2013年12月10日火曜日

cdh-twitter-exampleで遊ぶ

この記事はHadoop Advent Calendar 2013, 13日目のエントリです。ブログなんざ書いたことないので勝手が良く分かりませんが@sudabonさん@kernel023さんのお前も何か書けやコラという圧力に負けてBloggerにページを用意してみました。

Hadoopの利用目的としてデータの分析に取り組んでいる、もしくはこれから取り組もうとしているユーザは多いのではないかと思います。中でもログ分析と並んでソーシャルメディアの分析は各社が積極的に取り組もうとしているテーマであり、私が勝手にTwitter分析3部作と呼んでいる以下のブログを読まれた方も多いのではないでしょうか:
  1. http://blog.cloudera.com/blog/2012/09/analyzing-twitter-data-with-hadoop/
  2. http://blog.cloudera.com/blog/2012/10/analyzing-twitter-data-with-hadoop-part-2-gathering-data-with-flume/
  3. http://blog.cloudera.com/blog/2012/11/analyzing-twitter-data-with-hadoop-part-3-querying-semi-structured-data-with-hive/
まだ読まれていない方のためにざっくりと説明しますと、Flumeを利用してTwitterからデータをぶっこ抜いてきてHDFSに流し込んでやれば、あとはHiveにJSON SerDeを組み込むだけでSQLでTwitterを分析する準備ができちゃいます、ということが紹介されてます。



前置きが長くなりましたがこのエントリでは上の図のTwitter分析用のデータパイプラインに細工してデータを流し込む先としてHBaseを追加してみたり、HBaseに書き込まれたデータについてインデックスを作成して全文検索が出来るようにする方法を紹介してみたいと思います。せっかちな方のためにどんな技術トピックが関係するかを先に列記しておくとFlume Interceptor, Flume flow multiplexer, Flume HBaseSink, Lily HBase NRT Indexer Service, Cloudera Search (SolrCloud)といったものになります。ここから先はTwitter分析3部作のブログは読んでいる、もしくは同等の内容を知っているものとして話を進めます。Twitter分析3部作のオリジナルのコードはGithubの以下のリポジトリに置かれています:
これを勝手にforkしていじくったものをGithubの以下のリポジトリに置いています。興味がある人はのぞいてみてください。



InterceptorでFlume eventをネコババする

先に見たデータパイプラインにある「カスタム作成のFlume Source」の正体はcom.cloudera.flume.source.TwitterSourceクラスです。このクラスは設定ファイルで指定されたキーワードで検索したtweetをJSONフォーマットで出力してきます。1イベントについて1つのJSONレコードが含まれています。以下は例です:

{"filter_level":"medium","contributors":null,"text":"In the past, our team at Bing Ads has applied big data insight to ad copy strategy in the travel and financial servi\u2026http://t.co/A3A5xRJE8b","geo":null,"retweeted":false,"in_reply_to_screen_name":null,"possibly_sensitive":false,"truncated":false,"lang":"en","entities":{"symbols":[],"urls":[{"expanded_url":"http://lnkd.in/bipZAX3","indices":[117,139],"display_url":"lnkd.in/bipZAX3","url":"http://t.co/A3A5xRJE8b"}],"hashtags":[],"user_mentions":[]},"in_reply_to_status_id_str":null,"id":390880848465375232,"source":"<a href=\"http://www.linkedin.com/\" rel=\"nofollow\">LinkedIn<\/a>","in_reply_to_user_id_str":null,"favorited":false,"in_reply_to_status_id":null,"retweet_count":0,"created_at":"Thu Oct 17 16:43:51 +0000 2013","in_reply_to_user_id":null,"favorite_count":0,"id_str":"390880848465375232","place":null,"user":{"location":"Raleigh, NC","default_profile":true,"statuses_count":3010,"profile_background_tile":false,"lang":"en","profile_link_color":"0084B4","id":474950294,"following":null,"favourites_count":0,"protected":false,"profile_text_color":"333333","description":null,"verified":false,"contributors_enabled":false,"profile_sidebar_border_color":"C0DEED","name":"Dan Damato","profile_background_color":"C0DEED","created_at":"Thu Jan 26 14:35:17 +0000 2012","default_profile_image":false,"followers_count":139,"profile_image_url_https":"https://si0.twimg.com/profile_images/1785237335/images_normal.jpg","geo_enabled":false,"profile_background_image_url":"http://abs.twimg.com/images/themes/theme1/bg.png","profile_background_image_url_https":"https://abs.twimg.com/images/themes/theme1/bg.png","follow_request_sent":null,"url":null,"utc_offset":null,"time_zone":null,"notifications":null,"profile_use_background_image":true,"friends_count":389,"profile_sidebar_fill_color":"DDEEF6","screen_name":"onlinesalesnut","id_str":"474950294","profile_image_url":"http://a0.twimg.com/profile_images/1785237335/images_normal.jpg","listed_count":1,"is_translator":false},"coordinates":null}

JSONレコード1つをHBaseの1カラムに格納してもいいのですが、ここではtweetの中から取り敢えず興味のある
  1. id : tweetを一意に識別する識別子(だと予想)
  2. text : tweetの本文
  3. user.screen_name : twitterユーザのアカウント(例:kmizumar)
  4. user.name : twitterユーザのフルネーム(例:Kiyoshi Mizumaru)

だけを抽出してそれぞれHBaseのtid, text, screen, userというカラムに格納することにします。 これを実現するためにはJSONレコードをパースして興味のある名前と値のペアだけを抜き出すという処理が必要になりますが、このためにTwitterSourceクラスを改造することは得策ではありません。Flumeには任意のSourceにくっつけてそのSourceへ入力されてくるイベントを横取りして好きなようにイベントに細工を施すことのできるInterceptorが用意されてますのでこれを使います。TwitterSourceにくっつけて上記の処理を行わせるために作成したものがcom.cloudera.flume.interceptor.TwitterInterceptorクラスです:


private byte[] readJsonStream(InputStream)メソッド

JSONレコードをパースして”id”, “text”, “user.name”, “user.screen_name”に該当する値だけを抜き出します。抜き出した値を使用して

____ID____:<idの値>____TEXT____:<textの値>____USER____:<user.nameの値>____SCREEN____:<user.screen_nameの値>

という内容を持つbyte[]を組み立てて返します。パーサーはcom.google.gson.stream.JsonReaderクラスを利用しています。 


pubic Event intercept(Event)メソッド

イベントをひとつ受け取ったら
  1. 元のイベントのヘッダをコピーしたものに SINKTYPE → HBASE というデータを追加
  2. 上記ヘッダとreadJsonStreamから戻ってきた内容をボディに持つ新しいイベントを作成
して返します。ここではイベントのボディを完全に別の物にすり替えています。


public List<Event> intercept(List<Event>)メソッド

イベントのリストを受け取ります。リストに含まれているイベントそれぞれについて
  1. 元のイベントのヘッダに SINKTYPE → HDFS というデータを追加する
  2. Event intercept(Event)メソッドを呼び出して SINKTYPE → HBASE の新しいイベントを取得する
という処理を行いHDFS用とHBase用の2つのイベントを作成してリストに入れて返します。入力されてきたイベントのリストはHDFS用とHBase用のイベントを含む倍の長さのイベントのリストになって出力されることになります。



仕方がないのでTwitterSourceを改造する

ここまでで作成したTwitterInterceptorを使うことで、TwitterSourceに流れてくるイベントを横取りしてHDFS向けに従来通りのJSONレコードを保持するevent、HBASE向けにJSONレコードから欲しいところだけ抜き出した内容を保持するevent、の2つのeventを吐き出すことができるようになるはずです。はずです、と書いたのには理由があって実はオリジナルのTwitterSourceクラスの実装では期待した結果が得られません。これはTwitterSourceクラスの中で使用しているStatusListenerクラスのインスタンスがJSONレコードをラップするイベントオブジェクトを作成したら、イベントオブジェクト1つを引数にして後段のチャネルに流しているのが原因です。これではTwitterInterceptorクラスのEvent intercept(Event)メソッドしか呼ばれないためイベントを捨てるか(nullを返す)、イベントを改変して返すか、そのまま返すか、しかできることがありません。現在のTwitterInterceptorの実装ではすべてのイベントがHBase用に置き換えられて出てくるだけで、いずれにしてもこのままでは今回やろうとしている「1つのイベントを受け取って2つのイベントを返す」ということはできないのでチャネルに流す前にイベントオブジェクトをList<Event>に突っ込んでチャネルにはリストを流すように変更してしまいます。もっと上手い方法はありそうですがQuick and Dirty Hackでここは乗り切ることにします。



diff --git a/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java b/flume-sources/src/main/java/com/
index 53c2f60..0305702 100644
--- a/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java
+++ b/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java
@@ -18,7 +18,9 @@
 
 package com.cloudera.flume.source;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.flume.Context;
@@ -110,8 +112,8 @@ public class TwitterSource extends AbstractSource
         headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
         Event event = EventBuilder.withBody(
             DataObjectFactory.getRawJSON(status).getBytes(), headers);
-        
-        channel.processEvent(event);
+       List<event> events = Arrays.asList(event);
+        channel.processEventBatch(events);
       }
       
       // This listener will ignore everything except for new tweets


Flow multiplexerでFlume eventを撃ち分ける

Flumeのデータフローを組み立てます。登場するのはsourceとしてTwitterからデータを入力するものが1つ、sinkとしてHDFSへ流し込むのとHBaseへ流し込むものが2つ、そしてそれぞれのsinkに対応するchannelが2つです。flume.confからポイントとなる部分だけ抜粋します。TwitterAgentという名前でエージェントを作成しています。

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel MemChannel2
TwitterAgent.sinks = HDFS HBASE

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel MemChannel2

続いてこのTwitterAgentにインターセプターを設定します。インターセプターの型を指定する*.typeプロパティにはインターセプターのクラス名ではなくインターセプターのビルダークラスを指定しなければならないのに気付くまでかなり時間をムダに費やしたのは秘密です。

TwitterAgent.sources.Twitter.interceptors = TwitterInterceptor
TwitterAgent.sources.Twitter.interceptors.TwitterInterceptor.type = com.cloudera.flume.interceptor.TwitterInterceptor$Builder

これでTwitterSourceが取得してきたイベントはTwitterInterceptorを経由してからチャネルに流れ出るようになります。どのイベントをどのチャネルに流し込むかを選択するにはsourceにマルチプレクサを設定して、イベントのどのヘッダを見て、その内容に応じてどのチャネルに流し込んで欲しいかを指定します。ここではSINKTYPEヘッダの内容を見てそれが"HDFS"だったらMemChannelへ、"HBASE"だったらMemChannel2へ流し込むようにしています。いまのTwitterInterceptorの実装ではSINKTYPEがHDFSかHBASEのどちらかしか出力してきませんが、将来的に他のタイプのものを出力するようにしたときにHBase側に流れても正しく扱えないのでデフォルトをHDFS側のMemChannelに倒しています(HDFSなら任意のJSONレコードをそのまま格納できるので)。

TwitterAgent.sources.Twitter.selector.type = multiplexing
TwitterAgent.sources.Twitter.selector.header = SINKTYPE
TwitterAgent.sources.Twitter.selector.mapping.HDFS = MemChannel
TwitterAgent.sources.Twitter.selector.mapping.HBASE = MemChannel2
TwitterAgent.sources.Twitter.selector.default = MemChannel


HBaseSinkでHBaseにデータを流し込む

MemChannel側の設定はTwitter分析3部作のブログと変わらずHdfsSinkに接続してHDFSに書き込んでいるだけなので説明は省略します。MemChannel2側のチャネルにはHBaseSinkを接続してデータをHBaseに流し込むことにします。

TwitterAgent.sinks.HBASE.channel = MemChannel2
TwitterAgent.sinks.HBASE.type = hbase
TwitterAgent.sinks.HBASE.table = tweets_hbase
TwitterAgent.sinks.HBASE.columnFamily = cf1
TwitterAgent.sinks.HBASE.batchSize = 1000
TwitterAgent.sinks.HBASE.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
TwitterAgent.sinks.HBASE.serializer.regex = ^____ID____:(.*)____TEXT____:(.*)____USER____:(.*)____SCREEN____:(.*)$
TwitterAgent.sinks.HBASE.serializer.colNames = tid,text,user,screen

ここでは次のことを指定しています:
  1. 書き込む先のHBaseテーブルの名前をtweets_hbase
  2. カラムファミリ名をcf1
  3. RegexHbaseEventSerializerを使ってイベントボディからデータを取り出す(ここ正しくはデシリアライザじゃないのかなぁ... 名前)
  4. データを取り出すための正規表現パターン
  5. マッチしたデータをHBaseテーブルのどのカラムに書き込むか
ここまでの内容に合わせてHBaseにテーブルを作成しておきます。


% hbase shell
hbase(main):001:0> create 'tweets_hbase', 'cf1'

ここまでの構成が問題なくできていればFlume agentを立ち上げることでTwitterからデータの取り込みが開始されてHDFS, HBaseの双方にデータが書き込まれて行きます。画面はHueのHBaseブラウザでtweets_hbaseテーブルを表示してみたところです。指定したcf1:screen, cf1:tid, cf1:text, cf1:userカラムにそれぞれ値が格納されている様子がわかります:


Lily HBase NRT Indexer Serviceでインデックス作成

そろそろ飽きてきたころだと思いますがここまでくればもう少しです。まずはインデックスを作成したいHBaseのテーブルでレプリケーションが有効になっていることが要求されますのでこれを片付けておきます:

% hbase shell
hbase(main):001:0> disable 'tweets_hbase'
hbase(main):002:0> alter 'tweets_hbase', {NAME => 'cf1', REPLICATION_SCOPE => 1}
hbase(main):003:0> enable 'tweets_hbase'

立て続けにmorphlines.confを設定します。後ほど作成するSolrコレクション hbase-collection1に対してインデックスを登録していくこと、HBaseテーブルのどのカラムを持ってきてSolrスキーマのどのフィールドに割り当てるか、を指定しています。この例ではcf1:textカラムの値がSolrスキーマのtextフィールドとして出力されることになります。


SOLR_LOCATOR : {
  # Name of solr collection
  collection : hbase-collection1

  # ZooKeeper ensemble
  zkHost : "$ZK_HOST" 
}


morphlines : [
  {
    id : morphline
    importCommands : ["com.cloudera.**", "com.ngdata.**"]

    commands : [                    
      {
        extractHBaseCells {
          mappings : [
            {
              inputColumn : "cf1:tid"
              outputField : "tid" 
              type : string 
              source : value
            }
            {
              inputColumn : "cf1:text"
              outputField : "text" 
              type : string 
              source : value
            }
            {
              inputColumn : "cf1:user"
              outputField : "user" 
              type : string 
              source : value
            }
            {
              inputColumn : "cf1:screen"
              outputField : "screen" 
              type : string 
              source : value
            }
          ]
        }
      }

      { logDebug { format : "output record: {}", args : ["@{}"] } }
    ]
  }
]

Solrのコレクションを作ります。スキーマ定義は上記のmorphlines.confに合わせて書き換えたものをGithubに置いてますのでこれを利用してSolrCloudにインスタンスディレクトリの作成とコレクションの作成を行います。下の例ではコレクションの作成時にshardの数を4と指定していますが利用しているSolrCloudのノード数に合わせて適宜変更する必要があるので注意してください。Solrスキーマの詳細について知りたいときはWebサイトや専門の書籍を参照してください。深遠なる検索システムの世界が貴方を待っています。

% solrctl instancedir --create hbase-collection1 hbase-collection1
% solrctl collection --create hbase-collection1 -s 4

最後にインデクサの設定ファイルを作成します。この設定ファイルではインデクサがどのHBaseテーブルを読みに行って、どのmorphlines.confの設定に従って動作するべきなのかを指定します。Githubにmorphline-hbase-mapper.xmlという名前で置いてあるサンプルの内容を以下に示します。ここでtweets_hbaseテーブルをインデックスの作成対象として指示しています。パラメータmorphlineFileの値がフルパスで指定されていないのはCloudera Managerの管理下にあるSolrCloudを利用しているときの設定の仕方になります。

<?xml version="1.0"?>
<indexer table="tweets_hbase" mapper="com.ngdata.hbaseindexer.morphline.MorphlineResultToSolrMapper">
   <!-- The relative or absolute path on the local file system to the morphline configuration file. -->
   <!-- Use relative path "morphlines.conf" for morphlines managed by Cloudera Manager -->
   <param name="morphlineFile" value="morphlines.conf"/>
   <!--
   <param name="morphlineFile" value="/etc/hbase-solr/conf/morphlines.conf"/>
   -->
   <!-- The optional morphlineId identifies a morphline if there are multiple morphlines in morphlines.conf -->
   <!-- <param name="morphlineId" value="morphline1"/> -->
</indexer>

準備がすべて整ったところでインデクサを登録します。動作しているLily HBase NRT Indexer Serviceに対して設定ファイルの登録を行い、正しく登録されていることを確認できたらすべての作業は終了です。登録に用いるhbase-indexer add-indexerコマンドの実行例を以下に示します。ZooKeeperアンサンブルの指定は利用している環境に合わせて適宜変更してください。

% hbase-indexer add-indexer -n myindexer \
  --indexer-conf morphline-hbase-mapper.xml \
  -cp solr.zk=nn1.demo.dev,nn2.demo.dev,ms3.demo.dev/solr \
  -cp solr.collection=hbase-collection1 -z nn1.demo.dev

インデクサの一覧を見るにはhbase-indexer list-indexersコマンドを使用します。設定ファイルに間違いがあるときは一番下に表示されるProcessesの項目に実行中のプロセスが1個もなくてshard数分のプロセスがfailしているという状態になりますので注意して見てください:

% hbase-indexer list-indexers -z nn1.demo.dev
Number of indexes: 1

myindexer
  + Lifecycle state: ACTIVE
  + Incremental indexing state: SUBSCRIBE_AND_CONSUME
  + Batch indexing state: INACTIVE
  + SEP subscription ID: Indexer_myindexer
  + SEP subscription timestamp: 2013-12-03T03:38:40.206+09:00
  + Connection type: solr
  + Connection params:
    + solr.collection = hbase-collection1
    + solr.zk = nn1.demo.dev,nn2.demo.dev,ms3.demo.dev/solr
  + Indexer config:
      645 bytes, use -dump to see content
  + Batch index config:
      (none)
  + Default batch index config:
      (none)
  + Processes
    + 4 running processes
    + 0 failed processes

CDH4.4 + Cloudera Manager 4.7の組み合わせではCloudera SearchとCloudera Managerの統合がそんなに進んでいないのでSolrCloudの構成状況やドキュメントの登録状況を確認するためにはSolr Admin Console画面を参照します。CollectionのStatistics画面で時間が経つにつれて登録されているドキュメントの数(= tweets_hbaseテーブルに格納されたtweetの数)がどんどん増えて行く様子が見て取れます。またHueのSearch AppでCollection Managerからhbase-collection1を登録して適当なキーワードを投げたりして遊んでみてください。



おわりに

駆け足でTwitterSourceからデータを横取りしてHBaseに放り込み全文検索用のインデックスを張るところまでを眺めてみました。Flumeからしかデータを書き込まないのであればFlume Morphline Solr Sinkを利用してインデックス作成とデータ書き込みを同時にやってのけるという芸当も実際には可能です。興味のある人は是非挑戦してみてください。

Hadoop Advent Calendar 2013, 14日目はs-wool@githubさんです。おたのしみに。