我有一个应用程序,其中1. 我使用 SqlContext.read.json 从 S3 读取 JSON 文件到数据帧中2. 然后在数据帧上执行一些转换3. 最后,我想使用其中一个记录值作为键,其余 JSON 参数作为值/列,将记录保存到 DynamoDB。
我正在尝试类似的东西:
JobConf jobConf = new JobConf(sc.hadoopConfiguration());
jobConf.set("dynamodb.servicename", "dynamodb");
jobConf.set("dynamodb.input.tableName", "my-dynamo-table"); // Pointing to DynamoDB table
jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com");
jobConf.set("dynamodb.regionid", "us-east-1");
jobConf.set("dynamodb.throughput.read", "1");
jobConf.set("dynamodb.throughput.read.percent", "1");
jobConf.set("dynamodb.version", "2011-12-05");
jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat");
jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat");
DataFrame df = sqlContext.read().json("s3n://mybucket/abc.json");
RDD<String> jsonRDD = df.toJSON();
JavaRDD<String> jsonJavaRDD = jsonRDD.toJavaRDD();
PairFunction<String, Text, DynamoDBItemWritable> keyData = new PairFunction<String, Text, DynamoDBItemWritable>() {
public Tuple2<Text, DynamoDBItemWritable> call(String row) {
DynamoDBItemWritable writeable = new DynamoDBItemWritable();
try {
System.out.println("JSON : " + row);
JSONObject jsonObject = new JSONObject(row);
System.out.println("JSON Object: " + jsonObject);
Map<String, AttributeValue> attributes = new HashMap<String, AttributeValue>();
AttributeValue attributeValue = new AttributeValue();
attributeValue.setS(row);
attributes.put("values", attributeValue);
AttributeValue attributeKeyValue = new AttributeValue();
attributeValue.setS(jsonObject.getString("external_id"));
attributes.put("primary_key", attributeKeyValue);
AttributeValue attributeSecValue = new AttributeValue();
attributeValue.setS(jsonObject.getString("123434335"));
attributes.put("creation_date", attributeSecValue);
writeable.setItem(attributes);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return new Tuple2(new Text(row), writeable);
}
};
JavaPairRDD<Text, DynamoDBItemWritable> pairs = jsonJavaRDD
.mapToPair(keyData);
Map<Text, DynamoDBItemWritable> map = pairs.collectAsMap();
System.out.println("Results : " + map);
pairs.saveAsHadoopDataset(jobConf);
但是,我没有看到任何数据被写入 DynamoDB。我也没有收到任何错误消息。
我
不确定,但你的似乎比它可能需要的更复杂。
我已使用以下方法成功将 RDD 写入 DynamoDB:
val ddbInsertFormattedRDD = inputRDD.map { case (skey, svalue) =>
val ddbMap = new util.HashMap[String, AttributeValue]()
val key = new AttributeValue()
key.setS(skey.toString)
ddbMap.put("DynamoDbKey", key)
val value = new AttributeValue()
value.setS(svalue.toString)
ddbMap.put("DynamoDbKey", value)
val item = new DynamoDBItemWritable()
item.setItem(ddbMap)
(new Text(""), item)
}
val ddbConf = new JobConf(sc.hadoopConfiguration)
ddbConf.set("dynamodb.output.tableName", "my-dynamo-table")
ddbConf.set("dynamodb.throughput.write.percent", "0.5")
ddbConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")
ddbConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
ddbInsertFormattedRDD.saveAsHadoopDataset(ddbConf)
另外,您是否检查过是否已正确增加容量?