我正在通过ActiveMQ获取数据,我想使用Apache Flink DataStreams实时处理这些数据。支持许多消息传递服务,如RabbitMQ和Kafka,但我看不到对ActiveMQ的任何支持。如何使用它?
由于不支持ActiveMQ
,我建议实现一个自定义源代码。
您基本上必须实现SourceFunction
接口。如果要使用恰好一次语义,则可以将实现基于MultipleIdsMessageAcknowledgingSourceBase
类。
我建议您从实施SourceFunction
开始
找到 Flink 的 JMS 连接器:
https://github.com/jkirsch/senser/blob/master/src/main/java/edu/tuberlin/senser/images/flink/io/FlinkJMSStreamSource.java