gRPC は、高性能でクロスプラットフォームな API を構築するための強力なフレームワークとして台頭している。その中核では、トランスポートに HTTP/2 を、インターフェース定義言語に Protocol Buffers を活用し、マイクロサービス間で効率的かつ厳密に型付けされた通信を可能にする。多くの開発者は基本的なリクエスト/レスポンスパターンから始めるが、gRPC の真の力はその 4 つの異なる通信モデルにある。これらのパターンを理解することは、堅牢でスケーラブルなシステムを設計する鍵となる。
この記事では、gRPC の 4 つの通信パターン(Unary、サーバー ストリーミング、クライアント ストリーミング、双方向ストリーミング)のそれぞれを、Go で簡単なチャットサービスを実装しながら探求していく。.proto
ファイルの定義、サーバー側のハンドラ、そして各パターンのクライアント側のロジックを順に見ていこう。
始める前に、Go プログラミング言語と Protocol Buffers コンパイラ(protoc
)がインストールされていることを確認する必要がある。また、gRPC と Protobufs の Go プラグインも必要だ。
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
まず、.proto
ファイルでサービスを定義する。4 つの通信タイプごとにメソッドを持つChatService
を作成する。
chat/chat.proto
syntax = "proto3";
package chat;
option go_package = "github.com/your-repo/grpc-blog/chat";
// ユーザー名とメッセージを含むリクエストメッセージ
message Message {
string user = 1;
string content = 2;
}
// サービス定義
service ChatService {
// Unary RPC: 単一のメッセージを送信し、単一のレスポンスを受け取る
rpc SayHello(Message) returns (Message) {}
// Server Streaming RPC: 単一のメッセージを送信し、レスポンスのストリームを受け取る
rpc BroadcastMessages(Message) returns (stream Message) {}
// Client Streaming RPC: メッセージのストリームを送信し、単一のレスポンスを受け取る
rpc UploadStream(stream Message) returns (Message) {}
// Bidirectional Streaming RPC: メッセージのストリームを送信し、レスポンスのストリームを受け取る
rpc ChatStream(stream Message) returns (stream Message) {}
}
サービスを定義した後、protoc
を使って Go のコードを生成する。
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
chat/chat.proto
このコマンドはchat/chat.pb.go
とchat/chat_grpc.pb.go
を生成する。これらには、サーバーとクライアントに必要なインターフェースと構造体が含まれている。
Unary RPCは最もシンプルなパターンで、従来の関数呼び出しに似ている。クライアントはサーバーに単一のリクエストメッセージを送信し、単一のレスポンスメッセージを返してもらう。
サーバー側では、ChatServiceServer
インターフェースで定義されたSayHello
メソッドを実装する。このメソッドは、コンテキストとリクエストメッセージを引数に取り、レスポンスメッセージとエラーを返す。
// server/main.go
type server struct {
chat.UnimplementedChatServiceServer
}
func (s *server) SayHello(ctx context.Context, in *chat.Message) (*chat.Message, error) {
log.Printf("Unaryメッセージ受信 from %s: %s", in.User, in.Content)
return &chat.Message{
User: "Server",
Content: "こんにちは、" + in.User + "さん!メッセージありがとう。",
}, nil
}
クライアントは接続を確立し、新しいクライアントスタブをインスタンス化し、ローカル関数のようにリモートメソッドを呼び出す。
// client/main.go
c := chat.NewChatServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
res, err := c.SayHello(ctx, &chat.Message{User: "Marko", Content: "これはUnary呼び出しだ。"})
if err != nil {
log.Fatalf("挨拶ができなかった: %v", err)
}
log.Printf("Unaryレスポンス from %s: %s", res.User, res.Content)
ユースケース: リソースの作成、ユーザープロファイルの取得、ステータスチェックなど、自己完結型の単純な操作に最適である。
サーバー ストリーミング RPCでは、クライアントは単一のリクエストを送信するが、サーバーはメッセージのストリームで応答する。サーバーは接続を開いたままにし、送信するデータがなくなるまで複数のメッセージを返す。
サーバーメソッドはリクエストとストリームオブジェクトを受け取る。そして、ストリームのSend()
メソッドを複数回使用してレスポンスを返信できる。
// server/main.go
func (s *server) BroadcastMessages(in *chat.Message, stream chat.ChatService_BroadcastMessagesServer) error {
log.Printf("Broadcastリクエスト受信 from %s", in.User)
messages := []chat.Message{
{User: "Server", Content: "ブロードキャストへようこそ!"},
{User: "Server", Content: "こちらがメッセージ#1だ。"},
{User: "Server", Content: "そしてこちらがメッセージ#2。"},
{User: "Server", Content: "ブロードキャスト終了。"},
}
for _, msg := range messages {
if err := stream.Send(&msg); err != nil {
return err
}
time.Sleep(500 * time.Millisecond) // 処理をシミュレート
}
return nil
}
クライアントはメソッドを呼び出し、戻り値としてストリームオブジェクトを受け取る。その後、ループに入り、Recv()
を呼び出してストリームからメッセージを読み取る。サーバーによってストリームが閉じられるとio.EOF
エラーを受け取り、ループを終了する。
// client/main.go
stream, err := c.BroadcastMessages(ctx, &chat.Message{User: "Marko"})
if err != nil {
log.Fatalf("ブロードキャストストリームを開けなかった: %v", err)
}
for {
msg, err := stream.Recv()
if err == io.EOF {
// ストリーム終了
break
}
if err != nil {
log.Fatalf("ブロードキャストメッセージの受信に失敗した: %v", err)
}
log.Printf("Broadcast from %s: %s", msg.User, msg.Content)
}
ユースケース: 株価のティッカー、スポーツのライブスコア、長時間実行されるタスクの進捗通知など、サーバーがクライアントに更新をプッシュする必要があるシナリオに最適だ。
クライアント ストリーミング RPCはサーバーストリーミングの逆である。クライアントはストリームを使用して一連のメッセージをサーバーに送信する。クライアントがメッセージの書き込みを完了すると、サーバーがそれらを処理し、単一のレスポンスを返すのを待つ。
サーバーのハンドラはストリームオブジェクトを受け取る。Recv()
を使ったループで、クライアントからのすべての着信メッセージを読み取る。クライアントがストリームを閉じると、Recv()
はio.EOF
エラーを返す。その後、サーバーは最終的な計算を行い、SendAndClose()
を使用して単一のレスポンスを送信できる。
// server/main.go
func (s *server) UploadStream(stream chat.ChatService_UploadStreamServer) error {
var messageCount int32
var lastUser string
for {
msg, err := stream.Recv()
if err == io.EOF {
log.Printf("アップロードストリーム終了。%d件のメッセージを受信した。", messageCount)
return stream.SendAndClose(&chat.Message{
User: "Server",
Content: fmt.Sprintf("%sからのストリーム受信完了。%d件のメッセージを取得した。", lastUser, messageCount),
})
}
if err != nil {
return err
}
lastUser = msg.User
messageCount++
log.Printf("クライアントストリームからメッセージ受信: %s", msg.Content)
}
}
クライアントはメソッドを呼び出してストリームオブジェクトを取得する。その後、ループを使用してSend()
で複数のメッセージを送信する。終了後、CloseAndRecv()
を呼び出してサーバーに完了を通知し、最終的な要約レスポンスを受け取る。
// client/main.go
stream, err := c.UploadStream(ctx)
if err != nil {
log.Fatalf("アップロードストリームを開けなかった: %v", err)
}
messages := []chat.Message{
{User: "Marko", Content: "ログエントリ 1"},
{User: "Marko", Content: "ログエントリ 2"},
{User: "Marko", Content: "ログエントリ 3"},
}
for _, msg := range messages {
if err := stream.Send(&msg); err != nil {
log.Fatalf("ストリームへのメッセージ送信に失敗した: %v", err)
}
time.Sleep(300 * time.Millisecond)
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("アップロードストリームからのレスポンス受信に失敗した: %v", err)
}
log.Printf("アップロードストリームのレスポンス: %s", res.Content)
ユースケース: ログファイル、IoT デバイスのメトリクス、動画ファイルの一部など、大規模なデータのアップロードに優れている。
双方向ストリーミング RPCは最も柔軟なモデルである。クライアントとサーバーの両方が、単一の gRPC 接続を介して独立してメッセージのストリームを相互に送信できる。読み取りと書き込みの順序は保証されず、アプリケーションのロジックに依存する。
サーバーハンドラは、読み取りと書き込みの両方に使用できるストリームを受け取る。いつでもRecv()
でメッセージを読み取り、Send()
でレスポンスを書き込むことが可能だ。
// server/main.go
func (s *server) ChatStream(stream chat.ChatService_ChatStreamServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("ChatStreamでメッセージ受信 from %s: %s", in.User, in.Content)
// メッセージをエコーバックする
if err := stream.Send(&chat.Message{User: "Server", Content: "Echo: " + in.Content}); err != nil {
return err
}
}
}
クライアント側のロジックは、送受信を同時に処理する必要があるため、より複雑になる。一般的なパターンは、2 つの別々の goroutine を使用することだ。1 つはメッセージの送信、もう 1 つは受信を担当する。sync.WaitGroup
などを使用して、調整と完了の通知を行う。
// client/main.go
stream, err := c.ChatStream(context.Background()) // background contextを使用
if err != nil {
log.Fatalf("チャットストリームを開けなかった: %v", err)
}
var wg sync.WaitGroup
wg.Add(2)
// メッセージ送信用のgoroutine
go func() {
defer wg.Done()
messages := []chat.Message{
{User: "Marko", Content: "こんにちは!"},
{User: "Marko", Content: "元気かい?"},
{User: "Marko", Content: "双方向ストリームを使っている!"},
}
for _, msg := range messages {
log.Printf("メッセージ送信: %s", msg.Content)
if err := stream.Send(&msg); err != nil {
log.Fatalf("メッセージの送信に失敗した: %v", err)
}
time.Sleep(time.Second)
}
stream.CloseSend() // 重要: ストリームの送信側を閉じる
}()
// メッセージ受信用のgoroutine
go func() {
defer wg.Done()
for {
res, err := stream.Recv()
if err == io.EOF {
return // サーバーがストリームを閉じた
}
if err != nil {
log.Fatalf("メッセージの受信に失敗した: %v", err)
}
log.Printf("サーバーからの受信: %s", res.Content)
}
}()
wg.Wait()
log.Println("双方向ストリーム終了。")
ユースケース: リアルタイムのチャットサービス、共同ドキュメント編集、ライブゲームセッションなど、複雑でインタラクティブなアプリケーションの基盤となる。
gRPC の 4 つの通信パターンをマスターすることで、正確なニーズに合わせた非常に効率的で専門的な API を構築する能力が手に入る。Unary RPC は多くの標準的なユースケースをカバーするが、ストリーミングパターンはリアルタイムのデータフロー、大規模なデータセット、そして完全にインタラクティブなサービスを処理する力を提供する。適切なパターンを選択することは、Go と gRPC で回復力とパフォーマンスに優れたマイクロサービスアーキテクチャを設計する上で重要なステップである。