如何在kubernetes环境中使用spark配置beam-python-sdk



TLDR

如何配置Apache Beam管道选项与";environment_type"=外部还是过程?

说明

目前,我们在Kubernetes中有一个独立的spark集群,根据这个解决方案(和设置),我们启动了一个束管道,在需要联合运行python SDK的spark worker上创建一个嵌入式spark作业服务器。Apache Beam允许以4种不同的方式运行python SDK:

  • "DOCKER"-默认,在Kubernetes集群内不可能(将使用"container-in-container")
  • "回送"-仅用于测试,不可能有1个以上的工作舱
  • "外部"-理想的设置,";只是";创建一个sidecar容器,以便与spark工作程序在同一个pod中运行
  • "工艺"-在火花工人中执行一个过程,虽然不理想,但也可能是

开发

  1. 使用";外部"-在同一个pod上使用python-sdk实现spark worker:
apiVersion: apps/v1
kind: Deployment
metadata:
name: spark-worker
labels:
app: spark-worker
spec:
selector:
matchLabels:
app: spark-worker
template:
metadata:
labels:
app: spark-worker
spec:
containers:
- name: spark-worker
image: spark-py-custom:latest
imagePullPolicy: Never
ports:
- containerPort: 8081
protocol: TCP
command: ['/bin/bash',"-c","--"]
args: ["/start-worker.sh" ]
resources :
requests :
cpu : 4
memory : "5Gi"
limits :
cpu : 4
memory : "5Gi"
volumeMounts:
- name: spark-jars
mountPath: "/tmp"
- name: python-beam-sdk
image: apachebeam/python3.7_sdk:latest
command: ["/opt/apache/beam/boot", "--worker_pool"]
ports:
- containerPort: 50000
resources:
limits:
cpu: "1"
memory: "1Gi"
volumes:
- name: spark-jars
persistentVolumeClaim:
claimName: spark-jars

他们,如果我们执行命令

python3 wordcount.py 
--output ./data_test/counts 
--runner=SparkRunner 
--spark_submit_uber_jar 
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar 
--spark_master_url=spark://spark-master:7077 
--spark_rest_url=http://spark-master:6066 
--environment_type=EXTERNAL 
--environment_config=localhost:50000

我们得到了一个卡在状态";RUNNING":

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.28.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7fc360c0b8c8> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7fc360c0f048> ====================
INFO:apache_beam.runners.portability.abstract_job_service:Artifact server started on port 36369
INFO:apache_beam.runners.portability.abstract_job_service:Running job 'job-2448721e-e686-41d4-b924-5f8c5ae73ac2'
INFO:apache_beam.runners.portability.spark_uber_jar_job_server:Submitted Spark job with ID driver-20210305172421-0000
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

在spark worker日志中:

21/03/05 17:24:25 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"

在python sdk上:

2021/03/05 17:19:52 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=', '--artifact_endpoint=', '--provision_endpoint=', '--control_endpoint=']
2021/03/05 17:24:32 No logging endpoint provided.

检查火花工作程序stderr(在localhost 8081上):

Spark Executor Command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"
========================================
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/03/05 17:24:26 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 230@spark-worker-64fd4ddd6-tqdrs
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for TERM
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for HUP
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for INT
21/03/05 17:24:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:27 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 50 ms (0 ms spent in bootstraps)
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO DiskBlockManager: Created local directory at /tmp/spark-bdffc2b3-f57a-42fa-a720-e22274b86b67/executor-f1eff7ca-d2cd-4ff4-b18b-c8d6a520f590/blockmgr-c61fb65f-ea97-4bd5-bf15-e0025845a251
21/03/05 17:24:28 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203
21/03/05 17:24:28 INFO WorkerWatcher: Connecting to worker spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to /172.18.0.20:44365 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO WorkerWatcher: Successfully connected to spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/03/05 17:24:28 INFO Executor: Starting executor ID 0 on host 172.18.0.20
21/03/05 17:24:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42561.
21/03/05 17:24:28 INFO NettyBlockTransferService: Server created on 172.18.0.20:42561
21/03/05 17:24:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/03/05 17:24:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 172.18.0.20, 42561, None)

它永远卡在哪里。检查python SDK的源代码,我们可以看到";没有提供日志记录端点";是致命的,这是由于缺乏发送给他的配置(没有日志记录/工件/提供/控制端点)。如果我尝试添加"--artifact_endpoint";对于python命令,我得到了通信失败的grcp错误,因为jobserver创建了自己的工件端点。在这个设置中,有必要用固定端口配置所有这些端点(可能与SDK和worker在同一个pod中的localhost一样),但我找不到如何配置它。检查SO我可以找到相关的问题,但在他的情况下,他会自动获得python SDK配置(可能是spark runner问题?)

  1. 使用";工艺"-尝试在进程中运行python SDK,我使用./gradlew :sdks:python:container:py37:docker构建了python SDK,将SDK/python/container/build/target/launcher/linux_amd64/boot可执行文件复制到spark worker容器内的/python_SDK/boot,并使用命令:
python3 wordcount.py 
--output ./data_test/counts 
--runner=SparkRunner 
--spark_submit_uber_jar 
--spark_master_url=spark://spark-master:7077 
--spark_rest_url=http://spark-master:6066 
--environment_type=PROCESS 
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar 
--environment_config='{"os":"linux","arch":"x84_64","command":"/python_sdk/boot"}'

导致";运行时异常";在终端中:

INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
File "wordcount.py", line 91, in <module>
run()
File "wordcount.py", line 86, in run
output | "Write" >> WriteToText(known_args.output)
File "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 581, in __exit__
self.result.wait_until_finish()
File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 608, in wait_until_finish
raise self._runtime_exception
RuntimeError: Pipeline job-95c13aa5-96ab-4d1d-bc68-7f9d203c8251 failed in state FAILED: unknown error

并且再次检查spark stderr工作日志,我可以看到问题是java.lang.IllegalArgumentException: No filesystem found for scheme classpath,我不知道原因。

21/03/05 18:33:12 INFO Executor: Adding file:/opt/spark/work/app-20210305183309-0000/0/./javax.servlet-api-3.1.0.jar to class loader
21/03/05 18:33:12 INFO TorrentBroadcast: Started reading broadcast variable 0
21/03/05 18:33:12 INFO TransportClientFactory: Successfully created connection to spark-worker-89c5c4c87-5q45s/172.18.0.20:34783 after 1 ms (0 ms spent in bootstraps)
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.3 KB, free 366.3 MB)
21/03/05 18:33:12 INFO TorrentBroadcast: Reading broadcast variable 0 took 63 ms
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.5 KB, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_13_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_17_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 5427 bytes result sent to driver
21/03/05 18:33:14 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5f917914
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:16 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@67fb2b2c
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:19 INFO ProcessEnvironmentFactory: Still waiting for startup of environment '/python_sdk/boot' for worker id 1-1

可能缺少一些配置参数。

Obs

如果我执行命令

python3 wordcount.py 
--output ./data_test/counts 
--runner=SparkRunner 
--spark_submit_uber_jar 
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar 
--spark_master_url=spark://spark-master:7077 
--spark_rest_url=http://spark-master:6066 
--environment_type=LOOPBACK

在我们的spark工作者(在spark集群中只有一个工作者)中,我们有一个完整的工作光束管道,其中包含这些日志。

  1. 使用"外部"-这绝对像是Beam中的一个bug。工作端点应该设置为使用localhost;我认为不可能配置它们。我不知道他们为什么会失踪;一个有根据的猜测是,服务器无法安静地启动,从而使端点为空。我为这个问题提交了一份错误报告(BEAM-11957)
  2. 使用";过程"-方案classpath对应于ClassLoaderFileSystem。该文件系统通常使用AutoService加载,这取决于类路径上是否存在ClassLoaderFileSystemRegister(与文件系统本身的名称无关)。作业jar的类路径基于spark_job_server_jar。你的beam-runners-spark-job-server-2.28.0.jar是从哪里来的

最新更新