マイクロサービス間の連携を、HTTP/gRPCによる同期的なAPI呼び出しだけで実装していませんか?一つのサービスがダウンすると、依存する全てのサービスが影響を受ける…そんな脆いシステムになっていないでしょうか?
この課題を解決するのが、サービス間を「疎結合」にするメッセージングシステムです。
新規プロジェクトではNATS JetStreamを選択してください。 NATS Server 2.2.0以降、コアNATSに永続化機能が直接統合され、高性能で信頼性の高いメッセージング基盤として提供されています。
JetStreamは、ストリーム(メッセージの永続化)とコンシューマ(メッセージの配信制御)という概念で、エンタープライズレベルの信頼性を実現します。
この記事では、歴史的経緯として**NATS Streaming (STAN)**を使って、メッセージング基盤の基本概念を学びます。ここで習得する永続サブスクリプションやACKの概念は、JetStreamでも中心的な役割を果たす普遍的な知識です。
学習の流れ:
重要: NATS Streaming (STAN)は、現在メンテナンスモードであり、新規プロジェクトでの採用は推奨されていません。
注意: NATS Streamingは非推奨のため、本番環境では後継のJetStreamの使用を強く推奨します。
docker-compose.yml
を使い、NATS Streaming Serverを簡単に起動できます。
yaml1# docker-compose.yml 2version: '3.6' 3services: 4 nats-streaming: 5 image: nats-streaming:0.25.5 6 ports: 7 - "4222:4222" # Client Port 8 - "8222:8222" # Monitoring Port 9 command: ["-p", "4222", "-m", "8222", "-cid", "test-cluster", "-store", "file", "-dir", "/data"] 10 volumes: 11 - ./nats-data:/data
開発環境でも「障害復旧シナリオ」を確認したい場合は、
yaml1command: ["-p", "4222", "-m", "8222", "-cid", "test-cluster", "-store", "file", "-dir", "/data"] 2volumes: 3 - ./nats-data:/data
のように -store file
を指定し、コンテナ外にボリュームをマウントしましょう。
これにより、サーバ再起動後もメッセージが残り、ACK 動作を正しく検証できます。
GoのNATS Streamingクライアントライブラリを使い、メッセージをPublish(発行)します。
go1// publisher/main.go 2package main 3 4import ( 5 "log" 6 "github.com/nats-io/nats.go" 7 "github.com/nats-io/stan.go" 8) 9 10func main() { 11 nc, err := nats.Connect("nats://localhost:4222") 12 if err != nil { 13 log.Fatal("NATS接続エラー:", err) 14 } 15 defer nc.Close() 16 17 sc, err := stan.Connect("test-cluster", "publisher-1", stan.NatsConn(nc)) 18 if err != nil { 19 log.Fatal("STAN接続エラー:", err) 20 } 21 defer sc.Close() 22 23 err = sc.Publish("order.created", []byte("Hello NATS Streaming!")) 24 if err != nil { 25 log.Fatal("メッセージ送信エラー:", err) 26 } 27 28 log.Println("メッセージを送信しました") 29}
メッセージをSubscribe(購読)します。 DurableName
(永続サブスクリプション) を指定することで、Subscriberが再起動しても、未処理のメッセージから処理を再開できます。
go1// subscriber/main.go 2package main 3 4import ( 5 "log" 6 "os" 7 "time" 8 "github.com/nats-io/nats.go" 9 "github.com/nats-io/stan.go" 10) 11 12func main() { 13 nc, err := nats.Connect("nats://localhost:4222") 14 if err != nil { 15 log.Fatal("NATS接続エラー:", err) 16 } 17 defer nc.Close() 18 19 sc, err := stan.Connect("test-cluster", "subscriber-1", stan.NatsConn(nc)) 20 if err != nil { 21 log.Fatal("STAN接続エラー:", err) 22 } 23 defer sc.Close() 24 25 // 注意:StartWithLastReceived()とDurableNameの併用について 26 // 初回起動時は最新メッセージから開始し、再起動時は未処理メッセージから再開します 27 // 本番環境では、この動作が期待通りかを十分に検証してください 28 29 // 実運用では、初回だけ特定時刻から開始する場合の例: 30 var startOpt stan.SubscriptionOption 31 if isFirstRun() { 32 // 初回起動時は特定時刻から開始(重複配信を防ぐ) 33 startOpt = stan.StartAtTime(time.Now().Add(-1 * time.Hour)) 34 } else { 35 // 再起動時はDurableで未処理メッセージから再開 36 startOpt = stan.DeliverAllAvailable() 37 } 38 39 _, err = sc.Subscribe( 40 "order.created", 41 func(msg *stan.Msg) { 42 log.Printf("受信:%s", string(msg.Data)) 43 // 実際の処理をここに記述 44 if err := processMessage(msg.Data); err != nil { 45 log.Printf("メッセージ処理エラー: %v", err) 46 return // Ackしないことで再配信される 47 } 48 msg.Ack() // 処理完了を通知 49 }, 50 stan.DurableName("order-processor"), // 永続サブスクリプション名 51 stan.SetManualAckMode(), // 手動Ackモード 52 stan.AckWait(30*time.Second), // Ack待機時間(実運用では重要) 53 startOpt, // 動的に決定される開始位置 54 ) 55 if err != nil { 56 log.Fatal("サブスクリプション作成エラー:", err) 57 } 58 59 log.Println("メッセージ待機中...") 60 select {} // main が即終了しないようブロック 61} 62 63func processMessage(data []byte) error { 64 // ここに実際のビジネスロジックを実装 65 log.Printf("メッセージを処理中: %s", string(data)) 66 return nil 67} 68 69func isFirstRun() bool { 70 // 実装例:ファイルの存在チェックで初回起動を判定 71 if _, err := os.Stat("./subscriber.lock"); os.IsNotExist(err) { 72 // ロックファイルを作成 73 file, _ := os.Create("./subscriber.lock") 74 file.Close() 75 return true 76 } 77 return false 78}
stan.SetManualAckMode()
とmsg.Ack()
は、「At-Least-Once」配信保証の核となる仕組みです。Subscriberがメッセージを「確かに処理した」とサーバーに通知するまで、サーバーはメッセージを保持し続けます。
STANで学んだ概念を、現在推奨されるJetStreamで実装してみましょう。
bash1# JetStreamを有効化した単体サーバー(開発用) 2docker run -p 4222:4222 -p 8222:8222 nats -js -m 8222
go1// JetStream のストリームとコンシューマをコードから作成 2package main 3 4import ( 5 "log" 6 "time" 7 "github.com/nats-io/nats.go" 8) 9 10func main() { 11 nc, err := nats.Connect("nats://localhost:4222") 12 if err != nil { 13 log.Fatal("NATS接続エラー:", err) 14 } 15 defer nc.Close() 16 17 js, err := nc.JetStream() 18 if err != nil { 19 log.Fatal("JetStream取得エラー:", err) 20 } 21 22 // ストリーム作成 23 _, err = js.AddStream(&nats.StreamConfig{ 24 Name: "ORDERS", 25 Subjects: []string{"order.*"}, 26 Storage: nats.FileStorage, 27 Retention: nats.LimitsPolicy, 28 }) 29 if err != nil { 30 log.Printf("ストリーム作成エラー(既存の場合は無視): %v", err) 31 } 32 33 // コンシューマ作成 34 _, err = js.AddConsumer("ORDERS", &nats.ConsumerConfig{ 35 Durable: "order-processor", 36 AckPolicy: nats.AckExplicitPolicy, 37 AckWait: 30 * time.Second, 38 DeliverPolicy: nats.DeliverLastPolicy, 39 }) 40 if err != nil { 41 log.Printf("コンシューマ作成エラー(既存の場合は無視): %v", err) 42 } 43 44 // メッセージ購読 45 _, err = js.Subscribe("order.created", func(m *nats.Msg) { 46 log.Printf("JetStreamで受信: %s", string(m.Data)) 47 m.Ack() 48 }, nats.Durable("order-processor")) 49 if err != nil { 50 log.Fatal("サブスクリプション作成エラー:", err) 51 } 52 53 log.Println("JetStreamメッセージ待機中...") 54 select {} 55}
Push型(上記の例)に加えて、Pull型サブスクリプションも利用できます。これにより、ワーカー側でメッセージ処理のペースを制御できます:
go1// Pull Subscribe の例(JetStream) 2sub, err := js.PullSubscribe("order.created", "order-processor") 3if err != nil { 4 log.Fatal("Pull Subscribe エラー:", err) 5} 6 7for { 8 msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second)) 9 if err != nil { 10 log.Printf("Fetch エラー: %v", err) 11 continue 12 } 13 14 for _, m := range msgs { 15 log.Printf("Pull型で受信: %s", string(m.Data)) 16 // 処理... 17 m.Ack() 18 } 19}
ポイント: バッチ数と MaxWait を調整することで、ワーカー側のスループットを動的に制御できます。実運用では、処理能力に応じてこれらのパラメータを調整し、効率的なメッセージ処理を実現できます。
STANで学んだDurable SubscriptionやAckの概念はそのまま活用できます。既存のSTANデータの移行には、公式のstan2js
ツールが提供されています。
NATS Streamingサーバーは、運用に必要なメトリクスを提供します:
/varz
: サーバー全体の統計情報/streamingz
: ストリーミング固有のメトリクス(チャンネル、サブスクリプション情報)bash1# モニタリング情報の確認 2curl http://localhost:8222/varz 3curl http://localhost:8222/streamingz
本格的な運用では、NATS Prometheus Exporterを使用してメトリクスを収集し、Grafanaでダッシュボードを構築することを推奨します。
bash1# Prometheus Exporterの起動例 2docker run -p 7777:7777 natsio/prometheus-nats-exporter -varz http://nats-server:8222
可観測性を向上させるため、以下のメトリクスを監視することを推奨します:
主要メトリクス:
gnatsd_varz_connections
: アクティブな接続数gnatsd_varz_in_msgs
: 受信メッセージ数gnatsd_varz_out_msgs
: 送信メッセージ数gnatsd_varz_slow_consumers
: 遅いコンシューマ数サンプルGrafanaクエリ:
promql1# メッセージスループット(1分間の平均) 2rate(gnatsd_varz_in_msgs[1m]) 3 4# 接続数の推移 5gnatsd_varz_connections 6 7# エラー率 8rate(gnatsd_varz_in_msgs[5m]) - rate(gnatsd_varz_out_msgs[5m])
公式のGrafanaダッシュボードテンプレートはNATS Community Dashboardで入手できます。
NATS Streaming (STAN)で学んだ、永続サブスクリプションやAckの概念は、後継のNATS JetStreamでも中心的な役割を果たす普遍的な知識です。
非同期メッセージングを使いこなす能力は、スケーラブルで回復力のあるマイクロサービスを設計できる、市場価値の高いエンジニアであることの証明となります。
私たちGoForceは、Go言語の並行処理能力を最大限に活かし、NATS/JetStreamのようなモダンなメッセージング基盤を構築できる、プロフェッショナルなフリーランスエンジニアとの強いネットワークを持っています。あなたのその分散システム設計能力を、次の挑戦的なプロジェクトで活かしませんか?ぜひ一度私たち]にご相談ください!
最適なGo案件を今すぐチェック!