Bringing up a Kafka-based Ordering Service¶
Caveat emptor¶
このドキュメントは、読者がKafkaクラスタとZooKeeperアンサンブルをセットアップして、 一般的な利用のために不正アクセスを防いで安全に維持する方法を知っていることを仮定しています。 このガイドの目的は、Kafkaクラスタを使用してHyperledger Fabricのオーダリングサービスノード(OSN)を構築し、 ブロックチェーンネットワークにオーダリングサービスを提供するために必要なステップを明らかにすることです。
ネットワークやトランザクションフローにおけるOrdererの役割については、 The Ordering Service ドキュメントを参照してください。
オーダリングノードのセットアップ方法については、 Setting up an ordering node ドキュメントを参照してください。
Raft オーダリングサービスの設定については、 Configuring and operating a Raft ordering service を参照してください。
Big picture¶
各チャネルはKafkaで個別のシングルパーティショントピックにマッピングされます。
OSNが Broadcast
RPC経由でトランザクションを受信すると、
ブロードキャストするクライアントがチャネルへの書き込み権限を持っていることを確認し、
Kafkaの適切なパーティションにトランザクションをリレー(つまり生成)します。
このパーティションはOSNでも使用され、受信したトランザクションをローカルでブロックにグループ化し、
ローカル台帳に保存し、 Deliver
RPCを経由して受信するクライアントに提供されます。
低レベルの詳細については、
the document that describes how we came to this design
を参照してください。
図8 は、上記のプロセスを模式的に表したものです。
Steps¶
K
と Z
は、それぞれKafkaクラスタとZooKeeperアンサンブルのノード数を示すとします。
- 最低でも
K
を4に設定する必要があります。 (以下のステップ4で説明するように、これはクラッシュ故障耐性を示すために必要な最小ノード数です。 つまり、4つのブローカーがあれば、1つのブローカーがダウンしても、 すべてのチャネルは書き込みと読み込みが可能で、新しいチャネルを作成できます)。 Z
は3、5、7のいずれかになります。 これはスプリットブレインシナリオを避けるため、奇数にする必要があります。 また、単一障害点を避けるため、1より大きい数でなければなりません。 7台以上のZooKeeperサーバーは過剰とみなされます。
それから、次のように進めます。:
Orderers: Kafkaに関連する情報をネットワークのジェネシスブロックにエンコードします。
configtxgen
を使用している場合、configtx.yaml
を編集します。 または、システムチャネルのジェネシスブロックでプリセットプロファイルを選択します。Orderer.OrdererType
をkafka
に設定します。Orderer.Kafka.Brokers
には、クラスタ内の 少なくとも2つの KafkaブローカーのアドレスをIP:port
形式で指定します。 このリストは完全である必要はありません。(これらはブートストラップ・ブローカーです。)
Orderers: 最大ブロックサイズを設定します。 各ブロックは最大で
Orderer.AbsoluteMaxBytes
バイト(ヘッダーを除く)であり、 この値はconfigtx.yaml
で設定できます。 ここで選んだ値をA
とし、メモしておいてください。 これは、ステップ6でのKafkaブローカーの設定に影響します。Orderers: ジェネシスブロックを作成します。
configtxgen
を使用します。 上記のステップ3と4で選んだ設定は、システム全体の設定です。 つまり、すべてのOSNに対するネットワーク全体に適用されます。 ジェネシスブロックの場所をメモしておきます。Kafka cluster: Kafkaブローカーを適切に設定します。 すべてのKafkaブローカーに以下の項目が設定されていることを確認します。
unclean.leader.election.enable = false
— ブロックチェーン環境では、データの一貫性が重要です。 同期しているレプリカセットの外部からチャネルリーダーを選ぶことはできません。 さもないと、前のリーダーが作成したオフセットを上書きするリスクがあります。 その結果、Ordererが作成するブロックチェーンを書き換えてしまいます。min.insync.replicas = M
—M
が1 < M < N
となる値を指定します。 (以下のdefault.replication.factor
を参照してください。) データは、少なくともM
のレプリカに書き込まれた時点でコミットされたとみなされます。 (このとき、これらのレプリカは同期中とみなされ、同期中レプリカセット(ISR)に属します。) それ以外の場合、書き込み操作はエラーを返します。 それから:- チャネルデータの書き込み先である
N
のうち、最大でN-M
のレプリカが利用できなくなった場合まで、運用は正常に続行されます。 - さらに多くのレプリカが利用できなくなると、Kafka は
M
のISRセットを維持できなくなり、書き込みの受付を停止します。 読み込みは問題なく行えます。M
のレプリカが同期すると、チャネルは再び書き込み可能になります。
- チャネルデータの書き込み先である
default.replication.factor = N
—N
がN < K
となる値を指定します。 レプリケーションファクターがN
の場合、各チャネルはN
個のブローカーにデータを複製します。 これらはチャネルのISRセットの候補です。 上記のmin.insync.replicas section
で述べたように、すべてのブローカーが常に利用可能である必要はありません。N
はK
よりも 小さく 設定する必要があります。 なぜなら、ブローカーがN
よりも少ない場合は、チャネルの作成を続行できないからです。 つまり、N = K
と設定すると、ブローカーが1つでもダウンすると、ブロックチェーンネットワーク上に新しいチャネルを作成できなくなります。 — オーダリングサービスのクラッシュ故障耐性が無くなります。ここまで説明した内容から、
M
とN
の最小許容値はそれぞれ2と3になります。 この設定により、新しいチャネルの作成が進み、すべてのチャネルが書き込み可能な状態を維持します。message.max.bytes
とreplica.fetch.max.bytes
には、 上記のステップ4でOrderer.AbsoluteMaxBytes
に設定したA
より大きな値を設定してください。 ヘッダを考慮したバッファを追加してください --- 1 MiBあれば十分です。 次のような条件があります。Orderer.AbsoluteMaxBytes < replica.fetch.max.bytes <= message.max.bytes
(完全を期すために、
message.max.bytes
はsocket.request.max.bytes
よりも必ず小さくなるように注意してください。 デフォルトでは100MiBに設定されています。 もし100MiBより大きなブロックを持ちたい場合は、fabric/orderer/kafka/config.go
のbrokerConfig.Producer.MaxMessageBytes
にハードコードされている値を編集し、 バイナリをソースからリビルドする必要があります。 これは推奨されません。)log.retention.ms = -1
。オーダリングサービスがKafkaログのプルーニングをサポートするまでは、 時間ベースの保持を無効にして、セグメントが期限切れになることを防ぎます。 (サイズベースの保持 —log.retention.bytes
を参照 — は、 執筆時点ではKafkaのデフォルトで無効になっているので、明示的に設定する必要はありません。)
Orderers: それぞれのOSNをジェネシスブロックに指定します。
orderer.yaml
のGeneral.BootstrapFile
を編集し、上記のステップ5で作成したジェネシスブロックを指定します。 ついでに、そのYAMLファイルの他のすべてのキーが適切に設定されていることを確認します。Orderers: ポーリング間隔とタイムアウトを調整します。 (オプションのステップ)
orderer.yaml
ファイルのKafka.Retry
セクションでは、 メタデータ/プロデューサー/コンシューマーの要求頻度や、ソケットのタイムアウトを調整できます (これらはすべて、Kafka プロデューサーまたはコンシューマーで見られる設定です。)- なお、新たなチャネルが作成された時、および、既存チャネルがリロードされた時
(再起動したばかりのOrdererの場合)、Ordererは以下の方法でKafkaクラスタとやり取りします。
- チャネルに対応するKafkaパーティションに対して、Kafkaプロデューサー(ライター)を作成します。
そのプロデューサーを使用して、パーティションに
CONNECT
メッセージをポストします。 そのパーティションに対して、Kafkaコンシューマー(リーダー)を作成します。 - これらのステップのいずれかが失敗した場合、繰り返す頻度を調整できます。
具体的には、
Kafka.Retry.ShortInterval
ごとに再試行し、合計でKafka.Retry.ShortTotal
となります。 そして、成功するまで、Kafka.Retry.LongInterval
ごとに、合計Kafka.Retry.LongTotal
となります。 上記のすべてのステップが正常に完了するまで、Ordererがチャネルへの読み書きをできなくなることに注意してください。
- チャネルに対応するKafkaパーティションに対して、Kafkaプロデューサー(ライター)を作成します。
そのプロデューサーを使用して、パーティションに
OSNとKafkaクラスタがSSLで通信するようにセットアップします。 (オプションのステップですが、強くお勧めします。) Kafkaクラスタ側については、 the Confluent guide を参照し、 各OSNの
orderer.yaml
内のKafka.TLS
配下のキーを設定します。以下の順番でノードを立ち上げます: ZooKeeperアンサンブル、Kafkaクラスタ、オーダリングサービスノード。
Additional considerations¶
- 好ましいメッセージサイズ
上記のステップ4(Steps セクション参照)では、
Orderer.Batchsize.PreferredMaxBytes
キーを設定することで、ブロックサイズを好みで設定できます。 Kafkaは比較的小さなメッセージを扱うときに高いスループットを発揮します。 1MiB以下の値を心掛けてください。 - 環境変数による設定の上書き
Fabricに付属するKafkaとZookeeperのサンプルDockerイメージ
(それぞれ
images/kafka
とimages/zookeeper
を参照) を使用すると、 KafkaブローカーとZooKeeperサーバーの設定を環境変数で上書きできます。 設定キーのドットをアンダースコアに置き換えます。 例えば、KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false
とすると、 デフォルト値のunclean.leader.election.enable
を上書きできます。 同じことが、OSNの ローカル 設定、つまりorderer.yaml
で設定できる内容にも当てはまります。 例えば、ORDER_KAFKA_RETRY_SHORTINTERVAL=1s
とすると、Orderer.Kafka.Retry.ShortInterval
のデフォルト値を上書きできます。
Kafka Protocol Version Compatibility¶
Fabricは sarama client library を使用し、 Kafka 0.10 から 1.0 をサポートするバージョンをベンダリングしていますが、 古いバージョンでも動作することが確認されています。
orderer.yaml
の Kafka.Version
キーを使用すると、
Kafkaクラスタのブローカーとの通信に使用するKafkaプロトコルのバージョンを設定できます。
Kafkaブローカーは古いプロトコルのバージョンと下位互換性があります。
Kafkaブローカーは古いプロトコルのバージョンと下位互換性があるため、
Kafkaブローカーを新しいバージョンにアップグレードしても、
Kafka.Version
キーの値を更新する必要はありませんが、
古いプロトコルのバージョンを使用していると、Kafkaクラスタは
performance penalty を被ることがあります。
Debugging¶
環境変数 FABRIC_LOGGING_SPEC
に DEBUG
を設定し、 orderer.yaml
で Kafka.Verbose
に true
を設定してください。