在我们的应用程序中,生产者正在发送不同的数据类型,并且可能会碰到一个分区可以具有不同的数据类型对象,因为我们不想基于数据类型进行分区。
在Kafka流中,我试图使用标题。生产者正在将标题添加到字节对象,并将数据推向Kafka。
标题是一种特定的数据类型(customObject)。现在基于标题,我想解析Kafka流中接收到的字节对象,但我通过使用ProcessorInterface来限制我必须通过实际的Deserializer
是否有任何方法我不必事先指定供应序列化,然后根据processorContext中的标题以获取收到的记录,我可以对象
public class StreamHeaderProcessor extends AbstractProcessor<String, Bytes>{
@Override
public void process(String key, Bytes value) {
Iterator<Header> it = context().headers().iterator();
while (it.hasNext()) {
Header head = it.next();
if (head.key().equals("dataType")) {
String headerValue = new String(head.value());
if (headerValue.equals("X")) {
} else if(headerValue.equals("Y")) {
}
}
}
}
}
如果您在 StreamsConfig
中的set serdes且不在 builder.stream(..., Consumed.with(/*Serdes*/))
kafka流上设置serdes,则默认情况下将使用 ByteArraySerde
,因此,键和值将复制到 byte[]
数组中为数据类型。(使用处理器API类似,并且不要在topology.addSource(...)
上设置SERDE。)
因此,您可以在数据流上应用Processor
或Transformer
,检查标头并在您自己的代码中调用相应的避难所。您需要提前知道所有可能的数据类型。
public class MyProcessor implements Processor {
// add corresponding deserializers for all expected types (eg, String)
private StringDeserializer stringDeserializer = new StringDeserializer();
// other methods omitted
void process(byte[] key, byte[] value) {
// inspect header
if (header.equals("StringType") {
// get `context` via `init()` method
String stringValue = stringDeserializer.deserialize(context.topic(), value);
// similar for `key`
// apply processing logic for String type
}
}
}