2017-11-23

How to play with the new S3A committers

Untitled

Following up from yesterday's post on the S3A committers, here's what you need for picking up the committers.
  1. Apache Hadoop trunk; builds to 3.1.0-SNAPSHOT:  
  2. The documentation on use.
  3. An AWS keypair, try not to commit them to git. Tip for the Uber team: git-secrets is something you can add as a checkin hook. Do as I do: keep them elsewhere.
  4. If you want to use the magic committer; turn S3Guard on. Initially I'd use the staging committer, specificially the "directory" on.
  5. switch s3a:// to use that committer: fs.s3a.committer.name =  partitioned
  6. Run your MR queries
  7. look in _SUCCESS for committer info. 0-bytes long: classic FileOutputCommitter. Bit of JSON naming committer, files committed and some metrics (SuccessData) and you are using an S3 committer.
If you do that: I'd like to see the numbers comparing FileOutputCommitter (which must have S3Guard) and the new committers. For benchmark consistency, leave S3Guard on.

If you can't get things to work because the docs are wrong: file a JIRA with a patch. If the code is wrong: submit a patch with the fix & tests.

Spark?
  1. Spark Master has a couple of patches to deal with integration issues (FNFE on magic output paths, Parquet being over-fussy about committers, I think the committer binding has enough workarounds for these to work with Spark 2.2 though.
  2. Checkout my cloud-integration for Apache Spark repo, and its production-time redistributable, spark-cloud-integration.
  3. Read its docs and use
  4. If you want to use Parquet over other formats, use this committer.  
  5. Again,. check _SUCCESS to see what's going on.
  6. There's a test module with various (scaleable) tests as well as a copy and paste of some of the Spark SQL test.
  7. Spark can work with the Partitioned committer. This is a staging committer which only worries about file conflicts in the final partitions. This lets you do in-situ updates of existing datasets, adding new partitions or overwriting existing ones, while leaving the rest alone. Hence: no need to move the output of a job into the reference datasets.
  8. Problems. File an issue. I've just seen Ewan has a couple of PRs I'd better look at, actually.
Committer-wise, that spark-cloud-integration module is ultimately transient. I think we can identify those remaining issues with committer setup in spark core, after which a hadoop 3.0+ specific module should be able to work out the box with the new committers.

There's still other things there, like
  • Cloud store optimised file input stream source
  • ParallizedWithLocalityRDD: and RDD which lets you provide custom functions to declare locality on a row-by-row basis. Used in my demo of implementing DistCp in Spark. Every row is a filename, which gets pushed out to a worker close to the data, it does the upload. This is very much a subset of distCP, but it shows this: you can have with with RDDs and cloud storage.
  • + all the tests
I think maybe Apache Bahir would be the ultimate home for this. For now, a bit too unstable.

(photo: spices on sale in a Mombasa market)

2 comments:

  1. Can S3A Committers (Staging and Magic) be used for running a Distcp job?

    ReplyDelete
  2. I've been experimenting with S3Guard writing parquet files directly to S3. I see that the FileOutputCommitter is used to write files to local disk first prior to submitting to S3. The performance 3x slower than writing to HDFS cluster.

    Spark config looks like this
    {code}
    val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName(getClass.getSimpleName)
    .set("spark.ui.enabled", "true")
    .set("spark.app.id", "LocalSparkSessionProvider" + math.floor(math.random * 10E4).toLong.toString)
    .set("spark.sql.shuffle.partitions", "32")
    .set("spark.default.parallelism", "32")
    .set("spark.speculation", "false")
    .set("spark.locality.wait", "0")
    .set("spark.sql.parquet.compression.codec", "snappy")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.hadoop.dfs.nameservices", "hadooplocaltestcluster")
    .set("spark.hadoop.fs.s3a.access.key", s3Account.accessKey)
    .set("spark.hadoop.fs.s3a.secret.key", s3Account.secretKey)
    .set("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A")
    .set("spark.hadoop.fs.s3a.connection.maximum", "100")
    .set("spark.hadoop.fs.s3a.region", s3Account.region.getName)
    .set("kuduMaster", kuduMaster)
    .set("spark.hadoop.fs.s3a.metadatastore.impl", "org.apache.hadoop.fs.s3a.s3guard.DynamoDBMetadataStore")
    .set("spark.hadoop.fs.s3a.metadatastore.authoritative", "true")
    .set("spark.hadoop.fs.s3a.s3guard.ddb.region", s3Account.region.getName)
    .set("spark.hadoop.fs.s3a.s3guard.ddb.table", s"metastore-npe-${s3Account.region.getName}-rms-com")
    .set("spark.hadoop.fs.s3a.s3guard.ddb.table.create", "true")
    .set("spark.hadoop.fs.s3a.committer.name", "magic")
    .set("spark.hadoop.fs.s3a.committer.magic.enabled", "true")
    .set("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")
    .set("spark.hadoop.fs.s3a.s3guard.disabled.warn.level", "fail")
    {code}

    Am I missing something?

    ReplyDelete

Comments are usually moderated -sorry.