_failedResult = new Message<string, string>(Consts.Topic, Consts.Partition, Consts.Offset, Consts.Key, It.IsAny<string>(), default(Timestamp), new Error(ErrorCode.Local_MsgTimedOut, MsgTimeoutReason));
我在升级到最新的 Kafka 时遇到问题,因为我的单元测试使用的是旧消息格式。我似乎找不到列出如何正确格式化消息的文档。任何帮助将不胜感激。
谢谢
您可以在confluent-kafka-dotnet
中看到当前稳定v1.3.0
的消息格式:
https://github.com/confluentinc/confluent-kafka-dotnet/blob/v1.3.0/src/Confluent.Kafka/Message.cs
您可以通过以下方式生产:
var message = new SomeDto();
var dr = await _producer.ProduceAsync(
"topic-name",
new Message<Null, SomeDto>
{
Value = message
},
cancellationToken);
消息中包含的数据,如Topic
、Partition
不再是消息的一部分(它们是生产者配置或生产方法的一部分(