目次

    GoとNATS Streaming入門|高信頼メッセージング基盤の構築と実践
    GoNATSNATS Streamingメッセージングマイクロサービス非同期通信分散システム
    202507-07
    メッセージング基盤のアーキテクチャ(Go × NATS Streaming)

    GoとNATS Streaming入門|高信頼メッセージング基盤の構築と実践

    はじめに:Go メッセージキューで実現する高信頼システム

    マイクロサービス間の連携を、HTTP/gRPCによる同期的なAPI呼び出しだけで実装していませんか?一つのサービスがダウンすると、依存する全てのサービスが影響を受ける…そんな脆いシステムになっていないでしょうか?

    この課題を解決するのが、サービス間を「疎結合」にするメッセージングシステムです。

    現在の推奨技術:NATS JetStream

    新規プロジェクトではNATS JetStreamを選択してください。 NATS Server 2.2.0以降、コアNATSに永続化機能が直接統合され、高性能で信頼性の高いメッセージング基盤として提供されています。

    JetStreamは、ストリーム(メッセージの永続化)とコンシューマ(メッセージの配信制御)という概念で、エンタープライズレベルの信頼性を実現します。

    NATS Streaming 入門で学ぶ基礎概念

    この記事では、歴史的経緯として**NATS Streaming (STAN)**を使って、メッセージング基盤の基本概念を学びます。ここで習得する永続サブスクリプションやACKの概念は、JetStreamでも中心的な役割を果たす普遍的な知識です。

    学習の流れ:

    1. NATS Streamingで基本概念を理解
    2. JetStreamでの実装方法を習得
    3. 運用面のモニタリング手法を学習

    NATSとNATS Streaming (STAN) の基本概念

    • コアNATS: 「Fire-and-Forget」の超高速・軽量なPub/Subシステム。「At-Most-Once(最大1回)」配信。
    • NATS Streaming (STAN): コアNATSの上に、メッセージの永続化At-Least-Once配信保証を追加した、信頼性の高いレイヤー。

    重要: NATS Streaming (STAN)は、現在メンテナンスモードであり、新規プロジェクトでの採用は推奨されていません。


    実践!GoでNATS Streamingを使ってみる

    ステップ①:環境構築 (Docker)

    注意: NATS Streamingは非推奨のため、本番環境では後継のJetStreamの使用を強く推奨します。

    docker-compose.ymlを使い、NATS Streaming Serverを簡単に起動できます。

    yaml
    1# docker-compose.yml
    2version: '3.6'
    3services:
    4  nats-streaming:
    5    image: nats-streaming:0.25.5
    6    ports:
    7      - "4222:4222" # Client Port
    8      - "8222:8222" # Monitoring Port
    9    command: ["-p", "4222", "-m", "8222", "-cid", "test-cluster", "-store", "file", "-dir", "/data"]
    10    volumes:
    11      - ./nats-data:/data

    ファイル永続化の注意

    開発環境でも「障害復旧シナリオ」を確認したい場合は、

    yaml
    1command: ["-p", "4222", "-m", "8222", "-cid", "test-cluster", "-store", "file", "-dir", "/data"]
    2volumes:
    3  - ./nats-data:/data

    のように -store file を指定し、コンテナ外にボリュームをマウントしましょう。 これにより、サーバ再起動後もメッセージが残り、ACK 動作を正しく検証できます。

    ステップ②:Publisherの実装

    GoのNATS Streamingクライアントライブラリを使い、メッセージをPublish(発行)します。

    go
    1// publisher/main.go
    2package main
    3
    4import (
    5    "log"
    6    "github.com/nats-io/nats.go"
    7    "github.com/nats-io/stan.go"
    8)
    9
    10func main() {
    11    nc, err := nats.Connect("nats://localhost:4222")
    12    if err != nil {
    13        log.Fatal("NATS接続エラー:", err)
    14    }
    15    defer nc.Close()
    16
    17    sc, err := stan.Connect("test-cluster", "publisher-1", stan.NatsConn(nc))
    18    if err != nil {
    19        log.Fatal("STAN接続エラー:", err)
    20    }
    21    defer sc.Close()
    22
    23    err = sc.Publish("order.created", []byte("Hello NATS Streaming!"))
    24    if err != nil {
    25        log.Fatal("メッセージ送信エラー:", err)
    26    }
    27    
    28    log.Println("メッセージを送信しました")
    29}

    ステップ③:Subscriberの実装と「Durable Subscription」

    メッセージをSubscribe(購読)します。 DurableName(永続サブスクリプション) を指定することで、Subscriberが再起動しても、未処理のメッセージから処理を再開できます。

    go
    1// subscriber/main.go
    2package main
    3
    4import (
    5    "log"
    6    "os"
    7    "time"
    8    "github.com/nats-io/nats.go"
    9    "github.com/nats-io/stan.go"
    10)
    11
    12func main() {
    13    nc, err := nats.Connect("nats://localhost:4222")
    14    if err != nil {
    15        log.Fatal("NATS接続エラー:", err)
    16    }
    17    defer nc.Close()
    18
    19    sc, err := stan.Connect("test-cluster", "subscriber-1", stan.NatsConn(nc))
    20    if err != nil {
    21        log.Fatal("STAN接続エラー:", err)
    22    }
    23    defer sc.Close()
    24
    25    // 注意:StartWithLastReceived()とDurableNameの併用について
    26    // 初回起動時は最新メッセージから開始し、再起動時は未処理メッセージから再開します
    27    // 本番環境では、この動作が期待通りかを十分に検証してください
    28    
    29    // 実運用では、初回だけ特定時刻から開始する場合の例:
    30    var startOpt stan.SubscriptionOption
    31    if isFirstRun() {
    32        // 初回起動時は特定時刻から開始(重複配信を防ぐ)
    33        startOpt = stan.StartAtTime(time.Now().Add(-1 * time.Hour))
    34    } else {
    35        // 再起動時はDurableで未処理メッセージから再開
    36        startOpt = stan.DeliverAllAvailable()
    37    }
    38    
    39    _, err = sc.Subscribe(
    40        "order.created",
    41        func(msg *stan.Msg) {
    42            log.Printf("受信:%s", string(msg.Data))
    43            // 実際の処理をここに記述
    44            if err := processMessage(msg.Data); err != nil {
    45                log.Printf("メッセージ処理エラー: %v", err)
    46                return // Ackしないことで再配信される
    47            }
    48            msg.Ack() // 処理完了を通知
    49        },
    50        stan.DurableName("order-processor"),     // 永続サブスクリプション名
    51        stan.SetManualAckMode(),                 // 手動Ackモード
    52        stan.AckWait(30*time.Second),            // Ack待機時間(実運用では重要)
    53        startOpt,                                // 動的に決定される開始位置
    54    )
    55    if err != nil {
    56        log.Fatal("サブスクリプション作成エラー:", err)
    57    }
    58    
    59    log.Println("メッセージ待機中...")
    60    select {} // main が即終了しないようブロック
    61}
    62
    63func processMessage(data []byte) error {
    64    // ここに実際のビジネスロジックを実装
    65    log.Printf("メッセージを処理中: %s", string(data))
    66    return nil
    67}
    68
    69func isFirstRun() bool {
    70    // 実装例:ファイルの存在チェックで初回起動を判定
    71    if _, err := os.Stat("./subscriber.lock"); os.IsNotExist(err) {
    72        // ロックファイルを作成
    73        file, _ := os.Create("./subscriber.lock")
    74        file.Close()
    75        return true
    76    }
    77    return false
    78}

    ステップ④:高信頼性を支えるACK(受信確認)

    stan.SetManualAckMode()msg.Ack()は、「At-Least-Once」配信保証の核となる仕組みです。Subscriberがメッセージを「確かに処理した」とサーバーに通知するまで、サーバーはメッセージを保持し続けます。


    JetStream実践:ストリームとコンシューマの定義

    STANで学んだ概念を、現在推奨されるJetStreamで実装してみましょう。

    JetStreamサーバーの起動

    bash
    1# JetStreamを有効化した単体サーバー(開発用)
    2docker run -p 4222:4222 -p 8222:8222 nats -js -m 8222

    ストリームとコンシューマをコードから作成

    go
    1// JetStream のストリームとコンシューマをコードから作成
    2package main
    3
    4import (
    5    "log"
    6    "time"
    7    "github.com/nats-io/nats.go"
    8)
    9
    10func main() {
    11    nc, err := nats.Connect("nats://localhost:4222")
    12    if err != nil {
    13        log.Fatal("NATS接続エラー:", err)
    14    }
    15    defer nc.Close()
    16
    17    js, err := nc.JetStream()
    18    if err != nil {
    19        log.Fatal("JetStream取得エラー:", err)
    20    }
    21
    22    // ストリーム作成
    23    _, err = js.AddStream(&nats.StreamConfig{
    24        Name:      "ORDERS",
    25        Subjects:  []string{"order.*"},
    26        Storage:   nats.FileStorage,
    27        Retention: nats.LimitsPolicy,
    28    })
    29    if err != nil {
    30        log.Printf("ストリーム作成エラー(既存の場合は無視): %v", err)
    31    }
    32
    33    // コンシューマ作成
    34    _, err = js.AddConsumer("ORDERS", &nats.ConsumerConfig{
    35        Durable:       "order-processor",
    36        AckPolicy:     nats.AckExplicitPolicy,
    37        AckWait:       30 * time.Second,
    38        DeliverPolicy: nats.DeliverLastPolicy,
    39    })
    40    if err != nil {
    41        log.Printf("コンシューマ作成エラー(既存の場合は無視): %v", err)
    42    }
    43
    44    // メッセージ購読
    45    _, err = js.Subscribe("order.created", func(m *nats.Msg) {
    46        log.Printf("JetStreamで受信: %s", string(m.Data))
    47        m.Ack()
    48    }, nats.Durable("order-processor"))
    49    if err != nil {
    50        log.Fatal("サブスクリプション作成エラー:", err)
    51    }
    52
    53    log.Println("JetStreamメッセージ待機中...")
    54    select {}
    55}

    Pull型サブスクリプション(バックプレッシャ制御)

    Push型(上記の例)に加えて、Pull型サブスクリプションも利用できます。これにより、ワーカー側でメッセージ処理のペースを制御できます:

    go
    1// Pull Subscribe の例(JetStream)
    2sub, err := js.PullSubscribe("order.created", "order-processor")
    3if err != nil {
    4    log.Fatal("Pull Subscribe エラー:", err)
    5}
    6
    7for {
    8    msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second))
    9    if err != nil {
    10        log.Printf("Fetch エラー: %v", err)
    11        continue
    12    }
    13    
    14    for _, m := range msgs {
    15        log.Printf("Pull型で受信: %s", string(m.Data))
    16        // 処理...
    17        m.Ack()
    18    }
    19}

    ポイント: バッチ数と MaxWait を調整することで、ワーカー側のスループットを動的に制御できます。実運用では、処理能力に応じてこれらのパラメータを調整し、効率的なメッセージ処理を実現できます。

    STANで学んだDurable SubscriptionやAckの概念はそのまま活用できます。既存のSTANデータの移行には、公式のstan2jsツールが提供されています。


    運用面:モニタリングとメトリクス収集

    NATS Streaming モニタリングエンドポイント

    NATS Streamingサーバーは、運用に必要なメトリクスを提供します:

    • /varz: サーバー全体の統計情報
    • /streamingz: ストリーミング固有のメトリクス(チャンネル、サブスクリプション情報)
    bash
    1# モニタリング情報の確認
    2curl http://localhost:8222/varz
    3curl http://localhost:8222/streamingz

    Prometheus Exporter

    本格的な運用では、NATS Prometheus Exporterを使用してメトリクスを収集し、Grafanaでダッシュボードを構築することを推奨します。

    bash
    1# Prometheus Exporterの起動例
    2docker run -p 7777:7777 natsio/prometheus-nats-exporter -varz http://nats-server:8222

    Grafana ダッシュボード例

    可観測性を向上させるため、以下のメトリクスを監視することを推奨します:

    主要メトリクス:

    • gnatsd_varz_connections: アクティブな接続数
    • gnatsd_varz_in_msgs: 受信メッセージ数
    • gnatsd_varz_out_msgs: 送信メッセージ数
    • gnatsd_varz_slow_consumers: 遅いコンシューマ数

    サンプルGrafanaクエリ:

    promql
    1# メッセージスループット(1分間の平均)
    2rate(gnatsd_varz_in_msgs[1m])
    3
    4# 接続数の推移
    5gnatsd_varz_connections
    6
    7# エラー率
    8rate(gnatsd_varz_in_msgs[5m]) - rate(gnatsd_varz_out_msgs[5m])

    公式のGrafanaダッシュボードテンプレートはNATS Community Dashboardで入手できます。


    まとめ:非同期アーキテクチャへの第一歩

    NATS Streaming (STAN)で学んだ、永続サブスクリプションやAckの概念は、後継のNATS JetStreamでも中心的な役割を果たす普遍的な知識です。

    エンジニアとしての市場価値

    非同期メッセージングを使いこなす能力は、スケーラブルで回復力のあるマイクロサービスを設計できる、市場価値の高いエンジニアであることの証明となります。

    私たちGoForceは、Go言語の並行処理能力を最大限に活かし、NATS/JetStreamのようなモダンなメッセージング基盤を構築できる、プロフェッショナルなフリーランスエンジニアとの強いネットワークを持っています。あなたのその分散システム設計能力を、次の挑戦的なプロジェクトで活かしませんか?ぜひ一度私たち]にご相談ください!

    参考文献

    会員登録はこちら

    最適なGo案件を今すぐチェック!

    会員登録

    生年月日 *

    /

    /

    Go経験年数 *

    /

    利用規約プライバシーポリシーに同意してお申し込みください。