Beam发射flink的环境配置



我希望在运行Beam wordcount.py演示时能得到关于如何设置--environment_config的指导。

它与DirectRunner一起运行良好。Flink的wordcount也运行良好(即通过flink run运行Flink(。

我想使用Flink runner运行Beam,使用Beam文档中描述的"单独的Flink集群"。我不能使用Docker,所以我计划使用--environment_type=PROCESS

我在python代码中使用以下内容来设置environment_config:

environment_config = dict()
environment_config['os'] = platform.system().lower()
environment_config['arch'] = platform.machine()
environment_config['command'] = 'ls'
ec = "--environment_config={}".format(json.dumps(environment_config))

显然,这个命令是错误的。当我运行这个时,Flink确实接收并成功处理了DataSource子任务。它最终在CHAIN MapPartitions上超时。

有人能就如何设置environment_config提供指导(或链接(吗?我正在奇点容器中运行Beam。

对于environment_type=DOCKER,大多数事情都由您处理,但在流程模式下,您必须自己进行大量设置。您要查找的命令是sdks/python/container/build/target/launcher/linux_amd64/boot。您将需要在所有工作机器上同时拥有可执行文件(可以使用./gradlew :sdks:python:container:build从源代码构建(和Python安装,包括Beam和其他依赖项。

我所知道的最好的例子是:https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165

相关内容

  • 没有找到相关文章