我的HDFS接收器连接:
{
"name":"hdfs-sink1",
"config":{
"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":"3",
"topics":"mysql-prod-registrations-",
"hadoop.conf.dir":"/usr/hdp/current/hadoop-client/conf",
"hadoop.home":"/usr/hdp/current/hadoop-client",
"hdfs.url":"hdfs://HACluster:8020",
"topics.dir":"/topics",
"logs.dir":"/logs",
"flush.size":"100",
"rotate.interval.ms":"60000",
"format.class":"io.confluent.connect.hdfs.avro.AvroFormat",
"value.converter.schemas.enable": "true",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.TimeBasedPartitioner",
"partition.duration.ms":"1800000",
"path.format":"'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
"locale":"kor",
"timezone":"Asia/Kolkata"
}
}
但是在蜂窝中阅读时,我比时区提前了5:30":"亚洲/加尔各答"。如何获得印度时区的时间戳值?
连接运行了两天,但现在出现了如下错误:
ERROR WorkerSinkTask{id=hdfs-sink1-2} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
java.lang.NullPointerException
at io.confluent.connect.hdfs.HdfsSinkTask.open(HdfsSinkTask.java:133)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-12-14 12:22:39,670] ERROR WorkerSinkTask{id=hdfs-sink1-2} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
Asia/Kolkata
是UTC之前的+05:30
,所以这是有意义的。。。并且timezone
配置只适用于path.format
值,而不适用于Kafka记录的内部值。
我不确定您使用的是哪种工具进行查询,但这可能是一个问题,因为我有一些工具假设数据只以UTC时间写入,然后该工具将"移动"并"显示"格式化的本地时间戳。。。因此,我建议让HDFS接收器连接器实际以UTC时间编写,然后让您的SQL工具和操作系统自己处理实际的TZ转换。