在 Flink 作业之间传递参数



我有一个从Cassandra读取数据并将数据存储为List的工作(下面附的方法fillOnceGeoFencesFromDB(((,然后我创建StreamExecutionEnvironment并使用Kafka队列中的数据。

在数据流的转换过程中,我尝试引用最近填充的静态 ArrayList,但它是空的。

将先前填写的列表传递到下一个作业的最佳实践是什么? 任何想法将不胜感激。

private  static ArrayList<GeoFences> allGeoFences = new ArrayList<>();
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
kafkaProps.setProperty("group.id", KAFKA_GROUP);
kafkaProps.setProperty("auto.offset.reset", "earliest");
fillOnceGeoFencesFromDB();   // populate data in ArrayList<GeoFences> allGeoFences
DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env
.addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps))
.rebalance().map(new MapFunction<String, Tuple4<UUID, String, String, Timestamp>>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception {
return mapToTuple4(value);
}})

. . . . . .

请记住,map函数中发生的任何事情都将发生在任务管理器上,而main中的所有代码仅用于定义您的作业。

将参数显式传递给 MapFunction(这将使代码更易于阅读(。

private static class GeoFenceMapper implements MapFunction<String, Tuple4<UUID, String, String, Timestamp>> {
private ArrayList<GeoFences> allGeoFences;
public GeoFenceMapper(ArrayList<GeoFences> allGeoFences) {
this.allGeoFences = allGeoFences;
}
@Override
public Tuple4<UUID, String, String, Timestamp> map(String value) throws Exception {
return mapToTuple4(value);
}})   
}

然后使用此新映射器:

DataStream <Tuple6<UUID, String, String, String, String, Timestamp>> stream_parsed_with_timestamps = env
.addSource(new FlinkKafkaConsumer010<>(KAFKA_SUBSCRIBE_TOPIC, new SimpleStringSchema(), kafkaProps))
.rebalance().map(new GeoFenceMapper(fillOnceGeoFencesFromDB())) 

希望这有帮助!

相关内容

  • 没有找到相关文章

最新更新