2013年12月10日火曜日

Tableau上でRを使用する(インストール編)

Tableau Desktop 8.1からR scriptの実行がサポートされたようです。
http://www.tableausoftware.com/ja-jp/about/blog/2013/10/tableau-81-and-r-25327

Tableau上でRを実行するには、同じPC上(もしくはネットワーク接続可能な他のPC上)でRserveが動作している必要があります。
上記URLにも記載されていますが、セットアップ手順としては以下になります。

  1. Rをインストール
  2. Rを起動し、Rserveパッケージをインストール( install.packages("Rserve"); )
  3. Rserveを起動( library(Rserve); Rserve() )
  4. TableauからRserveに接続( ヘルプ→設定とパフォーマンス→R接続の管理 )
ただ、まっさらなR環境だと上記では足らず、追加でplyrパッケージのインストールが必要になります。( install.packages("plry"); )

これで、無事動作するようになります。

詳しくは追ってないですが、Tableau x Rのインテグレーションは、Tableauで収集したデータセットをRserve経由でR環境にわたし、実行結果を再度Tableauで受け取り、表示をする、という感じになっているように見えます。


2013年11月22日金曜日

RedshiftのJSON functions

単純に見落としていただけで以前から有ったのかもしれませんが、RedshiftにもJSON関連のfunctionが存在していました。。
http://docs.aws.amazon.com/redshift/latest/dg/json-functions.html

PostgreSQLにもJSON functionは各種存在しますが、Redshiftのそれについては関連性はなく、独自の実装に見受けられます。


◯JSON_ARRAY_LENGTH function
http://docs.aws.amazon.com/redshift/latest/dg/JSON_ARRAY_LENGTH.html
対象となる文字列がJSONのarray表現であるとして、そのarrayの長さを返却するfunctionです。
hoge=# select json, json_array_length(json) from hoge;
| 0
["a<","b","c"] | 3 
渡された文字列が空文字の場合、0が返却されます。
また、文字列以外の値など、JSONのparsingが行えない値を渡した場合はエラーになります。
ERROR:  JSON parsing error
DETAIL:  
  -----------------------------------------------
  error:  JSON parsing error
  code:      8001
  context:   invalid json array object 2013-11-21 00:36:08
  query:     2416921
  location:  funcs_json.h:81
  process:   query1_20 [pid=9891]
  -----------------------------------------------

◯JSON_EXTRACT_ARRAY_ELEMENT_TEXT function
http://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_ARRAY_ELEMENT_TEXT.html
渡されたJSON Array文字列の中から指定した要素の値を取得します。
hoge=# select json, json_extract_array_element_text(json, 0);
 |  
["a","b","c"] | a
当該要素が存在しない場合は空文字が返ります。


◯JSON_EXTRACT_PATH_TEXT function
http://docs.aws.amazon.com/redshift/latest/dg/JSON_EXTRACT_PATH_TEXT.html
渡されたJSON文字列の中から、指定したkeyに合致するvalue値を取得します。
hoge=# select json, json_extract_path_text(json, 'a')
 | 
{"a":1, "b":2, "c": 3} | 1
該当するkeyが存在しない場合は空文字が返ります。
また、{"a": {"b":{"c" :3} } } のようにネスト構造のjsonから"a.b.c"のvalueを取得したい場合は、 json_extract_path_text(json, 'a', 'b', 'c') という風に書くことで取得できます。


2013年10月24日木曜日

Redshiftでの"String contains invalid or unsupported UTF8 codepoints"問題とACCEPTINVCHARSオプション

Redshiftにおけるvarcharなど文字列型は基本的にUTF-8文字列しかサポートしていないようなので、非UTF-8と思われる文字列が含まれているTSV/USVをRedshiftにロードしようとすると以下のエラーが発生します。
String contains invalid or unsupported UTF8 codepoints. Bad UTF8 hex sequence:

基本的には取り込むデータ中に非UTF-8と思われる文字コードが含まれないよう、前処理にてデータの整形を行うことが正攻法だと思います。
しかし試験的にデータを取り込みたかったり、ひとまずRedshiftにデータを入れたい場合は、'ACCEPTINVCHARS'オプションを用いて当該文字を他の文字にreplaceしてRedshiftに取り込む、ということができます。

http://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html#acceptinvchars
ACCEPTINVCHARS AS '_'
のような形でオプション指定します。ASで置換後の任意の文字列を指定できます(デフォルトは'?')

'ACCEPTINVCHARS'を指定した場合、たとえ非UTF-8文字列が含まれていても自動的に置換されRedshiftに取り込まれます。なお、取り込み時に以下のようなメッセージが表示されます。
INFO: Load into table 'hoge' completed, 59 record(s) were loaded with replacements made for ACCEPTINVCHARS. COPY

参考:Amazon Redshift / Analytics / Knowledge Base - Papertrail Support

2013年10月3日木曜日

nginxのmerge_slashes

nginxや、その他多くのWebサーバーでは、リクエストされたURLの中の連続しているスラッシュをマージして解釈するような動きをする事が多いです。

たとえば
http://www.sada.co.jp//////index.html → http://www.sada.co.jp/index.html
のような感じで。

多くの場合上記の動作をしてくれて問題無いのですが、連続したスラッシュに意味がある場合もあり、その際は自動的にmerge slashされると都合が悪いケースもあります。

nginxの場合は"merge_slashes"というsyntaxがあるようで、こちらはデフォルトではonなのですが、offにすることで自動的にmerge slashをしないようにすることができるようです
merge_slashes off
こんな感じ。

参考:nginxはデフォルトでmerge slashする

2013年9月14日土曜日

redshiftのSQLでunixtimeをtimestamp型に変換

timestamp から unixtimeに変換するのは Extract 関数があるので楽ですが、逆の変換はto_timestampなどがredshiftに存在しないため面倒です。

以下のようなSQLを書くことで実現できます。なお、以下の場合は、カラムtime_stampに秒単位でunixtimeが入っている場合です。
select  TIMESTAMP 'epoch' + unix_time * INTERVAL '1 second' as time_stamp from hoge limit 10;
timezoneの変換を合わせて行う場合は、 CONVERT_TIMEZONE 関数を使用します。以下はtimezoneをJSTに変換する例。
select CONVERT_TIMEZONE('JST', TIMESTAMP 'epoch' + unix_time * INTERVAL '1 second') time_stamp from hoge limit 10;


2013年9月7日土曜日

redshiftのカラムに適切なencodeを設定する

redshiftはいわゆるcolumner型のデータ構造を持っているので、各columnに適切なencode(圧縮)方式を適用することが運用に際して非常に重要になります。


◯redshiftに任せる
redshiftは、当該テーブルへの初回copy時に適切なencodeを自動検出し適用する機能があります(10万レコードからサンプリングし、推定)。最もナイーブな方式としてはこちらに任せる、という方法があります。

適用されたencodeは、以下のクエリーを実行することで確認できます。
select * from pg_table_def where tablename = 'hoge';


◯analyze compressionを実行した結果を適用する
ただし、初回のサンプリングだけでは限界があり、運用を重ねていった結果実情に則さなくなるケースも多いです。

現在蓄えられたデータから最適なencodeを推薦する"analyze compression"というコマンドがredshiftには提供されています。
http://docs.aws.amazon.com/redshift/latest/dg/r_ANALYZE_COMPRESSION.html
analyze compression hoge;

こちらの実行結果と、初回copy時に割り当てられたencodeでは、同じ場合もありますし、異なる場合もあります.
異なる場合はどうするか?
alter tableなどで後付でencodeが変更できればよいですが、流石にそのような機能はredshiftに提供されていないので、基本的には以下の手順でtableを作り直します。

  1. 既存のテーブルhogeと全く同じcolumnを持つtemporary table を作成。
  2. hogeのデータをtemporary tableに退避(insert into hoge_tmp (select * from hoge))
  3. hogeをdrop(drop table hoge)
  4. 適切なencodeを指定して、hoge table を作りなおす(create table ~ )
  5. temporary tableのデータをhogeにコピーする( insert into hoge (select * from hoge_tmp))
  6. temproary table を削除



◯encodeの種類
以下にまとめられています。
http://docs.aws.amazon.com/redshift/latest/dg/c_Compression_encodings.html

run-lengthは、同一カラム内で同じ値を持つデータが連続して出現する可能性が高い場合、非常に有効です。

同じ値ではないけれど近しい値が連続して出現する場合は、差分情報を用いるdeltaならびにdelta32kが有効です。

数値系の値で、その型が持つサイズ(integer であれば32bit、bigintであれば64bit)を使いきらず、本来のサイズよりも少ないbit数で表現可能な値がカラム中に多い場合はmostly8/16/32が有効です。

辞書式圧縮が有効なケースの場合は、文字列の場合はtext255/32k、そうでない場合はbytedictになります。

いずれにも当てはまらず、圧縮が意味が無いケースの場合はraw(圧縮しないでそのまま)、という感じです。


◯脳筋的な運用をするのであれば定期的にanalyze compression → 作り直し
redshiftを信じるのであれば、何も考えずにanalyze compressionを定期的に実行し、必要であればその結果を元に作り変えるのが良いのでは無いかと思います。



なお、redshiftは、そのデータ構造の性質上、データの出現順序(insert順序)によって圧縮効率が大きく変わるようです。
圧縮しやすい形でデータを保存したいのであれば、ある程度データをあらかじめソートした上で投入するのが、効率的と思われます。
また、encodeの選定についても、どのような順序でデータが投入されるかをあらかじめ想定しておくと、最初のcreate tableの段階で妥当なencodeが何なのか類推しやすくなると思います。


※参考文献:
本記事と直接的な関連は無いですが、encodeの理解に非常に参考になったスライド
Redshift Compression Encodings(圧縮アルゴリズム)についてもっと調べてみた

2013年9月4日水曜日

fluent-plugin-redshiftとその他pluginを組み合わせてredshiftへデータ保存

http://aws.amazon.com/jp/redshift/
AWSから安価で使用可能なDWH製品Redshiftが公開されてしばらく立ちます。
非常に興味深いサービスなのですが、Redshiftへのデータの登録が独特(S3に置いたCSV/TSVをcopyコマンドを用いて登録)という事もあり、ちょっと面倒くさく感じていました。

最近、掲題のようにredshiftへのデータ保存が行えるFluentdプラグインがあるのを発見したので、こちらと他プラグインを組み合わせて、Fluentdを用いたRedshiftへのデータ保存を試してみました。


◯fluent-plugin-redshift
https://github.com/hapyrus/fluent-plugin-redshift
BufferedOutputプラグインの一つで、仕組みとしてはchunk単位でS3にデータを書き込んだ上でcopyコマンドを使用してRedshiftにデータ登録を行っています。S3上のファイルはRedshift登録後もそのまま残るので、作業の証跡としてや、他用途への活用(EMRを使っての解析等)にも使用できるようになっています。

Redshftへの書き込みはchunk単位で行われるので、書き込み頻度については"buffer_chunk_limit"や"flush_interval"などの設定で調整します。

一つおもしろい形になっているのがデータ登録形式で、json形式のデータを投入したい場合
{"log":{"a":1."b":2,"c":3}}
といった形で、"log"というキーの中にjsonデータをすべて包含させる必要があります。
※"log"の値は設定ファイル中で"record_log_tag"に任意の値を指定することで変更可能です。

なぜこのような仕様になっているかは定かではないですが、現状実装がそのようになっていますし、TSV/CSV形式で流しこむのに比してスキーマ情報(カラム名)を明示的に指定できるメリットもあるので、JSONデータ形式で流し込みたいところです。
とはいえ、情報ソース側で出力するJSONの形式をredshiftプラグインの仕様にあわせて変更する、というのも手間です。


◯fluent-plugin-jsonbucket
こちらの課題を解決することだけを目的に、fluent-plugin-jsonbucketというプラグインを作りました。
https://github.com/moaikids/fluent-plugin-jsonbucket

こちらは、READMEに記載している通リ
{“a”:1, “b”:2, “c”:3 } -> {“bucket”: {“a”:1, “b”:2, “c”:3 } }
という形で、JSONデータを特定のkeyのvalue値として内包させるためのプラグインです。

jsonbucketと組み合わせる事で、情報ソース側で特別な加工をすることなく、pluginでJSONの変換処理を吸収させる事ができます。


fluent-plugin-reassemble
基本的には上記の組み合わせでOKですが、情報ソースのうちすべての項目をredshiftに保存するのも無駄が多いですし、redshiftに登録する時にデータの簡単な変換を行いたいケースもあります。
たとえば、JSON上は文字列として扱われている数値をredshift側ではintegerで保存したい、日付情報についてログ上ではunixtimeで保存されているけれど、見通しを良くしたりもしくはテーブル設計上の都合でredshift側では"yyyy-mm-dd"のdate型で保存したい、といったケースです。

また、元々のデータがネスト構造になっていたりするとredshift的には扱いづらいので、redshift保存のためにフラットな構成に置き換える必要があります。

その辺を情報ソース側ですべてコントロールするのもコレまた手間ですので、plugin側でこの手のtransform処理を行うfluent-plugin-reassembleを作成しました。
https://github.com/moaikids/fluent-plugin-reassemble

保存したいkeyの情報と、valueをどのように変換するかを設定ファイル上で記載することで、指定通りにデータの加工(reassemble)が行えるプラグインです。

こちらもREADMEに例を載せていますが、
ex1. 
assemble foo1:bar1:to_i, foo2:bar2:to_s
record => {"foo1": "1", "foo2": 2}
reassemble => {"bar1": 1, "bar2": "2"}
ex2. 
assemble foo1,foo2:bar2,foo3:bar3:unixtime_to_datetime,foo4:bar4:url_to_host
record => {"foo1": "1", "foo2": 2, "foo3": 1377946931, "foo4": "http://www.sada.co.jp/concert.html"}
reassemble => {"foo1": "1", "bar2": 2, "bar3": "2013-08-31 20:00:02", "bar4": "www.sada.co.jp"}
といった形で、"assemble"に所定のフォーマットで対象のキーとoperation内容を記述することで、plugin上でデータの加工が行えます。
(指定のフォーマットは以下)
{extract_key1}:{replaced_key1}:{operation1},{extract_key2}:{replaced_key2}:{operation2},....{extract_keyN}:{replaced_keyN}:{operationN}
こちらを使う事で、情報ソースのうち必要なデータを必要な形式で変換・加工し、redshiftに登録する、といった事がpluginだけで完結させる事ができます。
(もちろんreassembleプラグインについてはredshiftだけに限らず、すべての用途向けに仕えるpluginですが)


という感じで、これらのpluginを組み合わせる事で、非常に手軽にredshiftへデータの保存が行えるようになります。

2013年8月29日木曜日

fluent-plugin-jsonbucket

Fluentdのプラグインをいくつか作成しているのですが、多くが実証実験中(?)なので、rubygemsなどへの公開テスト的に簡単なものを書いてみようということで、掲題のものを10分くらいで作ってみました。

https://github.com/moaikids/fluent-plugin-jsonbucket

このoutput pluginは、Fluentd内で処理をしているjsonデータ(msgpackデータ)を、 key : {json} のような形で一つのキーにひとまとめにするプラグインです。

何をいっているのか自分でもよくわからなくなってきましたが、
{“a”:1, “b”:2, “c”:3 } 
というjsonデータ(msgpackデータ)を
{“bucket”: {“a”:1, “b”:2, “c”:3 } } 
という様な形式に変換するプラグインです。

「何に使うの??」というのが正直な感想だと思いますが、用途は一般的では無いのでヒミツです。


2013年8月21日水曜日

Elastic MapReduceの実行状況をグラフ化するmunin plugin

最近muninのプラグインを作ることが多くなりましたが、掲題のようにElastic MapReduceのプラグインを作成してみました。AWSのCloudWatchで用意されているグラフは非常に見辛いので何とかしたかったのが、作成の動機です。。。

https://github.com/moaikids/munin-emr

内部ではPythonのAWS SDK実装である"boto"を用いてjobflowsの値を取得し、stateごとに件数をグラフ化している感じです。


インストール方法は、上記のgitからpluginを取得し、munin-nodeがインストールされている環境に設置するだけです。ただしあらかじめPython(2.6 >= )がインストールされていることと、先述の"boto"ならびにtimezone関連のutil "pytz" のインストールが必要です。

pip install boto
pip install pytz

git clone https://github.com/moaikids/munin-emr.git
cp munin-fluentd/plugins/emr/* /usr/share/munin/plugins/
ln -s  /usr/share/munin/plugins/emr_jobflows  /etc/munin/plugins/emr_jobflows 

また、AWSへの接続を行うため、AWSへアクセスするためのcredential情報をmunin-nodeの設定ファイルに記載する必要があります。

$ vim /etc/munin/plugin-conf.d/munin-node 
[emr_jobflows]
user root
env.access_key_id [ACCESS_KEY_ID]
env.access_secret_key [ACCESS_SECRET_KEY]
env.region [REGION]
env.time_range 10
一番最後の"time_range"では、何分前のジョブ情報を取得するかを指定します(単位:分)。
内部実装的には、describe_jobflows APIのcreated_afterの値に、「現在時刻 - time_range」の値を渡すようになっています。

当該プラグインをmunin環境に適用すると、以下のようなグラフが表示されるようになります。

2013年8月19日月曜日

awsのpython sdkを使ってEMRのジョブステータスを取得する。

https://github.com/boto/boto
pythonのAWS SDKのひとつ"boto"を用いて、EMRのジョブステータスの取得を行ってみました。
import boto.emr
import datetime

access_key_id = "….."
access_secret_key = "….."
conn = boto.emr.connect_to_region("ap-northeast-1", aws_access_key_id=access_key_id,aws_secret_access_key=acces    s_secret_key)

created_after = datetime.datetime.today() - datetime.timedelta(hours=24)
print created_after
jobflows = conn.describe_jobflows(None, None, created_after, None )
for jobflow in jobflows:
    print jobflow.jobflowid
    print "  " + jobflow.name
    print "  " + jobflow.loguri
    print "  " + jobflow.state
    print

表示される結果は、先のRuby版の記事の内容とだいたい同じです。
http://itsneatlife.blogspot.jp/2013/08/awsruby-sdkemr.html

2013年8月16日金曜日

awsのruby sdkを使ってEMRのジョブステータスを取得する。

↓な感じになる。
以下は直近1日以内のデータのみを取得するよう、'created_after'パラメータに値を指定してます。
#!/usr/bin/env ruby
require 'rubygems'
require 'aws-sdk'

emr=AWS::EMR.new
created_after = Time.at(Time.now().to_i - (24 * 60 * 60)).iso8601
p created_after
resp = emr.client.describe_job_flows({:created_after => created_after})

resp.data[:job_flows].map{ |job|
    p job[:job_flow_id]
    p "    " + job[:name]
    p "    " + job[:log_uri]
    p "    " + job[:execution_status_detail][:state]
    p
}
実行結果例
"hogehoge"
"    hogejob.name"
"    s3n://hoge-log/emr/logs/"
"    COMPLETED"


2013年8月12日月曜日

Linuxのdateコマンドでunixtimeと日付情報の変換

何気に知っていると便利でした。
# 現在時刻をunixtimeで表示
$ date +%s
1376285909
# 指定したunixtimeを日付情報で表示
$ date --date @1376285909
2013年  8月 12日 月曜日 14:38:29 JST

2013年8月6日火曜日

redisで特定の条件にマッチするkeyを削除する

redisのコマンドラインとxargsを使うと楽ですね。
#全削除
redis-cli KEYS "*" | xargs redis-cli DEL
#前方一致
redis-cli KEYS "prefix*" | xargs redis-cli DEL
#後方一致
redis-cli KEYS "*suffix" | xargs redis-cli DEL

2013年8月2日金曜日

aws-sdk-rubyを使用してspot instanceのpricing historyを取得

https://github.com/aws/aws-sdk-ruby
gem install aws-sdk

aws-sdk-rubyをインストールした上で、使用しているAWSのID情報を設定し
export AWS_ACCESS_KEY_ID='...'
export AWS_SECRET_ACCESS_KEY='...'
export AWS_REGION='ap-northeast-1'
以下のようなruby srouceを実行すると、spot instanceのpricing historyが取得出来ます。
#!/usr/bin/env ruby
require 'rubygems'
require 'aws-sdk'

ec2=AWS::EC2.new
resp = ec2.client.describe_spot_price_history(
    :instance_types => ['m1.small', 'm1.medium', 'm1.large', 'm1.xlarge'],
    :start_time => (Time.now - 60 * 60 * 24).iso8601, 
    :availability_zone => 'ap-northeast-1b')

resp.data[:spot_price_history_set].map{ |history| 
    if history[:product_description] == "Linux/UNIX"
        p history[:instance_type] + " => " + history[:spot_price] + " (" + history[:timestamp].to_s + ")" 
    end
}
実行結果。
"m1.small => 0.017000 (2013-08-01 22:52:34 UTC)"
"m1.large => 0.067000 (2013-08-01 22:48:47 UTC)"
"m1.medium => 0.035000 (2013-08-01 22:48:44 UTC)"
"m1.xlarge => 0.134000 (2013-08-01 22:48:43 UTC)"
"m1.small => 0.000100 (2013-08-01 22:47:34 UTC)"
"m1.xlarge => 0.134000 (2013-08-01 13:12:40 UTC)"
"m1.medium => 0.035000 (2013-08-01 06:47:28 UTC)"
"m1.small => 0.017000 (2013-08-01 02:42:36 UTC)"
"m1.large => 0.067000 (2013-08-01 00:04:50 UTC)"
"m1.xlarge => 0.134000 (2013-07-31 13:11:39 UTC)"
"m1.medium => 0.035000 (2013-07-31 06:44:00 UTC)"


2013年7月9日火曜日

MRJobのmapper/reducerに引数を渡す

Java版のHadoopとかだと、プログラムに環境変数的な値を渡すのはConfiguraitonクラスを色々駆使しなくてはいけなくて結構面倒くさい印象がありますが、MRJobの場合は以下のような書き方で、optparseチックに引数を渡すことができます。

たとえば"--date"というパラメータで日付情報を渡すようにする場合
    def configure_options(self):
        super(MR, self).configure_options()
        self.add_passthrough_option('--date', default=None)
のような形で"configure_options"メソッドを上書きし、add_passthrough_optionメソッドに定義を書いてあげると、各mapper、reducerの処理中から
date = self.options.date
のように参照できるようになります。

実行時は以下のような形
python hoge.py --date 2013-07-09 data/hoge.tsv


MRJobで複数ステップのMap-Reduceを実行する

"def steps(self)" メソッドをoverrideして、各ステップにどのような処理を実施するか指定します。

MRJobのサンプルページに載ってる例だと↓な感じ。
def steps(self):
    return [self.mr(mapper=self.transform_input,
                    reducer=self.consolidate_1),
            self.mr(reducer_init=self.log_mapper_init,
                    reducer=self.consolidate_2)]
steps関数の戻り値として、配列で各ステップごとにどんな振る舞いをさせるかをmr関数の戻り値として記載するようになっています。複数要素の配列を返せば、複数のステップが実行されます。
mr関数に指定できる振る舞いは、以下のようです。
  • mapper
  • reducer
  • combiner
  • mapper_init
  • mapper_final
  • reducer_init
  • reducer_final
  • combiner_init
  • combiner_final
ナイーブにMRJobを動かすと、1ステップごとに、outout出力されたファイルをS3から手元の環境にコピーするようになっています。その転送コストがかなりオーバーヘッドが高いので、S3上で出来る限り処理をしたい…という時に、複数ステップで実行すると嬉しいケースが多いです。

MySQLからデータをサンプリングして取得する

何かしらのデータの傾向を調べたい時に、たとえばMySQLに保存されているデータをdumpして調べる、みたいな事を行います。
ただ、この方法は待機系サーバーであったり、データ件数が少ないうちはあまり気にせずに行なっても大丈夫ですが、本番系で大量のデータを持つテーブルからfull dumpなどを行うのは現実的では有りません。

ということで、対象テーブルから乱数を用いてサンプリングデータの抽出をしてみました。本当の意味ではメルセンヌ・ツイスターなどの精度が保証されている乱数器を用いるのがベターなのでしょうが、今回はその話は無しで。



MySQLには乱数を生成する”rand()”という関数があるので、こちらをwhere句に記述し、テーブルの各レコードに対して乱数を付与し、かつその値が一定の範囲に含まれるもののみ抽出対象とする、という方法で、手軽にサンプリングを行う事ができます。

たとえば約90,000件のレコードが存在する以下のテーブルに対して
mysql> select count(*) from hoge;
+----------+
| count(*) |
+----------+
|    88975 |
+----------+
以下のようなクエリーを発行すると、おおよそ全レコードの10%のデータがランダムに抽出されます。
mysql> select count(*) from hoge where rand() <= .1;
+----------+
| count(*) |
+----------+
|     8997 |
+----------+

内部的には、すべての対象レコードに対して"rand()"関数を実行し、当該乱数が0.1以下のデータを抽出、という動きになっていると思います。
全レコードを走査しながら、かつ内部関数を実行する、という、クエリー的には鬼のような挙動なので、どう考えても高トラフィックのサイトのサービス側で使用するクエリーとしては、使えません。

アドホックにデータ抽出する際でも、件数の多いテーブルに対して当該処理を行うと処理に膨大に時間がかかります。

そのため、パフォーマンス・チューニングとしては以下のような感じになると思います。
  1. '*'指定ではなく、primary key もしくは index上のカラムを抽出対象にする
  2. 1.で取得したデータと、元のテーブルのJOINでデータを抽出する
  3. 1.で取得する件数を全件ではなく、一定の範囲に収める

すべてのデータを走査対象にするよりは、indexのみで完結するほうが走査コストが低くすむので、"where rand()" を使用する際の抽出条件はprimary keyや、index上のカラムのみにします。
たとえば、"id"というカラムがprimary keyなのであれば、以下のようにします。
mysql> select id from hoge where rand() <= .1;
explainしてみると、id指定の場合は"type=index"となり、'*'指定の場合は"type=ALL"となります。
(count句を使用した場合は例外で、primary keyが指定されているテーブルであればデータ件数はprimary keyの数を数えるだけで済むので、count(*)でも"type=index"になることがあります)


そのうえで、以下のようにJOINしてデータを取得してあげると、ナイーブにrand()を使う時よりも速くなります。
mysql> select * from hoge as a, (select id from hoge where rand() <= .1) as b where a.id = b.id;

また、subqueryの結果返されるidの数が少なければ少ないほどtemporary領域などのリソースを食いつぶす率が減りますし、JOINコストが減りますので、rand()をかけるデータの範囲を絞ってあげるのも1クエリーあたりのスループット向上と言う意味では効果があります。
mysql> select * from hoge as a, (select id from hoge where (id between 1 and 1000000) and rand() <= .1) as b where a.id = b.id; 
mysql> select * from hoge as a, (select id from hoge where (id between 1000001 and 2000000) and rand() <= .1) as b where a.id = b.id; 
・
・
・
のような形。
もう既に無作為抽出などは望むべくも無い状態になってますが…まあ、全データdumpでなく、一部のデータをMySQLにあまり負荷をかけずに取得する、という方法の紹介でした。


2013年7月2日火曜日

fluent-plugin-uniqcount を用いて Fluentd で数値集計

https://github.com/KazkiMatz/fluent-plugin-uniqcount
http://d.hatena.ne.jp/kazuk_i/20130506

ログデータをFluentdで収集し、HadoopやMongoDBなどのストレージに収集した後、クエリーを発行して必要なデータを取得する。
たいていのデータ解析では上述のような後付けのバッチ処理で十分ですが、

  • 準リアルタイムで収集をしたい
  • Fluentdでリレーするデータは、当該集計以外には使用しない
    (ストレージに格納したとしても、基本的に再利用はされない)
といったケースの場合、ストレージにデータをすべて保持するのは無駄ですし、即時性の観点でもFluentdをデータが流れる際にあわせて数値集計が行われた方が効率が良いです。

記事先頭にリンクをした「fluent-plugin-uniqcount」は、名前の通リ特定の条件に合致したデータのユニーク件数について、集計が行えるFluentdプラグインです。


◯内容
詳しくは作者の方のブログに詳細な記述があるので、こちらを参照下さい。


◯事例
ある程度URLのバリエーションが固定されているとあるWebサービスで、時間単位内でURLごとにそれぞれどの程度のリクエストがあるか集計したい、という需要がありました。
1日単位での集計で十分であればログを溜めた上でHadoop(Hive)等での集計で十分なのですが、最低でも1分単位でデータを把握したかったため当該プラグインを使用しました。
access_log -- [in_tail] --> Fluentd -- [out_uniqcount] -- [out_http] --> (API Server) --> MySQL
概ねの流れは上記の通リで、in_tailで読み取ったログ情報をURLごとに集計し、out_httpを経由して最終的にMySQLに書き込みます。

out_uniqcountの設定は、たとえば以下のような形。
<match hoge.access_log>
    type uniq_count
    list1_label hoge_trends
    list1_key1 uri
    list1_key2 referer
    list1_span 60
    list1_offset 3
    list1_out_tag hoge.uniq_referer
    list1_out_num 1000
    list1_out_interval 60
</match>
上記であれば、あるuriにアクセスされたrefererごとにFluentd上で数値が集計され、1分ごとにアクセスの多い上位1000件が"hoge.uniq_referer"タグにて出力されます。

このように、色々なルールの組み合わせでFluentd上で手軽に数値集計が可能になります。

システム的なメトリクス情報の取得であれば、uriとstatus codeの組み合せで特定URLへのステータスコード別リクエスト数、なども出せます。
また、作者ブログの事例にもあるように、urlとip addressの組み合わせで集計することで特定URLの簡易ユニークアクセス数なども集計できます。


◯注意点
すべての集計データをいったんFluentdプロセスのメモリ上に保持する形になります。なので、作者のブログにも記載されていますが、データパターンが増えるとメモリが溢れてしまう可能性があります。単位時間内に大量の組み合わせが発生するような集計処理では大量のシステムリソースを必要とするので、注意です。


機能としては非常にシンプルですし、作者の意向かFluentd Pluginsのページにも公開されてませんが、現実的な性能を出してくれますし、手軽に数値集計が行えるので、データ規模が一定の範囲で収まる用途であれば有効な手法だと思います。





2013年6月28日金曜日

cut コマンドでCSV/TSVの特定のカラム情報のみを抽出する

http://linuxjm.sourceforge.jp/html/GNU_textutils/man1/cut.1.html

CSVやTSVが手元にある時、ある特定のカラムの情報だけを抜き出したい場合、cutコマンドを使用するのが便利そうです。
$ vi hoge.csv
a,b,c
1,2,3
4,5,6
7,8,9
というCSVがあった時に、カラム"b"だけを抜き出したい場合は
$ cut -d "," -f 2 hoge.csv
b
2
5
8
という感じで抜き出せます。

"-d" はデリミタで、デフォルトはタブ区切り文字です。TSVの場合は-dは未指定でも可。
"-f"は抜き出すカラム(一番左が1)。複数抜き出す場合は"2,3"のようにカンマ区切りで複数指定すればOKです。

2013年6月25日火曜日

mac に jq(command-line JSON Prosesser) をインストールする

ここからダウンロードするのが一番ラクだということに気づいた…
http://stedolan.github.io/jq/

ダウンロード後、実行パーミッション(chmod +x )を加えればOKです。

Fluentd + out_s3 でタグごとに出力先フォルダを変更する

勿論タグごとに match エレメントを記載してベタ書きしても良いのですが、こういう際は、複数の設定を動的に行えるforest pluginが便利です。

https://github.com/tagomoris/fluent-plugin-forest

◯設定例
<match hoge.**>
    type forest
    subtype s3
    <template>
        aws_key_id [aws_key]
        aws_sec_key [secret_key]
        s3_bucket [bucket_name]
        s3_endpoint [region_name].amazonaws.com
        path fluentd_logs/${tag}/
        buffer_path /var/log/fluentd/${tag}
    </template>
</match>
データの保存先(path ならびに buffer_path)にplace holderとして${tag}を指定しています。
このように設定しておくと、"hoge.**"  のログデータについてタグごとに fluentd_logs/ 配下に保存されるようになります。

一点注意として、out_s3はfile typeのbufferを標準では使用するようになっており、いったんバッファ情報を特定フォルダ(buffer_path)に保存します。その際、buffer_path の保存先をタグごとに振り分けて置かないと、同じbufferファイルに複数タグのデータが保存され、S3保存時にも一緒くたに保存されてしまいます。


◯json内に含まれた値に応じて保存先を変更する
あらかじめ設定されたタグではなく、jsonに含まれた値に応じて保存先を変更したい場合は、rewrite_tag_filter を使用してタグを書き換える、という方法があります。
https://github.com/y-ken/fluent-plugin-rewrite-tag-filter

たとえばFluentdに流れているデータが以下のようなフォーマットになっていて、
1970-01-01T00:00:00+09:00   log.pc    {"event":"register"}
"event" に各種イベント名が含まれており、タグ名とイベント名の組み合わせごとに保存先を変更したい、という場合は以下のように設定します。
<match log.**>
    type forest
    subtype rewrite_tag_filter
    <template>
        remove_tag_prefix log
        capitalize_regex_backreference yes
        rewriterule1 event (.*) hoge.${tag}_$1
    </template>
</match>
log.pc というタグを hoge.pc_register というタグに書き換えています。
こちらと先述のout_s3の定義を組み合わせると、最終的に [bucket_name]/fluentd_logs/hoge.pc_register/ というS3のディレクトリにファイルが保存されます。


s3 の Reduced Redundancy Storage に書き込む

S3に多少のロストが許容できるようなそれほど重要でない情報を少し安く保存する、という手段として、Reduced Redundancy Storage(RSS)として書き込む、という方法があります。

以下は3年前のブログなので、改めて触れるまでも無く大多数の方はご存知でしょうが…
http://aws.typepad.com/aws/2010/05/new-amazon-s3-reduced-redundancy-storage-rrs.html

ノーマルオプション(Standard)では 99.999999999% の可用性が保証されているのに対し、RSSオプションでは 99.99% の可用性で、もし10,000ファイルをS3に置いた場合、1年の間に1つかそれ以上のファイルがロストする、という計算。ただしその分価格が33%割安、となります。
あまり長期保存しないファイルを一時的に置く、万が一ロストしても構わない、という場合RSSを選択するとかなりお得になります。


◯rubyのaws-sdkでreduced_redundancyを有効にする
http://aws.amazon.com/jp/sdkforruby/
require 'aws-sdk'

s3 = AWS::S3.new({})
bucket = s3.buckets[s3_bucket]
bucket.objects[s3path].write(Pathname.new(filepath), :reduced_redundancy => true)

◯pythonのaws-sdkでreduced_redundancyを有効にする
http://aws.amazon.com/sdkforpython/
import boto

s3 = boto.connect_s3()
bucket = s3.lookup(s3_bucket)
key = bucket.new_key(s3path)
key.set_contents_from_filename(filepath, reduced_redundancy=True)

◯Fluentdのout_s3でreduced_redundancyを有効にする。
現バージョン(2013/6/25現在)ではreduced_redundancyに対応していないので、以下のような修正を行う必要があります。
https://github.com/moaikids/fluent-plugin-s3/commit/183708f6a9a5c2eb12e8d64a9f02233c25cb3a9d

2013年6月24日月曜日

mrjob で手軽に elastic map reduce

http://pythonhosted.org/mrjob/

mrjobは、AWSのElastic MapReduceサービスを手軽に使用するためのPython実装です。
EMRのクライアントライブラリとして有名な"Amazon Elastic MapReduce Ruby Client( http://aws.amazon.com/developertools/2264 ) "と比べると出来ることは少ない印象ですが、その分シンプルに使うことができます。


◯インストール
pip install mrjob

◯jobの作成
mrjobに含まれている"MRJob" というクラスを継承することで、手軽にmapper / reducerのjob作成する事ができます。以下はmrjobのサイトにも記載されている、word countを行うサンプルのソース。
#mrjob_sample.py
from mrjob.job import MRJob


class MRWordCounter(MRJob):

    def mapper(self, key, line):
        for word in line.split():
            yield word, 1

    def reducer(self, word, occurrences):
        yield word, sum(occurrences)


if __name__ == '__main__':
    MRWordCounter.run()

◯ローカル環境で実行
ローカル環境で作成したjobの動作確認ができます。
python mrjob_sample.py test.txt > result.txt
test.txtの中身が以下のようなものだった場合
A job consists of an MRJob subclass, one or more mapper, combiner, or reducer methods, a call at the bottom to invoke mrj    ob’s logic, and all associated dependencies.
For jobs that consist only of one mapper, combiner, and reducer, in that order, you can simply override one or more of th    e mapper(), combiner(), and reducer() methods:
MapReduceの実行結果が出力されたresult.txtの中身は以下のようになります。
"A" 1
"For"   1
"MRJob" 1
"a" 1
"all"   1
"an"    1
"and"   3
"associated"    1
"at"    1
"bottom"    1
(snip.)

◯EMRの設定
EMRの環境でjobを実行するには、設定ファイルを作成する必要があります。
この中でAWSのcertificate情報や、起動するインスタンスの種類などを指定します。
以下は最低限動作させるための一例です。詳しくは公式サイトのドキュメントを参考ください。
http://pythonhosted.org/mrjob/guides/quickstart.html#configuration
#emr.conf
runners:
    emr:
        aws_access_key_id: [access_key_id]
        aws_region: [region]
        aws_secret_access_key: [secret_access_key]
        ec2_instance_type: m1.small
        num_ec2_instances: 3
        cmdenv:
        s3_log_uri: s3://[bucket_name]/emr/logs/

◯EMR環境で実行
emr環境で実行する場合は" -r emr" オプションを指定します。また、解析対象のデータのs3のパスを指定します。
python mrjob_sample.py -r emr --conf-path ./emr.conf s3://[bucket_name]/[file_path] > result.txt

◯バグ?
実は、pip installでmrjobをダウンロードした場合、上記の手順では正常にEMR上で動作させる事ができませんでした。
ローカル実行は可能ですが、EMR上で動作させる際に"returned an invalid certificate"というエラーメッセージが表示し、emrのエンドポイントへの接続が正しく行われませんでした。

同様のエラーが以下issueに報告されていますが、現在は未だ修正が行われていないようです。
https://github.com/Yelp/mrjob/issues/621

上記issueにも記載されているように、mrjobのemr.py中に記載されているEMRのendpoint URLを
    #'ap-northeast-1': 'elasticmapreduce.ap-northeast-1.amazonaws.com',
    'ap-northeast-1': 'ap-northeast-1.elasticmapreduce.amazonaws.com',
という形で書き換えてあげると、うまく動作するようになります。



mrjobはシンプルとはいえ、色々な記述方法、設定方法があり、各種管理コマンドも用意されているようなので、以降も気が向いたら記事を書いていこうと思います。




2013年6月20日木曜日

あるページがfacebookで"いいね""シェア"をされている数を取得

メモ。一番簡単なのは以下。ただし"いいね"と"シェア"の合算値なのかなんだかよく分からない"shares"という値だけが返ってくる。
curl http://graph.facebook.com/http://itsneatlife.blogspot.jp/


jetty の出力するアクセスログをカスタマイズする

かなりjetty独自の方法になっていたのでメモ。

基本的には、${JETTY_HOME}/etc/jetty-requestlog.xml を編集することで行います。
なお、以下は一般的なログ形式であるNCSAに準拠した”NCSARequestLog”を使用した例です。

  • filename : 出力するログファイル名。デフォルトでは"yyyy_mm_dd.request.log"という形で保存されdaily rotateされます。
  • filenameDateFormat : ログファイルに含める日付表現のフォーマット
  • retainDays : 何日間保存するか。デフォルトでは90日
  • extended : trueにするとcombined方式で出力されます。デフォルトはfalse(common)
  • logDateFormat : ログの日付領域の日付表現
  • logLocale : 出力ログのロケールの設定。
  • LogTimeZone : 出力ログのタイムゾーンの設定。デフォルトはGMT
他にも設定項目は多種ありますが、それらしきドキュメントに辿りつけなかったので、直接ソースやJavadocを参照するのが良いと思います。


2013年6月19日水曜日

redis2.6 + lua scripting で snowflake 風の timestamp based id を生成

https://github.com/twitter/snowflake
twitterのID生成に使用されているという噂のsnowflakeですが、こちらは導入にZooKeeperやCassandraが必要という事があり、若干敷居が高いです。

もう少しカジュアルに初められる手段として、redisと、2.6以降から機能追加されたLua Scripting機能を組み合わせて、同種のid生成処理を試してみました。


◯snowflake
snowflakeは、timestampの数値をベースとした64bit idを生成するための仕組みです。
timestamp値を基にid生成することで、以下のメリットが得られます。
  • 生成されたidから、idが生成された時刻を復元することができる。
  • 生成日時が古いidより生成日時が新しいidの方が基本的に数値が大きくなるため、idを用いた時間sortができる。

ただし、通常多くの言語処理系ではtimestampは64bitで表現されています。
また、timestamp値だけをidとして用いると、並列処理でid生成を行った際に重複が発生する可能性があります。
そのため64bitでidを表現したいのであれば、timestamp部のサイズの省サイズ化と、idとして重複を発生させない仕組みが必要です。

省サイズ化については、timestampをそのまま使うのでなく、ある一定時刻のtimestamp値(epoch)を減算することで少ないbit数で表現するようにしています。基本的に今後生成するid値は未来のtimestamp値をベースにすれば良いので、id生成を始める前の過去分の情報を削ることでサイズを節約する、というアプローチです。
実際のsnowflakeのソースでは以下のようになっています(抜粋)
val twepoch = 1288834974657L // Thu Nov 04 10:42:54 JST 2010
var timestamp = System.currentTimeMillis()
timestamp - twepoch

重複の抑制については、id情報にnode idと、各ノードが発行するsequenceという値を付与することによってある程度防いでいます。
実際のsnowfalke id のbit layoutは以下。
  • time - 41 bits (millisecond precision w/ a custom epoch gives us 69 years) 
  • configured machine id - 10 bits - gives us up to 1024 machines 
  • sequence number - 12 bits - rolls over every 4096 per machine (with protection to avoid rollover in the same ms)
unsigned な long で無くても整数値で扱える事を意図してか、実際は 63bit (41+10+12)の情報量でidの表現を行なっています。


◯redis + lua scripting でのid生成
evalコマンドによるscripting機能対応は2.6より追加された機能なので、まず2.6以降のredisをインストールする必要があります。
http://redis.io/download

なお、scripting処理はevalコマンドを用いて行えます。
http://redis.io/commands/eval

次に、redis nodeのnode id と sequence 情報を表現するための入れ物を用意します。
ここではnode idのkeyは"node_id", sequenceについては"sequence"とします。
redisを複数台用意する場合は、node_idのvalueは各redis serverごとに異なる数値を割り当てます。
$ redis-cli set sequence 1
$ redis-cli set node_id 1

続いて、id生成処理を行うlua scriptを用意します。lua内部で'sequence'はインクリメントし、自分自身のnode_idの値を取得し、timestamp - epoch の値とを組み合わせてidを生成しています。
-- id.lua
local epoch = 1288834974657
local seq = tonumber(redis.call('INCR', 'sequence')) % 4096
local node = tonumber(redis.call('GET', 'node_id')) % 1024
local time = redis.call('TIME')
local time41 = ((tonumber(time[1]) * 1000) + (tonumber(time[2]) / 1000)) - epoch
return (time41 * (2 ^ 22)) + (node * (2 ^ 12)) + seq

その上で以下のようにevalコマンドでluaスクリプトを発行すると、id値が生成されます。
$ redis-cli --eval id.lua
(integer) 347205555082385408
生成に関しては以上です。


◯運用にあたっての制約
本体のsnowflakeについても同じ事が言えますが、単一ノード内で、1ミリ秒以内に4096回以上idの生成を行うと、idが重複してしまう可能性があります。
なので、"redisノード数 × 4096回 / ms" 以上のオーダーでid生成が行われそうな場合は、逐次redisのノードを増やしていく必要があります。

また、idの重複に関しては、上記の仕組みだけでは検知できないので、生成したidを取得した側で重複チェックを行う必要があります。

また、各redisノードに設定されたnode idが万が一重複してしまうとid重複の発生確率も高くなるので、運用的に重複が起こらないようにする必要があります。

また、osの時刻が過去にずれてしまった場合、仕組み的にidの重複が起こりやすくなります。この部分についても実際は考慮が必要になると思います。

実際、一意のidを少ないビット量で表現するのにはどうしても制約がつきまとうので、運用には工夫が必要になるとは思います。
厳密にuuidを運用したい場合は、以下のRFCの記述なども参考になると思います。
http://www.ietf.org/rfc/rfc4122.txt

まあ、手軽にやる方法として、一応ご紹介いたしました、みたいな記事でした。



2013年6月18日火曜日

td-agent/fluentdのmonitor_agentを使用してbuffer使用領域の監視

fluentdの0.10.33からin_monitor_agentという標準プラグインが追加され、これによりfluentdで使用しているpluginのmetrics情報が外部から取得できるようになりました。

◯設定
<source>
  type monitor_agent
  bind 0.0.0.0
  port 24220
</source>
※参考:http://docs.fluentd.org/articles/monitoring#monitoring-agent


◯確認
上記のように設定後、以下のように24220ポート経由でAPIに接続すると、metrics情報がjson形式で取得できます。
$ curl -s http://localhost:24220/api/plugins.json | jq "."
{
  "plugins": [
    {
      "config": {
        "type": "forward"
      },
      "output_plugin": false,
      "type": "forward",
      "plugin_id": "object:3ff71e700648"
    },
    {
      "config": {
        "port": "24220",
        "type": "monitor_agent"
      },
      "output_plugin": false,
      "type": "monitor_agent",
      "plugin_id": "object:3ff71e6ffb30"
    },
(snip.)
ここで確認できるのは基本的に以下になります。

  1. 登録されているpluginの一覧
  2. 各pluginの設定情報(上記monitor_agentで言えば port:24220)
  3. buffered output pluginのmetrics

このうち、運用者としては3が特に大事になります。大量のデータを取り扱うfluentdノードでは、buffer処理をするoutput pluginでbuffer領域のqueueを使い尽くし(queue size exceeds limit)てしまうことがあります。
そちらを事前に検知して強制的にflushをしたり、使用領域サイズをグラフにプロットしたり、という事が運用上必要となり、解決方法としてはmetrics pluginを使用する、というのが良い策になります。

具体的に、out_forwardのmetrics内容を見てみると以下の様な感じになっています。
    {
      "config": {
        "type": "forward"
      },
      "retry_count": 0,
      "buffer_total_queued_size": 0,
      "buffer_queue_length": 0,
      "output_plugin": true,
      "type": "forward",
      "plugin_id": "object:3ff7206ce948"
    },
retry_count, buffer_total_queued_size, buffer_queue_lengthあたりが、ウォッチが必要な項目になりますね。


◯強制的なflush
たとえばmonitor_agentで取得したbuffer sizeの値が大きくなっていて手動でflushをしたい、といった場合、以下のようにsignalを送る事で強制flushができるようです。
pkill -USR1 -f fluentd
実際のところ、queueがあふれる理由は、トラフィックの急増やoutputの先にあるシステムの負荷高騰やキャパシティ低減が考えられるため、fluentdだけを強制的にflushしてもシステム全体のキャパシティは解決せず、解決することは少ない気もしますが、一応。


◯munin plugin
可視化側の施策の一つとして、monitor agentの値をグラフ化するmunin pluginを作成しました。
https://github.com/moaikids/munin-fluentd

muninについての説明はここでは割愛しますが、システムメトリクスのグラフ可視化ツールでは有名なものです。

上述のgithub内には以下の3種のmunin pluginが含まれています。
  • fluentd_process : プロセス数の取得
  • fluentd_resource : fluentdの使用メモリ量の取得
  • fluentd_monitor_agent : monitor_agentから取得したbuffered output pluginの各種metricsの取得

設定、使い方は、多くのmunin pluginと同様に簡単です。
  • munin ならびに munin-nodeのインストール
  • python(<= 2.6)のインストール ※munin-fluentdのプラグインはpythonで書いてるので
  • munin-nodeサーバーに、上記munin-fluentdのプラグインをインストール
  • munin-nodeの設定ファイルに設定を記述

munin-fluentdのプラグインのインストールは以下のような形で。所定のplugins置き場に、pluginのシンボリックリンクを作成します。
git clone https://github.com/moaikids/munin-fluentd.git
cp munin-fluentd/plugins/fluentd/* /usr/share/munin/plugins/
chmod +x /usr/share/munin/plugins/fluentd_*
ln -s  /usr/share/munin/plugins/fluentd_process /etc/munin/plugins/fluentd_process
ln -s  /usr/share/munin/plugins/fluentd_resource /etc/munin/plugins/fluentd_resource
ln -s  /usr/share/munin/plugins/fluentd_monitor_agent /etc/munin/plugins/fluentd_monitor_agent
また、設定ファイルに以下のような記述が必要です。
[fluentd*]
user munin
env.host localhost
env.port 24220
env.package td-agent #td-agentの場合はこちら、fluentdの場合は"fluentd"と指定
上記設定の上、munin-nodeを再起動すれば、muninのグラフに取得した数値が反映されるようになると思います。
なお、fluentd_monitor_agent では標準では三種の値(retry_count, buffer_total_queued_size, buffer_queue_length)を同一グラフで表示するようにしていますが、それぞれ数値の取りうる値の範囲に大きな差があるため正直見づらくなります。
ln -s  /usr/share/munin/plugins/fluentd_monitor_agent /etc/munin/plugins/fluentd_monitor_agent_buffer_queue_length
ln -s  /usr/share/munin/plugins/fluentd_monitor_agent /etc/munin/plugins/fluentd_monitor_agent_buffer_total_queued_size
ln -s  /usr/share/munin/plugins/fluentd_monitor_agent /etc/munin/plugins/fluentd_monitor_agent_retry_count
上記のように、シンボリックリンクのファイル名のsuffixに各種metricsの値を指定すると、その値だけをグラフ化するグラフが作成できるようになります。

グラフ化した一例は以下のような感じ。












2013年6月11日火曜日

nginxで特定URLへの接続をhttps強制にする

location /hoge {
    if($sheme = 'http'){
        rewrite ^(.*) https://$host$1 permanent;
    }
    (snip.)
}
たとえばこんな感じ。
しかしこの方法はend-to-end方式でのHTTPS接続に対応していない場合のみ有効です。
nginxの前面にロードバランサーが置いてあり、証明書をロードバランサー側に設置し、
(client) --https--> (Load Balancer)  --http--> (nginx)
のような構成になっている場合は使用出来ません。

2013年6月7日金曜日

jstackを用いてJavaのThread Dumpを取得する

kill -3 (process id)
以外の方法でも、Javaに標準で付属の"jstack"というコマンドを使用すればthread dumpの取得が行えるようです。
jstack -F (process id) > dump.log.`date +"%Y%m%d%H%M%S"` 2>&1
みたいな感じで。


AWSのpython client "boto" を用いて特定のタグのインスタンス情報だけ取得する

botoは↓
http://aws.amazon.com/jp/sdkforpython/

インストールはpipで出来ます。
pip install boto

import boto.ec2

conn = boto.ec2.connect_to_region("ap-northeast-1", aws_access_key_id=<access_key_id>,aws_secret_access_key=<secret_access_key>)
reservations = conn.get_all_instances(filters={'tag-key':'Name', 'tag-value': 'hoge'})
for reservation in reservations:
  print reservation
上記のように書くと、Nameというタグの値に'hoge'と指定されているインスタンスのみ取得します。

応用としては、fabricとの組み合わせがあります。
fabricは、pythonで作られた、複数のサーバー群に対して手軽にコマンド実行を行う事ができるツールです。

例えばec2インスタンスに”Group”といったタグを割り当て、値を設定しておき、fabric側では当該グループの情報を取得するようにしておくと、手軽に特定サーバー群へのコマンド一括実行ができますね。
こういった管理手法は、Cloud Design Patternでは”Cloud DI”と呼ぶようです。

2013年6月5日水曜日

JedisでRedis2.6のevalコマンドを使用する

JedisはJavaによるRedisクライアント実装です。
Redisは2.6よりevalコマンドによるServer Side Scriptingに対応しました。この機能は非常に便利なので使用したいのですが、現在のJedis(2013年6月5日現在)の最新版:2.1.0ではevalコマンドにバグが存在するようです。

https://github.com/xetorthio/jedis/issues/340
こちらのissueを見る限りだと2.2.0では修正予定で、master branchには既に修正が反映されているようです。
今すぐにJedisでevalコマンドを使用したい場合は、ソース一式を取得し、自分でmavenコマンドを実行し、 packageを行う必要があります。
git clone https://github.com/xetorthio/jedis.git
cd jedis
mvn package
上記の結果、targetディレクトリに"jedis-2.2.0-SNAPSHOT.jar"が作成されますので、こちらを任意の場所に置き、クラスパスを設定してあげればOKです。


なお、Jedisでのevalコマンドは以下のような感じで使用出来ます。
Jedis jedis = new Jedis("localhost");
Object ret = jedis.eval("return 'hello redis lua world.'");
System.out.println(ret == null ? "(null)" : ret.toString());
実行結果は以下。
hello redis lua world.





nginxでltsv形式のaccess.logを出力する

人間による可読性の良さと、parse処理のしやすさ、1行中の出力内容の順序にたいしてロバスト(項目の順番が変わったり追加されてもparseに失敗することは無い)という点でltsv(Labeled Tab-Separated Value)というファイルフォーマットが一時期流行りました。

もちろん一時の流行ではなく、非常に実用的な手法なので、このたびnginxの出力ログをltsv形式に変えてみました。

◯設定
naoyaさんのブログ(http://d.hatena.ne.jp/naoya/20130205/1360088927)を引用すると、以下のような形になります。
http {
    …
    log_format  ltsv  'time:$time_local\t'
                      'host:$remote_addr\t'
                      'request:$request\t'
                      'status:$status\t'
                      'size:$body_bytes_sent\t'
                      'referer:$http_referer\t'
                      'ua:$http_user_agent\t'
                      'reqtime:$request_time\t'
                      'upsttime:$upstream_response_time';

    access_log  /var/log/nginx/access.log  ltsv;   

nginxで定義されている変数を用いて、タブ区切りでフォーマットを記述していく感じですね。

個人的な環境では上記では項目が不足していたので、以下のように項目を追加しています。
  log_format  ltsv  'time:$time_local\t'
                    'msec:$msec\t'
                    'host:$remote_addr\t'
                    'forwardedfor:$http_x_forwarded_for\t'
                    'req:$request\t'
                    'method:$request_method\t'
                    'uri:$request_uri\t'
                    'status:$status\t'
                    'size:$body_bytes_sent\t'
                    'referer:$http_referer\t'
                    'ua:$http_user_agent\t'
                    'reqtime:$request_time\t'
                    'upsttime:$upstream_response_time\t'
                    'cache:$upstream_http_x_cache\t'
                    'runtime:$upstream_http_x_runtime\t'
                    'vhost:$host';

  • アクセス時間は秒単位では情報として不足しているので、milliseconds($msec)まで出力するようにした。
  • $request は "GET /hoge" のようにrequest methodとrequest pathがスペース区切りで出力されますが、後々の集計の際に一緒になっていると面倒なので、$request_methodと$request_uriをそれぞれ個別に出力するようにした。
  • reverse proxyとして動作した際に有用な情報を追加($upstream~)
あたりが追加した動機です。

なお、WEB+DB Pressの74号で「LTSVでログ活用」という記事が書かれています。こちらに、「推奨ラベル一覧」という表があるので、各項目のlabel名をどのようにするか、参考にすると良いでしょう。
http://www.amazon.co.jp/dp/4774156078


◯Fluentdで読み込み
おそらく多くの現場で、出力したログはFluentd経由で読み込み、各種metricsの可視化や、解析サーバーへの転送などを行なっていると思います。

apacheのログを読み込む際は、たとえばin_tailプラグインなどを用いて、正規表現でparseをする形になります。commonsやcombined形式であればFluentdに標準パーサーがありますが、独自の拡張が行われたログの場合は自分でparseのフォーマットを記述する必要がありました。

ltsvの場合、最新(<= 0.10.32 )のFluentdであればltsv用の標準パーサーがありますので、非常に手軽に取り込めます。
<source>
  type tail
  format ltsv
  (snip.)
</source>
出力項目が変更になってもparse処理の部分は特に気にする必要が無いので、そういう意味では楽ですね。


ltsvは若干出力サイズが冗長になると思いますが、それを補って余りあるメリットがあるので、それほど大規模でないサイトやアプリのロギング方式としては適切だと思います。



2013年6月3日月曜日

munin pluginの動作確認を行う

muninをインストールしたり、pluginを作成した際、pluginの動作確認をしたい時があります。
その際の手法を幾つか。

あらかじめ、動作確認したいpluginについては /etc/munin/plugins/ 配下に設置した上で、munin-nodeの再起動をしておく必要があります。

◯munin-run 
munin-nodeの動作する環境で、ローカルに設定されているpluginの動作確認をする場合はこちらでしょう。
$ munin-run cpu
user.value 11097897
nice.value 129809178
system.value 6680422
(snip.)
$ munin-run cpu config
graph_title CPU usage
graph_order system user nice idle iowait irq softirq
graph_args --base 1000 -r --lower-limit 0 --upper-limit 300
(snip.)

◯muninwalk
muninサーバーから、各munin-nodeがインストールされているサーバーに接続して、正しく値が収集できるか確認したいケースがあります。
その際の便利ツールとして muninwalk というperl製のツールが公開されています。
ref. http://pocketstudio.jp/log3/2012/04/13/muninwalk_and_muninget/


◯ncコマンド
muninwalkは便利なのですが、すべてのサーバーにインストールして回るのは大変なので、多くのLinux OSにインストールされているncコマンドを使用して確認するのが、個人的には楽です。
ncコマンドを実行後、コンソールで " fetch (plugin name)" と入力すると、当該pluginのメトリクスの値が取得出来ます。
$ nc munin-node 4949
# munin node at munin-node
fetch cpu
user.value 11104466
nice.value 129888815
system.value 6684441
(snip.)
munin-nodeのポートに接続出来ない場合はネットワークの設定か、接続先munin-nodeのmunin-node.confの設定に問題がある可能性があります。

pluginの値が取得出来ず"# Unknown service" という文字列が返される場合は当該pluginが正しくmunin-nodeに認識されていない可能性があります。
また"#Bad exit" と返される場合は当該pluginが正しく動作していない可能性があります。

こういう感じで、動作確認や問題の切り分けを行なっていく、という形になります。




2013年5月25日土曜日

2013年5月17日金曜日

muninの監視通知をpushoverで受信する

”muinin"と書いていますが、特にmuninに限った話ではなく、pushoverの提供しているAPIを経由して何かしらアラート的な情報をiPhone/Android端末にpush通知しましょう、という話です。


◯Pushover
https://pushover.net/

iPhoneもしくはAndroidに手軽にPushメッセージを通知する事のできるサービスです。
会員登録すると、各ユーザーに固有のtokenが割り当てられます。それを使用してAPI経由でPushoverにメッセージを送ると、tokenに紐付いたユーザーの携帯端末宛に、pushメッセージが送信されます、
なお、pushの受信には専用のアプリをダウンロードする必要があり、アプリは有料になっています。


◯Pushoverにメッセージを送る
手順としては以下。

  1. 会員登録を行う。https://pushover.net/ より
  2. 会員登録完了後、あらためて https://pushover.net/ にログインすると、画面右側にユーザーに割り当てられたtoken(user key)が表示されます。こちらを何かしらメモに控えておきます。
  3. 同画面の一番下の「You Applications」より、「Edit」をクリックし、アプリケーション登録画面に進みます。ここで「Create a New Application」をクリックし必要事項を入力します。
    ひと通り完了すると、アプリケーションごとのtoken(API Token/Key)が発行されるので、こちらを何かしらメモに控えておきます。
  4. 入手したtoken情報を使用してAPIにメッセージを送るプログラムを用意します。
    各種言語での操作方法がpushoverのページの掲載されていますが、ここでは一番手軽なshell scriptsによるものを例示します。こちらを"pushover.sh"として保存します。
    #!/bin/sh
    
    API_TOKEN="(API Token/Key)"
    USER_TOKEN="(user key)"
    MESSAGE=`< /dev/stdin`
    
    echo "send notify message : '${MESSAGE}'"
    
    curl -s -F "token=${API_TOKEN}" -F "user=${USER_TOKEN}" -F "message=${MESSAGE}" https://api.pushover.net/1/messages.json
  5. 以下の様な形で標準入力でshell scriptsに文字列を与えてあげると、その内容をpushover APIに送信します。
    echo "help me!!" | ./pushover.sh

◯muninとの連携
muninのエラーメッセージ通知先としてpushoverを使用する場合は、muninがインストールされているサーバー上に上記shellを設置した上で、"munin.conf"に以下のように記述をすればOKです。
一般的なメール通知の方法も参考程度に併記しておきます。また、"max_messages" "always_send"の設定は任意です。下記だとcriticalエラーのみpush通知されます。
contacts admin push
contact.admin.command        mail -s "Munin ${var:group}::${var:host}" alert@hoge.com
contact.admin.max_messages      1
contact.admin.always_send       warning critical
contact.push.command         /var/lib/pushover.sh
contact.push.max_messages       1
contact.push.always_send        critical


◯使用してみての感想
メール受信にありがちな配送遅延が少なく、かなり迅速にエラーが検知できるようになると思います。
pushoverではアラートの取得のみ行うという使い分けにしておけば取得漏れも極力減らせますし、通知音のカスタマイズなども行いやすいので、運用的にもメールと分離するのは便利な事が多いです。

ただ、pushoverはメッセージの保存件数に上限があり、かつ検索機能も貧弱なため、速報的な通知はpushover経由でpushで受け取り、過去の履歴についてはメールや他ツールで管理する、というやり方が望ましいのではないかと思います。

多少の制約はあれども、個人的にはいまさら「メールでのアラート受信」の時代には戻れない感があります。便利な世の中になりましたね。
ただ、pushoverはひと月あたり7,500件のメッセージまでしか送れないっぽいので(以降は10,000件追加あたり$50必要?)、送りすぎには注意。



2013年5月1日水曜日

crontabの中で"%"を使用したい場合はエスケープする必要がある

知らなかった…
The "sixth" field (the rest of the line) specifies the command to be run. The entire command portion of the line, up to a newline or % character, will be executed by /bin/sh or by the shell specified in the SHELL variable of the cronfile. Percent-signs (%) in the command, unless escaped with backslash (\), will be changed into newline characters, and all data after the first % will be sent to the command as standard input.

(ref. http://linux.die.net/man/5/crontab)

たとえば
`date --date '1 day ago' +'%Y-%m-%d'`
みたいにdateコマンドの出力書式を指定したい時などに”%"を使用しますが、crontabに記載したい場合は以下のようにエスケープしてあげる必要があります。
`date --date '1 day ago' +'\%Y-\%m-\%d'`


2013年4月23日火曜日

Tomcat / Jettyの環境変数の設定

ついついcatalina.shなど起動シェルに直書きしてしまいがちですが、そんなのが許されるの(ry
Tomcat、Jettyについて、それぞれ以下のように行うのが望ましいです。

◯Tomcat
Tomcat付属のcatalina.sh、もしくはdaemon.shには、以下のような行があります。
if [ -r "$CATALINA_BASE/bin/setenv.sh" ]; then
  . "$CATALINA_BASE/bin/setenv.sh"
elif [ -r "$CATALINA_HOME/bin/setenv.sh" ]; then
  . "$CATALINA_HOME/bin/setenv.sh"
fi
環境変数 $CATALINA_BASE、もしくは$CATALINA_HOME配下の"/bin/setenv.sh"を読み込むようになっています。
パッケージ配布されたものを展開した際にはこちらのファイルは存在しないので、所定の場所にsetenv.shを作成し、その中で環境変数の設定を行うようにしてあげれば、OKです。

◯Jetty
Jetty付属のstart.shには、以下のような行があります。
ETC=/etc
if [ $UID != 0 ]
then
  ETC=$HOME/etc
fi

for CONFIG in $ETC/default/jetty{,9} $HOME/.jettyrc; do
  if [ -f "$CONFIG" ] ; then
    readConfig "$CONFIG"
  fi
done

  • /etc/default/jetty
  • ${HOME}/.jettyrc
の順番に設定を読みにいくようです。ちなみにreadConfigはjetty.sh内に記載されている設定読み込み用の関数です。
readConfig()
{
  (( DEBUG )) && echo "Reading $1.."
  source "$1"
}
全体定義についてはetcの下に置き、ユーザー固有の設定についてはシェル実行ユーザーの{user_home}/.jettyrcに記載する、という形が良いのだと思います。


2013年4月16日火曜日

wget / ab でproxyを設定して接続を行う

メモ

wget
wget -e "http_proxy=http://proxy.server.de:8080" http://www.yahoo.co.jp/

ab
ab -n 1 -c 1 -X proxy.server.de:8080 http://www.yahoo.co.jp/



2013年4月9日火曜日

EC2のインスタンスに割り当てられているPublic IP ならびに Local IPを取得する方法

当該インスタンスにログインした上で以下。
wget -qO- http://instance-data/latest/meta-data/public-ipv4
wget -qO- http://instance-data/latest/meta-data/local-ipv4
Local IPに関しては"/sbin/ifconfig"でも確認できますね。

vagrantで立ち上げたインスタンスとSCPでやりとりする

そういえばどうするんだろう?という風に悩みましたが、以下の手順で行えます。
以下は"vagrant up"実施済みを想定しています。
vagrant ssh-config > .vagrant.ssh.config
scp -F .vagrant.ssh.config default:/hoge/fuga ./
"ssh-config"というコマンドで、vagrantで立ち上げたインスタンスに対するsshのconfig情報が取得出来ます。
Host default
  HostName ****
  User root
  Port 22
...
ホスト名はデフォルトでは"default"になっているようです。


2013年3月30日土曜日

aws : ec2で自身のインスタンスidを取得する

メモ。こんな方法なんですね。
$ wget -q -O - http://169.254.169.254/latest/meta-data/instance-id


2013年3月29日金曜日

vagrant-awsを使ってAWSインスタンを立ち上げる

Virtual Boxの便利なクライアントラッパーとして有名だったvagrantが、version1.1以降ではAWSやRackspaceなど有名なクラウド環境もサポートするようになりました。

AWSについては、vagrant-aws というプラグインを導入することで、vagrantからaws instanceの立ち上げ〜廃棄を行うことができます。

◯install
vagrantについては以下のサイトなどから適当に(1.1以上必須)。
http://www.vagrantup.com/

vagrant-awsについては以下のコマンドで追加できます。
$ vagrant plugin install vagrant-aws

$ vagrant plugin list
vagrant-aws (0.1.2)
◯init
$ vagrant init
まず"vagrant init"によってVagrantfileのひな形を作成します。
ここではawsのインスタンスを立ち上げる事を目標としているので、Vagrantfileをaws向けの設定に書き換えます。
vgrant-awsのREADMEを引用すると以下のような書式になります。
Vagrant.configure("2") do |config|
  config.vm.box = "dummy"

  config.vm.provider :aws do |aws|
    aws.access_key_id = "YOUR KEY"
    aws.secret_access_key = "YOUR SECRET KEY"
    aws.keypair_name = "KEYPAIR NAME"
    aws.ssh_private_key_path = "PATH TO YOUR PRIVATE KEY"

    aws.ami = "ami-7747d01e"
    aws.ssh_username = "ubuntu"
  end
end
まず 、"Vagrant.configure("2")"はおまじないみたいなもので、vagrantの1.1を使用する、という宣言です。
続いて、config.vm.boxで使用するboxを指定します。awsの場合、ec2側で登録されているamiを使用する形になるので、本来はこの設定は不要です。ただ、vagrant的には値の設定が必要なようなので、dummyのboxを指定します。
なおdummyのboxはvagrant-awsのgit上に用意されているので、あらかじめvagrantにaddしておきましょう。
vagrant box add dummy https://github.com/mitchellh/vagrant-aws/raw/master/dummy.box   
aws provider周りの設定については、それぞれの項目に応じて設定します。あらかじめaccess keyやsecret access keyなどの値を控えておきましょう。
詳細な項目についてはvagrant-awsのREADMEの”Configuration"の項を参照ください。
https://github.com/mitchellh/vagrant-aws

なお、security group についてはssh portが空いているものを設定する必要があります。sshが接続できないとvagrantからコントロールすることが出来ず、インスタンスだけ立ち上がるけれどvagrant側はエラーになりdestroyできなくなる、という状況に陥ります。
※ 設定で嵌りそうな点についてはこの辺のブログなどを参照してください。
http://d.hatena.ne.jp/naoya/20130315/1363340698

◯operation
ひと通り設定が終わったら、vagrant upを実行します。provider情報として"aws"を指定する必要があります。
$ vagrant up --provider=aws
ec2のインスタンスが立ち上がったら、Virtual Boxの時と同じようにssh接続することができます。
$ vagrant ssh
使用が終わったら、destroyをすると立ち上げたインスタンスがterminateされます。
$ vagrant destroy
なお、私が試した環境では、"vagrant halt"が上手く動作しませんでした。イメージ的にはインスタンスがstopされて欲しいのですが…

◯所感
vagrant経由で簡単にawsをハンドリングする事ができるので、手軽にec2のインスタンスを立ち上げ、アプリケーションや構成管理ツール(chefなど)の動作確認を行うには非常に便利なツールだと思います。
ただ、ユースケースとしては、あくまで検証向きだと思います。本番環境の複数台のサーバーに対してvagrantで管理する、というイメージがあまり持てないので、検証領域での使用に留めるのが妥当かなと思っています。






2013年3月27日水曜日

jenkinsでロール管理を行う

jenkinsはユーザー単位で権限管理もできますが、ロール単位でも管理ができます。
ただし、標準機能では出来ないので、"Role Strategy Plugin"を追加する必要があります。

プラグインを追加した後の手順は、以下ブログが詳細で分かりやすいです。
http://d.hatena.ne.jp/tyuki39/20110111/1294759823

githubにブロックされずにssh接続を行う

githubに22番ポートで接続をしていたら、あるタイミングで接続できなくなりました。
ssh: connect to host github.com port 22: Connection refused
githubのhelpを読んでいると”Using SSH over the HTTPS port”という項があり、HTTPSのポートを使用してSSHを接続することが推奨されています。
https://help.github.com/articles/using-ssh-over-the-https-port

ここに記載されているように、"/.ssh/config"に以下のような記述を追加すれば、ブロックされること無くSSH接続ができるようになります。
Host github.com
  Hostname ssh.github.com
  Port 443



64bit CentOS環境のjenkinsでAndroidのテストを実行する

jenkinsには"Android Emulator Plugin"が存在するので、こちらを利用すればAndroidアプリのテストをjenkinsの継続インテグレーションに組み込むことができます。

手順については、@ITの以下記事が良くまとまっているので、ここでは引用まで。
http://www.atmarkit.co.jp/fsmart/articles/androidtest06/01.html

ただ、手元の環境がCentOS6.3だったのですが、その環境でemulatorを動作させようとするとlibstdc++が見つからないという事でエラーになりました。
android create avd -f -a -c 64M -s WVGA800 -n hudson_ja-JP_160_WVGA_android-8 -t android-8
Auto-selecting single ABI armeabi
Android 2.2 is a basic Android platform.
Do you wish to create a custom hardware profile [no]
Error: /home/jenkins/.jenkins/tools/android-sdk/tools/mksdcard: error while loading shared libraries: libstdc++.so.6: cannot open shared object file: No such file or directory
Error: Failed to create the SD card.
Error: Failed to create sdcard in the AVD folder.
どうも以下のブログ記事などを見る限りだと、Linux系向けのAndroid SDKは32bitアプリケーションのため、64bit OS環境で動作させるためにはいくつかライブラリをインストールする必要があります。
http://iexcel.wordpress.com/2010/01/25/ia32-libs-for-android-sdk-on-fedora-12-x86_64/

上記ブログはFedora向けの説明ですが、概ねCentOSでも同じで大丈夫です。私は以下のライブラリについて"yum install xxxx"して、ひと通りjenkinsでテストが動くようになることを確認しました。
glibc.i686 
glibc-devel.i686 
libstdc++.i686 
zlib-devel.i686
ncurses-devel.i686 
libX11-devel.i686 
libXrender.i686 
libXrandr.i686



mavenと認証設定済みのnexusの連携

認証設定をしたnexusサーバーにpom.xml経由でjarの取得、もしくは登録を行いたい場合、~/.m2/settings.xml に以下のような設定を記載する必要があります。
 <settings xmlns="http://maven.apache.org/SETTINGS/1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLoc    ation="http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd">
     <servers>
         <server>
             <id>hoge</id>
             <username>fuga</username>
             <password>password</password>
         </server>
</settings>
idの部分に、pom.xmlでserver情報を一意に特定するためのサービスID(任意)を設定します。usernameならびにpasswordはnexusで使用するID・passwordです。

pom.xmlで管理しているプロジェクトのjarを"mvn deploy"でnexusに登録したい場合は、pom.xmlに以下のような記述を記載します。
 <distributionManagement>
      <repository>
           <id>hoge</id>
           <url>http://nexus.hoge.jp/content/repositories/releases</url>
      </repository>
 </distributionManagement>
上記pom.xmlのid"hoge"と、settings.xml中のid"hoge"は対応しています。ですので上記URLに接続する際、認証が求められた場合はsettings.xmlに記載した認証情報を元に接続をすることになります。

nexus上のrepositoryを取得したい場合は以下です。
 <repositories>
    <repository>
       <id>hoge</id>
       <url>http://nexus.hoge.jp/nexus/content/repositories/releases</url>
    </repository>
 </repositories>

ローカルPCの開発環境で上記設定を行う場合は、ログインしているユーザーの~/.m2/settings.xml を編集すればOKです。
jenkinsなどサーバー環境で設定が必要な場合は、たとえばjenkinsの場合はjenkins serverの実行ユーザーのsettings.xmlを、サーバー上で直接編集してあげる必要があります。

また、ID・パスワード認証以外の方法...たとえば公開鍵認証方式...でサーバーに接続する場合は、以下のmanualの内容を参考にしてください。
http://maven.apache.org/settings.html#Servers