您如何从进入AWS Hive表中过滤多行JSON数据



我有一个AWS IoT规则,该规则将传入的JSON发送到Kinesis Firehose。

我物联网发布的JSON数据全部在一行 - ex:

{"count":4950, "dateTime8601": "2017-03-09T17:15:28.314Z"}

管理UI中的IoT"测试"部分允许您发布消息,默认为以下(注意格式化的多行JSON):

{
  "message": "Hello from AWS IoT console"
}

我将消防式流式传输到S3,然后将EMR转换为柱状格式,以最终被雅典娜使用。

问题是,在转换为柱状格式期间,Hive(特别是JSON SERDE)无法处理跨越一条线的JSON对象。它将炸毁转换,而不会转换好单行,JSON记录。

我的问题是

  • 您如何设置FireHose忽略多行JSON?
  • 如果不可能,您如何告诉Hive在加载到表格或至少捕获异常之前删除新线并尝试继续?

在定义蜂巢表时,我已经试图忽略畸形的JSON:

DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
 count int,     
 dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
 'ignore.malformed.json' = 'true',
 "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://...';

这是我进行转换的完整HQL:

--Example of converting to OEX/columnar formats
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
    count int,
    dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT  serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
 'ignore.malformed.json' = 'true',
 "timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://bucket.me.com/raw/all-sites/';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='15') location 's3://bucket.me.com/raw/all-sites/2017/03/09/15';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='16') location 's3://bucket.me.com/raw/all-sites/2017/03/09/16';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='17') location 's3://bucket.me.com/raw/all-sites/2017/03/09/17';
DROP TABLE to_orc;
CREATE EXTERNAL TABLE to_orc (
      count int,
      dateTime8601 timestamp
)
STORED AS ORC
LOCATION 's3://bucket.me.com/orc'
TBLPROPERTIES ("orc.compress"="ZLIB");
INSERT OVERWRITE TABLE to_orc SELECT count,dateTime8601 FROM site_sensor_data_raw where year=2017 AND month=03 AND day=09 AND hour=15;

好吧,默认的JSON SERDE在EMR上使用,雅典娜无法在多行JSON记录上使用。每个JSON记录都应在一行。

在多线JSON上,我看到了Hive/Hadoop甚至Presto(在Athean)Perspective

的两个问题
  • 给定文件,显然是Hive/Hadoop和JSON Serde将无法识别JSON记录的结束和开始以返回其对象表示。
  • 给定多个文件,多行JSON文件不能像普通/N界定的JSON文件那样分配。

要从EMR/Athena End解决此问题,您需要根据数据结构和捕获异常等编写自己的自定义Serde。

您如何设置FireHose忽略多行JSON?

Firehose没有能力忽略特定格式。它将使用其API(PutRecord或PutRecorDBatch)作为数据斑点,并将其发送到目的地。

http://docs.aws.amazon.com/firehose/latest/apireference/api_putrecordbatch.html

无论如何,AWS FireHose提供使用AWS lambda 的数据转换,您可以在其中使用lambda函数在Firehose上转换数据输入数据,并将转换的数据放在目的地。因此,您可能会使用该功能来识别并在手动之前识别多行JSON。如果记录未正确格式化,您也可能会丢弃它们。您将需要探讨IoT如何将多行JSON数据发送到FireHose(例如Line等)以编写您自己的功能。

https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/

如果不可能,您如何告诉Hive之前删除Newlines 加载到桌子或至少捕获例外并尝试继续吗?

如果您的Firehose目的地仍然有多行JSON,则由于您的ETL中有EMR,因此可以使用其Compute而不是Lambda将JSON弄平。Spark上的此功能也可以帮助您实现这一目标。https://issues.apache.org/jira/browse/spark-18352

然后,您可以摄入此数据以创建塔列格式,以供雅典娜(Athena)处理。

最新更新