眨眼动态更新流作业



我收到了一组关于不同主题的 Avro 格式的事件。我想使用这些并以镶木地板格式写入 s3。 我编写了一个下面的作业,为每个事件创建一个不同的流,并从融合模式注册表中获取架构,为事件创建镶木地板接收器。
这工作正常,但我面临的唯一问题是每当新事件开始到来时,我都必须更改 YAML 配置并每次重新启动作业。有什么方法我不必重新启动作业,它开始消耗一组新事件。

YamlReader reader = new YamlReader(topologyConfig);
EventTopologyConfig eventTopologyConfig = reader.read(EventTopologyConfig.class);
long checkPointInterval = eventTopologyConfig.getCheckPointInterval();
topics = eventTopologyConfig.getTopics();
List<EventConfig> eventTypesList = eventTopologyConfig.getEventsType();
CachedSchemaRegistryClient registryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000);

FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("source");
try {
for (EventConfig eventConfig : eventTypesList) {
LOG.info("creating a stream for ", eventConfig.getEvent_name());
final StreamingFileSink<GenericRecord> sink = StreamingFileSink.forBulkFormat
(path, ParquetAvroWriters.forGenericRecord(SchemaUtils.getSchema(eventConfig.getSchema_subject(), registryClient)))
.withBucketAssigner(new EventTimeBucketAssigner())
.build();
DataStream<GenericRecord> outStream = dataStream.filter((FilterFunction<GenericRecord>) genericRecord -> {
if (genericRecord != null && genericRecord.get(EVENT_NAME).toString().equals(eventConfig.getEvent_name())) {
return true;
}
return false;
});
outStream.addSink(sink).name(eventConfig.getSink_id()).setParallelism(parallelism);
}
} catch (Exception e) {
e.printStackTrace();
}

Yaml 文件 :

!com.bounce.config.EventTopologyConfig
eventsType:
- !com.bounce.config.EventConfig
event_name: "search_list_keyless"
schema_subject: "search_list_keyless-com.bounce.events.keyless.bookingflow.search_list_keyless"
topic: "search_list_keyless"
- !com.bounce.config.EventConfig
event_name: "bike_search_details"
schema_subject: "bike_search_details-com.bounce.events.keyless.bookingflow.bike_search_details"
topic: "bike_search_details"
- !com.bounce.config.EventConfig
event_name: "keyless_bike_lock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_lock"
topic: "analytics-keyless"
- !com.bounce.config.EventConfig
event_name: "keyless_bike_unlock"
schema_subject: "analytics-keyless-com.bounce.events.keyless.bookingflow.keyless_bike_unlock"
topic: "analytics-keyless"

checkPointInterval: 1200000
topics: ["search_list_keyless","bike_search_details","analytics-keyless"]

谢谢。

我认为您想使用自定义的 BucketAssigner,该分配器使用genericRecord.get(EVENT_NAME).toString()值作为存储桶 ID,以及EventTimeBucketAssigner正在执行的任何事件时间存储桶。

然后,无需创建多个流,它应该是动态的(每当正在写入的记录中出现新的事件名称值时,都会获得新的输出接收器(。

相关内容

  • 没有找到相关文章

最新更新