使用 AMQSource 的 Flink 作业不会生成输出



我使用的是liscationemq的Apache Bahir的Amqsource Connector,但是当我运行Flink作业以消耗ActiveMQ的数据时,没有产生输出。

例如,连接器正在收听包含4条消息的ActiveMQ,但是当我运行Flink作业时,没有数据被消耗。

val brokerURL = "tcp://localhost:61616"
val destinationName = "TEST.FOO"
val filePath = "C:\output" + System.currentTimeMillis + ".csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStateBackend(new MemoryStateBackend(1000, false))

val config = new AMQSourceConfig.AMQSourceConfigBuilder[String]()
  .setConnectionFactory(new ActiveMQConnectionFactory(brokerURL))
  .setDestinationName(destinationName)
  .setDeserializationSchema(new SimpleStringSchema)
  .setDestinationType(DestinationType.QUEUE)
  .setRunningChecker(new RunningChecker).build
val amqSource = new AMQSource[String](config)
val stream = env.addSource(amqSource)
stream.map(/*Some MapFunction*/)
stream.writeAsText(filePath)
stream.print
env.execute

amqsource期望消息是字节,请参见amqsource.class下的运行方法的代码:

Message message = this.consumer.receive(1000L);
if (!(**message instanceof BytesMessage**)) {
LOG.warn("Active MQ source received non bytes message: {}", message);
return;
}

当将数据生成ActiveMQ而不是短信时:

val message = session.createTextMessage(text)

使用字节消息:

val message = session.createBytesMessage()
message.writeBytes(text.getBytes)

相关内容

  • 没有找到相关文章

最新更新