Cassandraでのデータのread (get) の実装 - Consistency LevelがONEの場合

Consistency LevelがONEの場合の、read部分の実装をまとめました。

依然書いたSEDA部分の内容が理解できていれば、読むのはあまり難しくないです。逆に、そこが理解できてないと、さっぱり処理手順がわからないので、事前に以下のエントリを読むことをおすすめします。
http://dann.g.hatena.ne.jp/dann/20100307/p1
http://dann.g.hatena.ne.jp/dann/20100307/p2

処理概要

  • read requestをCassandra cluster中の一つのノードに送る。
    • ノードは次のように決定する。Cassandraのnodeのclusterを表すStorageProxyは、ReplicationStrategyにしたがって、キーに対応するデータを保持するN個のノードのエンドポイントを取得して、proximityに従ってsortした後、その中の生きているノードを一つ返す
  • read処理実行
    • ターゲットのノードからデータ取得
  • read repairをbackgroundで実行(別スレッドで)
    • ノードが最新の値を保持するようにtimestampが古い値を返すノードに対して、write requestを送信して値を更新

データの取得処理概要

Read処理の流れは、以下のような感じ。

CassandraServer -> StorageProxy -> ReadVerbHandler(1) -> ResponseVerbHandler

Read処理中に、Repair処理も、以下のような流れで実行される。

ReadVerbHandler(1) -> StorageService -> ConsistencyManager -> ReadVerbHandler(2 Digest計算) -> ResponseHandler (callback指定されているDigestResponseHandler実行) -> DigestResponseHandler

以下、処理の手順と内容を書いていきます。

CassandraServer
  • get
  • multigetInternal
  • SliceByNamesReadCommandを作成して、readColumnFamily
  • rows = StorageProxy.readProtocol(commands, consistency_level);
StorageProxy

readProtocolメソッドは、consistency levelによって動作が変わる。

consistency_levelがConsistencyLevel.ONEの場合

  • weakRemoteRead
  • Endpoint選択
    • StorageServiceにcommand.table, command.keyを与えて、適切なエンドポイントを選択
      • Cassandraのnodeのclusterを表すStorageProxyは、ReplicationStrategyにしたがって、キーに対応するデータを保持するN個のノードのエンドポイントを取得して、proximityに従ってsortした後、その中の生きているノードを一つ返す
  • ReadMessage送信(非同期)
  • ReadMessageをReadCommand群から生成して、MessagingServiceでendpointにメッセージ生成。
    • StageManager.READ_STAGEが指定されているので、このメッセージはReadVerbHandlerで処理される
      • メッセージヘッダにはRepairを指定している

consistency_levelがConsistencyLevel.ONE以外の場合

  • strongRead
ReadVerbHandler (ReadMessageを処理するHandler)
  • ReadMessageをでデシリアライズして、ReadCommandを生成
  • readCommandのテーブル名でtable openして、SliceByNamesReadCommand#getRow(table)
    • レスポンス用のメッセージを作成して、メッセージの送信元にMessagingServiceでメッセージ送信(非同期)
      • reply用メッセージは、StageManager.RESPONSE_STAGEのStageで処理される。)
  • リペア処理実行
    • メッセージヘッダにDO_REPAIRが設定されているので、リペア処理を開始
    • 与えられたキーをストアしているN個の生きているEndpointを取得して、
    • StorageServiceインスタンスで取得したエンドポイントのConsistencyCheckを行う
    • JMXEnabledThreadPoolExecutorでConsistencyManagerスレッドを実行し、consistencyチェックを開始
    • ReadCommand(digest用)作って、DigestMessageをMessagingServiceで送信
      • ただし、DigestResponseHandlerを指定して送信
      • public String sendRR(Message message, InetAddress to, IAsyncCallback cb)
    • ReadVerbHandlerが実行されて、digest用の処理の時には、ColumnFamily.digest(row.cf)でCFのdigest値を計算
    • digest値をreadResponseを作って返す
  • ResponseHandlerがresponseを処理
    • DigestResponseHandlerがcallbackに指定されているので、readResponseがDigetResponseHandlerのcallback(responseメソッド)で処理される
    • ColumnFamilyのdigest値がdigest値と違ったら、DigestResponseHandler がdoReadRepairメソッドでread repair処理を開始(長くなるので、この先は別エントリで)
ResponseVerbHandler
  • ReadVerbHandlerで送信されたreply用メッセージは、StageManager.RESPONSE_STAGEのStageで処理されるので、ResponseVerbHandlerが実行される。
  • ResponseVerbHandlerでは、AsyncResultにmessageを設定してレスポンスを返す
StorageProxy
  • weakReadRemote
    • 戻ってきたレスポンスをrowに追加していって、rowを返す