我正在尝试使用Amazon- data - pipeline工具将数据从Amazon S3-Cloud传输到Amazon- redshift。
是否有可能在传输数据时使用SQL语句更改数据,以便SQL语句的结果将被输入到Redshift?
我只发现复制命令如下:
{
"id": "S3Input",
"type": "S3DataNode",
"schedule": {
"ref": "MySchedule"
},
"filePath": "s3://example-bucket/source/inputfile.csv"
},
来源:https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-get-started-copy-data-cli.html
是有可能的。有两种方法:
- 使用RedShiftCopyActivity的
transformSQL
transformSQL
是有用的。这样,更改只应用于批处理,而不是整个表。
以下是文档的摘录:
transformSql:用于转换输入数据的SQL SELECT表达式。当您从DynamoDB或Amazon S3复制数据时,AWS data Pipeline会创建一个名为staging的表,并将其初始加载到其中。该表中的数据用于更新目标表。如果指定了transformSql选项,则从指定的SQL语句创建第二个staging表。然后在最终目标表中更新第二个staging表中的数据。因此,transformSql必须在名为staging的表上运行,并且transformSql的输出模式必须与最终目标表的模式匹配。
请在下面找到一个使用transformSql的示例。注意,select来自staging
表。它将有效地运行CREATE TEMPORARY TABLE staging2 AS SELECT <...> FROM staging;
。此外,所有字段必须包含并匹配RedShift DB中的现有表。
{
"id": "LoadUsersRedshiftCopyActivity",
"name": "Load Users",
"insertMode": "OVERWRITE_EXISTING",
"transformSql": "SELECT u.id, u.email, u.first_name, u.last_name, u.admin, u.guest, CONVERT_TIMEZONE('US/Pacific', cs.created_at_pst) AS created_at_pst, CONVERT_TIMEZONE('US/Pacific', cs.updated_at_pst) AS updated_at_pst FROM staging u;",
"type": "RedshiftCopyActivity",
"runsOn": {
"ref": "OregonEc2Resource"
},
"schedule": {
"ref": "HourlySchedule"
},
"input": {
"ref": "OregonUsersS3DataNode"
},
"output": {
"ref": "OregonUsersDashboardRedshiftDatabase"
},
"onSuccess": {
"ref": "LoadUsersSuccessSnsAlarm"
},
"onFail": {
"ref": "LoadUsersFailureSnsAlarm"
},
"dependsOn": {
"ref": "BewteenRegionsCopyActivity"
}
}
- 使用SqlActivity的
script
SqlActivity允许对整个数据集进行操作,并且可以通过dependsOn
机制安排在特定事件之后运行
{
"name": "Add location ID",
"id": "AddCardpoolLocationSqlActivity",
"type": "SqlActivity",
"script": "INSERT INTO locations (id) SELECT 100000 WHERE NOT EXISTS (SELECT * FROM locations WHERE id = 100000);",
"database": {
"ref": "DashboardRedshiftDatabase"
},
"schedule": {
"ref": "HourlySchedule"
},
"output": {
"ref": "LocationsDashboardRedshiftDatabase"
},
"runsOn": {
"ref": "OregonEc2Resource"
},
"dependsOn": {
"ref": "LoadLocationsRedshiftCopyActivity"
}
}
在RedshiftCopyActivity中有一个可选字段叫做'transformSql'。
http://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-redshiftcopyactivity.html我个人没有使用过这个,但从它的外观来看,它似乎-你会把s3数据放在一个临时表中,这个sql stmt将返回转换后的数据,以便红移插入。
因此,无论您是否正在转换该字段,您都需要列出select中的所有字段。
AWS Datapipeline SqlActivity
{
"id" : "MySqlActivity",
"type" : "SqlActivity",
"database" : { "ref": "MyDatabase" },
"script" : "insert into AnalyticsTable (select (cast(requestEndTime as bigint) - cast(requestBeginTime as bigint)) as requestTime, hostname from StructuredLogs where hostname LIKE '%.domain.sfx');",
"schedule" : { "ref": "Hour" },
"queue" : "priority"
}
所以基本上在"script"任何sql脚本/转换/命令Amazon Redshift sql命令
transformSql可以,但只支持用于转换输入数据的SQL SELECT表达式。ref: RedshiftCopyActivity