用例
我正在阅读一个已经创建的Kafka主题,其中一个单独的集群正在生成一些键和值。我的最终目标是以JSON格式编写HDFS,为此我已经用Kafka HDFSSink 5.3进行了一段时间的实验。我面临的问题是,我无法吸收并将主题中的所有记录写入HDFS。到目前为止,如果我的主题包含数百万条记录的每小时数据,我只能根据10万条记录来写。
以下是我用于kafka connect standalone.properties和我的HDFS快速启动HDFS.properties的配置
kafka-connect-standalone.properties
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
schema.enable=false
offset.flush.interval.ms=10000
group.id=x-hdfs-consumer-group
consumer.session.timeout.ms=10000
consumer.heartbeat.interval.ms=3000
consumer.request.timeout.ms=1810000
consumer.max.poll.interval.ms=1800000
quickstart-hdfs.properties
name=hdfs-sink-mapr
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=10
topics=topic_name
hdfs.url=maprfs:///hive/x/poc_kafka_connect/
flush.size=20000
errors.tolerance=all
format.class=io.confluent.connect.hdfs.json.JsonFormat
partitioner.class=io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner
timestamp.extractor=RecordField
timestamp.field=timestamp
partition.duration.ms=3600000
path.format='year'=YYYY/'month'=MM/'day'=dd/'hour'=HH
locale=en
timezone=UTC
如果我不使用错误。容差=所有属性,那么我只生成大约500条记录。
就工人日志而言,我没有收到任何错误,所以我不确定我遗漏了什么。
由于我对卡夫卡连接器还比较陌生,并且已经尝试了一段时间,如果有人能对我做错了什么提供一些见解,我将不胜感激。
跟进问题
卡夫卡连接器也在2天内死亡。也就是说,它可以正常工作近2天,但一段时间后,它停止读取数据,也没有产生任何结果。我在独立模式下运行它,这可能是原因吗?我试着描述消费者群体,似乎所有的消费者都死了。
kafka/kafka_2.12-2.3.0/bin/kafka-consumer-groups.sh --bootstrap-server <server>:9092 --describe --group connect-ajay-hdfs-sink-mapr
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
connect-ajay-hdfs-sink-mapr topic_name 21 1186755480 1187487551 732071 - - -
connect-ajay-hdfs-sink-mapr topic_name 12 957021804 957736810 715006 - - -
connect-ajay-hdfs-sink-mapr topic_name 17 957031965 957746941 714976 - - -
connect-ajay-hdfs-sink-mapr topic_name 24 957496491 958212413 715922 - - -
connect-ajay-hdfs-sink-mapr topic_name 0 956991807 957716202 724395 - - -
connect-ajay-hdfs-sink-mapr topic_name 28 956940273 957668689 728416 - - -
connect-ajay-hdfs-sink-mapr topic_name 5 957182822 957899308 716486 - - -
connect-ajay-hdfs-sink-mapr topic_name 3 956974180 957695189 721009 - - -
connect-ajay-hdfs-sink-mapr topic_name 19 956878365 957590196 711831 - - -
connect-ajay-hdfs-sink-mapr topic_name 2 956968023 957685835 717812 - - -
connect-ajay-hdfs-sink-mapr topic_name 16 957010175 957726139 715964 - - -
connect-ajay-hdfs-sink-mapr topic_name 7 956900190 957624746 724556 - - -
connect-ajay-hdfs-sink-mapr topic_name 8 957020325 957739604 719279 - - -
connect-ajay-hdfs-sink-mapr topic_name 22 957064283 957788487 724204 - - -
connect-ajay-hdfs-sink-mapr topic_name 29 957026931 957744496 717565 - - -
connect-ajay-hdfs-sink-mapr topic_name 13 957400623 958129555 728932 - - -
connect-ajay-hdfs-sink-mapr topic_name 6 956892063 957618485 726422 - - -
connect-ajay-hdfs-sink-mapr topic_name 11 957117685 957841645 723960 - - -
connect-ajay-hdfs-sink-mapr topic_name 1 957003873 957734649 730776 - - -
connect-ajay-hdfs-sink-mapr topic_name 18 957007813 957734011 726198 - - -
connect-ajay-hdfs-sink-mapr topic_name 27 957047658 957766131 718473 - - -
connect-ajay-hdfs-sink-mapr topic_name 10 956975729 957689182 713453 - - -
connect-ajay-hdfs-sink-mapr topic_name 15 957046441 957775251 728810 - - -
connect-ajay-hdfs-sink-mapr topic_name 23 957011972 957727996 716024 - - -
connect-ajay-hdfs-sink-mapr topic_name 14 957151628 957881644 730016 - - -
connect-ajay-hdfs-sink-mapr topic_name 4 957118644 957845399 726755 - - -
connect-ajay-hdfs-sink-mapr topic_name 9 957109152 957838497 729345 - - -
connect-ajay-hdfs-sink-mapr topic_name 25 956923833 957646070 722237 - - -
connect-ajay-hdfs-sink-mapr topic_name 26 957026885 957742112 715227 - - -
connect-ajay-hdfs-sink-mapr topic_name 20 957010071 957733605 723534 - - -
为了将主题中的所有现有记录获取到接收器连接器,请将其添加到辅助属性中,然后重新启动连接
consumer.auto.offset.reset=earliest
如果你已经启动了一个连接器,你需要重置它的使用者组,或者在配置中更改名称来创建一个新的组