我有一个AWS IoT规则,该规则将传入的JSON发送到Kinesis Firehose。
我物联网发布的JSON数据全部在一行 - ex:
{"count":4950, "dateTime8601": "2017-03-09T17:15:28.314Z"}
管理UI中的IoT"测试"部分允许您发布消息,默认为以下(注意格式化的多行JSON):
{
"message": "Hello from AWS IoT console"
}
我将消防式流式传输到S3,然后将EMR转换为柱状格式,以最终被雅典娜使用。
问题是,在转换为柱状格式期间,Hive(特别是JSON SERDE)无法处理跨越一条线的JSON对象。它将炸毁转换,而不会转换好单行,JSON记录。
我的问题是:
- 您如何设置FireHose忽略多行JSON?
- 如果不可能,您如何告诉Hive在加载到表格或至少捕获异常之前删除新线并尝试继续?
在定义蜂巢表时,我已经试图忽略畸形的JSON:
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
count int,
dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
'ignore.malformed.json' = 'true',
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://...';
这是我进行转换的完整HQL:
--Example of converting to OEX/columnar formats
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
count int,
dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
'ignore.malformed.json' = 'true',
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://bucket.me.com/raw/all-sites/';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='15') location 's3://bucket.me.com/raw/all-sites/2017/03/09/15';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='16') location 's3://bucket.me.com/raw/all-sites/2017/03/09/16';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='17') location 's3://bucket.me.com/raw/all-sites/2017/03/09/17';
DROP TABLE to_orc;
CREATE EXTERNAL TABLE to_orc (
count int,
dateTime8601 timestamp
)
STORED AS ORC
LOCATION 's3://bucket.me.com/orc'
TBLPROPERTIES ("orc.compress"="ZLIB");
INSERT OVERWRITE TABLE to_orc SELECT count,dateTime8601 FROM site_sensor_data_raw where year=2017 AND month=03 AND day=09 AND hour=15;
好吧,默认的JSON SERDE在EMR上使用,雅典娜无法在多行JSON记录上使用。每个JSON记录都应在一行。
在多线JSON上,我看到了Hive/Hadoop甚至Presto(在Athean)Perspective
的两个问题- 给定文件,显然是Hive/Hadoop和JSON Serde将无法识别JSON记录的结束和开始以返回其对象表示。
- 给定多个文件,多行JSON文件不能像普通/N界定的JSON文件那样分配。
要从EMR/Athena End解决此问题,您需要根据数据结构和捕获异常等编写自己的自定义Serde。
您如何设置FireHose忽略多行JSON?
Firehose没有能力忽略特定格式。它将使用其API(PutRecord或PutRecorDBatch)作为数据斑点,并将其发送到目的地。
http://docs.aws.amazon.com/firehose/latest/apireference/api_putrecordbatch.html
无论如何,AWS FireHose提供使用AWS lambda 的数据转换,您可以在其中使用lambda函数在Firehose上转换数据输入数据,并将转换的数据放在目的地。因此,您可能会使用该功能来识别并在手动之前识别多行JSON。如果记录未正确格式化,您也可能会丢弃它们。您将需要探讨IoT如何将多行JSON数据发送到FireHose(例如Line等)以编写您自己的功能。
https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/
如果不可能,您如何告诉Hive之前删除Newlines 加载到桌子或至少捕获例外并尝试继续吗?
如果您的Firehose目的地仍然有多行JSON,则由于您的ETL中有EMR,因此可以使用其Compute而不是Lambda将JSON弄平。Spark上的此功能也可以帮助您实现这一目标。https://issues.apache.org/jira/browse/spark-18352
然后,您可以摄入此数据以创建塔列格式,以供雅典娜(Athena)处理。