Flink 1.11.1 Cassandra Sink在存在时间戳时不保存



我不确定根本问题是什么,但这是我能解决的最基本的问题。当我通过kafka运行某个东西,并且我的流媒体工作接收到它时,它会贯穿整个过程,直到该将它保存到Cassandra时,它才会挂起。任何和所有的帮助都是感激的,我的头撞了这个太久了

显示以下基本问题的代码段。

StreamingJob.java:

final DataStream<Pojo> stream = env.addSource(source)
.process(new MyProcess());
CassandraSink.addSink(stream).setClusterBuilder(new ClusterBuilder() {
@Override
protected Cluster buildCluster(Cluster.Builder builder) {
return builder.withCredentials("","")
.addContactPoint("127.0.0.1").withPort(9042).build();
}
})
.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(false)})
.setDefaultKeyspace("my_keyspace").build();
env.execute(jobConfig.getName());

MyProcess.java

@Override
Pojo myPojo = doSomethingtoMyInput();
out.collect(myPojo);
//Debugging this proves it works to this point

MyPojo.java

@Table(keyspace = "my_keyspace", name="my_table")
public class MyPojo {
@PartitionKey(0)
@Column
String user_id;
@PartitionKey(1)
@Column
String other_id;
@ClusteringColumn
@Column
java.util.Date time_id;
//Getters and setters using standard notation
}

我的卡桑德拉模式

CREATE TABLE my_table (user_id text,
other_id text,
time_idtimestamp,
PRIMARY KEY ((user_id, other_id), time_id)
) WITH CLUSTERING ORDER BY (time_id DESC)

您需要验证源中time_id的格式,因为它可能与CQL列不兼容。

在POJO中,您已经将其映射到java.util.Date,如果源中的字段确实包含日期,那么这可能是它不起作用的原因。

CQLtimestamp是一个64位带符号的int,表示自Unix epoch以来的毫秒数。源中字段的值可以是(a(整数,也可以是(b(类似于yyyy-mm-dd HH:mm的文字字符串。有效ISO 8601格式的列表在这里可用--CQL时间戳。干杯

经过多次战斗找到了答案。Flink和Cassandra是一种非常严格和脆弱的联系。所有东西都必须完全对齐,Cassandra中的decimal在Java中需要decimal,更令人困惑的是,Cassandr中的时间戳在Java中只能使用长值。

希望这能帮助其他遇到同样问题的人。

最新更新