kafka流媒体标头支持



在我们的应用程序中,生产者正在发送不同的数据类型,并且可能会碰到一个分区可以具有不同的数据类型对象,因为我们不想基于数据类型进行分区。

在Kafka流中,我试图使用标题。生产者正在将标题添加到字节对象,并将数据推向Kafka。

标题是一种特定的数据类型(customObject)。现在基于标题,我想解析Kafka流中接收到的字节对象,但我通过使用ProcessorInterface来限制我必须通过实际的Deserializer

是否有任何方法我不必事先指定供应序列化,然后根据processorContext中的标题以获取收到的记录,我可以对象

public class StreamHeaderProcessor extends AbstractProcessor<String, Bytes>{
    @Override
    public void process(String key, Bytes value) {
        Iterator<Header> it = context().headers().iterator();
        while (it.hasNext()) {
            Header head = it.next();
            if (head.key().equals("dataType")) {
                String headerValue = new String(head.value());
                if (headerValue.equals("X")) {
                } else if(headerValue.equals("Y")) {
                }
            }
        }
    }
}

如果您在 StreamsConfig中的set serdes且不在 builder.stream(..., Consumed.with(/*Serdes*/)) kafka流上设置serdes,则默认情况下将使用 ByteArraySerde,因此,键和值将复制到 byte[]数组中为数据类型。(使用处理器API类似,并且不要在topology.addSource(...)上设置SERDE。)

因此,您可以在数据流上应用ProcessorTransformer,检查标头并在您自己的代码中调用相应的避难所。您需要提前知道所有可能的数据类型。

public class MyProcessor implements Processor {
    // add corresponding deserializers for all expected types (eg, String)
    private StringDeserializer stringDeserializer = new StringDeserializer();
    // other methods omitted
    void process(byte[] key, byte[] value) {
        // inspect header
        if (header.equals("StringType") {
            // get `context` via `init()` method
            String stringValue = stringDeserializer.deserialize(context.topic(), value);
            // similar for `key`
            // apply processing logic for String type
        }
    }
}

相关内容

  • 没有找到相关文章

最新更新