气流kubernetesExecutiutor:Worker pod在创建后终止



当尝试用KubernetesExecutor运行dag时,worker pod中的异常在启动后立即终止:

我有一个问题,为什么调度程序将LocalExecutor作为可以在pod describe result中找到的env变量发送是正确的行为?

请查找所有必需的文件:

  1. 气流.cfg
  2. 工人dag描述
  3. 工人dag日志
  4. dag文件

Worker pod描述结果:

Name:         tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac
Namespace:    default
Priority:     0
Node:         worker01/<node-ip>
Start Time:   <date-time>
Labels:       airflow-worker=<airflow-dummy>
airflow_version=1.10.11
dag_id=tutorial_v01
execution_date=
kubernetes_executor=True
task_id=print_hello
try_number=1
Annotations:  <none>
Status:       Failed
IP:           <Node Ip>
IPs:
IP:  <Node Ip>
Containers:
base:
Container ID:  <container-id>
Image:         <repo-name>/k8-airflow:latest
Image ID:      docker-pullable://<repo-name>/k8-
Port:          <none>
Host Port:     <none>
Command:
airflow
run
tutorial_v01
print_hello
<date time>
--local
--pool
default_pool
-sd
/usr/local/airflow/dags/tutorial_01.py
State:          Terminated
Reason:       Error
Exit Code:    1
Started:      Thu, 06 Aug 2020 13:20:21 +0000
Finished:     Thu, 06 Aug 2020 13:20:22 +0000
Ready:          False
Restart Count:  0
Environment Variables from:
airflow-configmap  ConfigMap  Optional: false
Environment:
AIRFLOW__CORE__EXECUTOR:          LocalExecutor
AIRFLOW__CORE__DAGS_FOLDER:       /usr/local/airflow/dags/repo/
AIRFLOW__CORE__SQL_ALCHEMY_CONN:  <alchemy-postgres-conn-url>
Mounts:
/usr/local/airflow/dags from airflow-dags (ro)
/usr/local/airflow/logs from airflow-logs (rw)
/var/run/secrets/kubernetes.io/serviceaccount from default-token-sdfdfdd (ro)
Conditions:
Type              Status
Initialized       True
Ready             False
ContainersReady   False
PodScheduled      True
Volumes:
airflow-dags:
Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName:  airflow-dags
ReadOnly:   false
airflow-logs:
Type:       EmptyDir (a temporary directory that shares a pod's lifetime)
Medium:
SizeLimit:  <unset>
default-token-mnh2t:
Type:        Secret (a volume populated by a Secret)
SecretName:  default-token-mnh2t
Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type    Reason     Age        From               Message
----    ------     ----       ----               -------
Normal  Scheduled  <unknown>  default-scheduler  Successfully assigned default/tutorialv01printhello-50d3b9099ea64c19a51e2fb035eef8ac to worker01
Normal  Pulling    8m4s       kubelet, worker01  Pulling image "<repo-name>/k8-airflow:latest"
Normal  Pulled     8m1s       kubelet, worker01  Successfully pulled image "<repo-name>/k8-airflow:latest"
Normal  Created    8m1s       kubelet, worker01  Created container base
Normal  Started    8m1s       kubelet, worker01  Started container base

Worker pod日志:

File "/usr/bin/airflow", line 25, in <module>
from airflow.configuration import conf
File "/usr/lib/python3.6/site-packages/airflow/__init__.py", line 31, in <module>
from airflow.utils.log.logging_mixin import LoggingMixin
File "/usr/lib/python3.6/site-packages/airflow/utils/__init__.py", line 24, in <module>
from .decorators import apply_defaults as _apply_defaults
File "/usr/lib/python3.6/site-packages/airflow/utils/decorators.py", line 36, in <module>
from airflow import settings
File "/usr/lib/python3.6/site-packages/airflow/settings.py", line 37, in <module>
from airflow.configuration import conf, AIRFLOW_HOME, WEBSERVER_CONFIG  # NOQA F401
File "/usr/lib/python3.6/site-packages/airflow/configuration.py", line 636, in <module>
with open(TEST_CONFIG_FILE, 'w') as f:
PermissionError: [Errno 13] Permission denied: '/usr/local/airflow/unittests.cfg'

find airflow.cfg:

apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
labels:
env: airflow-test
data:
airflow.cfg: |
[core]
dags_folder = /usr/local/airflow/dags
base_log_folder = /usr/local/airflow/logs
logging_level = INFO
executor = KubernetesExecutor
parallelism = 32
load_examples = False
plugins_folder = /usr/local/airflow/plugins
sql_alchemy_conn = postgresql+psycopg2://<username>:<pwd>@airflow-metastore:5432/airflow
[celery]
broker_url =
result_backend =
[webserver]
base_url = http://0.0.0.0:8080
rbac=False
web_server_host = 0.0.0.0
web_server_port = 8080
dag_default_view = tree
[kubernetes]
namespace = default
airflow_configmap =
worker_service_account_name = default
worker_container_image_pull_policy = Always
worker_dags_folder = /usr/local/airflow/dags
worker_container_repository = <repo-name>/k8-airflow
worker_container_tag = latest
delete_worker_pods = false
env_from_configmap_ref = airflow-configmap
git_repo = https://github.com/<repo-name>/airflow-dags
git_branch = master
git_sync_credentials_secret = git-credentials
git_sync_root = /tmp/git
git_dags_folder_mount_point = /usr/local/airflow/dags
git_sync_container_repository = <repo-name>/git-sync
git_sync_container_tag = latest
git_sync_init_container_name = git-sync-clone
dags_volume_claim = airflow-dags
in_cluster = True
dags_volume_subpath =
dags_volume_mount_point =

[kubernetes_environment_variables]
AIRFLOW__CORE__EXECUTOR = KubernetesExecutor
AIRFLOW__CORE__DAGS_FOLDER = /usr/local/airflow/dags

[admin]
hide_sensitive_variable_fields = True

和Kubernetes文件:

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: default
namespace: default
labels:
env: airflow-test
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["pods"]
verbs: ["get", "list", "watch", "create", "update", "delete"]
- apiGroups: ["batch", "extensions"]
resources: ["jobs"]
verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: default
namespace: default
labels:
env: airflow-test
subjects:
- kind: ServiceAccount
name: default # Name of the ServiceAccount
namespace: default
roleRef:
kind: Role # This must be Role or ClusterRole
name: default # This must match the name of the Role or ClusterRole you wish to bind to
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: default
labels:
env: airflow-test
spec:
replicas: 1
selector:
matchLabels:
env: airflow-test
template:
metadata:
labels:
env: airflow-test
spec:
initContainers:
- name: "init"
image: <repo-name>/k8-airflow
imagePullPolicy: Always
volumeMounts:
- name: airflow-configmap
mountPath: /usr/local/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /usr/local/airflow/dags
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command:
- "bash"
args:
- "-cx"
- "initdb.sh"
containers:
- name: webserver
image: <repo-name>/k8-airflow
imagePullPolicy: IfNotPresent
env:
- name: NODE
value: "webserver"
envFrom:
- configMapRef:
name: airflow-configmap
ports:
- name: webserver
protocol: TCP
containerPort: 8080
volumeMounts:
- mountPath: /usr/local/airflow/dags
name: airflow-dags
- mountPath: /usr/local/airflow/airflow.cfg
name: airflow-configmap
subPath: airflow.cfg
- name: airflow-logs
mountPath: /usr/local/airflow/logs
- name: scheduler
image: <repo-name>/k8-airflow
imagePullPolicy: IfNotPresent
env:
- name: NODE
value: "scheduler"
envFrom:
- configMapRef:
name: airflow-configmap
ports:
- name: webserver
protocol: TCP
containerPort: 8080
volumeMounts:
- mountPath: /usr/local/airflow/dags
name: airflow-dags
- mountPath: /usr/local/airflow/airflow.cfg
name: airflow-configmap
subPath: airflow.cfg
- name: airflow-logs
mountPath: /usr/local/airflow/logs
volumes:
- name: airflow-configmap
configMap:
name: airflow-configmap
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
---
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: default
labels:
env: airflow-test
spec:
type: NodePort
ports:
- name: webserver
protocol: TCP
port: 8080
targetPort: 8080
nodePort: 30003
selector:
env: airflow-test

Dag文件:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
def print_world():
print('world_1')
default_args = {
'start_date': dt.datetime(2020, 8, 6,9,45,0),
'retries': 1,
'retry_delay': dt.timedelta(minutes=5),
}
with DAG('tutorial_v01',
default_args=default_args,
schedule_interval='*/30 * * * *',
) as dag:
print_hello = BashOperator(task_id='print_hello',
bash_command='echo "hello"')
sleep = BashOperator(task_id='sleep',
bash_command='sleep 5')
print_world = PythonOperator(task_id='print_world',
python_callable=print_world)
print_hello >> sleep >> print_world 

我已经在使用过的图像中提供了777到这个位置/usr/local/airflow/,如果还有其他需要,请告诉我

K8s作为docker容器运行气流。当你旋转容器时,你需要以气流用户的身份运行它。

这可以在您的dockerfile中实现。您可以指示它以用户身份运行。如果你想了解更多这方面的信息,请告诉我。

同样针对您的上述问题。请参考此。

https://issues.apache.org/jira/browse/AIRFLOW-6754

希望这能回答你的问题。让我知道。

关注@hopeIsTheonlyWeapon参考链接,使其发挥作用。。

airflow.cfg中定义run_as_user = <user uid>

或在配置映射工作、中定义ENV变量AIRFLOW__KUBERNETES__RUN_AS_USER=<user uid>

例如,如果您的AIRFLOW_HOME路径可以访问airflow用户而不是

  • 通过运行以下命令id -u airflow来识别用户uid
    • 现在在我的情况下,uid是1000
  • 所以现在我的airflow.cfg会有这个配置run_as_user = 1000

ref:https://airflow.apache.org/docs/stable/configurations-ref.html#run-作为用户

相关内容

最新更新