Tags: gRPC Go Golang

gRpc の client-side streaming RPC を試す

前回 gRPC の環境構築を行ったので、実際に client-side streaming RPC を作ってみる。

開発環境

  • Windows 10 Pro バージョン20H2
  • docker desktop 3.3.1
  • WSL2 のディストリビューションは Ubuntu 20.04

本文

https://grpc.io/docs/languages/go/basics/
を見ると RPC には 4 種類あるとわかる。
前回のサンプルで作ったのが Simple RPC で、その他に

  • server-side streaming RPC
  • client-side streaming RPC
  • bidirectional streaming RPC

がある。
上からやっていこうと思ったのだが、どうしても server-side streaming RPC を使う場面がうまく想像できなかったので client-side streaming RPC からやることにした。

なお今回の完成品は下記にある。下記を前提として以下の文章を書いていく。
https://gitlab.com/k1350/daybreak_grpc_sample/-/tree/feature/sample_client_stream

環境自体は 前回 作った環境をベースとした。

protocol buffers

client-side streaming RPC を使う場面としてファイルの分割アップロードを想定し、upload.proto を以下のように作成した。

syntax = "proto3";

package upload;

option go_package = "github.com/k1350/proto/upload";

service UploadFile {  
  // ファイルを分割アップロードし、結果を返す
  rpc Upload (stream File) returns (Result) {};
}

message File { bytes Data = 1; }

// 脚注:本当は Code というように大文字から始めるべきだったのだが間違って小文字にしてしまった。
// しかしコンパイル結果は勝手に大文字になっていた。
message Result { int32 code = 1; }

stream でファイルを受け取り、正常に受け取ったら 200 を返そうと思っている。

これをコンパイルすると upload.pb.go と upload_grpc.pb.go が生成される。今回は自力で実装するのできちんと中身を把握しておく必要がある。が、初見だとよくわからないので
https://grpc.io/docs/languages/go/basics/
を見ながらやるほうがいい。

サーバサイドの実装

特に見るべきなのは upload_grpc.pb.go である。ここに実装すべき interface 等が定義されている。サーバーサイドの部分をチェックすると

// UploadFileServer is the server API for UploadFile service.
// All implementations must embed UnimplementedUploadFileServer
// for forward compatibility
type UploadFileServer interface {
	// ファイルを分割アップロードし、結果を返す
	Upload(UploadFile_UploadServer) error
	mustEmbedUnimplementedUploadFileServer()
}

というのが書いてある。これはサーバサイドで Upload 関数を実装しなければならないというのと、UnimplementedUploadFileServer というのを Embed しろと書いてある。
UnimplementedUploadFileServer を Embed するとはどういうことかというと、結論を言えば下記のように定義してやるといい。

package upload

import (
    //(中略)
	pb "github.com/k1350/server/api/upload"
)

type Server struct {
    // ここで UnimplementedUploadFileServer を Embed する
	pb.UnimplementedUploadFileServer
}

func (s *Server) Upload(stream pb.UploadFile_UploadServer) error {
	// (中略)
}

このように Embed しておくことで、未実装の関数がある場合に “method Upload not implemented” というエラーを返してくれるらしい。

さて 肝心の Upload 関数の中身だが以下のようにした。

func (s *Server) Upload(stream pb.UploadFile_UploadServer) error {
    // tmp ディレクトリ内に tmp.jpg というファイルを作成
	file, err := os.Create(filepath.Join("tmp", "tmp.jpg"))
	defer file.Close()
	if err != nil {
		return err
	}

	for {
		res, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
        // tmp.jpg に書き込み
		file.Write(res.Data)
	}
	err = stream.SendAndClose(&pb.Result{Code: 200})
	if err != nil {
		return err
	}
	return nil
}

stream.Recv() とか stream.SendAndClose() とかは upload_grpc.pb.go に実装がある。中身は下記の通り。

type UploadFile_UploadServer interface {
	SendAndClose(*Result) error
	Recv() (*File, error)
	grpc.ServerStream
}

type uploadFileUploadServer struct {
	grpc.ServerStream
}

func (x *uploadFileUploadServer) SendAndClose(m *Result) error {
	return x.ServerStream.SendMsg(m)
}

func (x *uploadFileUploadServer) Recv() (*File, error) {
	m := new(File)
	if err := x.ServerStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

あとは main.go から Upload を呼ぶだけだが、呼び方が直接呼ぶわけではなくて以下のように呼ぶ。

package main

import (
	"net"
	"log"

	pb "github.com/k1350/server/api/upload"
	up "github.com/k1350/server/internal/upload"
	"google.golang.org/grpc"
)

const (
	port = ":50051"
)

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterUploadFileServer(s, &up.Server{})
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

grpc.NewServer で新規サーバーを作り、そこに今回作ったサーバーを登録するみたいな感じっぽい。なかなか急に理解するのが難しいのでサンプル丸写しになった。

クライアントサイド

見るべきなのはこちらも upload_grpc.pb.go であるが、クライアントサイドは既に実装済みの物をうまく使う感じになる。
先に結果を出すがクライアントサイドのファイル分割アップロード部は以下のようになった。

package upload

import (
	"context"
	"log"
	"os"
	"time"
	"io"

	pb "github.com/k1350/client/api/upload"
)

const (
	// 読み取りバッファーサイズ
	buffersize = 256
)

func RunUpload(client pb.UploadFileClient, filepath string) {
	file, err := os.Open(filepath)
	defer file.Close()
	if err != nil {
		log.Fatalf("%v", err);
	}
	buf := make([]byte, buffersize)

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	stream, err := client.Upload(ctx)
	if err != nil {
		log.Fatalf("%v.Upload(_) = _, %v", client, err)
	}

	for {
		_, err := file.Read(buf)
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%v.Send(%v) = %v", stream, buf, err)
		}
		stream.Send(&pb.File{Data: buf})
	}

	res, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
	}
	log.Printf("Result: %v", res)
}

client.Upload() とか stream.Send() とか stream.CloseAndRecv() とかが upload_grpc.pb.go に実装されているので使っていく。
upload_grpc.pb.go 内の実装は以下のようになっている。

func (c *uploadFileClient) Upload(ctx context.Context, opts ...grpc.CallOption) (UploadFile_UploadClient, error) {
	stream, err := c.cc.NewStream(ctx, &UploadFile_ServiceDesc.Streams[0], "/upload.UploadFile/Upload", opts...)
	if err != nil {
		return nil, err
	}
	x := &uploadFileUploadClient{stream}
	return x, nil
}

type UploadFile_UploadClient interface {
	Send(*File) error
	CloseAndRecv() (*Result, error)
	grpc.ClientStream
}

type uploadFileUploadClient struct {
	grpc.ClientStream
}

func (x *uploadFileUploadClient) Send(m *File) error {
	return x.ClientStream.SendMsg(m)
}

func (x *uploadFileUploadClient) CloseAndRecv() (*Result, error) {
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	m := new(Result)
	if err := x.ClientStream.RecvMsg(m); err != nil {
		return nil, err
	}
	return m, nil
}

そして実装した RunUpload を main.go から呼び出すが、こっちも実装済みの物を使っていく感じになる。やはり初見ではよくわからずサンプルのコピペになっているが以下のようになった。

package main

import (
	"log"

	pb "github.com/k1350/client/api/upload"
	up "github.com/k1350/client/internal/upload"
	"google.golang.org/grpc"
)

const (
	address = "server:50051"
)

func main() {
	var opts []grpc.DialOption
	opts = append(opts, grpc.WithInsecure())
	opts = append(opts, grpc.WithBlock())
	conn, err := grpc.Dial(address, opts...)
	if err != nil {
		log.Fatalf("fail to dial: %v", err)
	}
	defer conn.Close()
	client := pb.NewUploadFileClient(conn)

    // sample.jpg をアップロードする
	up.RunUpload(client, "./sample.jpg")
}

起動

完成品 を clone して README 通りに動かせば多分動く。

感想

proto ファイルをコンパイルしたことで自動生成されるコードを見ないと「UploadFile_UploadServer って何?」となったりするので、やはりサンプルを丸ごと写すのではなく自分で書くのが大事だと思った。

あとファイル分割アップロードというのも実装したことがなくて下記参考にした。(記事の中身は今回やりたいことそのものだが、少し古い記事なので丸パクリしても多分うまくいかない。ファイル分割アップロードと、分割受信したのを保存するロジックだけ参考にした。)

今回はここまで。