当 Flink 在 Kubernetes 中作为 pod 运行时,有人知道如何使用 Flink 运行 Beam Python 管道吗?
我已经成功地使用可移植运行器和指向在Docker容器中运行的本地Flink服务器的作业服务运行了Beam Python管道。
我能够实现在我的 Flink 容器中挂载 Docker 套接字,并将 Flink 作为根进程运行,因此类 DockerEnvironmentFactory 可以创建 Python 线束容器。
不幸的是,当 Flink 在 Kubernetes 中运行时,我不能使用相同的解决方案。此外,我不想使用我的 pods 中的 Docker 命令创建 Python 线束容器。
似乎 Bean runner 会自动选择 Docker 来执行 Python 管道。但是,我注意到有一个名为ExternalEnvironmentFactory的实现,但我不确定如何使用它。
有没有办法部署侧容器并使用不同的工厂来运行 Python 线束过程?正确的方法是什么?
这是 DockerEnvironmentFactory 的补丁:
diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
ImmutableList.<String>builder()
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
! .add("--network=host")
// We need to pass on the information about Docker-on-Mac environment (due to missing
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
ImmutableList.<String>builder()
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
! .add("--network=flink")
// We need to pass on the information about Docker-on-Mac environment (due to missing
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****
private static ServerFactory getServerFactory() {
ServerFactory.UrlFactory dockerUrlFactory =
! (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
if (RUNNING_INSIDE_DOCKER_ON_MAC) {
// If we're already running in a container, we need to use a fixed port range due to
// non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----
private static ServerFactory getServerFactory() {
ServerFactory.UrlFactory dockerUrlFactory =
! (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
if (RUNNING_INSIDE_DOCKER_ON_MAC) {
// If we're already running in a container, we need to use a fixed port range due to
// non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
}
}
+ private static String getCanonicalHostName() throws RuntimeException {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Provider for DockerEnvironmentFactory. */
public static class Provider implements EnvironmentFactory.Provider {
private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
public ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
! return ServerFactory.createDefault();
case MAC:
return DockerOnMac.getServerFactory();
default:
--- 279,286 ----
public ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
! return DockerOnMac.getServerFactory();
! // return ServerFactory.createDefault();
case MAC:
return DockerOnMac.getServerFactory();
default:
这是我用来运行 Flink 的 Docker 撰写文件:
version: '3.4'
services:
jobmanager:
image: tenx/flink:1.8.1
command: 'jobmanager'
environment:
JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
DOCKER_MAC_CONTAINER: 1
FLINK_JM_HEAP: 128
volumes:
- jobmanager-data:/data
- /var/run/docker.sock:/var/run/docker.sock
ports:
- target: 8081
published: 8081
protocol: tcp
mode: ingress
networks:
- flink
taskmanager:
image: tenx/flink:1.8.1
command: 'taskmanager'
environment:
JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
DOCKER_MAC_CONTAINER: 1
FLINK_TM_HEAP: 1024
TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
networks:
- flink
volumes:
- taskmanager-data:/data
- /var/run/docker.sock:/var/run/docker.sock
- /var/folders:/var/folders
volumes:
jobmanager-data:
taskmanager-data:
networks:
flink:
external: true
这是我的Python管道:
import apache_beam as beam
import logging
class LogElements(beam.PTransform):
class _LoggingFn(beam.DoFn):
def __init__(self, prefix=''):
super(LogElements._LoggingFn, self).__init__()
self.prefix = prefix
def process(self, element, **kwargs):
logging.info(self.prefix + str(element))
yield element
def __init__(self, label=None, prefix=''):
super(LogElements, self).__init__(label)
self.prefix = prefix
def expand(self, input):
input | beam.ParDo(self._LoggingFn(self.prefix))
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])
p = beam.Pipeline(options=options)
(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())
p.run()
这是我运行作业服务的方式:
gradle :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081
自动选择 Docker 来执行 Python 工具。
我可以更改用于运行 Python 容器的图像:
options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=DOCKER", "--environment_config=beam/python:latest"](
我可以禁用Docker并启用ExternalEnvironmentFactory:
options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=server"](
但我必须在 http://server:80 上实现一些回调应答。
是否有可用的实现?
要回答上面的问题,基本上你想在同一个 pod 中beam_worker_pool容器和 flink 任务管理器容器一起添加。因此,在用于部署 flink 任务管理器的 yaml 文件中,添加一个新容器:
- name: beam-worker-pool
image: apache/beam_python3.7_sdk:2.22.0
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999
我找到了解决方案。新版本的 Apache Beam 2.16.0 提供了一个与环境类型 EXTERNAL 结合使用的实现。该实现基于为支持 Kubernetes 而创建的worker_pool_main。
我知道它有点过时了,但现在有一个用于 Kubernetes 的 Flink 运算符。
以下是如何使用运算符使用 Flink 运行 Apache Beam 的示例:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/master/examples/beam