我不确定根本问题是什么,但这是我能解决的最基本的问题。当我通过kafka运行某个东西,并且我的流媒体工作接收到它时,它会贯穿整个过程,直到该将它保存到Cassandra时,它才会挂起。任何和所有的帮助都是感激的,我的头撞了这个太久了
显示以下基本问题的代码段。
StreamingJob.java:
final DataStream<Pojo> stream = env.addSource(source)
.process(new MyProcess());
CassandraSink.addSink(stream).setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.withCredentials("","")
.addContactPoint("127.0.0.1").withPort(9042).build();
}
})
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(false)})
.setDefaultKeyspace("my_keyspace").build();
env.execute(jobConfig.getName());
MyProcess.java
@Override
Pojo myPojo = doSomethingtoMyInput();
out.collect(myPojo);
//Debugging this proves it works to this point
MyPojo.java
@Table(keyspace = "my_keyspace", name="my_table")
public class MyPojo {
@PartitionKey(0)
@Column
String user_id;
@PartitionKey(1)
@Column
String other_id;
@ClusteringColumn
@Column
java.util.Date time_id;
//Getters and setters using standard notation
}
我的卡桑德拉模式
CREATE TABLE my_table (user_id text,
other_id text,
time_idtimestamp,
PRIMARY KEY ((user_id, other_id), time_id)
) WITH CLUSTERING ORDER BY (time_id DESC)
您需要验证源中time_id
的格式,因为它可能与CQL列不兼容。
在POJO中,您已经将其映射到java.util.Date
,如果源中的字段确实包含日期,那么这可能是它不起作用的原因。
CQLtimestamp
是一个64位带符号的int,表示自Unix epoch以来的毫秒数。源中字段的值可以是(a(整数,也可以是(b(类似于yyyy-mm-dd HH:mm
的文字字符串。有效ISO 8601格式的列表在这里可用--CQL时间戳。干杯
经过多次战斗找到了答案。Flink和Cassandra是一种非常严格和脆弱的联系。所有东西都必须完全对齐,Cassandra中的decimal在Java中需要decimal,更令人困惑的是,Cassandr中的时间戳在Java中只能使用长值。
希望这能帮助其他遇到同样问题的人。