我们正在从Redshift迁移到Spark。我有一个Redshift表,我需要导出到S3。这将从S3提供给Apache Spark (EMR)。
我发现只有一种方法可以从红移导出数据。这就是UNLOAD
命令。并且卸载时不能导出类型化数据。它导出csv,这是一个字符串表。基于不同的格式(引号、分隔符等),Spark似乎不能很好地识别它。所以我正在寻找一种方法来卸载它们,并确保它们被spark以正确的类型读取。
是否有办法卸载数据为JSON或其他类型的格式,是可识别的Spark?
最后,我用字符串连接手工构建了JSON,
# UPLOAD AS JSON
UNLOAD ('SELECT CHR(123)||
'"receiver_idfa":"'||nvl(receiver_idfa,'')||'",'||
'"brand":"'||nvl(brand,'')||'",'||
'"total":'||nvl(total,0)||','||
'"screen_dpi":'||nvl(screen_dpi,0)||','||
'"city":"'||nvl(city,'')||'",'||
'"wifi":'||nvl(convert(integer,wifi),0)||','||
'"duration":'||nvl(duration,0)||','||
'"carrier":"'||nvl(carrier,'')||'",'||
'"screen_width":'||nvl(screen_width,0)||','||
'"time":'||nvl("time",0)||','||
'"ts":"'||nvl(ts,'1970-01-01 00:00:00')||'",'||
'"month":'||nvl(month,0)||','||
'"year":'||nvl(year,0)||','||
'"day":'||nvl(day,0)||','||
'"hour":'||nvl(hour,0)||','||
'"minute":'||nvl(minute,0)||
chr(125) from event_logs')
TO 's3://BUCKET/PREFIX/KEY'
WITH CREDENTIALS AS 'CREDENTIALS...'
GZIP
DELIMITER AS 't'
;
,
-
nvl
函数用于替换null -
convert
用于替换布尔值为int -
||
是红移 中的串联运算符 -
chr
用于生成{
和}
字符
此操作不如直接卸载csv快。要多花2-3倍的时间。但是我们只需要做一次就可以了。我卸载了大约16亿条记录,并成功地将它们导入到Spark中。
注意:通过spark解析json不是有效的方法。还有其他更快的格式,如parquet文件,序列文件。所以对spark来说,这可能不是一条正确的路径。但对于以JSON形式卸载,您可以使用此解决方案。
查看spark-redshift
库,该库旨在允许Apache Spark使用UNLOAD
从Redshift进行批量读取;它自动管理转义和模式处理。
您可以直接对从Redshift加载的数据运行Spark查询,或者您可以将Redshift数据保存为类似Parquet的类型格式,然后查询该数据。
完全声明:我是这个库的主要维护者。
由于Redshift是基于postgresql的RDBMS;可能没有简单的方法将其提取为json。当你运行卸载命令输出'csv'后,你可以将csv转换为json格式
这是一个github项目:https://github.com/darwin/csv2json
如果你想要一个基于命令的工具:https://www.npmjs.com/package/csvtojson
从2月22日开始,RedShift原生支持JSON格式的数据卸载。