对于你们中的一些人来说,这似乎是一个非常复杂的问题。我想使用 Apache Flink 对来自 SocketStream 的数据应用一些算法。但是,这些算法是我使用 Scala 的 sys.process
包运行的外部可执行文件。以下是我希望 Flink 做的事情:
-
从 SocketStream 获取单个行:
val text = env.socketTextStream(hostName, port) val lines = text.flatMap { _.toLowerCase.split("\n") filter { _.nonEmpty } }
-
使用这些行作为命令行参数调用我的可执行算法。有点像这样:
var op = "./Somefile.py "+lines!
-
打印我从可执行文件中获得的输出。
op.print()
显然,这不是执行我正在尝试执行的操作的正确方法,因为op
与lines
不同,它不是数据接收器,因此不会打印任何内容。有什么方法可以实现吗?
如果将所有参数放入单个 String 值中,则可以从 MapFunction
调用外部可执行文件。
这看起来像:
val args: DataStream[String] = env.socketTextStream(hostName, port)
// assume each text line has all elements
val out: DataStream[String] = args.map(new ExternalCaller())
// print result
out.print()
跟
class ExternalCaller extends MapFunction[String, String] {
override def map(args: String): String = {
// call external executable with args here and return output
}
}