kafka Connect FileStreamSink连接器删除了引号,并更改结肠以等于JSON消息的符号



摘要

当我用控制台生产商流式传输

{"id":1337,"status":"example_topic_1 success"}

我从我的FileStream消费者那里得到了此信息

/data/example_topic_1.txt

{id=1337, status=example_topic_1 success}

这对我来说是一个主要问题,因为如果不对引号的假设做出假设,就无法恢复原始的JSON消息。在保留报价标记的同时,如何将消息输出到文件?

详细信息

  1. 首先,我启动文件接收器连接器。
    # sh bin/connect-standalone.sh 
    >   config/worker.properties 
    >   config/connect-file-sink-example_topic_1.properties
    
  2. 第二,我启动了控制台消费者(也内置到kafka(,这样我就可以轻松的视觉确认消息可以正确通过。
    # sh bin/kafka-console-consumer.sh 
    >   --bootstrap-server kafka_broker:9092 
    >   --topic example_topic_1
    
  3. 最后,我启动了一个用于发送消息的控制台生产商,然后输入消息。

    # sh bin/kafka-console-producer.sh 
    >   --broker-list kafka_broker:9092 
    >   --topic example_topic_1
    

    从控制台消费者那里,消息正确弹出,引号。

    {"id":1337,"status":"example_topic_1 success"}
    

    但我从我的FileStreamSink消费者那里得到了:

    /data/example_topic_1.txt

    {id=1337, status=example_topic_1 success}
    

我的配置

config/worker.properties

offset.storage.file.filename=/tmp/example.offsets
bootstrap.servers=kafka_broker:9092
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

config/connect-file-sink-example_topic_1.properties

name=file-sink-example_topic_1
connector.class=FileStreamSink
tasks.max=1
file=/data/example_topic_1.txt
topics=example_topic_1

,因为您实际上不想解析JSON数据,而只是将其直接传递为一块文本,您需要使用StringConverter:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

本文详细介绍了转换器的细微差别:https://rmoff.net/2019/05/08/when-a-a-kafka-connect-connect-converter-converter-is-is-inot-a-a-a-a-pher-converter>/。尽管使用kafkacat代替了控制台生产商/消费者,但这说明了您要做什么的示例。

相关内容

  • 没有找到相关文章

最新更新