我希望在运行Beam wordcount.py演示时能得到关于如何设置--environment_config
的指导。
它与DirectRunner一起运行良好。Flink的wordcount也运行良好(即通过flink run
运行Flink(。
我想使用Flink runner运行Beam,使用Beam文档中描述的"单独的Flink集群"。我不能使用Docker,所以我计划使用--environment_type=PROCESS
。
我在python代码中使用以下内容来设置environment_config:
environment_config = dict()
environment_config['os'] = platform.system().lower()
environment_config['arch'] = platform.machine()
environment_config['command'] = 'ls'
ec = "--environment_config={}".format(json.dumps(environment_config))
显然,这个命令是错误的。当我运行这个时,Flink确实接收并成功处理了DataSource
子任务。它最终在CHAIN MapPartition
s上超时。
有人能就如何设置environment_config提供指导(或链接(吗?我正在奇点容器中运行Beam。
对于environment_type=DOCKER,大多数事情都由您处理,但在流程模式下,您必须自己进行大量设置。您要查找的命令是sdks/python/container/build/target/launcher/linux_amd64/boot
。您将需要在所有工作机器上同时拥有可执行文件(可以使用./gradlew :sdks:python:container:build
从源代码构建(和Python安装,包括Beam和其他依赖项。
我所知道的最好的例子是:https://github.com/apache/beam/blob/cbf8a900819c52940a0edd90f59bf6aec55c817a/sdks/python/test-suites/portable/py2/build.gradle#L146-L165