Map Reduce: Simplified Data Processing on Large Clustersのメモ


前回宣言した通りMapReduceの勉強を始めました。
MapReduceについてまず読まなければいけないのはGoogleが出した論文。

http://labs.google.com/papers/mapreduce-osdi04.pdf

確か2007年ぐらいにも読んだのですがまったく覚えていないので今回は気になるところだけメモを取ることにしました。

1.Introduction
2.Programmming Model
2.1 Example

ワードカウント疑似コードはこんな感じ

map(String key, String value): 
// key: document name 
// value: document contents for each word w in value:
	EmitIntermediate(w, "1");

reduce(String key, Iterator values): 
	// key: a word 
	// values: a list of counts 
	int result = 0;
	for each v in values: 
		result += ParseInt(v);
	Emit(AsString(result));

rubyにあるmap/reduceメソッド使って書くとこんな感じになります。

documents = {:one => "this is a pen", :two => "this is also a pen", :three => "this is a banana"}
intermediate = []
documents.map do |key, words|
 words.split.each do |w|
  intermediate << {w.to_sym => 1}
 end
end
irb(main):095:0> intermediate
=> [{:this=>1}, {:is=>1}, {:a=>1}, {:pen=>1}, {:this=>1}, {:is=>1}, {:also=>1}, {:a=>1}, {:pen=>1}, {:this=>1}, {:is=>1}, {:a=>1}, {:banana=>1}]

intermediate.reduce(0) do |result, current |
  result = result + 1
end
=> 13

疑似コードの違いとしては「 reduceの第一引数が疑似コードはkeyをとっているけれどrubyだとresultになる」といった点だけでしょうか。
疑似コードのmapもreduceも第一引数のkeyは実際には使用していませんが、ひょっとしたらワードカウントに使わないだけかもしれません。

2.3 More Examples

ワードカウント以外に使える例としていかがあげられています。

  • grep
  • アクセスカウンター = mapが「⟨URL,1⟩ 」のリストを作りreduceが「⟨URL, total(count))⟩」のリストを作る
  • Web Link のグラフ情報 = mapが「⟨target, source⟩ 」のリストを作りreduceが「⟨target, list(source)⟩」のリストを作る
  • Term Vector (頻出語解析方法の一つ?) = mapが「ドキュメントごとの⟨hostname, term vector⟩ 」のリストを作りreduceが全ドキュメントの「⟨ostname, term vector⟩」を作る
  • Inverted Index = mapが「⟨word, document ID) のリストを作りreduceが⟨word,list(document ID))のリストを作る
  • sort

[TODO]その他の例もRubyのmapとreduceで可能か試してみる

3. Implementation

3.1 Execution Overview

以上のような例を普通に1プロセスでやってもしょうがないので、いかに分散させ、並列的に実行させるかが鍵になってくる。
master-workerモデルになっていて、masterプロセスが各worker間への仕事の実行を割り振る。
実際にはmapの時点で16M 〜64Mに区切り各workerにわりふる。reduceも複数に分けることにより並列実行させるようにする。

[QUESTION]map,reduceはワーカーに割り振ることで並列化可能だけれどmasterの仕事は並列化しないのだろうか?もししないとしたらそれはボトルネックにはならないのだろうか?

3.3 Fault Tolerance

Workerが失敗したときはMasterが他のをたちあげる。Masterがこけることは(数がすくないので)あまりないから、こけた時にはエラーを出し、その場でエラーを発見する。

[QUESTION]「Semantics in the Presence of Failures」というのがよくわからなかった

3.4 Locality

MapReduceはたくさんのマシーンを使って並列実行できるのがメリットだが、ネットワーク越しにデータをやりとりするのはやはりリソースの無駄遣い。
GoogleはGFSというファイルシステムをつかい、一つのデータに対して3つのコピーが存在する。Masterはどこにどのデータがあるかを管理していてなるべくmap処理に使うデータは同一マシンに存在するものをつかうようにする。

3.5 Task Granularity

MapをするMタスクとReduceをするRタスクの合計は実際のWorkerよりはるかに多いとWorker間でロードバランスできて良い。
ただタスクの量を細分化しすぎるのも問題。Masterプロセスがスケジューリングを決める際のコストはO(M + R)の割合で増大するから。

(最適なMとRの数を算出する方法ものっていたけど今は無視)

3.6 Backup Tasks

MapReduceの際にはかならず「straggler」と呼ばれる通常よりはるかに時間のかかるタスクが出てきたりする。理由は様々だがそういうのに足をとられないために、MapReduceのおわる少し手前ぐらいから、残りのタスクをすべて別のワーカーにてもう一回実行しておく(バックアップタスク)。最初に実行されたタスク(プライマリタスク)かバックアップタスクのいずれかが終了した時点でそのタスクに「完了マークを」つけておく。

これによりStagglerによって全タスクを終了させる時間が遅れるのをある程度防ぐことができる。

4. Refinements
4.1 Partitioning
4.2 Ordering
4.3 Combiner
ワードカウントなどの例の場合、mapファンクションに膨大な重複が生じることが多々ある

  • (例:{:hello => 1, :hello =>1, :hello => 1, :hello => 1, :bye => 1})

そういう場合には「Combiner」というオプショナルな機能を使ってユーザーが少しデータをマージできるようにもする(

  • 例:{:hello =>2, :hello => 2, :bye => 1})

4.4 Input and Output Types

MapReduceにデータを入れる時は、別にファイルをそのまま渡すのではなく、"reader"メソッドのようなものでユーザーが自分の好きなフォーマットを指定できるようにする。たとえばkey=>valueのペア(keyはvalueの文書内での位置情報)の配列であったり、それをKeyであらかじめソートしたりとか。

4.5 Side-Effects

Atomic Two Phase CommitはMapReduce側では提供しないのでアプリ側でなんとかせよとのこと

4.6 Skipping Bad Records

4.7 Local Execution

何台にも分散された状況でデバックするのは困難だから、デバッグ用に全ての実行を(並列ではなく)逐一ローカルマシンで実行できるMapReduceの別実装も提供してある。

4.8 Status Information
MapReduceの実行状況を監視できるよう、MasterはステータスをHTTPではきだすようにしている。

4.9 Counters

これもデバッグ用。疑似コードはこんなかんじ

Counter* uppercase; 
uppercase = GetCounter("uppercase");
map(String name, String contents): 
	for each word w in contents:
		if (IsCapitalized(w)): 
			uppercase->Increment(); 
		EmitIntermediate(w, "1");

5. Performance

1800台のマシーンを用いて1テラバイトのデータをGrepしたりSortした結果の解説。Input, Shuffle, Outputのどの時点でどれくらい時間がかかっているとか、ワーカーを200プロセスKillした場合の影響などについていろいろ比較してある。

5.1 Cluster Configuration
5.2 Grep
5.3 Sort
5.4 Effect of Backup Failure
5.5 Machine Failures
6 Experience

2003年からGoogleないのいろいろなところで使われている例の紹介。並列性や障害対策などをMapReduceの中に隠蔽することにより、プログラマはビジネスコードを書くことに集中でき3800行あったC++ のコードが700行まで減った例も紹介されている。

6.1 Large Scale Indexing

7. Related Work

既存の他の理論とMapReduceとの比較

8, Conclusions

MapReduceは使いやすく、いろんなことに応用が利き、スケーラブル。

習った教訓としては

  • MapReduceというフレームワーク)プログラミングモデルを制限することで並列化が用意になった
  • 高速化の鍵は局所性などを利用していかにネットワークごしにデータをやりとりするのを減らすかにかかる
  • タスクの冗長化によって一部マシンのパフォーマンス低下や障害の影響を防ぐことがか

読んでみての感想

  • MapReduceの基本概念は簡単。でも並列化の鍵はWorker のスケジューリングにかかってくるけど、この文章だけでは自分で実装までは難しそう
  • 使用例としてワードカウントは有名だったけど、ソートにも有効だとは思っていなかった。
  • 結局「並列化でスピードアップ」が売りなMapReduceだけど、やはりネットワーク越しのデータのやりとりは少ないほどスピードが上がるのは、意外なようで常識的な気がした

次に読む予定のもの(たぶん)