KAFKA主题数据使用HDFS接收器连接器配置问题到HDFS Parquet文件



我需要关于一个我想以parquet格式放入HDF的KAFKA主题(带有每日分区者)的帮助。

我在Kafka主题中有很多数据,这些数据基本上是这样的JSON数据:

{"title":"Die Hard","year":1988,"cast":["Bruce Willis","Alan Rickman","Bonnie Bedelia","William Atherton","Paul Gleason","Reginald VelJohnson","Alexander Godunov"],"genres":["Action"]}
{"title":"Toy Story","year":1995,"cast":["Tim Allen","Tom Hanks","(voices)"],"genres":["Animated"]}
{"title":"Jurassic Park","year":1993,"cast":["Sam Neill","Laura Dern","Jeff Goldblum","Richard Attenborough"],"genres":["Adventure"]}
{"title":"The Lord of the Rings: The Fellowship of the Ring","year":2001,"cast":["Elijah Wood","Ian McKellen","Liv Tyler","Sean Astin","Viggo Mortensen","Orlando Bloom","Sean Bean","Hugo Weaving","Ian Holm"],"genres":["Fantasy »]}
{"title":"The Matrix","year":1999,"cast":["Keanu Reeves","Laurence Fishburne","Carrie-Anne Moss","Hugo Weaving","Joe Pantoliano"],"genres":["Science Fiction"]}

这个主题的名称是:test

我想以镶木格式将这些数据放入我的HDFS群集中。但是我在水槽连接器配置方面挣扎。我为此使用汇合的HDFS-sink-connector。

这是我到目前为止设法做的:

{
  "name": "hdfs-sink",
  "config": {
    "name": "hdfs-sink",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test",
    "hdfs.url": "hdfs://hdfs-IP:8020",
    "hadoop.home": "/user/test-user/TEST",
    "flush.size": "3",
    "locale": "fr-fr",
    "timezone": "UTC",
    "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.hdfs.partitioner.DailyPartitioner",
    "consumer.auto.offset.reset": "earliest",
    "value.converter":  "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "true",
    "value.converter.schemas.enable": "true"
    
  }
}

关于为什么我这样配置这样的连接器的一些解释:

  • 我每天都有很多填充我的话题的数据
  • 最终目标是在我的HDF中每天有一个木板文件

我知道,也许我必须使用模式注册将数据格式化为Parquet,但我不知道该怎么做。这是必要的吗?

您能帮我吗?

谢谢

我没有亲自使用ParquetFormat,但是您的数据必须具有模式,这意味着以下一个

之一
  1. 您的数据是使用Contruent Avro Serializer
  2. 生成的
  3. 您的数据是作为Protobuf生成的,您将Protobuf转换器添加到您的连接工人
  4. 您使用Kafka Connect的特殊JSON格式,该格式在您的记录中包含一个模式。

基本上,它不能是"普通的JSON"。IE。您目前有"value.converter.schemas.enable": "true",我猜您的连接器不起作用,因为您的记录不采用上述格式。

基本上,如果没有模式,JSON解析器就无法知道镶木木需要编写的"列"。


和每日分区者每天不会创建一个文件,而只能创建一个目录。您将获得每个flush.size的文件,并且还具有用于浮式文件的计划旋转间隔的配置。此外,每个kafka分区将有一个文件。


另外,"consumer.auto.offset.reset": "earliest",仅在connect-distribtued.properties文件中起作用,而不是在afaik的每个连接基础上。


由于我没有亲自使用ParquetFormat,所以这就是我可以提供的所有建议,但是我已经使用NIFI等其他工具来实现类似的目标,这将使您无法更改现有的KAFKA生产者代码。


另外,请改用JSONFormat,但是,Hive集成将不起作用,并且必须预定表(这将需要您对主题具有架构)。

>

和另一个选项只是配置Hive以直接从Kafka读取

最新更新