我有一个kafka主题,我在其中收到了大约500k的活动。
目前,我需要将这些事件插入蜂巢表中。由于事件是时间驱动的,因此我决定使用以下策略:
1(定义HDFS内的路由,我称之为用户。在此路线内部,将有几个镶木式文件,每个文件都与某个日期相对应。例如:20180412,20180413,20180414等(格式yyyymmdd(。2(创建一个蜂巢表,并使用格式yyyymmdd的日期作为分区。这个想法是将用户HDFS目录中的每个文件用作表的分区,只需通过命令添加相应的镶链文件:
ALTER TABLE users DROP IF EXISTS PARTITION
(fecha='20180412') ;
ALTER TABLE users ADD PARTITION
(fecha='20180412') LOCATION '/users/20180412';
3(从最早的事件中迭代读取Kafka主题的数据,获取事件中的日期值(在参数 dateclient (中,鉴于该日期值,将值插入相应的Parque文件。4(为了完成点3,我阅读了每个事件并将其保存在一个临时的HDFS文件中,我从中使用Spark读取文件。之后,我使用Spark将临时文件内容转换为数据框架。5(使用SPARK,我设法将数据框架值插入镶木件文件。
代码遵循此方法:
val conf = ConfigFactory.parseResources("properties.conf")
val brokersip = conf.getString("enrichment.brokers.value")
val topics_in = conf.getString("enrichment.topics_in.value")
val spark = SparkSession
.builder()
.master("yarn")
.appName("ParaTiUserXY")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val properties = new Properties
properties.put("key.deserializer", classOf[StringDeserializer])
properties.put("value.deserializer", classOf[StringDeserializer])
properties.put("bootstrap.servers", brokersip)
properties.put("auto.offset.reset", "earliest")
properties.put("group.id", "UserXYZ2")
//Schema para transformar los valores del topico de Kafka a JSON
val my_schema = new StructType()
.add("longitudCliente", StringType)
.add("latitudCliente", StringType)
.add("dni", StringType)
.add("alias", StringType)
.add("segmentoCliente", StringType)
.add("timestampCliente", StringType)
.add("dateCliente", StringType)
.add("timeCliente", StringType)
.add("tokenCliente", StringType)
.add("telefonoCliente", StringType)
val consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe( util.Collections.singletonList("geoevents") )
val fs = {
val conf = new Configuration()
FileSystem.get(conf)
}
val temp_path:Path = new Path("hdfs:///tmp/tmpstgtopics")
if( fs.exists(temp_path)){
fs.delete(temp_path, true)
}
while(true)
{
val records=consumer.poll(100)
for (record<-records.asScala){
val data = record.value.toString
val dataos: FSDataOutputStream = fs.create(temp_path)
val bw: BufferedWriter = new BufferedWriter( new OutputStreamWriter(dataos, "UTF-8"))
bw.append(data)
bw.close
val data_schema = spark.read.schema(my_schema).json("hdfs:///tmp/tmpstgtopics")
val fechaCliente = data_schema.select("dateCliente").first.getString(0)
if( fechaCliente < date){
data_schema.select("longitudCliente", "latitudCliente","dni", "alias",
"segmentoCliente", "timestampCliente", "dateCliente", "timeCliente",
"tokenCliente", "telefonoCliente").coalesce(1).write.mode(SaveMode.Append)
.parquet("/desa/landing/parati/xyusers/" + fechaCliente)
}
else{
break
}
}
}
consumer.close()
但是,此方法花费大约1秒钟来处理我的群集中的每个记录。到目前为止,这意味着我将花费大约6天才能处理所有事件。
这是将整个事件插入Kafka主题中的最佳方法吗?
还有哪些其他替代方案,或者我还能对我的代码进行哪些升级?
除了您无法正确使用Spark流到Kafka的民意调查之外(您写了一段段循环的香草scala kafka消费者(,而 coalesce(1)
始终是瓶颈迫使一位执行人收集记录,我只是说您真的在这里重新发明了方向盘。
存在哪些其他替代品
我知道并且都是开源的
- gobblin(替换Camus(由LinkedIn
- kafka连接w/hdfs接收器连接器(内置在汇合平台中,但也从github上的源构建(
- 流菜
- apache nifi
- pinterest的secor
从列出的那些人中,拥有JSON或AVRO编码的Kafka消息对您来说是有益的,而不是平坦的字符串。这样,您可以将文件按原样放入Hive Serde中,而不会在消耗它们时解析它们。如果您无法编辑生产者代码,请进行单独的Kafka流式作业,将其解析,然后将其写入AVRO或JSON的新主题。
如果您选择AVRO(您真的应该为Hive支持(,则可以使用Contruent模式注册表。或者,如果您正在运行HortonWorks,则它们提供了类似的注册表。
Avro上的Hive比文本或JSON的运行要好得多。AVRO可以轻松地转换为镶木木材,我相信上述每个选项至少提供了木木支持,而其他选项也可以做到兽人(目前Kafka Connect目前不使用兽人(。
上面的每一个还基于Kafka记录时间支持某些自动蜂巢分区的生成。
您可以通过增加Kafka主题的分区,并拥有一个或多个消费者群体,其中有多个消费者在每个分区中消费一对一。
作为,板球_007提到您可以使用一个OpenSource框架之一,也可以让更多的消费者组消耗同一主题来卸载数据。