如何生成可供春季云流消费者使用的消息记录



从生产者的角度来看,我使用Kinesis的PutRecord API来向流生成消息。从消费者的角度来看,我正在使用spring-cloud-stream-binder集成驱动器。

我已经修改了从org.springframework.integration.aws.outbound.KinesisMessageHandler构建PutRecordRequest对象的代码

PutRecordRequest buildPutRecordRequest(Message<?> message) {
MessageHeaders messageHeaders = message.getHeaders();
Object payload = message.getPayload();
ByteBuffer data = null;
Message<?> messageToEmbed = null;
logger.info("messageHeaders = {}", messageHeaders);
logger.info("payload = {}", payload);
if (payload instanceof ByteBuffer) {
data = (ByteBuffer) payload;
if (this.embeddedHeadersMapper != null) {
messageToEmbed = new MutableMessage<>(data.array(), messageHeaders);
}
} else {
byte[] bytes =
(byte[]) (payload instanceof byte[]
? payload
: this.messageConverter.fromMessage(message, byte[].class));
Assert.notNull(bytes, "payload cannot be null");
if (this.embeddedHeadersMapper != null) {
messageToEmbed = new MutableMessage<>(bytes, messageHeaders);
} else {
data = ByteBuffer.wrap(bytes);
}
}
if (messageToEmbed != null) {
try {
byte[] bytes = this.embeddedHeadersMapper.fromMessage(messageToEmbed);
Assert.notNull(bytes, "payload cannot be null");
data = ByteBuffer.wrap(bytes);
} catch (Exception ex) {
throw new MessageConversionException(message, "Cannot embedded headers to payload", ex);
}
}
byte[] bytes = (byte[]) this.messageConverter.fromMessage(message, byte[].class);
data = ByteBuffer.wrap(bytes);

PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setData(data);

String explicitHashKey = explicitHashKeyExtractor(event);
putRecordRequest.setPartitionKey(partitionKey);
putRecordRequest.setExplicitHashKey(explicitHashKey);
putRecordRequest.setStreamName(streamName);
return putRecordRequest;
}
void push(Event event) {
Message<Event> genericMessage = MessageBuilder.withPayload(event).setHeader(AwsHeaders.STREAM, streamName).build();
PutRecordResult putRecordResult = kinesisClient.putRecord(buildPutRecordRequest(genericMessage));
}

从消费者那里,我收到以下错误:

2022-09-12 16:01:38.266  INFO 816470 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : Got an exception during sending a 'GenericMessage [payload=byte[319], headers={skip-input-type-conversion=false, aws_shard=shardId-000000000000, id=d5e996ce-af62-c8b3-850c-3f1ee21c23b6, sourceData={SequenceNumber: 49632961872623851461858864802247332802270083416600870914,ApproximateArrivalTimestamp: Mon Sep 12 16:01:36 IST 2022,Data: java.nio.HeapByteBuffer[pos=0 lim=319 cap=319],PartitionKey: 673354558,}, contentType=application/json, aws_receivedPartitionKey=673354558, aws_receivedStream=test_stream, aws_receivedSequenceNumber=49632961872623851461858864802247332802270083416600870914, timestamp=1662978698237}]'
for the '{SequenceNumber: 49632961872623851461858864802247332802270083416600870914,ApproximateArrivalTimestamp: Mon Sep 12 16:01:36 IST 2022,Data: java.nio.HeapByteBuffer[pos=0 lim=319 cap=319],PartitionKey: 673354558,}'.
Consider to use 'errorChannel' flow for the compensation logic.
Caused by: java.lang.ClassCastException: [B cannot be cast to demo.stream.Event
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeConsumer(SimpleFunctionRegistry.java:784) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.doApply(SimpleFunctionRegistry.java:589) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:435) ~[spring-cloud-function-context-3.1.2.jar!/:3.1.2]

我应该如何修改我的生产者代码,以便我的消费者能够正确地使用消息负载?


更新

使用EmbeddedHeaderUtils和设置为embeddedHeaders的配置consumer.headerMode对我很有效。以下是创建PutRecordRequest的生产者代码。

Map<String, Object> map = new HashMap<>();
map.put(AwsHeaders.STREAM, streamName); // sample header
byte[] eventAsBytes = SerializationUtils.serialize(event);
MessageValues messageValues = new MessageValues(eventAsBytes, map);
byte[] bytes = EmbeddedHeaderUtils.embedHeaders(messageValues, AwsHeaders.STREAM);
ByteBuffer data = ByteBuffer.wrap(bytes);
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setData(data);

消费者的功能签名是

@Bean
public Consumer<Message<byte[]>> processOrder(OrderRepository orders) {
return message -> {
byte[] bytes = (byte[]) message.getPayload();
Event event = (Event) SerializationUtils.deserialize(bytes);
// do something
};
}

Spring Cloud Stream AWS Kinesis Binder默认在HeaderMode.embeddedHeaders中工作。只是因为AWS Kinesis本身没有记录头的概念。

因此,当您手动将记录发送到Kinesis流时,请确保您遵循Spring Cloud stream-org.springframework.cloud.stream.binder.EmbeddedHeaderUtils中使用的嵌入式头算法。

或者考虑为相应的Spring Cloud Stream使用者将其配置为none:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties

最新更新