我有一个 flink 项目,它将作为批处理作业将数据插入 cassandra 表中。我已经有一个 flink 流项目,它正在将一个 pojo 写入同一个 cassandra 表,但是 cassandraOutputFormat 需要将数据作为元组(希望在某些时候更改为接受像 CassandraSink 那样的 pojo)。所以这是我拥有的pojo:
@Table(keyspace="mykeyspace", name="mytablename")
public class AlphaGroupingObject implements Serializable {
@Column(name = "jobId")
private String jobId;
@Column(name = "datalist")
@Frozen("list<frozen<dataobj>")
private List<CustomDataObj> dataobjs;
@Column(name = "userid")
private String userid;
//Getters and Setters
}
以及我从这个pojo制作的元组数据集:
DataSet<Tuple3<String, List<CustomDataObj>, String>> outputDataSet = listOfAlphaGroupingObject.map(new AlphaGroupingObjectToTuple3Mapper());
这也是触发输出的行:
outputDataSet.output(new CassandraOutputFormat<>("INSERT INTO mykeyspace.mytablename (jobid, datalist, userid) VALUES (?,?,?);", clusterThatWasBuilt));
现在我遇到的问题是,当我尝试运行它时,当它尝试将其输出到 cassandra 表时出现此错误:
Caused by: com.datastax.driver.core.exceptions.CodecNotFoundException:
Codec not found for requested operation: [frozen<mykeyspace.dataobj> <-> flink.custom.data.CustomDataObj]
所以我知道当它是一个pojo时,我只需要将@Frozen注释添加到字段中,但我不知道如何为元组执行此操作。解决此问题的最佳/正确方法是什么?还是我在做一些不必要的事情,因为实际上有一种方法可以通过我还没有找到的 cassandraOutputFormat 发送 pojos?
提前感谢您的任何帮助!
编辑:
下面是 CustomDataObj 类的代码:
@UDT(name="dataobj", keyspace = "mykeyspace")
public class CustomDataObj implements Serializable {
@Field(name = "userid")
private String userId;
@Field(name = "groupid")
private String groupId;
@Field(name = "valuetext")
private String valueText;
@Field(name = "comments")
private String comments;
//Getters and setters
}
编辑 2
在 cassandra 中包含 CustomDataObj 绑定的表架构和 mytablename 架构。
CREATE TYPE mykeyspace.dataobj (
userid text,
groupid text,
valuetext text,
comments text
);
CREATE TABLE mykeyspace.mytablename (
jobid text,
datalist list<frozen<dataobj>>,
userid text,
PRIMARY KEY (jobid, userid)
);
在类CustomDataObj
添加UDT
注释
@UDT(name = "dataobj")
public class CustomDataObj {
//......
}
编辑
将"jobid
批注"更改为"@Column(name = "jobid")
",dataobjs
"冻结批注"更改为"@Frozen
@Table(keyspace="mykeyspace", name="mytablename")
public class AlphaGroupingObject implements Serializable {
@Column(name = "jobid")
private String jobId;
@Column(name = "datalist")
@Frozen
private List<CustomDataObj> dataobjs;
@Column(name = "userid")
private String userid;
//Getters and Setters
}
我相信我已经找到了比必须为 cassandraOutputFormat 提供元组更好的方法,但它在技术上仍然没有回答这个问题,所以我不会将其标记为答案。我最终使用了 cassandra 的对象映射器,这样我就可以将 pojo 发送到桌子上。仍然需要验证数据是否成功到达表格,并且一切都按照其实施方式正常工作,但我觉得这将帮助任何面临类似问题的人。
以下是概述解决方案的文档:http://docs.datastax.com/en/developer/java-driver/2.1/manual/object_mapper/using/