Bringing up a Kafka-based Ordering Service¶
Caveat emptor¶
このドキュメントは、読者がKafkaクラスタとZooKeeperアンサンブルをセットアップして、 一般的な利用のために不正アクセスを防いで安全に維持する方法を知っていることを仮定しています。 このガイドの目的は、Kafkaクラスタを使用してHyperledger Fabricのオーダリングサービスノード(OSN)を構築し、 ブロックチェーンネットワークにオーダリングサービスを提供するために必要なステップを明らかにすることです。
ネットワークやトランザクションフローにおけるOrdererの役割については、 The Ordering Service ドキュメントを参照してください。
オーダリングノードのセットアップ方法については、 orderer_deploy ドキュメントを参照してください。
Raft オーダリングサービスの設定については、 Configuring and operating a Raft ordering service を参照してください。
Big picture¶
各チャネルはKafkaで個別のシングルパーティショントピックにマッピングされます。
OSNが Broadcast RPC経由でトランザクションを受信すると、
ブロードキャストするクライアントがチャネルへの書き込み権限を持っていることを確認し、
Kafkaの適切なパーティションにトランザクションをリレー(つまり生成)します。
このパーティションはOSNでも使用され、受信したトランザクションをローカルでブロックにグループ化し、
ローカル台帳に保存し、 Deliver RPCを経由して受信するクライアントに提供されます。
低レベルの詳細については、
the document that describes how we came to this design
を参照してください。
図8 は、上記のプロセスを模式的に表したものです。
Steps¶
K と Z は、それぞれKafkaクラスタとZooKeeperアンサンブルのノード数を示すとします。
- 最低でも
Kを4に設定する必要があります。 (以下のステップ4で説明するように、これはクラッシュ故障耐性を示すために必要な最小ノード数です。 つまり、4つのブローカーがあれば、1つのブローカーがダウンしても、 すべてのチャネルは書き込みと読み込みが可能で、新しいチャネルを作成できます)。 Zは3、5、7のいずれかになります。 これはスプリットブレインシナリオを避けるため、奇数にする必要があります。 また、単一障害点を避けるため、1より大きい数でなければなりません。 7台以上のZooKeeperサーバーは過剰とみなされます。
それから、次のように進めます。:
Orderers: Kafkaに関連する情報をネットワークのジェネシスブロックにエンコードします。
configtxgenを使用している場合、configtx.yamlを編集します。 または、システムチャネルのジェネシスブロックでプリセットプロファイルを選択します。Orderer.OrdererTypeをkafkaに設定します。Orderer.Kafka.Brokersには、クラスタ内の 少なくとも2つの KafkaブローカーのアドレスをIP:port形式で指定します。 このリストは完全である必要はありません。(これらはブートストラップ・ブローカーです。)
Orderers: 最大ブロックサイズを設定します。 各ブロックは最大で
Orderer.AbsoluteMaxBytesバイト(ヘッダーを除く)であり、 この値はconfigtx.yamlで設定できます。 ここで選んだ値をAとし、メモしておいてください。 これは、ステップ6でのKafkaブローカーの設定に影響します。Orderers: ジェネシスブロックを作成します。
configtxgenを使用します。 上記のステップ3と4で選んだ設定は、システム全体の設定です。 つまり、すべてのOSNに対するネットワーク全体に適用されます。 ジェネシスブロックの場所をメモしておきます。Kafka cluster: Kafkaブローカーを適切に設定します。 すべてのKafkaブローカーに以下の項目が設定されていることを確認します。
unclean.leader.election.enable = false— ブロックチェーン環境では、データの一貫性が重要です。 同期しているレプリカセットの外部からチャネルリーダーを選ぶことはできません。 さもないと、前のリーダーが作成したオフセットを上書きするリスクがあります。 その結果、Ordererが作成するブロックチェーンを書き換えてしまいます。min.insync.replicas = M—Mが1 < M < Nとなる値を指定します。 (以下のdefault.replication.factorを参照してください。) データは、少なくともMのレプリカに書き込まれた時点でコミットされたとみなされます。 (このとき、これらのレプリカは同期中とみなされ、同期中レプリカセット(ISR)に属します。) それ以外の場合、書き込み操作はエラーを返します。 それから:- チャネルデータの書き込み先である
Nのうち、最大でN-Mのレプリカが利用できなくなった場合まで、運用は正常に続行されます。 - さらに多くのレプリカが利用できなくなると、Kafka は
MのISRセットを維持できなくなり、書き込みの受付を停止します。 読み込みは問題なく行えます。Mのレプリカが同期すると、チャネルは再び書き込み可能になります。
- チャネルデータの書き込み先である
default.replication.factor = N—NがN < Kとなる値を指定します。 レプリケーションファクターがNの場合、各チャネルはN個のブローカーにデータを複製します。 これらはチャネルのISRセットの候補です。 上記のmin.insync.replicas sectionで述べたように、すべてのブローカーが常に利用可能である必要はありません。NはKよりも 小さく 設定する必要があります。 なぜなら、ブローカーがNよりも少ない場合は、チャネルの作成を続行できないからです。 つまり、N = Kと設定すると、ブローカーが1つでもダウンすると、ブロックチェーンネットワーク上に新しいチャネルを作成できなくなります。 — オーダリングサービスのクラッシュ故障耐性が無くなります。ここまで説明した内容から、
MとNの最小許容値はそれぞれ2と3になります。 この設定により、新しいチャネルの作成が進み、すべてのチャネルが書き込み可能な状態を維持します。message.max.bytesとreplica.fetch.max.bytesには、 上記のステップ4でOrderer.AbsoluteMaxBytesに設定したAより大きな値を設定してください。 ヘッダを考慮したバッファを追加してください --- 1 MiBあれば十分です。 次のような条件があります。Orderer.AbsoluteMaxBytes < replica.fetch.max.bytes <= message.max.bytes
(完全を期すために、
message.max.bytesはsocket.request.max.bytesよりも必ず小さくなるように注意してください。 デフォルトでは100MiBに設定されています。 もし100MiBより大きなブロックを持ちたい場合は、fabric/orderer/kafka/config.goのbrokerConfig.Producer.MaxMessageBytesにハードコードされている値を編集し、 バイナリをソースからリビルドする必要があります。 これは推奨されません。)log.retention.ms = -1。オーダリングサービスがKafkaログのプルーニングをサポートするまでは、 時間ベースの保持を無効にして、セグメントが期限切れになることを防ぎます。 (サイズベースの保持 —log.retention.bytesを参照 — は、 執筆時点ではKafkaのデフォルトで無効になっているので、明示的に設定する必要はありません。)
Orderers: それぞれのOSNをジェネシスブロックに指定します。
orderer.yamlのGeneral.BootstrapFileを編集し、上記のステップ5で作成したジェネシスブロックを指定します。 ついでに、そのYAMLファイルの他のすべてのキーが適切に設定されていることを確認します。Orderers: ポーリング間隔とタイムアウトを調整します。 (オプションのステップ)
orderer.yamlファイルのKafka.Retryセクションでは、 メタデータ/プロデューサー/コンシューマーの要求頻度や、ソケットのタイムアウトを調整できます (これらはすべて、Kafka プロデューサーまたはコンシューマーで見られる設定です。)- なお、新たなチャネルが作成された時、および、既存チャネルがリロードされた時
(再起動したばかりのOrdererの場合)、Ordererは以下の方法でKafkaクラスタとやり取りします。
- チャネルに対応するKafkaパーティションに対して、Kafkaプロデューサー(ライター)を作成します。
そのプロデューサーを使用して、パーティションに
CONNECTメッセージをポストします。 そのパーティションに対して、Kafkaコンシューマー(リーダー)を作成します。 - これらのステップのいずれかが失敗した場合、繰り返す頻度を調整できます。
具体的には、
Kafka.Retry.ShortIntervalごとに再試行し、合計でKafka.Retry.ShortTotalとなります。 そして、成功するまで、Kafka.Retry.LongIntervalごとに、合計Kafka.Retry.LongTotalとなります。 上記のすべてのステップが正常に完了するまで、Ordererがチャネルへの読み書きをできなくなることに注意してください。
- チャネルに対応するKafkaパーティションに対して、Kafkaプロデューサー(ライター)を作成します。
そのプロデューサーを使用して、パーティションに
OSNとKafkaクラスタがSSLで通信するようにセットアップします。 (オプションのステップですが、強くお勧めします。) Kafkaクラスタ側については、 the Confluent guide を参照し、 各OSNの
orderer.yaml内のKafka.TLS配下のキーを設定します。以下の順番でノードを立ち上げます: ZooKeeperアンサンブル、Kafkaクラスタ、オーダリングサービスノード。
Additional considerations¶
- 好ましいメッセージサイズ
上記のステップ4(Steps セクション参照)では、
Orderer.Batchsize.PreferredMaxBytesキーを設定することで、ブロックサイズを好みで設定できます。 Kafkaは比較的小さなメッセージを扱うときに高いスループットを発揮します。 1MiB以下の値を心掛けてください。 - 環境変数による設定の上書き
Fabricに付属するKafkaとZookeeperのサンプルDockerイメージ
(それぞれ
images/kafkaとimages/zookeeperを参照) を使用すると、 KafkaブローカーとZooKeeperサーバーの設定を環境変数で上書きできます。 設定キーのドットをアンダースコアに置き換えます。 例えば、KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=falseとすると、 デフォルト値のunclean.leader.election.enableを上書きできます。 同じことが、OSNの ローカル 設定、つまりorderer.yamlで設定できる内容にも当てはまります。 例えば、ORDER_KAFKA_RETRY_SHORTINTERVAL=1sとすると、Orderer.Kafka.Retry.ShortIntervalのデフォルト値を上書きできます。
Kafka Protocol Version Compatibility¶
Fabricは sarama client library を使用し、 Kafka 0.10 から 1.0 をサポートするバージョンをベンダリングしていますが、 古いバージョンでも動作することが確認されています。
orderer.yaml の Kafka.Version キーを使用すると、
Kafkaクラスタのブローカーとの通信に使用するKafkaプロトコルのバージョンを設定できます。
Kafkaブローカーは古いプロトコルのバージョンと下位互換性があります。
Kafkaブローカーは古いプロトコルのバージョンと下位互換性があるため、
Kafkaブローカーを新しいバージョンにアップグレードしても、
Kafka.Version キーの値を更新する必要はありませんが、
古いプロトコルのバージョンを使用していると、Kafkaクラスタは
performance penalty を被ることがあります。
Debugging¶
環境変数 FABRIC_LOGGING_SPEC に DEBUG を設定し、 orderer.yaml で Kafka.Verbose に true を設定してください。