我正在尝试使用CassandraPojoSink类编写 Flink 的 Cassandra SINK 连接器。我没有收到任何错误/异常,但没有记录提交到 Cassandra 表中。
我正在使用以下代码。
==========接收器连接器代码快照===
==============DataStream<Event> stream = eventStream.flatMap(new EventTransformation());
try {
stream.addSink(new CassandraPojoSink<>(Event.class, new ClusterBuilder() {
private static final long serialVersionUID = -2485105213096858846L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("localhost").withPort(9042).build();
}
}));
} catch (Exception e) {
e.printStackTrace();
}
======POJO类===
=============@Table(keyspace= "cloud", name = "event")
public class Event implements Serializable {
private static final long serialVersionUID = 3284839826384795926L;
@Column(name = "name")
private String name;
@Column(name = "msg")
private String msg;
public Event(){
}
//......
}
Flink 作业无法产生任何输出的原因有很多。一些常见原因包括:
- 该应用程序不调用 env.execute((
- 该应用程序设置为使用事件时间,但没有水印生成器 水印
- 逻辑不知何故被混淆,并且没有生成水印(例如,应用程序根据 CPU 时钟而不是事件时间戳生成水印,导致每个事件延迟(
将 POJO 更改为元组时,添加时间戳水印代码非常有效。 我能够看到我的数据被写入Cassandra数据库。
DataStream> events = event_stream.flatMap(new EventTransformation(((.assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarks>(( {
private static final long serialVersionUID = 1L;
private final long maxOutOfOrderness = 1_000L; // 1
// second
private long currentMaxTimestamp = 0;
@Override
public long extractTimestamp(Tuple3<String, String, Long> arg0,
long arg1) {
long timestamp = arg0.f3; // get
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
});
event_stream.addSink(new CassandraTupleSink<Tuple3<String, String, Long>("INSERT INTO cloud.condition (name, msg, time) VALUES (?,?,?);", new ClusterBuilder() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
protected Cluster buildCluster(Builder builder) {
return builder.addContactPoint("localhost").withPort(9042).build();
}
}));
env.setParallelism(2);
env.execute();