SparkではローカルファイルやHDFS上のファイル以外に、S3上のファイルもデータとして使用することができます。
読み込む際に、SparkContextにAWSのACCESS_KEYとSECRET_KEYを認識させる必要がありますが、ネット上では色々情報が錯綜していてちょっと良くわかりませんでした。
(Hadoopクラスタのcore-site.xmlに書くとか、S3のURLに含ませるとか)
0.8.1のSparkContext.scala(core/src/main/scala/org/apache/spark/SparkContext.scala)のソースを見てみたら、以下のようになっていました。
要するに/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { val env = SparkEnv.get val conf = SparkHadoopUtil.get.newConfiguration() // Explicitly check for S3 environment variables if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) } // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" Utils.getSystemProperties.foreach { case (key, value) => if (key.startsWith("spark.hadoop.")) { conf.set(key.substring("spark.hadoop.".length), value) } } val bufferSize = System.getProperty("spark.buffer.size", "65536") conf.set("io.file.buffer.size", bufferSize) conf }
- OSの環境変数にAWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEYが指定されていたら、それぞれHadoop Configurationのfs.s3(n).awsAccessKeyId/fs.s3(n).awsSecretAccessKeyにマップされる
- Java(Scala)のSystem Propertiesに指定されている設定はそのままConfigurationにマップされる。
という風に環境変数は扱われているようです。
一例として、OSの環境変数に指定してあげると、spark-shellやpysparkなどでもS3からのインタラクティブロードが可能になります。
$ export AWS_ACCESS_KEY_ID=xxxxxx $ export AWS_SECRET_ACCESS_KEY=xxxxxx $ ./spark-shell (snip..) scala> val data = sc.textFile("s3n://(bucket)/(path)) scala> data.count() res0: Long = 1607157
みたいな感じで。
OSの環境変数として指定するもよし、プログラム中で閉じるもよし、この辺はケースバイケースだと思います。