Flink RMQSource



我想测试 RMQSource 类以从 RabbitMQ 接收数据,但我不知道如何为我的交易所配置 Rabbit 虚拟主机,我认为这是我遇到的问题。我的代码:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object rabbitjob {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print

def main (args:Array[String]){
    env.execute("Test Rabbit")
 }
} 

智能 IDE 中的错误: Error:(10, 29) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String] val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print

^

Error:(10, 29) not enough arguments for method addSource: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String]. Unspecified value parameter evidence$7. val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print

^

知道如何解决它或替代方案吗?提前谢谢你。

随着时间的推移,情况发生了变化。请查看 RMQConnectionConfig:在这里您可以找到通过构建器模式指定虚拟主机的方法。

您看到的错误是由于某些所需的导入不存在而导致的 Scala 编译时错误。 每当使用 Flink Scala API 时,都应该包含以下内容:

import org.apache.flink.api.scala._

这将解决您遇到的编译时问题。

您还需要提供虚拟主机名称。看看 AMQP URI 规范。

在您的情况下,整个 AMQP URI 看起来像是 "user:pass@192.168.1.11:5672/TestVHost" .

相关内容

  • 没有找到相关文章

最新更新