Cassandraでのデータのread (get) の実装 - Consistency LevelがONEの場合
Consistency LevelがONEの場合の、read部分の実装をまとめました。
依然書いたSEDA部分の内容が理解できていれば、読むのはあまり難しくないです。逆に、そこが理解できてないと、さっぱり処理手順がわからないので、事前に以下のエントリを読むことをおすすめします。
http://dann.g.hatena.ne.jp/dann/20100307/p1
http://dann.g.hatena.ne.jp/dann/20100307/p2
処理概要
- read requestをCassandra cluster中の一つのノードに送る。
- ノードは次のように決定する。Cassandraのnodeのclusterを表すStorageProxyは、ReplicationStrategyにしたがって、キーに対応するデータを保持するN個のノードのエンドポイントを取得して、proximityに従ってsortした後、その中の生きているノードを一つ返す
- read処理実行
- ターゲットのノードからデータ取得
- read repairをbackgroundで実行(別スレッドで)
- ノードが最新の値を保持するようにtimestampが古い値を返すノードに対して、write requestを送信して値を更新
データの取得処理概要
Read処理の流れは、以下のような感じ。
CassandraServer -> StorageProxy -> ReadVerbHandler(1) -> ResponseVerbHandler
Read処理中に、Repair処理も、以下のような流れで実行される。
ReadVerbHandler(1) -> StorageService -> ConsistencyManager -> ReadVerbHandler(2 Digest計算) -> ResponseHandler (callback指定されているDigestResponseHandler実行) -> DigestResponseHandler
以下、処理の手順と内容を書いていきます。
CassandraServer
- get
- multigetInternal
- SliceByNamesReadCommandを作成して、readColumnFamily
- rows = StorageProxy.readProtocol(commands, consistency_level);
StorageProxy
readProtocolメソッドは、consistency levelによって動作が変わる。
consistency_levelがConsistencyLevel.ONEの場合
- weakRemoteRead
- Endpoint選択
- StorageServiceにcommand.table, command.keyを与えて、適切なエンドポイントを選択
- Cassandraのnodeのclusterを表すStorageProxyは、ReplicationStrategyにしたがって、キーに対応するデータを保持するN個のノードのエンドポイントを取得して、proximityに従ってsortした後、その中の生きているノードを一つ返す
- StorageServiceにcommand.table, command.keyを与えて、適切なエンドポイントを選択
- ReadMessage送信(非同期)
- ReadMessageをReadCommand群から生成して、MessagingServiceでendpointにメッセージ生成。
- StageManager.READ_STAGEが指定されているので、このメッセージはReadVerbHandlerで処理される
- メッセージヘッダにはRepairを指定している
- StageManager.READ_STAGEが指定されているので、このメッセージはReadVerbHandlerで処理される
consistency_levelがConsistencyLevel.ONE以外の場合
- strongRead
ReadVerbHandler (ReadMessageを処理するHandler)
- ReadMessageをでデシリアライズして、ReadCommandを生成
- ここでデシリアライズされるのは、SliceByNamesReadCommand
- readCommandのテーブル名でtable openして、SliceByNamesReadCommand#getRow(table)
- レスポンス用のメッセージを作成して、メッセージの送信元にMessagingServiceでメッセージ送信(非同期)
- reply用メッセージは、StageManager.RESPONSE_STAGEのStageで処理される。)
- レスポンス用のメッセージを作成して、メッセージの送信元にMessagingServiceでメッセージ送信(非同期)
- リペア処理実行
- メッセージヘッダにDO_REPAIRが設定されているので、リペア処理を開始
- 与えられたキーをストアしているN個の生きているEndpointを取得して、
- StorageServiceインスタンスで取得したエンドポイントのConsistencyCheckを行う
- JMXEnabledThreadPoolExecutorでConsistencyManagerスレッドを実行し、consistencyチェックを開始
- ReadCommand(digest用)作って、DigestMessageをMessagingServiceで送信
- ただし、DigestResponseHandlerを指定して送信
- public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
- ReadVerbHandlerが実行されて、digest用の処理の時には、ColumnFamily.digest(row.cf)でCFのdigest値を計算
- digest値をreadResponseを作って返す
- ResponseHandlerがresponseを処理
- DigestResponseHandlerがcallbackに指定されているので、readResponseがDigetResponseHandlerのcallback(responseメソッド)で処理される
- ColumnFamilyのdigest値がdigest値と違ったら、DigestResponseHandler がdoReadRepairメソッドでread repair処理を開始(長くなるので、この先は別エントリで)
ResponseVerbHandler
- ReadVerbHandlerで送信されたreply用メッセージは、StageManager.RESPONSE_STAGEのStageで処理されるので、ResponseVerbHandlerが実行される。
- ResponseVerbHandlerでは、AsyncResultにmessageを設定してレスポンスを返す
StorageProxy
- weakReadRemote
- 戻ってきたレスポンスをrowに追加していって、rowを返す