我想测试 RMQSource 类以从 RabbitMQ 接收数据,但我不知道如何为我的交易所配置 Rabbit 虚拟主机,我认为这是我遇到的问题。我的代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object rabbitjob {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
def main (args:Array[String]){
env.execute("Test Rabbit")
}
}
智能 IDE 中的错误: Error:(10, 29) could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[String]
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
Error:(10, 29) not enough arguments for method addSource: (implicit evidence$7: org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.streaming.api.scala.DataStream[String].
Unspecified value parameter evidence$7.
val stream = env.addSource(new RMQSource[String]("192.168.1.11", 5672,"user","pass", "inbound.input.data",false, new SimpleStringSchema())).print
^
知道如何解决它或替代方案吗?提前谢谢你。
随着时间的推移,情况发生了变化。请查看 RMQConnectionConfig:在这里您可以找到通过构建器模式指定虚拟主机的方法。
您看到的错误是由于某些所需的导入不存在而导致的 Scala 编译时错误。 每当使用 Flink Scala API 时,都应该包含以下内容:
import org.apache.flink.api.scala._
这将解决您遇到的编译时问题。
您还需要提供虚拟主机名称。看看 AMQP URI 规范。
在您的情况下,整个 AMQP URI 看起来像是 "user:pass@192.168.1.11:5672/TestVHost"
.