2014年2月19日水曜日

Redshiftで実行中のqueryをkillする

以下のページにまとめられていました。
Cancel a query

'stv_recents'テーブルから実行中のquery(status='Running')を検索してkillしたいqueryのpidを取得し、cancelコマンドをpid指定して実行すればOKです。
select pid, trim(user_name), starttime, substring(query,1,20) 
from stv_recents
where status='Running';
cancel [pid];

2014年2月7日金曜日

CPUのcoreごとの使用率を調べたい

メモ
$ mpstat -P ALL
10時19分43秒  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest   %idle
10時19分43秒  all    8.73    0.01    1.20    0.19    0.00    0.10    0.04    0.00   89.74
10時19分43秒    0   17.30    0.00    2.28    0.44    0.00    0.32    0.06    0.00   79.60
10時19分43秒    1    7.72    0.01    1.02    0.12    0.00    0.04    0.04    0.00   91.06
10時19分43秒    2    5.24    0.01    0.80    0.11    0.00    0.02    0.03    0.00   93.79
10時19分43秒    3    4.69    0.01    0.70    0.09    0.00    0.01    0.04    0.00   94.46
4つcoreがある場合は上記のような形で表示されます。

定期実行する場合は、上記にプラスして引数に数字(例えば1)を指定すると、指定した秒数ごとに最新の情報が表示されます。
$ mpstat -P ALL 1

2014年2月5日水曜日

ITエンジニア外出7つ小道具

インターネット時代のITエンジニアの宿命として、24時間365日、いかなる場所でも仕事をしなければいけない状況に陥る事があります。
そのため、多くの技術者もそうだと思いますが、常に持ち歩く道具というものが存在します。ここでは、ノートPCや携帯などの大きめな道具を除いた、僕が常に持ち歩く7つ小道具について紹介します。


1.WiFiルーター
外出先でもインターネットに接続するため。写真のものは、イーモバイルのPocketWiFiのLTE版です。
スマートフォンのテザリング機能を用いてネットワーク通信を行っても良いのですが、スマフォでは電源の持ちが心許ないので、電源を分散する意味でも専用のものを持ち歩いた方が便利です。WiFiルーターも、余計な機能がついてなく(ex.液晶タッチパネル)、バッテリーが大容量で、通信速度が速いものを選ぶのが望ましいです。
日本ではイーモバイルを使っていますが、外国に行くときはその国ごとにWiFiルーターをレンタルします。

2.iPhone/iPod向けLightningケーブル
iPhone持ち向け。急な充電や開発の際に使用。純正品が望ましいです。

3.MicroUSBケーブル
iPhone以外の多くのモバイル機器(Android端末や、上述のWiFiルーターなど)がMicroUSBに対応しているので。急な充電や開発の際に使用。純製のケーブルでないとAndroid端末が開発者モード時に認識してくれない事があるので、Android端末の純正品で揃えるのが望ましいです。

4.VGAアダプタ
急な会議やプレゼンなどで使用する事があるため。
HDMI対応のディスプレイやプロジェクタがもっと普及してくれればHDMIケーブルに置き換えます。

5.USB Ethernet アダプタ
主に最近のMacbook Pro/Airを持ち運ぶ人向け。有線LANしかインターネット環境が無い時に備えて。

6.SDカードリーダー
デジカメなどで撮影した写真を使用して資料やレポートなどを作成する事が必要な時があるため。多くのノートPCではSDカード読み取りスロットがありますが、様々なメディアからでも吸い出しできるように、一応。

7.モバイルバッテリー
写真には写ってないですが。電子機器の電池切れは死を意味します。


以前はこれ以外に以下のものも持ち歩いていました。

  • USBマイク(急なskypeミーティングに対応するため)
  • USBメモリ

上記のような小道具をいつも持ち歩いていれば、たいていのケースで対応できて焦ることは無いかな、と思います。


ちなみに大道具は以下で、それぞれ必要に応じて持ち運びます。

  • ノートPC
  • タブレット
  • スマフォ(Android+iOS の2台)
  • それぞれの機器の充電器

2014年1月29日水曜日

SparkでS3上のデータを使用する

http://spark.incubator.apache.org/docs/latest/ec2-scripts.html

SparkではローカルファイルやHDFS上のファイル以外に、S3上のファイルもデータとして使用することができます。
読み込む際に、SparkContextにAWSのACCESS_KEYとSECRET_KEYを認識させる必要がありますが、ネット上では色々情報が錯綜していてちょっと良くわかりませんでした。
(Hadoopクラスタのcore-site.xmlに書くとか、S3のURLに含ませるとか)

0.8.1のSparkContext.scala(core/src/main/scala/org/apache/spark/SparkContext.scala)のソースを見てみたら、以下のようになっていました。
  /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
  val hadoopConfiguration = {
    val env = SparkEnv.get
    val conf = SparkHadoopUtil.get.newConfiguration()
    // Explicitly check for S3 environment variables
    if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
        System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
      conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
      conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
      conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
      conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
    }
    // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
    Utils.getSystemProperties.foreach { case (key, value) =>
      if (key.startsWith("spark.hadoop.")) {
        conf.set(key.substring("spark.hadoop.".length), value)
      }
    }
    val bufferSize = System.getProperty("spark.buffer.size", "65536")
    conf.set("io.file.buffer.size", bufferSize)
    conf
  }
要するに

  • OSの環境変数にAWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEYが指定されていたら、それぞれHadoop Configurationのfs.s3(n).awsAccessKeyId/fs.s3(n).awsSecretAccessKeyにマップされる
  • Java(Scala)のSystem Propertiesに指定されている設定はそのままConfigurationにマップされる。
という風に環境変数は扱われているようです。

一例として、OSの環境変数に指定してあげると、spark-shellやpysparkなどでもS3からのインタラクティブロードが可能になります。
$ export AWS_ACCESS_KEY_ID=xxxxxx
$ export AWS_SECRET_ACCESS_KEY=xxxxxx
$ ./spark-shell
(snip..)
scala> val data = sc.textFile("s3n://(bucket)/(path))
scala> data.count()
res0: Long = 1607157
みたいな感じで。

OSの環境変数として指定するもよし、プログラム中で閉じるもよし、この辺はケースバイケースだと思います。

2014年1月28日火曜日

Spark + YARN + CDHの相性

http://spark.incubator.apache.org/
Sparkは、Hadoop上で動作するクラスターコンピューティングの基盤です。
という説明で正確かは自信ありません。。

生のHadoop+HDFS構成でMapReduceを動作させると基本的にHDFSに対してシーケンシャルアクセスが発生しますが、たとえばMapReduce上でiterableな解析処理を実行しようとすると毎回HDFSをシーケンシャルに読み込むために性能劣化につながります。
そのためSparkではRDD(Resilient Distributed Dataset)というデータ機構を独自に用意し、一度読み込んだデータはメモリ上にキャッシュするなどの対策でiterableな解析処理も多少は最適化するようになっています。

この特性を活かして、MLlibのような機械学習ツールや、Bagelのようなグラフ解析ツールも用意されています。
http://spark.incubator.apache.org/docs/latest/mllib-guide.html
http://spark.incubator.apache.org/docs/latest/bagel-programming-guide.html

また、インタラクティブコンソール上でscalaやpythonを用いてduck typing的に処理が書けるのも特徴です。

Sparkは独自のCluster Managerを持ち、この上で分散処理をコントロールします。Cluster Managerはstandaloneで動作させることもできますし、MesosやYARNなどのResource Managerに任せることもできます。スペック的には。。



Sparkでは、YARNと連携する際、ダウンロードしてきたソース・ファイルを用いてYARNの環境向けにビルド(assembly)しなおす必要があります。
ドキュメントは以下。
http://spark.incubator.apache.org/docs/latest/running-on-yarn.html

基本的には、YARNの動作環境にあわせて環境変数(SPARK_HADOOP_VERSION)に適切な値を設定し、「sbt assembly」を実行します。
指定すべきバージョンは以下が参考になります。
http://spark.incubator.apache.org/docs/latest/hadoop-third-party-distributions.html

試しに、CDH4.5とSpark8.1を組み合わせてYARN連携をしようとしてみました。マニュアル通りSparkのインストールディレクトリで以下を実行します。
./sbt/sbt clean
SPARK_HADOOP_VERSION=2.0.0-cdh4.5.0 SPARK_YARN=true ./sbt/sbt assembly
ただし上記を実行すると、コンパイル時にエラーになります。
エラーメッセージは以下のような感じで
https://gist.github.com/moaikids/8663958

要約すると、org.apache.spark.deploy.yarn.YarnAllocationHandler.scala内で
import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
というimport文があるのですが、このうち「AMResponse」インターフェースが存在しないため、コンパイルエラーになります。

Apache Hadoopの例えばr2.0.5-alphaには含まれていますが、
http://hadoop.apache.org/docs/r2.0.5-alpha/api/org/apache/hadoop/yarn/api/records/AMResponse.html
CDH4.5には含まれていない、
http://archive.cloudera.com/cdh4/cdh/4/hadoop-2.0.0-cdh4.5.0/api/org/apache/hadoop/yarn/api/records/
という状態です。

色々調べてみると、現在のSparkはApache Hadoop2.2でstableになったクラスを前提としているようで、CDH4.5(Hadoop2.0ベース)ではYARN経由での連携は出来ないようです。
詳細については以下のMailing Listのやりとりが詳しいです。
https://groups.google.com/forum/#!topic/spark-users/T1soH67C5M4

基本的には、CDH4.5でSpark8.1を動かす場合はstandaloneモードが無難ですし、どうしてもYARN連携したい場合は現在(2014/01/28現在)Beta1のCDH5を使って構築してみるか、のいずれかの選択肢になります。



jmapを用いたJVM Heapの状態調査

メモ。
参考:http://www.javainthebox.net/laboratory/JavaSE6/managementtools/mngtools.html

jmapは、JVMのHeapメモリ内の情報をdumpするためのツールです。
http://docs.oracle.com/javase/jp/6/technotes/tools/share/jmap.html
引数にjavaのプロセスIDを渡してあげると、当該プロセスのメモリ情報をダンプすることができます。
jmap -dump:format=b,file=[file name] [pid]
上記の例では、バイナリ形式でheapのダンプ情報を出力します。
ダンプしたファイルは色々なツールで可視化が可能ですが、javaの標準コマンドであるjhatが一番楽だと思います。
jhat [file name] 
引数のファイルには、上述のjmapで出力したダンプファイルを指定します。
このように指定してあげると、デフォルトでは7000番のポートでサーバーが立ち上がり、ブラウザで接続すると以下のような分析画面が表示されます。

お察しの通り、上記はJettyの起動プロセスからjmapでダンプした結果を可視化したものです。
解析画面にはいくつか項目があり、そのうち画面下部の「Heap Histogram」を選択するとHeap上に存在するインスタンスの数とバイトサイズを確認することができます。


このように、jmapは手軽にJVMのHeapの状態を確認するには便利なのですが、すべてのデータをダンプすると、JVMのHeap Sizeによってはダンプファイルのサイズが非常に大きくなり、解析が難しくなります。
僕の例だと、数GBのダンプファイルをjhatで読み込ませようとしたら、30分くらい解析に時間がかかった結果最終的にOutOfMemoryでプロセスが死にました。。。

たとえばHeap Histogramだけを知りたい、という場合、以下のオプションを指定してあげるとjhatの上記の例で表示したのと同じようなデータが取得できます。
jmap -histo [pid] > [file name]
結果は以下

なお、以下のように「live」オプションを付与すると、生存中のオブジェクトについてのみ結果出力されます。
jmap -histo:live [pid] > [file name]

2013年12月10日火曜日

Tableau上でRを使用する(インストール編)

Tableau Desktop 8.1からR scriptの実行がサポートされたようです。
http://www.tableausoftware.com/ja-jp/about/blog/2013/10/tableau-81-and-r-25327

Tableau上でRを実行するには、同じPC上(もしくはネットワーク接続可能な他のPC上)でRserveが動作している必要があります。
上記URLにも記載されていますが、セットアップ手順としては以下になります。

  1. Rをインストール
  2. Rを起動し、Rserveパッケージをインストール( install.packages("Rserve"); )
  3. Rserveを起動( library(Rserve); Rserve() )
  4. TableauからRserveに接続( ヘルプ→設定とパフォーマンス→R接続の管理 )
ただ、まっさらなR環境だと上記では足らず、追加でplyrパッケージのインストールが必要になります。( install.packages("plry"); )

これで、無事動作するようになります。

詳しくは追ってないですが、Tableau x Rのインテグレーションは、Tableauで収集したデータセットをRserve経由でR環境にわたし、実行結果を再度Tableauで受け取り、表示をする、という感じになっているように見えます。