我需要关于一个我想以parquet格式放入HDF的KAFKA主题(带有每日分区者)的帮助。
我在Kafka主题中有很多数据,这些数据基本上是这样的JSON数据:
{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}
这个主题的名称是:test
我想以镶木格式将这些数据放入我的HDFS群集中。但是我在水槽连接器配置方面挣扎。我为此使用汇合的HDFS-sink-connector。
这是我到目前为止设法做的:
{
"name": "hdfs-sink",
"config": {
"name": "hdfs-sink",
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max": "1",
"topics": "test",
"hdfs.url": "hdfs://hdfs-IP:8020",
"hadoop.home": "/user/test-user/TEST",
"flush.size": "3",
"locale": "fr-fr",
"timezone": "UTC",
"format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
"consumer.auto.offset.reset": "earliest",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
关于为什么我这样配置这样的连接器的一些解释:
- 我每天都有很多填充我的话题的数据
- 最终目标是在我的HDF中每天有一个木板文件
我知道,也许我必须使用模式注册将数据格式化为Parquet,但我不知道该怎么做。这是必要的吗?
您能帮我吗?
谢谢
我没有亲自使用ParquetFormat
,但是您的数据必须具有模式,这意味着以下一个
- 您的数据是使用Contruent Avro Serializer 生成的
- 您的数据是作为Protobuf生成的,您将Protobuf转换器添加到您的连接工人
- 您使用Kafka Connect的特殊JSON格式,该格式在您的记录中包含一个模式。
基本上,它不能是"普通的JSON"。IE。您目前有"value.converter.schemas.enable": "true"
,我猜您的连接器不起作用,因为您的记录不采用上述格式。
基本上,如果没有模式,JSON解析器就无法知道镶木木需要编写的"列"。
和每日分区者每天不会创建一个文件,而只能创建一个目录。您将获得每个flush.size
的文件,并且还具有用于浮式文件的计划旋转间隔的配置。此外,每个kafka分区将有一个文件。
另外,"consumer.auto.offset.reset": "earliest",
仅在connect-distribtued.properties
文件中起作用,而不是在afaik的每个连接基础上。
由于我没有亲自使用ParquetFormat
,所以这就是我可以提供的所有建议,但是我已经使用NIFI等其他工具来实现类似的目标,这将使您无法更改现有的KAFKA生产者代码。
另外,请改用JSONFormat
,但是,Hive集成将不起作用,并且必须预定表(这将需要您对主题具有架构)。
和另一个选项只是配置Hive以直接从Kafka读取