我最近开始学习流处理,正在尝试Apache Flink。我正在尝试编写一个从 Kafka 主题读取事件的作业,可能执行一些无状态链式转换,并对另一个应用程序进行 REST 调用以 POST 每个转换后的事件。例如,我的主要方法可能看起来像这样 -
public class KafkaSourceToRestSinkJob {
public static void main(String[] args) {
String configPath = args[0];
//Read configuration for the job (like kafka properties, rest uri for sink, possibly operators to invoke)
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.addSource(new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), kafkaProps));
dataStream.addSink(new RestSinkFunction<>()); //Custom sink function implementing org.apache.flink.streaming.api.functions.sink.SinkFunction
//Chain some operators depending on some parameters in the config file
...
env.execute("Confused Job");
}
}
我的目标是为具有相同类型的源和接收器的多个作业提供一个通用的 jar 工件。如果我需要一个作业来执行转换 A、B 和 C(实现将出现在 jar 中(,我可以在配置文件中指定它们,并在程序 args 中传递文件的路径。
现在这是我的问题——
- 是否可以动态调用运算符?
- 我知道在接收器中进行 REST 调用可能会导致一些不必要的延迟,但在我的应用程序中,这是可以容忍的。我也不在乎回应。记住这一点,我有理由避免使用 REST 接收器吗?
- 总的来说,我错得很严重吗?
谢谢!
您无法动态修改作业图的拓扑,但例如,您可以实现一个平面映射运算符,该运算符动态加载类(在配置中指定(,然后使用它来转换事件流。
至于 REST sink,如果你需要保证一次语义,端到端的,那么你需要小心地将接收器与 Flink 的检查点配合进来。FlinkKafkaConsumer 通过倒带和重播自上次检查点以来的事件来处理恢复。如果不小心,这将导致在恢复期间将重复结果推送到 REST 接收器。如果 REST 接收器仅在外部系统上执行幂等更新,则这很好,但否则需要使 REST 接收器具有状态和事务性。
我可能会看看Flink SQL。你可以定义公共源/接收器,然后只将 SQL 查询传递给 Flink。
我过去在Spark SQL上也有类似的设置,它运行得很好。您不需要发明自己的规范语言,而且更容易理解。