<hadoop Pig>pig使ってみた

今回はhadoop pigを使って情報の抽出を行ってみました。
文献が少ないので大したことをしていないのですが書き残しておこうかなと思います。

目次

  1. 今回すること
  2. hadoop pigとは
  3. どのような流れか
  4. 実際のコード . その他

1. 今回すること

今回は気象庁から提供される台風情報のCSVデータ(偶然見つけた)をもとにして、各台風について気圧が最低になったときの情報を抽出します。 元データの例

2. hadoop Pigとは

実際のコードを書く前にhadoop Pigがどのようなものかを簡単に書いておきます。
分散処理を行う際に用いられるhadoopというフレームワーク上で動くスクリプト言語がpigです。ベースはJavaらしいです。 Apacheが提供しているみたいです。 Apache Pig

3.どのような流れか

データ読み込み

元になるCSVファイルをすべて読み込みます。 今回はdata/input/以下に複数の対象データを保存しています。

読み込みデータをソート

読み込まれたデータは日時等の順番がばらばらになっているので年月日時でソート処理を行います。

ソートしたデータのグループ化

ソートしたデータを台風それぞれでグループ化します。各台風での情報の比較がしやすい状態にしていきます。

各台風が最低気圧だった時の情報を抽出

これが今回作成するプログラムの肝になっています。グループ化したそれぞれの台風について、気圧が一番低くなっている時の情報(勢力が一番強くなっている)を切り出すようにします。

古いデータの削除

あんまり気にすることもないとは思うんですが、Pigの仕様として古い出力データが残っているとエラー起こすっていう問題があります。1回しか実行しないなら特にいらないんですが、デバッグとかで回数実行するときは古いデータの削除を行う必要があります。

処理結果の出力

処理結果をファイル出力します。今回は更に加工しようかなと思っていたのでCSVに近い形式で出力します。各行についてカンマ区切りでデータを書き出しました。

4.実際のコード

実際のコードも上と同じ順番で軽く書いていきます。

データの読み込み

データを読み込むときにはLOADメゾッドを使います

-- データを読み込む typhoon_input = LOAD 'data/input/'USING PigStorage( ',' ) AS (year:int, month:int, day:int, time:int,num:int, name:chararray,level:int, lat:double, long:double,power:double );

USING PigStorage(',')を使うことで、データの区切り文字を指定できます。 AS以降には各データの形式を指定しています。ASは必須ではないのですが、あったほうが後々扱いやすくなるのではないかなと思います。
ホントは経度をlongと書きたかったのですが予約語になっているので諦めます。logにしました。

読み込みデータをソート

データのソートをするときにはORDER BYを用います。

--データを月、日、時でソート
order_data = ORDER typhoon_input BY year,month,day,time ;

ORDER 対象 BY キーで並べ替えができます。このときのキーは複数でも扱うことができます。 今回は複数キーを同時に扱っています。

ソートしたデータをグループ化

ソートしたデータを各台風単位でグループにします。

-- ソートしたデータを名称ごとグループ化 typhoon_data_all = Group order_data BY num;

GROUP 対象 BY キーでグループ化できます。

各台風が最低気圧だった時の情報を抽出

グループ化された各データの中でも気圧が最低の時の情報を抽出します。

-- 一番気圧が低下した時のデータ、各台風のデータ数を記録 hpower_data = FOREACH typhoon_data_all {sorted = ORDER order_data BY power; mdata = LIMIT sorted 1; GENERATE group , FLATTEN(mdata.power) AS power ,FLATTEN(mdata.lat) AS lat , FLATTEN(mdata.log) AS log , COUNT(order_data),FLATTEN(mdata.year) AS year,FLATTEN(mdata.month) AS month,FLATTEN(mdata.day) AS day,FLATTEN(mdata.time) AS time;};

FOREACHを使って対象の各グループについて実行しています。 各グループについて気圧を基準にして並べ替えをしました。 そのデータの先頭にあるデータが気圧が最低になった時のデータになっているのでそのデータをFLATTENというグループ化の解除をしながら呼び出します。 また、各台風のデータ数を確認するためにCOUNTを用いています。

古いデータの削除

Pig内でコマンドが使えるらしいです。 rmを用いることも可能なのですが、フォルダ系の削除に対応したrmfを用いることで出力の古いデータの確実な削除をしています。

-- outputデータを削除 rmf data/output;

データの出力

データをファイルに出力します。 そのときにはSTOREを用いることで出力が可能になります。

-- 取得したデータをoutput ;STORE hpower_data INTO 'data/output' USING PigStorage(',');

そうするとこんな感じで出力されます。

f:id:poolbooyer:20190125141828p:plain
出力結果のサンプル

その他

今回の参考文献はこちら

pig.apache.org

次のエントリでこの出力結果を加工するお(すでにしてるんだけどエントリに書く時間がなさげ)