C语言 如何使用 Thrift 而不是 RPC 进行消息传递



我知道Thrift主要针对成熟的客户端 - 服务器RPC,但从高级体系结构来看,在我看来,它也应该完全可用于双向消息传递。

我想在两端(C,.NET Core(上构建的内容如下:

  • 接收方法:引用套接字,读取完整消息,反序列化,返回
  • 发送方法:引用套接字,序列化给定的消息,将其发送到线路。

我不需要线程服务器,...任何花哨的东西。从本质上讲,我希望在Protobuffs提供的内容之上获得的是开箱即用的处理,在接收端缓冲整个消息,通常是消息帧。

问题是我找不到任何关于如何使用当前库(我个人对 .NET Core 和 C 库感兴趣(API 开始构建它的文档。我唯一发现的是这个问题,但它并没有真正指向任何资源。

做一些非常相似的事情的注意事项:

  • 同时使用 C#(.Net Core 和 Framework 的混合(和C++的客户端
  • 使用 Thrift RPC 以及"纯消息"进行发布/订阅和常规序列化

将所有消息放在顶级联合中的建议很好,因为它可以更轻松地反序列化消息。

鉴于以下节俭:

struct SubscribeRequest {
1: string topic,
2: string appid,
}
struct SubscribeReply {
1: bool success,
2: string topic,
}
service HttpService {
HttpSDKDataTypes.SubscribeReply Subscribe(1: HttpSDKDataTypes.SubscribeRequest message),
}

thrift -gen netcore为您提供:

public async Task<Ruyi.SDK.Http.SubscribeReply> SubscribeAsync(Ruyi.SDK.Http.SubscribeRequest message, CancellationToken cancellationToken)
{
await OutputProtocol.WriteMessageBeginAsync(new TMessage("Subscribe", TMessageType.Call, SeqId), cancellationToken);

消息标识符包含在 RPC 调用中。 如果不使用 RPC 调用,则会得到"原始"结构,其中没有指示如何反序列化它们。

将它们放在union中:

union UnionExample {
1: SubscribeRequest request,
2: SubscribeReply reply,    
}

为您处理:

public async Task WriteAsync(TProtocol oprot, CancellationToken cancellationToken)
{
oprot.IncrementRecursionDepth();
try
{
var struc = new TStruct("SubscribeRequest");
await oprot.WriteStructBeginAsync(struc, cancellationToken);
var field = new TField();
if (Topic != null && __isset.topic)
{
field.Name = "topic";
field.Type = TType.String;
field.ID = 1;

反序列化后,可以使用以下命令处理它们:

public void handler(UnionExample example)
{
if (example.__isset.request)
{
SubscribeRequest msg = example.request;
// ...
}
else if (example.__isset.reply)
{
SubscribeReply msg = example.reply;
// ...
}

检查生成器中可用的选项。thrift -gen "csharp:union,async"让我们使用模式匹配:

public void handler(UnionExample example)
{
switch (example.Data)
{
case SubscribeRequest msg:
//...
case SubscribeReply msg:
//...

不幸的是,netcore生成器在 0.11.0 中没有这样做。

节俭的 github 存储库具有序列化到内存(而不是使用 RPC(的示例。 一般来说,它类似于:

Stream stm = new MemoryStream();
TTransport trans = new TStreamTransport(null, stm);
TProtocol prot = new TJSONProtocol(trans);

如果您要创建大量MemoryStream实例,请查看Microsoft.IO.RecycableMemoryStream.

使用自定义协议/传输将简化发送消息的过程,因为它将处理样板(并避免首先序列化到内存的额外对象,然后无论您使用它做什么(。 已经提到了节俭contrib/文件夹。 这是我们在 ZeroMQ 上使用 Thrift 的 C# 示例。

最后一点。 如果要使用 RPC 功能,请使用单个结构参数编写服务方法。 意义:

service HttpService {
// Do this
string Subscribe(1: SubscribeRequest message),
// Not this
string Subscribe(1: string topic, 2: string appid,),
}

这将使摆脱 RPC 和/或重用消息变得更加容易。

Thrift 是一个 RPC 和序列化框架。这意味着,您也可以只使用序列化部分而不使用 RPC。

与消息传递系统结合使用时,要走的路通常(大致(如下:

  • 将消息序列化到缓冲区中
  • 通过您想要的任何方式发送该缓冲区
  • 接收端反序列化缓冲区并处理数据

如果计划通过同一通道发送不同类型的邮件,最好使用包含所有可能的邮件正文的union信封结构:

struct MessageOne {
// contents of this message type
}
struct MessageTwo {
// contents of this message type
}
struct MessageThree {
// contents of this message type
}
union MyMessageEnvelope {
1: MessageOne   one
2: MessageTwo   two
3: MessageThree  three
// easily extendable 
}

为了使它更优雅/可重用,还可以实现自定义传输以满足需求并进一步封装逻辑。Thrift的模块化结构使其变得容易(您链接的帖子也提到了这一点(。源代码树的/contrib文件夹中有一些示例可以作为起点。

如果您完全不知道从哪里开始:查看教程,然后查看测试套件程序,两者都非常适合作为节俭初学者的学习资源。

最新更新