我正试图通过EKS Fargate配置文件中的spark-submit
运行Spark作业
创造就业机会:
apiVersion: batch/v1
kind: Job
metadata:
name: data-processor-external-spark-job
namespace: fargate-profile-selector
labels:
app: data-processor
spec:
template:
metadata:
labels:
app: data-processor-external-spark
sdr.appname: spark
spec:
securityContext:
runAsUser: 1000
runAsGroup: 1000
fsGroup: 1000
hostname: data-processor
serviceAccountName: data-processor
restartPolicy: OnFailure
containers:
- name: data-processor
image: <account-id>.dkr.ecr.us-west-2.amazonaws.com/data-processor:1.0.1-507
imagePullPolicy: IfNotPresent
args:
- "spark-submit"
- "--master"
- "spark://spark-core:7077"
- "--verbose"
- "--class"
- "com.extraction.DataExtractionJob"
- "--deploy-mode"
- "client"
- "--conf"
- "spark.driver.cores=4"
- "--conf"
- "spark.kubernetes.driver.request.cores=2"
- "--conf"
- "spark.kubernetes.driver.limit.cores=4"
- "--conf"
- "spark.executor.cores=4"
- "--conf"
- "spark.kubernetes.executor.request.cores=3"
- "--conf"
- "spark.kubernetes.executor.limit.cores=4"
- "--conf"
- "spark.executor.instances=1"
- "--conf"
- "spark.driver.memory=2G"
- "--conf"
- "spark.executor.memory=3GB"
- "--conf"
- "spark.driver.supervise=false"
- "--conf"
- "spark.driver.maxResultSize=4g"
- "--conf"
- "spark.port.maxRetries=25"
- "--conf"
- "spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled=true"
- "--conf"
- "spark.kubernetes.container.image=<account-id>.dkr.ecr.us-west-2.amazonaws.com/data-processor-scaling:1.0.1-508"
- "--conf"
- "spark.kubernetes.namespace=fargate-profile-selector"
- "--conf"
- "spark.kubernetes.authenticate.driver.serviceAccountName=data-processor"
- "--conf"
- "spark.kubernetes.authenticate.executor.serviceAccountName=data-processor"
- "--conf"
- "spark.kubernetes.submission.waitAppCompletion=false"
- "--conf"
- "spark.kubernetes.driver.label.execution_id=c5cfaccb-d934-4150-ac59-471a7b6dac57"
- "--conf"
- "spark.kubernetes.executor.label.execution_id=c5cfaccb-d934-4150-ac59-471a7b6dac57"
- "--conf"
- "spark.kubernetes.driver.pod.name=extract-c5cfaccb-d934-4150-ac59-471a7b6dac57"
- "--conf"
- "spark.kubernetes.executor.podNamePrefix=extract-c5cfaccb-d934-4150-ac59-471a7b6dac57"
- "--conf"
- "spark.kubernetes.driverEnv.AWS_STS_REGIONAL_ENDPOINTS=regional"
- "--conf"
- "spark.executorEnv.AWS_STS_REGIONAL_ENDPOINTS=regional"
- "--conf"
- "spark.executorEnv.AWS_ROLE_ARN=arn:aws:iam::<account-id>:role/data-processor-service-account"
- "--conf"
- "spark.kubernetes.driverEnv.AWS_ROLE_ARN=arn:aws:iam::<account-id>:role/data-processor-service-account"
- "local:///opt/etl.jar"
- "--spark_master=spark://spark-core:7077"
- "--trace_id=54321"
- "--job_execution_id=12213131"
- "--s3_endpoint=https://s3.us-west-2.amazonaws.com/"
- "--input_data_uri=s3://uploads-dev/coreattribute.csv"
- "--output_data_uri=s3://uploads-dev/fargate-test/test-lead/output"
- "--error_data_uri=s3://uploads-dev/fargate-test/test-lead/error"
- "--s3_kms_key_id=arn:aws:kms:us-west-2:<account-id>:key/40859321-de3a-4751-b5c4-44a0378a17c0"
ports:
- containerPort: 8080
- containerPort: 40000
- containerPort: 40001
envFrom:
- secretRef:
name: data-processor-secrets
env:
- name: AWS_STS_REGIONAL_ENDPOINTS
value: regional
- name: S3_KMS_KEY_ID
value: arn:aws:kms:us-west-2:<account-id>:key/40859321-de3a-4751-b5c4-44a0378a17c0
- name: SPARK_PROMETHEUS_METRICS_REPORTING_ENABLED
value: "false"
- name: KUBERNETES_API
value: https://<id>.us-west-2.eks.amazonaws.com
- name: SPARK_DRIVER_CORES_REQUEST
value: "2"
- name: SPARK_DRIVER_CORES_LIMIT
value: "4"
- name: SPARK_MEMORY_OVERHEAD_FACTOR
value: "0.4"
- name: SPARK_MEMORY_OVERHEAD
value: "1.0"
- name: CONNECTOR_MOUNT_PATH
value: /etc/secrets/connector
- name: S3_TLS_ENABLED
value: "true"
- name: S3_PATH_STYLE_ENABLED
value: "true"
- name: SPARK_MASTER_HOST
value: spark-core
- name: DATA_PROCESSOR_HOST
value: data-processor
- name: SPARK_DRIVER_MEMORY
value: 2G
- name: SPARK_DRIVER_CORES
value: "4"
- name: ZOOKEEPER_HOST
value: zookeeper
- name: ZOOKEEPER_PORT
value: "2181"
- name: SPARK_CLUSTER_NODES
value: "6"
- name: SPARK_CLUSTER_NODE_MEMORY_GB
value: "24"
- name: SPARK_CLUSTER_NODE_CORES
value: "12"
- name: SPARK_UNZIP_CORES_LIMIT
value: "18"
- name: SPARK_UNZIP_MEMORY_GB_LIMIT
value: "18"
- name: SPARK_UNION_CORES_LIMIT
value: "12"
- name: SPARK_UNION_MEMORY_GB_LIMIT
value: "24"
- name: SPARK_JOIN_CORES_LIMIT
value: "12"
- name: SPARK_JOIN_MEMORY_GB_LIMIT
value: "32"
- name: SPARK_EXPORT_CORES_LIMIT
value: "12"
- name: SPARK_EXPORT_MEMORY_GB_LIMIT
value: "24"
- name: SERVICE_ACCOUNT
value: data-processor
- name: AWS_DEFAULT_REGION
value: us-west-2
- name: AWS_REGION
value: us-west-2
- name: AWS_ROLE_ARN
value: arn:aws:iam::<account-id>:role/data-processor-service-account
- name: AWS_WEB_IDENTITY_TOKEN_FILE
value: /var/run/secrets/eks.amazonaws.com/serviceaccount/token
volumeMounts:
- name: secrets
mountPath: /etc/secrets
readOnly: true
resources:
requests:
cpu: 100m
memory: 1Gi
limits:
cpu: 400m
memory: 2Gi
securityContext:
allowPrivilegeEscalation: false
aws-eks描述fargate概要文件-集群名称dev-fargate-cluster-poc-2-fargate概要名称fargateprofile
{
"fargateProfile": {
"fargateProfileName": "fargateprofile",
"fargateProfileArn": "arn:aws:eks:us-west-2:<account-id>:fargateprofile/dev-fargate-cluster-poc-2/fargateprofile/60c21523-d172-5442-5548-5ca118185ddf",
"clusterName": "dev-fargate-cluster-poc-2",
"createdAt": "2022-10-30T21:04:09.930000+02:00",
"podExecutionRoleArn": "arn:aws:iam::<account-id>:role/dev-fargate-cluster-poc-2-FargateNodeInstanceRole-3QDTOVKG62WU",
"subnets": [
"subnet-0371427591c2240ab",
"subnet-08895c527023616c1",
"subnet-0b398db79033a2be5"
],
"selectors": [
{
"namespace": "fargate-profile-selector",
"labels": {}
}
],
"status": "ACTIVE",
"tags": {
"sys_tag_version": "2020-04-29",
"sys_owner_div": "0056",
"sys_owner_dept": "4755",
"sys_sub_env": "dev",
"sys_app_id": "0",
"sys_env": "dev",
"sys_app_name": "cloud-dev-dev-5828"
}
}
}
aws iam列出附加的角色策略--角色名称dev-fargate-cluster-poc-FargateNodeInstanceRole-3QDTOVKG62WU(存在AmazonS3FullAccess策略(
{
"AttachedPolicies": [
{
"PolicyName": "CloudWatchAgentServerPolicy",
"PolicyArn": "arn:aws:iam::aws:policy/CloudWatchAgentServerPolicy"
},
{
"PolicyName": "AmazonSSMManagedInstanceCore",
"PolicyArn": "arn:aws:iam::aws:policy/AmazonSSMManagedInstanceCore"
},
{
"PolicyName": "AmazonEKS_CNI_Policy",
"PolicyArn": "arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy"
},
{
"PolicyName": "AmazonEC2ContainerRegistryReadOnly",
"PolicyArn": "arn:aws:iam::aws:policy/AmazonEC2ContainerRegistryReadOnly"
},
{
"PolicyName": "AmazonEKSWorkerNodePolicy",
"PolicyArn": "arn:aws:iam::aws:policy/AmazonEKSWorkerNodePolicy"
},
{
"PolicyName": "AmazonS3FullAccess",
"PolicyArn": "arn:aws:iam::aws:policy/AmazonS3FullAccess"
},
{
"PolicyName": "AmazonEKSFargatePodExecutionRolePolicy",
"PolicyArn": "arn:aws:iam::aws:policy/AmazonEKSFargatePodExecutionRolePolicy"
},
{
"PolicyName": "AISSystemLogsPolicy",
"PolicyArn": "arn:aws:iam::<account-id>:policy/AISSystemLogsPolicy"
}
]
}
kubectl描述服务帐户数据处理器
Name: data-processor
Namespace: fargate-profile-selector
Labels: app=data-processor
app.kubernetes.io/managed-by=Helm
chart=data-processor-0.0.1-366
heritage=Helm
release=data-processor
Annotations: eks.amazonaws.com/role-arn: arn:aws:iam::<account-id>:role/data-processor-service-account
meta.helm.sh/release-name: data-processor
meta.helm.sh/release-namespace: fargate-profile-selector
Image pull secrets: <none>
Mountable secrets: data-processor-token-2cfhr
Tokens: data-processor-token-2cfhr
Events: <none>
aws iam列表附加的角色策略--角色名称数据处理器服务帐户(存在AmazonS3FullAccess策略(
{
"AttachedPolicies": [
{
"PolicyName": "AmazonS3FullAccess",
"PolicyArn": "arn:aws:iam::aws:policy/AmazonS3FullAccess"
},
{
"PolicyName": "kms-key",
"PolicyArn": "arn:aws:iam::<account-id>:policy/kms-key"
},
{
"PolicyName": "msk-admin-policy",
"PolicyArn": "arn:aws:iam::<account-id>:policy/msk-admin-policy"
},
{
"PolicyName": "NoDeleteOnGPG",
"PolicyArn": "arn:aws:iam::<account-id>:policy/NoDeleteOnGPG"
}
]
}
从代码库的角度来看S3配置:
val s3ProviderClasses = Seq(classOf[WebIdentityTokenCredentialsProvider],
classOf[TemporaryAWSCredentialsProvider],
classOf[SimpleAWSCredentialsProvider],
classOf[EnvironmentVariableCredentialsProvider],
classOf[IAMInstanceCredentialsProvider])
.map(_.getName)
.mkString(",")
spark.sparkContext.hadoopConfiguration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", s3Conf.s3Endpoint)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.ssl.enabled", s3Conf.s3TlsEnabled.toString)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.maximum", "15000")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.establish.timeout", "60000")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.connection.timeout", "120000")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.attempts.maximum", "10")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.threads.max", "500")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.multipart.size", "256M")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.multipart.threshold", "5G")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.fast.upload.buffer", "disk")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", s3Conf.s3PathStyleEnabled.toString)
spark.sparkContext.hadoopConfiguration.setBoolean("fs.s3a.impl.disable.cache", false)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider", s3ProviderClasses)
s3Conf.s3KmsKeyId.foreach { kmsKeyId =>
log.info(s"S3 SSE-KMS encryption is enabled for $appName job. KMS Key ID: $kmsKeyId")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.server-side-encryption-algorithm", "SSE-KMS")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.server-side-encryption.key", kmsKeyId)
}
错误:
java.nio.file.AccessDeniedException: s3://uploads-dev/coreattribute.csv: org.apache.hadoop.fs.s3a.auth.NoAuthWithAWSException: No AWS Credentials provided by WebIdentityTokenCredentialsProvider TemporaryAWSCredentialsProvider SimpleAWSCredentialsProvider EnvironmentVariableCredentialsProvider IAMInstanceCredentialsProvider : com.amazonaws.SdkClientException: Unable to load AWS credentials from environment variables (AWS_ACCESS_KEY_ID (or AWS_ACCESS_KEY) and AWS_SECRET_KEY (or AWS_SECRET_ACCESS_KEY))
我期望使用WebIdentityTokenCredentialsProvider
,而不是SimpleAWSCredentialsProvider
。必须使用WebIdentityTokenCredentialsProvider
,因为至少有角色为data-processor-service-account
的服务帐户data-processor
具有完全的S3访问权限。
那么,尽管容器是由具有S3完全访问权限的服务帐户运行的,为什么它期望AWS_ACCESS_KEY_ID
变量呢?
谢谢。
自hadoop aws<=起,ASF版本中没有对WebIdentityTokenCredentialsProviders3a的支持3.3.5.参见https://issues.apache.org/jira/browse/HADOOP-18154如果你想做公关的话。该PR仍在等待更新,如果你想接受并完成它,强烈建议你这样做,尤其是如果你可以使用新的提供者重新运行整个hadoop-aws集成测试。
开源开发依赖于开发人员的贡献来添加他们需要的功能。现在是你做出贡献的机会!