shema
//
// service
//
// RecorderService
service RecorderService {
// Upload ストリーミングしつつファイルをアップロードする
// ファイルをアップロードした結果は今は取り扱わないので空のレスポンス
rpc Upload(stream FileUploadRequest) returns (EmptyResponse) {}
}
//
// response
//
// 空レスポンス
// 公式にも定義済みなのでインポートしてそちらを使う方が流儀に沿ってそう
message EmptyResponse {
}
//
// file
//
// ファイルのメタデータ
// HTTP ヘッダーみたいに色々な用途で使えそう
message MetaData {
string name = 1;
string type = 2;
}
// ファイルの本体は bytes (音声でも動画でもテキストでもなんでもござれ)
message File {
bytes content = 1;
}
// ファイルをアップロードするリクエストボディ
message FileUploadRequest {
// oneof を指定すると、「どちらか一方がセットされるの意味」
oneof request {
// ファイルはサイズによって分割してストリーミングできるといいが、
// メタデータは最初に一回送ればそれで済むので oneof でどちらかが飛ぶようになってる
MetaData metaData = 1;
File file = 2;
}
}
server
func (a *RecorderAdapter) Upload(in recorderpb.RecorderService_UploadServer) error {
// ストリーミングの場合, context はリクエストの中に
ctx := in.Context()
grpc_ctxtags.Extract(ctx)
l := ctxlogrus.Extract(ctx)
l.Info("RecorderAdapter.Upload started")
// このメソッドブロック内の変数はここでのみ有効
// リクエスト毎の変数となるため排他的な処理は必要ない(なぜかいるんじゃないかと勘違いした)
var f *os.File
defer func() {
if f == nil {
// file が入る前に err とか飛ぶとここへ。忘れると後続の f.Close が nil 参照例外に
// f?.Close() とかでかけるようにならんか...
return
}
f.Close()
}()
// サーバー側は, クライアントからのストリーミングを待機するような実装になる
// チャットとかだとここが待ちっぱなしになる感じ
for {
resp, err := in.Recv()
if err == io.EOF {
// これはストリーミングの終了を表す
break
}
if err != nil {
return err
}
// メタデータかファイルかが飛んでくるので処理を分ける
mt := resp.GetMetaData()
if mt != nil {
// 今回は簡易なので tmp ディレクトリ下にベタベタにファイル作成
// 本来の処理だと, 「処理中断」 「ユーザー毎のファイル」 などなど色々と考えないとダメそう
f, err = os.Create(filepath.Join("tmp", fmt.Sprintf("%s.%s", mt.Name, mt.Type)))
} else {
if resp.GetFile() == nil {
// メタデータがないのにファイルに情報がないのは明確におかしいのでエラー
// grpc の code で not found とかつけてあげるのがいいかも
return errors.New("upload failed: resp file is nil")
}
v := resp.GetFile().Content
f.Write(v)
}
}
l.Info("RecorderAdapter.Upload end")
return in.SendAndClose(&utilpb.EmptyResponse{})
}
client
...
...
Future<void> upload(String filePath) async {
// 本来は try ... catch するなり例外処理入れるのがただしそう
final channel = createChannel();
final stub = _createStub(channel);
await stub.upload(_uploadStream(filePath));
await channel.shutdown(); // gRPC Client の Close 処理 go でも毎回切るのが正しいのか判断に悩む
}
// 返り値が若干ややこしい
// yield を使う以外にも Stream.value(T) でも
$async.Stream<$utilpb.FileUploadRequest> _uploadStream(
String filePath,
) async* /* Stream の場合 async* が必要らしい(何これ) */ {
final fileName = filepathToFileName(filePath);
// Dart での文字列操作まだ慣れない ...
final mt = $utilpb.MetaData()
..name = fileName.split('.')[0]
..type = fileName.split('.')[1];
// 一度目の返りでメタデータを送信
yield $utilpb.FileUploadRequest()..metaData = mt;
// 続いてファイルを byte 分割して送信
// ストリーミングと言いつつ通信回数は減らせた方がいいような気がするので
// ここの分割間隔については調整した方がただしそう
final bytes = await File(filePath).readAsBytes();
for (final byte in bytes) {
yield $utilpb.FileUploadRequest()
..file = ($utilpb.File()..content = [byte]);
}
}