2013年7月9日火曜日

MRJobのmapper/reducerに引数を渡す

Java版のHadoopとかだと、プログラムに環境変数的な値を渡すのはConfiguraitonクラスを色々駆使しなくてはいけなくて結構面倒くさい印象がありますが、MRJobの場合は以下のような書き方で、optparseチックに引数を渡すことができます。

たとえば"--date"というパラメータで日付情報を渡すようにする場合
    def configure_options(self):
        super(MR, self).configure_options()
        self.add_passthrough_option('--date', default=None)
のような形で"configure_options"メソッドを上書きし、add_passthrough_optionメソッドに定義を書いてあげると、各mapper、reducerの処理中から
date = self.options.date
のように参照できるようになります。

実行時は以下のような形
python hoge.py --date 2013-07-09 data/hoge.tsv


MRJobで複数ステップのMap-Reduceを実行する

"def steps(self)" メソッドをoverrideして、各ステップにどのような処理を実施するか指定します。

MRJobのサンプルページに載ってる例だと↓な感じ。
def steps(self):
    return [self.mr(mapper=self.transform_input,
                    reducer=self.consolidate_1),
            self.mr(reducer_init=self.log_mapper_init,
                    reducer=self.consolidate_2)]
steps関数の戻り値として、配列で各ステップごとにどんな振る舞いをさせるかをmr関数の戻り値として記載するようになっています。複数要素の配列を返せば、複数のステップが実行されます。
mr関数に指定できる振る舞いは、以下のようです。
  • mapper
  • reducer
  • combiner
  • mapper_init
  • mapper_final
  • reducer_init
  • reducer_final
  • combiner_init
  • combiner_final
ナイーブにMRJobを動かすと、1ステップごとに、outout出力されたファイルをS3から手元の環境にコピーするようになっています。その転送コストがかなりオーバーヘッドが高いので、S3上で出来る限り処理をしたい…という時に、複数ステップで実行すると嬉しいケースが多いです。

MySQLからデータをサンプリングして取得する

何かしらのデータの傾向を調べたい時に、たとえばMySQLに保存されているデータをdumpして調べる、みたいな事を行います。
ただ、この方法は待機系サーバーであったり、データ件数が少ないうちはあまり気にせずに行なっても大丈夫ですが、本番系で大量のデータを持つテーブルからfull dumpなどを行うのは現実的では有りません。

ということで、対象テーブルから乱数を用いてサンプリングデータの抽出をしてみました。本当の意味ではメルセンヌ・ツイスターなどの精度が保証されている乱数器を用いるのがベターなのでしょうが、今回はその話は無しで。



MySQLには乱数を生成する”rand()”という関数があるので、こちらをwhere句に記述し、テーブルの各レコードに対して乱数を付与し、かつその値が一定の範囲に含まれるもののみ抽出対象とする、という方法で、手軽にサンプリングを行う事ができます。

たとえば約90,000件のレコードが存在する以下のテーブルに対して
mysql> select count(*) from hoge;
+----------+
| count(*) |
+----------+
|    88975 |
+----------+
以下のようなクエリーを発行すると、おおよそ全レコードの10%のデータがランダムに抽出されます。
mysql> select count(*) from hoge where rand() <= .1;
+----------+
| count(*) |
+----------+
|     8997 |
+----------+

内部的には、すべての対象レコードに対して"rand()"関数を実行し、当該乱数が0.1以下のデータを抽出、という動きになっていると思います。
全レコードを走査しながら、かつ内部関数を実行する、という、クエリー的には鬼のような挙動なので、どう考えても高トラフィックのサイトのサービス側で使用するクエリーとしては、使えません。

アドホックにデータ抽出する際でも、件数の多いテーブルに対して当該処理を行うと処理に膨大に時間がかかります。

そのため、パフォーマンス・チューニングとしては以下のような感じになると思います。
  1. '*'指定ではなく、primary key もしくは index上のカラムを抽出対象にする
  2. 1.で取得したデータと、元のテーブルのJOINでデータを抽出する
  3. 1.で取得する件数を全件ではなく、一定の範囲に収める

すべてのデータを走査対象にするよりは、indexのみで完結するほうが走査コストが低くすむので、"where rand()" を使用する際の抽出条件はprimary keyや、index上のカラムのみにします。
たとえば、"id"というカラムがprimary keyなのであれば、以下のようにします。
mysql> select id from hoge where rand() <= .1;
explainしてみると、id指定の場合は"type=index"となり、'*'指定の場合は"type=ALL"となります。
(count句を使用した場合は例外で、primary keyが指定されているテーブルであればデータ件数はprimary keyの数を数えるだけで済むので、count(*)でも"type=index"になることがあります)


そのうえで、以下のようにJOINしてデータを取得してあげると、ナイーブにrand()を使う時よりも速くなります。
mysql> select * from hoge as a, (select id from hoge where rand() <= .1) as b where a.id = b.id;

また、subqueryの結果返されるidの数が少なければ少ないほどtemporary領域などのリソースを食いつぶす率が減りますし、JOINコストが減りますので、rand()をかけるデータの範囲を絞ってあげるのも1クエリーあたりのスループット向上と言う意味では効果があります。
mysql> select * from hoge as a, (select id from hoge where (id between 1 and 1000000) and rand() <= .1) as b where a.id = b.id; 
mysql> select * from hoge as a, (select id from hoge where (id between 1000001 and 2000000) and rand() <= .1) as b where a.id = b.id; 
・
・
・
のような形。
もう既に無作為抽出などは望むべくも無い状態になってますが…まあ、全データdumpでなく、一部のデータをMySQLにあまり負荷をかけずに取得する、という方法の紹介でした。


2013年7月2日火曜日

fluent-plugin-uniqcount を用いて Fluentd で数値集計

https://github.com/KazkiMatz/fluent-plugin-uniqcount
http://d.hatena.ne.jp/kazuk_i/20130506

ログデータをFluentdで収集し、HadoopやMongoDBなどのストレージに収集した後、クエリーを発行して必要なデータを取得する。
たいていのデータ解析では上述のような後付けのバッチ処理で十分ですが、

  • 準リアルタイムで収集をしたい
  • Fluentdでリレーするデータは、当該集計以外には使用しない
    (ストレージに格納したとしても、基本的に再利用はされない)
といったケースの場合、ストレージにデータをすべて保持するのは無駄ですし、即時性の観点でもFluentdをデータが流れる際にあわせて数値集計が行われた方が効率が良いです。

記事先頭にリンクをした「fluent-plugin-uniqcount」は、名前の通リ特定の条件に合致したデータのユニーク件数について、集計が行えるFluentdプラグインです。


◯内容
詳しくは作者の方のブログに詳細な記述があるので、こちらを参照下さい。


◯事例
ある程度URLのバリエーションが固定されているとあるWebサービスで、時間単位内でURLごとにそれぞれどの程度のリクエストがあるか集計したい、という需要がありました。
1日単位での集計で十分であればログを溜めた上でHadoop(Hive)等での集計で十分なのですが、最低でも1分単位でデータを把握したかったため当該プラグインを使用しました。
access_log -- [in_tail] --> Fluentd -- [out_uniqcount] -- [out_http] --> (API Server) --> MySQL
概ねの流れは上記の通リで、in_tailで読み取ったログ情報をURLごとに集計し、out_httpを経由して最終的にMySQLに書き込みます。

out_uniqcountの設定は、たとえば以下のような形。
<match hoge.access_log>
    type uniq_count
    list1_label hoge_trends
    list1_key1 uri
    list1_key2 referer
    list1_span 60
    list1_offset 3
    list1_out_tag hoge.uniq_referer
    list1_out_num 1000
    list1_out_interval 60
</match>
上記であれば、あるuriにアクセスされたrefererごとにFluentd上で数値が集計され、1分ごとにアクセスの多い上位1000件が"hoge.uniq_referer"タグにて出力されます。

このように、色々なルールの組み合わせでFluentd上で手軽に数値集計が可能になります。

システム的なメトリクス情報の取得であれば、uriとstatus codeの組み合せで特定URLへのステータスコード別リクエスト数、なども出せます。
また、作者ブログの事例にもあるように、urlとip addressの組み合わせで集計することで特定URLの簡易ユニークアクセス数なども集計できます。


◯注意点
すべての集計データをいったんFluentdプロセスのメモリ上に保持する形になります。なので、作者のブログにも記載されていますが、データパターンが増えるとメモリが溢れてしまう可能性があります。単位時間内に大量の組み合わせが発生するような集計処理では大量のシステムリソースを必要とするので、注意です。


機能としては非常にシンプルですし、作者の意向かFluentd Pluginsのページにも公開されてませんが、現実的な性能を出してくれますし、手軽に数値集計が行えるので、データ規模が一定の範囲で収まる用途であれば有効な手法だと思います。