ユーザーの行動ログ、IoTデバイスからのセンサーデータ、マイクロサービスのイベント。現代のアプリケーションが生成する膨大なデータストリームを、どうすれば確実かつリアルタイムに処理できるでしょうか?
この課題に対する業界標準の答えが、分散メッセージングシステム「Apache Kafka」です。そして、そのKafkaと対話する高性能なクライアントを開発する上で、Go言語が最高の選択肢の一つであることは間違いありません。
segmentio/kafka-go
(本記事で採用): Pure Goで実装されており、CGO不要で導入が非常にシンプル。confluentinc/confluent-kafka-go
: C言語製のlibrdkafka
をラップしており非常に高性能だが、導入にはCコンパイラ環境が必要。kafka-go
で始めるデータストリーミングハンズオン形式で、具体的な開発フローを解説していきます。
まずKafkaと、その管理に必要なZookeeperをローカルで簡単に起動するため、compose.yaml
を作成します。
yaml1# compose.yaml (Docker Compose v2推奨形式) 2version: '3.8' 3services: 4 zookeeper: 5 image: confluentinc/cp-zookeeper:7.4.0 6 environment: 7 ZOOKEEPER_CLIENT_PORT: 2181 8 ZOOKEEPER_TICK_TIME: 2000 9 10 kafka: 11 image: confluentinc/cp-kafka:7.4.0 12 depends_on: 13 - zookeeper 14 ports: 15 - "9092:9092" 16 environment: 17 KAFKA_BROKER_ID: 1 18 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 19 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 20 KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 21 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 22 KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
注意: Kafka 3.7以降ではKRaftモードが推奨されますが、学習コストを抑えるため、ここでは安定版のZookeeper構成(7.4.0)を使用しています。Confluent 7.4は Kafka 3.5 ベースであることにご注意ください。
Kafka 3.5 以降では KRaft が GA となり、Zookeeper を用いないシンプルな構成が推奨されています。以下は bitnami/kafka
を使った Compose 例です。
yaml1# compose-kraft.yaml (KRaftモード構成) 2version: '3.8' 3services: 4 kafka: 5 image: bitnami/kafka:3.6 6 ports: 7 - "9092:9092" 8 environment: 9 - KAFKA_CFG_NODE_ID=1 10 - KAFKA_CFG_PROCESS_ROLES=broker,controller 11 - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka:9093 12 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 13 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 14 - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER 15 - KAFKA_ENABLE_KRAFT=yes 16 - ALLOW_PLAINTEXT_LISTENER=yes
Zookeeper が不要になることで起動が高速化し、コンテナ数も削減できます。
bash1docker compose up -d
kafka.Writer
を使い、KafkaのTopicにメッセージを書き込むアプリケーションです。
go1package main 2 3import ( 4 "context" 5 "log" 6 "github.com/segmentio/kafka-go" 7) 8 9func main() { 10 writer := &kafka.Writer{ 11 Addr: kafka.TCP("localhost:9092"), 12 Topic: "user-events", 13 Balancer: &kafka.LeastBytes{}, 14 RequiredAcks: kafka.RequireAll, // 全ISRへの書き込み完了を確認 15 AllowAutoTopicCreation: true, 16 } 17 defer func() { 18 if err := writer.Close(); err != nil { 19 log.Printf("Failed to close writer: %v", err) 20 } 21 }() 22 23 // メッセージを送信 24 err := writer.WriteMessages(context.Background(), 25 kafka.Message{ 26 Key: []byte("user-1"), 27 Value: []byte("user logged in"), 28 }, 29 ) 30 if err != nil { 31 log.Printf("Failed to write message: %v", err) 32 } 33}
kafka.Writer
は v0.4.42 以降、Idempotent Producer をサポートしています。重複レコードを防ぎたい場合は以下のオプションを有効にしましょう。
go1import ( 2 "time" 3 "github.com/segmentio/kafka-go" 4) 5 6writer := &kafka.Writer{ 7 Addr: kafka.TCP("localhost:9092"), 8 Topic: "user-events", 9 RequiredAcks: kafka.RequireAll, 10 Balancer: &kafka.LeastBytes{}, 11 Idempotent: true, // Idempotent Producer 12 AllowAutoTopicCreation: true, 13 14 // 高可用性のためのリトライ・バックオフ設定 15 BatchSize: 100, // バッチサイズ(メッセージ数) 16 BatchBytes: 1048576, // バッチサイズ(1MB) 17 BatchTimeout: 10 * time.Millisecond, // バッチタイムアウト 18 ReadTimeout: 10 * time.Second, // 読み取りタイムアウト 19 WriteTimeout: 10 * time.Second, // 書き込みタイムアウト 20 21 // エラー時の自動リトライ設定 22 MaxAttempts: 3, // 最大リトライ回数 23}
高可用性設定のポイント:
Idempotent Producer を有効にすると同一メッセージの重複書き込みが防止され、Exactly-Once Delivery へ近づけます(※トランザクションを併用するとより確実です)。
go1import ( 2 "context" 3 "log" 4 "github.com/segmentio/kafka-go" 5) 6 7func main() { 8 ctx := context.Background() 9 10 msg1 := kafka.Message{Key: []byte("id-1"), Value: []byte("foo")} 11 msg2 := kafka.Message{Key: []byte("id-2"), Value: []byte("bar")} 12 13 writer := &kafka.Writer{ 14 Addr: kafka.TCP("localhost:9092"), 15 Topic: "user-events", 16 TransactionalID: "tx-user-events-001", 17 Idempotent: true, 18 RequiredAcks: kafka.RequireAll, 19 } 20 21 if err := writer.BeginTransaction(); err != nil { 22 log.Fatal(err) 23 } 24 25 if err := writer.WriteMessages(ctx, msg1, msg2); err != nil { 26 _ = writer.AbortTransaction(ctx) 27 log.Fatal(err) 28 } 29 30 if err := writer.CommitMessages(ctx); err != nil { 31 _ = writer.AbortTransaction(ctx) 32 log.Fatal(err) 33 } 34}
Transaction を張ることで「複数 Topic / Partition への一括書き込み」も安全にロールバックできます。
kafka.Reader
を使い、KafkaのTopicからメッセージを読み込むアプリケーションです。GroupID
を設定することで、同じグループのConsumerが協調して処理を分担します。
go1package main 2 3import ( 4 "context" 5 "fmt" 6 "log" 7 "os" 8 "os/signal" 9 "syscall" 10 "github.com/segmentio/kafka-go" 11) 12 13func main() { 14 // Graceful Shutdown用のコンテキスト 15 ctx, cancel := context.WithCancel(context.Background()) 16 defer cancel() 17 18 // OSシグナルを受け取るチャネル 19 sigChan := make(chan os.Signal, 1) 20 signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) 21 22 reader := kafka.NewReader(kafka.ReaderConfig{ 23 Brokers: []string{"localhost:9092"}, 24 Topic: "user-events", 25 GroupID: "user-event-processors", // Consumer Group ID 26 }) 27 defer reader.Close()
高スループットが求められる環境では、以下のパラメータでConsumerを最適化できます:
go1import "time" 2 3reader := kafka.NewReader(kafka.ReaderConfig{ 4 Brokers: []string{"localhost:9092"}, 5 Topic: "user-events", 6 GroupID: "optimized-processors", 7 8 // スループット最適化設定 9 MinBytes: 10e3, // 最小読み取りバイト数(10KB) 10 MaxBytes: 10e6, // 最大読み取りバイト数(10MB) 11 MaxWait: 1 * time.Second, // 最大待機時間 12 13 // コミット設定 14 CommitInterval: 1 * time.Second, // 自動コミット間隔 15 16 // 注意:StartOffsetはGroupID指定時には無視されます 17 // 新規Consumer Groupの場合のみ有効 18 StartOffset: kafka.LastOffset, // 最新メッセージから開始 19})
チューニングのポイント:
本番環境では、セキュリティ設定が必須です。以下はReaderConfig.Dialer
を使ったTLS/SASL設定例です:
go1import ( 2 "crypto/tls" 3 "github.com/segmentio/kafka-go" 4 "github.com/segmentio/kafka-go/sasl/scram" 5) 6 7// SASL/SCRAM認証の設定 8mechanism, err := scram.Mechanism(scram.SHA512, "username", "password") 9if err != nil { 10 log.Fatal(err) 11} 12 13// TLS設定 14dialer := &kafka.Dialer{ 15 Timeout: 10 * time.Second, 16 DualStack: true, 17 TLS: &tls.Config{}, 18 SASLMechanism: mechanism, 19} 20 21reader := kafka.NewReader(kafka.ReaderConfig{ 22 Brokers: []string{"secure-kafka:9093"}, 23 Topic: "user-events", 24 GroupID: "secure-processors", 25 Dialer: dialer, // セキュア接続設定 26})
セキュリティ設定のポイント:
go1// Graceful Shutdown処理 2 go func() { 3 <-sigChan 4 log.Println("Shutdown signal received, stopping consumer...") 5 cancel() 6 }() 7 8 for { 9 select { 10 case <-ctx.Done(): 11 log.Println("Consumer stopped gracefully") 12 return 13 default: 14 message, err := reader.ReadMessage(ctx) 15 if err != nil { 16 if ctx.Err() != nil { 17 return // コンテキストキャンセル時は正常終了 18 } 19 log.Printf("Failed to read message: %v", err) 20 continue 21 } 22 23 // メッセージ処理 24 fmt.Printf("Received: key=%s, value=%s\n", 25 string(message.Key), string(message.Value)) 26 } 27 } 28}
Consumerがどこまでメッセージを処理したか、その位置(オフセット)をKafkaに記録する行為が「Commit」です。kafka-go
では、GroupIDを設定した場合のみ、デフォルトでReadMessage
の後に自動Commitが行われます。
注意: 自動Commitはメッセージ読み込み直後に実行されるため、処理中にエラーが発生してもCommitされてしまいます。より確実な処理にはFetchMessage
とCommitMessages
を使った手動Commitを検討してください。
go1// 手動Commitの実装例 2package main 3 4import ( 5 "context" 6 "fmt" 7 "log" 8 "github.com/segmentio/kafka-go" 9) 10 11func manualCommitExample() { 12 reader := kafka.NewReader(kafka.ReaderConfig{ 13 Brokers: []string{"localhost:9092"}, 14 Topic: "user-events", 15 GroupID: "manual-commit-group", 16 CommitInterval: 0, // 自動Commitを無効化 17 }) 18 defer reader.Close() 19 20 for { 21 message, err := reader.FetchMessage(context.Background()) 22 if err != nil { 23 log.Printf("Failed to fetch message: %v", err) 24 continue 25 } 26 27 // メッセージ処理 28 if err := processMessage(message); err != nil { 29 log.Printf("Failed to process message: %v", err) 30 continue // 処理失敗時はCommitしない 31 } 32 33 // 処理成功後に手動Commit 34 if err := reader.CommitMessages(context.Background(), message); err != nil { 35 log.Printf("Failed to commit message: %v", err) 36 } 37 } 38} 39 40func processMessage(message kafka.Message) error { 41 fmt.Printf("Processing: key=%s, value=%s\n", 42 string(message.Key), string(message.Value)) 43 // ここで実際のビジネスロジックを実行 44 return nil 45}
本番環境では、失敗時のリトライポリシーとDead Letter Topicの設計が重要です。
go1// リトライ機能付きConsumer例 2func processWithRetry(message kafka.Message, maxRetries int) error { 3 for attempt := 0; attempt <= maxRetries; attempt++ { 4 if err := processMessage(message); err != nil { 5 if attempt == maxRetries { 6 // 最大リトライ回数に達した場合、Dead Letter Topicに送信 7 return sendToDeadLetterTopic(message, err) 8 } 9 log.Printf("Processing failed (attempt %d/%d): %v", attempt+1, maxRetries+1, err) 10 continue 11 } 12 return nil // 成功 13 } 14 return nil 15} 16 17func sendToDeadLetterTopic(message kafka.Message, originalErr error) error { 18 // Dead Letter Topic への送信ロジック 19 log.Printf("Sending to dead letter topic: %v", originalErr) 20 return nil 21}
bash1# 依存関係のインストール 2go mod init kafka-example 3go get github.com/segmentio/[email protected] 4 5# Docker環境起動 6docker compose up -d 7 8# Producer実行(別ターミナル) 9go run producer.go 10 11# Consumer実行(別ターミナル) 12go run consumer.go
本番環境では、Kafkaクラスタとアプリケーションの監視が重要です。以下はkafka_exporter
とjmx_exporter
を使った監視設定例です:
yaml1# docker-compose-monitoring.yaml 2version: '3.8' 3services: 4 kafka-exporter: 5 image: danielqsj/kafka-exporter:latest 6 ports: 7 - "9308:9308" 8 command: 9 - --kafka.server=kafka:9092 10 - --web.listen-address=:9308 11 depends_on: 12 - kafka 13 14 jmx-exporter: 15 image: sscaling/jmx-prometheus-exporter:latest 16 ports: 17 - "5556:5556" 18 environment: 19 - CONFIG_YML=/etc/jmx_exporter/config.yml 20 volumes: 21 - ./jmx_config.yml:/etc/jmx_exporter/config.yml 22 depends_on: 23 - kafka 24 25 prometheus: 26 image: prom/prometheus:latest 27 ports: 28 - "9090:9090" 29 volumes: 30 - ./prometheus.yml:/etc/prometheus/prometheus.yml
yaml1# prometheus.yml 2global: 3 scrape_interval: 15s 4 5scrape_configs: 6 - job_name: 'kafka-exporter' 7 static_configs: 8 - targets: ['kafka-exporter:9308'] 9 10 - job_name: 'jmx-exporter' 11 static_configs: 12 - targets: ['jmx-exporter:5556']
監視メトリクスの重要指標:
kafka-go
でのログ出力とメトリクス収集go1import ( 2 "log" 3 "os" 4 "github.com/segmentio/kafka-go" 5) 6 7// カスタムロガーの設定 8logger := log.New(os.Stdout, "kafka: ", log.LstdFlags) 9 10writer := &kafka.Writer{ 11 Addr: kafka.TCP("localhost:9092"), 12 Topic: "user-events", 13 Logger: logger, // ログ出力を有効化 14 ErrorLogger: logger, // エラーログも出力 15} 16 17reader := kafka.NewReader(kafka.ReaderConfig{ 18 Brokers: []string{"localhost:9092"}, 19 Topic: "user-events", 20 GroupID: "monitoring-group", 21 Logger: logger, // Reader側でもログ出力 22 ErrorLogger: logger, 23})
kafka-go
vs sarama
項目 | segmentio/kafka-go | Shopify/sarama |
---|---|---|
実装言語 | Pure Go | Pure Go |
依存関係 | CGO不要 | CGO不要 |
API設計 | シンプル・直感的 | 高機能・複雑 |
パフォーマンス | 中程度 | 高性能 |
メモリ使用量 | 少ない | 多い |
学習コスト | 低い | 高い |
Admin API | 限定的 | 充実 |
トランザクション | 基本サポート | 完全サポート |
コミュニティ | 活発 | 非常に活発 |
選定指針:
kafka-go
(シンプルで始めやすい)sarama
(機能豊富だが複雑)Goの軽量な並行処理とKafkaの分散アーキテクチャは、極めて親和性の高い組み合わせです。goroutine
を使って多数のConsumerを効率的に起動できるなど、Goの特性はデータストリーミング処理で大きなアドバンテージとなります。
現代のマイクロサービスアーキテクチャにおいて、イベント駆動型の設計は必須の技術となっています。ログ収集、イベントソーシング、リアルタイム分析基盤など、この分野のスキルは現代のバックエンドエンジニアにとって非常に市場価値が高いものです。
私たちGoForceは、Go言語とKafkaのような分散システム技術を駆使して、大規模なデータ基盤を構築できる、プロフェッショナルなフリーランスエンジニアとの強いネットワークを持っています。あなたのその高度なスキルセットを社会のインフラとなるような、挑戦的なプロジェクトで活かしませんか?ぜひ一度、私たちにご相談ください。
最適なGo案件を今すぐチェック!