AWS Kinesis Firehose to Lambda , Lambda to S3 using java



对于我的极化,创建了 AWS 消防水带流并配置 Lambda 函数并将数据移动到 S3。消防水带到S3它工作正常,没有任何问题。如果我启用 lamda 功能,则在 S3 失败存储桶中出现以下错误。

{"attemptsMade":4,"arrivalTimestamp":1570727830210,"errorCode":"Lambda.FunctionError","errorMessage":"The Lambda function was successfully invoked but it returned an error result."

Lambda Java Code:

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, String> {
@Override
public String handleRequest(KinesisEvent event, Context context) {
context.getLogger().log("Input: " + event);
StringBuffer sb = new StringBuffer();
for (KinesisEventRecord record : event.getRecords()) {
String payload = new String(record.getKinesis().getData().array());
if (payload.toLowerCase().contains("scala"))
sb.append(payload);
sb.append("n");
}
return sb.toString();
}
}

基本上,过滤传入的流数据并推送到 S3。 我也有问题。 1 .我正在将打孔 JSON 数据传递给消防水管。"record.getKinesis((.getData(("方法将逐行读取记录,并将记录一堆读取到整个json字符串。 2.书面日志声明。在哪里检查我的日志。 如何处理这种情况?请告知

AWS Lambda Java Events 2.x 库支持 KinesisFirehoseEvent。 1.x 库没有此类。

您的代码如下所示:

public class LambdaFunctionHandler implements RequestHandler<KinesisFirehoseEvent, String> {
@Override
public String handleRequest(KinesisFirehoseEvent event, Context context) {
}
}

在 Lambda 测试环境中,事件将如下所示:

{
"invocationId": "invocationIdExample",
"deliveryStreamArn": "arn:aws:kinesis:EXAMPLE",
"region": "us-west-2",
"records": [
{
"recordId": "49546986683135544286507457936321625675700192471156785154",
"approximateArrivalTimestamp": 1495072949453,
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4="
}
]
}

最新更新