我正在尝试在dynamodb流事件上触发用Java编写的AWS lambda函数。Amazon有一个同样的指南,在这里使用NodeJShttp://docs.aws.amazon.com/lambda/latest/dg/wt-ddb-create-test-function.html
NodeJS的测试输入(来自上面的链接)看起来像一个SNS事件,所以我尝试使用Java中相应的SNSEvent类作为我的处理程序方法的输入。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.events.SNSEvent;
import com.amazonaws.services.lambda.runtime.events.SNSEvent.SNSRecord;
import java.util.List;
public class RecomFunction {
public void handler(SNSEvent event, Context context) {
LambdaLogger logger = context.getLogger();
List<SNSRecord> records = event.getRecords();
if (records != null) {
for (SNSRecord record : records) {
if (record != null) {
logger.log("SNS record: " + record.getSNS().getMessage());
}
}
}
}
}
不幸的是,record.getSNS()返回NULL,导致NullPointer异常
有一个相关的问题,但没有给出具体的答案:使用Lambda
这段代码对我有效。您可以使用它来接收和处理Lambda函数中的DynamoDB事件-
public class Handler implements RequestHandler<DynamodbEvent, Void> {
@Override
public Void handleRequest(DynamodbEvent dynamodbEvent, Context context) {
for (DynamodbStreamRecord record : dynamodbEvent.getRecords()) {
if (record == null) {
continue;
}
// Your code here
}
return null;
}
}
同样,您可以使用SNSEvent
和SNSRecord
来处理Amazon SNS事件。
DynamoDB流事件:
import com.amazonaws.services.lambda.runtime.RequestHandler;
...
public class DynamoStreamHandler implements RequestHandler<Object, Void> {
@Override
public Void handleRequest(Object o, Context context) {
LinkedHashMap lhm = (LinkedHashMap) o;
...etc.
}
}
似乎他们使用了一个使用Map
和List
对象的定制JSON映射器。通过测试和打印日志来验证其他事件类型的这一点非常简单(但很乏味)。(叹气)
EDIT:如果~ 5mb的开销是ok的,您可以使用aws-lambda-java-events
库v1.1.0提供的DynamodbEvent.DynamodbStreamRecord
,如AWS Lambda文档中的AWS Lambda Walkthrough 3: Process Amazon DynamoDB Events (Java)所述。
创建一个处理程序,该处理程序接受InputStream,读取InputStream的内容(只是JSON),然后对其进行反序列化以获得所需的数据。
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import com.amazonaws.services.lambda.runtime.Context;
public class MyHandler {
public void handler(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int letter;
while((letter = inputStream.read()) != -1)
{
baos.write(letter);
}
//Send the contents of baos to a JSON deserializer ...
}
}
这有点麻烦,但据我所知,AWS目前没有提供更高级别的Java lambda接口来消费DynamoDB Streams。这里有一个完整的示例,详细说明了如何反序列化JSON流以获取数据的Java对象。
您的代码导致CloudWatch日志中出现以下异常,
类没有实现合适的处理程序接口....
一旦我将代码更改为以下内容,我就可以很好地获得SNS消息。
public class RecomFunction implements RequestHandler<SNSEvent, Void> {
public Void handleRequest(SNSEvent event, Context context) {
...
return null;
}
}
从事件处理程序中获取反序列化对象:1)在处理程序中获取json从输入流使用这样的东西:
private String getJsonFrom(InputStream stream) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
int letter;
while ((letter = stream.read()) != -1)
baos.write(letter);
return new String(baos.toByteArray());
}
2)然后创建一个特定的反序列化器从json派生对象。'NewImage'在DynamoDB事件的情况下。这里有一个例子
以上答案对我都没有帮助。使用对象,我得到了内容,但这还不够。为了使DynamodbEvent
工作,我必须将适当的事件JSON事件发送到我的lambda。使用AWS测试JSON内容时,缺少了几个属性。在将带有流的DynamoDB附加到Lambda并在那里进行一些更改时,我将获得适当的JSON内容,DynamodbEvent
将在RequestHandler
中工作。因此,在JSON事件中放入紧密属性可以使其工作(like: "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "...", "eventSource": "...")