如何在Kubernetes的Flink集群上运行ApacheBeam-Python管道



试图按照此处的Flink Kubernetes指令在minikube上运行单词计数示例,但作业始终无法完成。PythonBeamSDK工作池似乎不做任何工作。

除了配置Flink Kubernetes集群的说明外,我还在taskmanager部署中添加了一个Python SDK工作池。如果我理解正确的话,工作池的目的是执行管道的Python部分。请参阅完整的k8s部署。

apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink-test
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:1.10.2-scala_2.11
workingDir: /opt/flink
command: ["/bin/bash", "-c", "$FLINK_HOME/bin/taskmanager.sh start; 
while :;
do
if [[ -f $(find log -name '*taskmanager*.log' -print -quit) ]];
then tail -f -n +1 log/*taskmanager*.log;
fi;
done"]
ports:
- containerPort: 6122
name: rpc
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
- name: beam-worker-pool
image: apache/beam_python3.7_sdk:2.24.0
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j.properties
path: log4j.properties

我运行的示例如下:

python -m apache_beam.examples.wordcount 
--output /tmp/results/count 
--runner FlinkRunner 
--flink_master=localhost:8081 
--environment_type=EXTERNAL 
--environment_config=localhost:50000

我在https://beam.apache.org/documentation/runtime/sdk-harness-config/以设置CCD_ 1和CCD_。

作业被添加到作业管理器中,我可以在Flink UI中查看它,但作业永远不会完成。我开始翻阅容器日志,注意到光束工作者池有以下日志:

Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=localhost:46005', '--artifact_endpoint=localhost:43851', '--provision_endpoint=localhost:37079', '--control_endpoint=localhost:37961']
2020/09/28 16:44:00 Provision info:
pipeline_options:<fields: fields: > fields: > fields: > fields: > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > fields: > > logging_endpoint: artifact_endpoint: control_endpoint: dependencies: 
2020/09/28 16:44:00 Initializing python harness: /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:46005 --artifact_endpoint=localhost:43851 --provision_endpoint=localhost:37079 --control_endpoint=localhost:37961
2020/09/28 16:44:08 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = 

同样,任务管理器正在记录:

2020-09-28 16:46:00,155 INFO  org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory  - Still waiting for startup of environment from localhost:50000 for worker id 1-1

不确定我错过了什么。我检查了工作池上的/tmp/staging/pickled_main_session,它是空的。

请注意,此问题与前面的这些问题类似。如何使用部署在Kubernetes上的Flink运行Beam-Python管道?在Kubernetes 中运行Apache Beam python管道

默认情况下(截至本文撰写之时(,Beam将运行时依赖项("工件"(暂存到某个目录(默认情况下为/tmp/staped(,该目录需要作业服务器(在您的情况下为客户端(和Beam工作器都可以访问。

您可以通过设置--flink_submit_uber_jar管道选项来解决此问题。设置--flink_submit_uber_jar后,Beam将所有依赖项封装在提交给Flink的jar中。

最新更新