使用流式响应api调用的gRPC连接出现Dart问题



在dart中使用gprc进行wokring时,如果第一个rpc调用的响应类型是流式响应,则当流处理程序被调用时,客户端应用程序无法连接到服务器。我在构建包中的helloworld示例时发现了这个问题。

是否有任何方法可以确保建立连接?或者是我做错了什么?

我用await channel.getConnection();试过,但没有什么区别。

grpc版本:3.0.2

  1. helloworld.proto
// Copyright 2015 gRPC authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";
package helloworld;
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
rpc SayHelloStream (HelloRequest) returns (stream HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
  1. server.dart
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Dart implementation of the gRPC helloworld.Greeter server.
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
class GreeterService extends GreeterServiceBase {
@override
Stream<HelloReply> sayHelloStream(
ServiceCall call, HelloRequest request) async* {
for (var i = 0; i < 10; i++) {
yield HelloReply()..message = 'Hello, ${request.name}!';
await Future.delayed(Duration(seconds: 1));
}
}
@override
Future<HelloReply> sayHello(ServiceCall call, HelloRequest request) async {
return HelloReply()..message = 'Hello, ${request.name}!';
}
}
Future<void> main(List<String> args) async {
final server = Server(
[GreeterService()],
const <Interceptor>[],
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
);
await server.serve(port: 50051);
print('Server listening on port ${server.port}...');
}
  1. client.dart
// Copyright (c) 2018, the gRPC project authors. Please see the AUTHORS file
// for details. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Dart implementation of the gRPC helloworld.Greeter client.
import 'package:grpc/grpc.dart';
import 'package:helloworld/src/generated/helloworld.pbgrpc.dart';
Future<void> main(List<String> args) async {
final channel = ClientChannel(
'localhost',
port: 50051,
options: ChannelOptions(
credentials: ChannelCredentials.insecure(),
codecRegistry:
CodecRegistry(codecs: const [GzipCodec(), IdentityCodec()]),
),
);
final stub = GreeterClient(channel);
final name = args.isNotEmpty ? args[0] : 'world';
try {
// works if this call is made first
// final response = await stub.sayHello(HelloRequest()..name = name);
// print('Greeter client received: ${response.message}');
// this has no effect
// await channel.getConnection();
final responseStream = stub.sayHelloStream(
HelloRequest()..name = name,
);

// This doesn't work standalone.
responseStream
.listen((value) => print('Greeter client received: ${value.message}'));
// Works when using await for
// await for (var value in responseStream) {
//   print('Greeter client received: ${value.message}');
// }
} catch (e) {
print('Caught error: $e');
}
await channel.shutdown();
}

预期结果:它应该工作正常,并以1秒的间隔打印Greeter client received: ${value.message}'10次。

实际结果:在运行client.dart时,收到以下错误。

gRPC Error (code: 14, codeName: UNAVAILABLE, message: Error connecting: Connection shutting down., details: null, rawResponse: null, trailers: {})

添加以下行时(如注释所示(没有问题,结果按预期打印1+10次。

// final response = await stub.sayHello(HelloRequest()..name = name);
// print('Greeter client received: ${response.message}');

您应该只在处理完流后关闭通道。在您的情况下,您立即关闭通道,因此gRPC不可能继续更新流,因为您已经关闭了连接。

相关内容

  • 没有找到相关文章

最新更新