我正在尝试在kubernetes集群上运行一个简单的spark作业。我部署了一个启动pyspark shell的pod,并在该shell中更改spark配置,如下所示:
>>> sc.stop()
>>> sparkConf = SparkConf()
>>> sparkConf.setMaster("k8s://https://kubernetes.default.svc:443")
>>> sparkConf.setAppName("pyspark_test")
>>> sparkConf.set("spark.submit.deployMode", "client")
>>> sparkConf.set("spark.executor.instances", 2)
>>> sparkConf.set("spark.kubernetes.container.image", "us.icr.io/testspark/spark:v1")
>>> sparkConf.set("spark.kubernetes.namespace", "anonymous")
>>> sparkConf.set("spark.driver.memory", "1g")
>>> sparkConf.set("spark.executor.memory", "1g")
>>> sparkConf.set("spark.driver.host", "testspark")
>>> sparkConf.set("spark.driver.port", "37771")
>>> sparkConf.set("spark.kubernetes.driver.pod.name", "testspark")
>>> sparkConf.set("spark.driver.bindAddress", "0.0.0.0")
>>>
>>> spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()
>>> sc = spark.sparkContext
启动两个新的执行器pod,但都失败了:
satyam@Satyams-MBP ~ % kubectl get pods -n anonymous
NAME READY STATUS RESTARTS AGE
pysparktest-c1c8f177591feb60-exec-1 0/2 Error 0 111m
pysparktest-c1c8f177591feb60-exec-2 0/2 Error 0 111m
testspark 2/2 Running 0 116m
我检查了其中一个执行器pod的日志,它显示以下错误:
satyam@Satyams-MBP ~ % kubectl logs -n anonymous pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry=
+ set -e
+ '[' -z '' ']'
+ '[' -w /etc/passwd ']'
+ echo '185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sed 's/[^=]*=(.*)/1/g'
+ sort -t_ -k4 -n
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@testspark:37771 --executor-id 1 --cores 1 --app-id spark-application-1612108001966 --hostname 172.30.174.196
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/01/31 15:46:49 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 14@pysparktest-c1c8f177591feb60-exec-1
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for TERM
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for HUP
21/01/31 15:46:49 INFO SignalUtils: Registered signal handler for INT
21/01/31 15:46:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/01/31 15:46:49 INFO SecurityManager: Changing view acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls to: 185,root
21/01/31 15:46:49 INFO SecurityManager: Changing view acls groups to:
21/01/31 15:46:49 INFO SecurityManager: Changing modify acls groups to:
21/01/31 15:46:49 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185, root); groups with view permissions: Set(); users with modify permissions: Set(185, root); groups with modify permissions: Set()
Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:283)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:272)
at org.apache.spark.executor.CoarseGrainedExecutorBackend.main(CoarseGrainedExecutorBackend.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:302)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$3(CoarseGrainedExecutorBackend.scala:303)
at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.Range.foreach(Range.scala:158)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$1(CoarseGrainedExecutorBackend.scala:301)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
... 4 more
Caused by: java.io.IOException: Failed to connect to testspark/172.30.174.253:37771
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:253)
at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:195)
at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: testspark/172.30.174.253:37771
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
我还根据这里的指令https://spark.apache.org/docs/latest/running-on-kubernetes.html#client-mode-networking创建了一个无头服务。下面是服务和驱动pod的yaml:
apiVersion: v1
kind: Service
metadata:
name: testspark
spec:
clusterIP: "None"
selector:
spark-app-selector: testspark
ports:
- name: driver-rpc-port
protocol: TCP
port: 37771
targetPort: 37771
- name: blockmanager
protocol: TCP
port: 37772
targetPort: 37772
司机Pod
apiVersion: v1
kind: Pod
metadata:
name: testspark
labels:
spark-app-selector: testspark
spec:
containers:
- name: testspark
securityContext:
runAsUser: 0
image: jupyter/pyspark-notebook
ports:
- containerPort: 37771
command: ["tail", "-f", "/dev/null"]
serviceAccountName: default-editor
这应该允许executor pods连接到驱动程序(我检查了它的正确ip 172.30.174.249)。为了调试网络,我在驱动程序容器中启动了一个shell,并对侦听端口进行netstat。下面是相同操作的输出:
root@testspark:/opt/spark/work-dir# netstat -tulpn
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 127.0.0.1:15000 0.0.0.0:* LISTEN -
tcp 0 0 0.0.0.0:15001 0.0.0.0:* LISTEN -
tcp 0 0 0.0.0.0:15090 0.0.0.0:* LISTEN -
tcp6 0 0 :::4040 :::* LISTEN 35/java
tcp6 0 0 :::37771 :::* LISTEN 35/java
tcp6 0 0 :::15020 :::* LISTEN -
tcp6 0 0 :::41613 :::* LISTEN 35/java
我还尝试通过telnet从同一名称空间上的另一个运行pod连接到端口37771上的驱动pod,并且它能够连接。
root@test:/# telnet 172.30.174.249 37771
Trying 172.30.174.249...
Connected to 172.30.174.249.
Escape character is '^]'.
我不知道为什么我的执行器pod不能连接到同一端口的驱动程序。我是否缺少任何配置或我做错了什么?如果需要,我可以提供更多的信息。
我用以下docker文件创建了一个假的spark执行器映像:
FROM us.icr.io/testspark/spark:v1
ENTRYPOINT ["tail", "-f", "/dev/null"]
并在实例化spark上下文时将此图像作为spark.kubernetes.container.image
config传递。我有两个正在运行的执行舱。我用命令kubectl exec -n anonymous -it pysparktest-c1c8f177591feb60-exec-1 -c spark-kubernetes-executor bash
执行其中一个,并运行以下命令/opt/entrypoint.sh executor
,令我惊讶的是,执行者可以很好地连接到驱动程序。下面是相同的堆栈跟踪:
++ id -u
+ myuid=185
++ id -g
+ mygid=0
+ set +e
++ getent passwd 185
+ uidentry='185:x:185:0:anonymous uid:/opt/spark:/bin/false'
+ set -e
+ '[' -z '185:x:185:0:anonymous uid:/opt/spark:/bin/false' ']'
+ SPARK_CLASSPATH=':/opt/spark/jars/*'
+ env
+ grep SPARK_JAVA_OPT_
+ sort -t_ -k4 -n
+ sed 's/[^=]*=(.*)/1/g'
+ readarray -t SPARK_EXECUTOR_JAVA_OPTS
+ '[' -n '' ']'
+ '[' '' == 2 ']'
+ '[' '' == 3 ']'
+ '[' -n '' ']'
+ '[' -z ']'
+ case "$1" in
+ shift 1
+ CMD=(${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP)
+ exec /usr/bin/tini -s -- /usr/local/openjdk-8/bin/java -Dio.netty.tryReflectionSetAccessible=true -Dspark.driver.port=37771 -Xms1g -Xmx1g -cp ':/opt/spark/jars/*:' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@testspark.anonymous.svc.cluster.local:37771 --executor-id 1 --cores 1 --app-id spark-application-1612191192882 --hostname 172.30.174.249
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/02/01 15:00:16 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 39@pysparktest-27b678775e1556d9-exec-1
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for TERM
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for HUP
21/02/01 15:00:16 INFO SignalUtils: Registered signal handler for INT
21/02/01 15:00:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/02/01 15:00:17 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:17 INFO SecurityManager: Changing view acls groups to:
21/02/01 15:00:17 INFO SecurityManager: Changing modify acls groups to:
21/02/01 15:00:17 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185, root); groups with view permissions: Set(); users with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:17 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 173 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO SecurityManager: Changing view acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls to: 185,root
21/02/01 15:00:18 INFO SecurityManager: Changing view acls groups to:
21/02/01 15:00:18 INFO SecurityManager: Changing modify acls groups to:
21/02/01 15:00:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(185, root); groups with view permissions: Set(); users with modify permissions: Set(185, root); groups with modify permissions: Set()
21/02/01 15:00:18 INFO TransportClientFactory: Successfully created connection to testspark.anonymous.svc.cluster.local/172.30.174.253:37771 after 3 ms (0 ms spent in bootstraps)
21/02/01 15:00:18 INFO DiskBlockManager: Created local directory at /var/data/spark-839bad93-b01c-4bc9-a33f-51c7493775e3/blockmgr-ad6a42b9-cfe2-4cdd-aa28-37a0ab77fb16
21/02/01 15:00:18 INFO MemoryStore: MemoryStore started with capacity 413.9 MiB
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@testspark.anonymous.svc.cluster.local:37771
21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO ResourceUtils: Resources for spark.executor:
21/02/01 15:00:19 INFO ResourceUtils: ==============================================================
21/02/01 15:00:19 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/02/01 15:00:19 INFO Executor: Starting executor ID 1 on host 172.30.174.249
21/02/01 15:00:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40515.
21/02/01 15:00:19 INFO NettyBlockTransferService: Server created on 172.30.174.249:40515
21/02/01 15:00:19 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/02/01 15:00:19 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(1, 172.30.174.249, 40515, None)
21/02/01 15:00:19 INFO BlockManager: Initialized BlockManager: BlockManagerId(1, 172.30.174.249, 40515, None)
我实际上很困惑为什么会发生这种情况。是否有任何工作,我可以尝试让这个东西自动工作,而不是我必须手动运行它?
在同事的帮助下我终于解决了这个问题。我只是添加了这两个配置来禁用istio sidecar注入,它开始工作了。
sparkConf.set("spark.kubernetes.driver.annotation.sidecar.istio.io/inject", "false")
sparkConf.set("spark.kubernetes.executor.annotation.sidecar.istio.io/inject", "false")
我对PySpark没有太多的经验,但我曾经设置Java Spark在Kubernetes集群上以客户端模式运行,就像你现在正在尝试的那样。我相信配置应该基本相同。
首先,您应该检查无头服务是否按预期工作。第一个:
kubectl describe -n anonymous testspark
并查看是否有任何端点和整个描述。第二,从你的一个Pod内部,你可以检查nslookup是否解析了你期望你的驱动Pod拥有的主机名。
kubectl exec -n <namespace> -it <pod-name> -- bash // exec into a Pod which have nslookup
nslookup testspark
nslookup testspark.testspark
如果这些名称正确地解析为驱动Pod当前的ip地址,那么它可能与Spark配置有关。
我发现你的配置和我在java中使用的配置之间的唯一区别是,作为spark.driver.host,我使用的是类似的东西:
service-name.namespace.svc.cluster.local
,但在理论上,它应该是相同的。
另一个在调试这个问题时可能有用的东西,是对其中一个执行器pod的描述,只是为了检查配置是否正确。
编辑:
这是我使用的spark-submit命令:
sh /opt/spark/bin/spark-submit --master k8s://https://master-ip:6443 --deploy-mode client --name ${POD_NAME} --class "my.spark.application.main.Processor" --conf "spark.executor.instances=2" --conf "spark.kubernetes.namespace=${POD_NAMESPACE}" --conf "spark.kubernetes.driver.pod.name=${POD_NAME}" --conf "spark.kubernetes.container.image=${SPARK_IMAGE}" --conf "spark.kubernetes.executor.request.cores=2" --conf "spark.kubernetes.executor.limit.cores=2" --conf "spark.driver.memory=4g" --conf "spark.executor.cores=2" --conf "spark.executor.memory=4g" --conf "spark.driver.host=${SPARK_DRIVER_HOST}" --conf "spark.ui.dagGraph.retainedRootRDDs=1000" --conf "spark.driver.port=${SPARK_DRIVER_PORT}" --conf "spark.driver.extraJavaOptions=${DRIVER_JAVA_ARGS}" --conf "spark.executor.extraJavaOptions=${EXECUTOR_JAVA_ARGS}" /opt/spark/jars/app.jar -jobConfig ${JOB_CONFIGURATION}
大多数配置,如spark_driver_port或spark_driver_host,与你的完全相似,只是名称和端口不同。