如何在蜂巢表中快速插入KAFKA主题中的数据



我有一个kafka主题,我在其中收到了大约500k的活动。

目前,我需要将这些事件插入蜂巢表中。由于事件是时间驱动的,因此我决定使用以下策略:

1(定义HDFS内的路由,我称之为用户。在此路线内部,将有几个镶木式文件,每个文件都与某个日期相对应。例如:20180412,20180413,20180414等(格式yyyymmdd(。2(创建一个蜂巢表,并使用格式yyyymmdd的日期作为分区。这个想法是将用户HDFS目录中的每个文件用作表的分区,只需通过命令添加相应的镶链文件:

ALTER TABLE users DROP IF EXISTS PARTITION 
(fecha='20180412') ;
ALTER TABLE users ADD PARTITION
(fecha='20180412') LOCATION '/users/20180412';

3(从最早的事件中迭代读取Kafka主题的数据,获取事件中的日期值(在参数 dateclient (中,鉴于该日期值,将值插入相应的Parque文件。4(为了完成点3,我阅读了每个事件并将其保存在一个临时的HDFS文件中,我从中使用Spark读取文件。之后,我使用Spark将临时文件内容转换为数据框架。5(使用SPARK,我设法将数据框架值插入镶木件文件。

代码遵循此方法:

val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
val spark = SparkSession
    .builder()
    .master("yarn")
    .appName("ParaTiUserXY")
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "UserXYZ2")
//Schema para transformar los valores del topico de Kafka a JSON
val my_schema = new StructType()
    .add("longitudCliente", StringType)
    .add("latitudCliente", StringType)
    .add("dni", StringType)
    .add("alias", StringType)
    .add("segmentoCliente", StringType)
    .add("timestampCliente", StringType)
    .add("dateCliente", StringType)
    .add("timeCliente", StringType)
    .add("tokenCliente", StringType)
    .add("telefonoCliente", StringType)
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )
val fs = {
    val conf = new Configuration()
    FileSystem.get(conf)
}
val temp_path:Path = new Path("hdfs:///tmp/tmpstgtopics")
    if( fs.exists(temp_path)){
        fs.delete(temp_path, true)
}
while(true)
{
    val records=consumer.poll(100)
    for (record<-records.asScala){
        val data = record.value.toString
        val dataos: FSDataOutputStream = fs.create(temp_path)
        val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))
        bw.append(data)
        bw.close
        val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/tmpstgtopics")
        val fechaCliente = data_schema.select("dateCliente").first.getString(0)
        if( fechaCliente < date){
            data_schema.select("longitudCliente", "latitudCliente","dni", "alias", 
            "segmentoCliente", "timestampCliente", "dateCliente", "timeCliente", 
            "tokenCliente", "telefonoCliente").coalesce(1).write.mode(SaveMode.Append)
            .parquet("/desa/landing/parati/xyusers/" + fechaCliente)
          }
          else{
              break
          }
        }
    }
  consumer.close()

但是,此方法花费大约1秒钟来处理我的群集中的每个记录。到目前为止,这意味着我将花费大约6天才能处理所有事件。

这是将整个事件插入Kafka主题中的最佳方法吗?

还有哪些其他替代方案,或者我还能对我的代码进行哪些升级?

除了您无法正确使用Spark流到Kafka的民意调查之外(您写了一段段循环的香草scala kafka消费者(,而 coalesce(1)始终是瓶颈迫使一位执行人收集记录,我只是说您真的在这里重新发明了方向盘。

存在哪些其他替代品

我知道并且都是开源的

  • gobblin(替换Camus(由LinkedIn
  • kafka连接w/hdfs接收器连接器(内置在汇合平台中,但也从github上的源构建(
  • 流菜
  • apache nifi
  • pinterest的secor

从列出的那些人中,拥有JSON或AVRO编码的Kafka消息对您来说是有益的,而不是平坦的字符串。这样,您可以将文件按原样放入Hive Serde中,而不会在消耗它们时解析它们。如果您无法编辑生产者代码,请进行单独的Kafka流式作业,将其解析,然后将其写入AVRO或JSON的新主题。

如果您选择AVRO(您真的应该为Hive支持(,则可以使用Contruent模式注册表。或者,如果您正在运行HortonWorks,则它们提供了类似的注册表。

Avro上的Hive比文本或JSON的运行要好得多。AVRO可以轻松地转换为镶木木材,我相信上述每个选项至少提供了木木支持,而其他选项也可以做到兽人(目前Kafka Connect目前不使用兽人(。

上面的每一个还基于Kafka记录时间支持某些自动蜂巢分区的生成。

您可以通过增加Kafka主题的分区,并拥有一个或多个消费者群体,其中有多个消费者在每个分区中消费一对一。

作为,板球_007提到您可以使用一个OpenSource框架之一,也可以让更多的消费者组消耗同一主题来卸载数据。

最新更新