从 Flink 调用外部脚本



对于你们中的一些人来说,这似乎是一个非常复杂的问题。我想使用 Apache Flink 对来自 SocketStream 的数据应用一些算法。但是,这些算法是我使用 Scala 的 sys.process 包运行的外部可执行文件。以下是我希望 Flink 做的事情:

  1. 从 SocketStream 获取单个行:

    val text = env.socketTextStream(hostName, port) val lines = text.flatMap { _.toLowerCase.split("\n") filter { _.nonEmpty } }

  2. 使用这些行作为命令行参数调用我的可执行算法。有点像这样:

    var op = "./Somefile.py "+lines!

  3. 打印我从可执行文件中获得的输出。

    op.print()

显然,这不是执行我正在尝试执行的操作的正确方法,因为oplines不同,它不是数据接收器,因此不会打印任何内容。有什么方法可以实现吗?

如果将所有参数放入单个 String 值中,则可以从 MapFunction 调用外部可执行文件。

这看起来像:

val args: DataStream[String] = env.socketTextStream(hostName, port) 
// assume each text line has all elements
val out: DataStream[String] = args.map(new ExternalCaller())
// print result
out.print()

class ExternalCaller extends MapFunction[String, String] {
  override def map(args: String): String = {
    // call external executable with args here and return output
  }
}

相关内容

  • 没有找到相关文章

最新更新