我是spark streaming和scala的新手。我有一个Json数据和其他一些随机日志数据来自kafka主题。我能够过滤掉json数据,像这样
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2).filter (x => x.matches("^[{].*" ))
我的json数据是这样的
{"time":"125573","randomcol":"abchdre","address":{"city":"somecity","zip":"123456"}}
我试图解析json数据并将其放入hive表。有人能告诉我正确的方向吗?由于
有多种方法可以做到这一点。
- 创建一个外部hive表,其中包含所需的列并指向此数据位置。
- 创建表时,可以使用默认的JSON服务器,然后使用get_json_object hive函数并将这些原始数据加载到最终表中。有关函数详细信息,请参考
或
你可以尝试avro服务器,并根据你的json消息提到avro模式来创建hive表。请参考此示例以获取其他示例.
希望有所帮助