在JAVA中使用Lambda的AWS DynamoDB触发器



我正在尝试在dynamodb流事件上触发用Java编写的AWS lambda函数。Amazon有一个同样的指南,在这里使用NodeJShttp://docs.aws.amazon.com/lambda/latest/dg/wt-ddb-create-test-function.html

NodeJS的测试输入(来自上面的链接)看起来像一个SNS事件,所以我尝试使用Java中相应的SNSEvent类作为我的处理程序方法的输入。

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord;
import java.util.List;
public class RecomFunction {
    public void handler(SNSEvent event, Context context) {
        LambdaLogger logger = context.getLogger();
        List<SNSRecord> records = event.getRecords();
        if (records != null) {
            for (SNSRecord record : records) {
                if (record != null) {
                    logger.log("SNS record: " + record.getSNS().getMessage());
                }
            }
        }
    }
}

不幸的是,record.getSNS()返回NULL,导致NullPointer异常

有一个相关的问题,但没有给出具体的答案:使用Lambda

设置DynamoDB触发器

这段代码对我有效。您可以使用它来接收和处理Lambda函数中的DynamoDB事件-

public class Handler implements RequestHandler<DynamodbEvent, Void> {
    @Override
    public Void handleRequest(DynamodbEvent dynamodbEvent, Context context) {
        for (DynamodbStreamRecord record : dynamodbEvent.getRecords()) {
            if (record == null) {
                continue;
            }
            // Your code here
        }
        return null;
    }
}

同样,您可以使用SNSEventSNSRecord来处理Amazon SNS事件。

DynamoDB流事件:

import com.amazonaws.services.lambda.runtime.RequestHandler;
...
public class DynamoStreamHandler implements RequestHandler<Object, Void> {
    @Override
    public Void handleRequest(Object o, Context context) {
        LinkedHashMap lhm = (LinkedHashMap) o;
        ...etc.
    }
}

似乎他们使用了一个使用MapList对象的定制JSON映射器。通过测试和打印日志来验证其他事件类型的这一点非常简单(但很乏味)。(叹气)

EDIT:如果~ 5mb的开销是ok的,您可以使用aws-lambda-java-events库v1.1.0提供的DynamodbEvent.DynamodbStreamRecord,如AWS Lambda文档中的AWS Lambda Walkthrough 3: Process Amazon DynamoDB Events (Java)所述。

创建一个处理程序,该处理程序接受InputStream,读取InputStream的内容(只是JSON),然后对其进行反序列化以获得所需的数据。

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import com.amazonaws.services.lambda.runtime.Context; 
public class MyHandler {    
    public void handler(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {           
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        int letter;        
        while((letter = inputStream.read()) != -1)
        {
            baos.write(letter);                     
        }        
        //Send the contents of baos to a JSON deserializer ...          
    }      
}

这有点麻烦,但据我所知,AWS目前没有提供更高级别的Java lambda接口来消费DynamoDB Streams。这里有一个完整的示例,详细说明了如何反序列化JSON流以获取数据的Java对象。

您的代码导致CloudWatch日志中出现以下异常,

类没有实现合适的处理程序接口....

一旦我将代码更改为以下内容,我就可以很好地获得SNS消息。

public class RecomFunction implements RequestHandler<SNSEvent, Void> {
    public Void handleRequest(SNSEvent event, Context context) {
        ...
        return null;
    }
}

从事件处理程序中获取反序列化对象:1)在处理程序中获取json从输入流使用这样的东西:

private String getJsonFrom(InputStream stream) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    int letter;
    while ((letter = stream.read()) != -1)
        baos.write(letter);
    return new String(baos.toByteArray());
}

2)然后创建一个特定的反序列化器从json派生对象。'NewImage'在DynamoDB事件的情况下。这里有一个例子

以上答案对我都没有帮助。使用对象,我得到了内容,但这还不够。为了使DynamodbEvent工作,我必须将适当的事件JSON事件发送到我的lambda。使用AWS测试JSON内容时,缺少了几个属性。在将带有流的DynamoDB附加到Lambda并在那里进行一些更改时,我将获得适当的JSON内容,DynamodbEvent将在RequestHandler中工作。因此,在JSON事件中放入紧密属性可以使其工作(like: "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "...", "eventSource": "...")

最新更新