我每天有三个任务。第一个检查csv文件是否存在,第二个创建MySQL表,第三个将csv文件中的数据插入MySQL表。前两个任务成功,但最后一个没有说,由于secure-file-priv选项,我没有权利从文件插入数据到数据库。
default_args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1),
"retries": 1,
"retry_delay": timedelta(seconds=5)
}
with DAG('sql_operator_from_csv_to_mysql',default_args=default_args,schedule_interval='@daily', template_searchpath=['/opt/airflow/sql_files_mysql'], catchup=True) as dag:
t1 = BashOperator(task_id='check_file_exists', bash_command='ls /opt/airflow/store_files/students_data.csv | sha1sum', retries=2, retry_delay=timedelta(seconds=15))
t2 = MySqlOperator(task_id='create_mysql_table', mysql_conn_id="mysql_conn", sql="create_table.sql")
t3 = MySqlOperator(task_id='insert_into_table', mysql_conn_id="mysql_conn", sql="insert_into_table.sql")
SQL语句:
LOAD DATA INFILE '/store_files/students_data.csv' INTO TABLE students_db FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n' IGNORE 1 ROWS;
mysql.cnf文件如下:
!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/mysql.conf.d/
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= ""
我尝试将'secure-file-priv'选项标记为空字符串并显示csv文件的位置,但它们都不工作。
secure-file-priv= ""
secure-file-priv= "/opt/airflow/store_files"
错误提示:
[2022-06-12, 09:44:24 UTC] {taskinstance.py:1774} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/mysql/operators/mysql.py", line 84, in execute
hook.run(self.sql, autocommit=self.autocommit, parameters=self.parameters)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 205, in run
self._run_command(cur, sql_statement, parameters)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/hooks/dbapi.py", line 229, in _run_command
cur.execute(sql_statement)
File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 206, in execute
res = self._query(query)
File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/cursors.py", line 319, in _query
db.query(q)
File "/home/airflow/.local/lib/python3.7/site-packages/MySQLdb/connections.py", line 254, in query
_mysql.connection.query(self, query)
MySQLdb._exceptions.OperationalError: (1290, 'The MySQL server is running with the --secure-file-priv option so it cannot execute this statement')
[2022-06-12, 09:44:24 UTC] {taskinstance.py:1288} INFO - Marking task as FAILED. dag_id=sql_operator_from_csv_to_mysql, task_id=insert_into_table, execution_date=20220612T094413, start_date=20220612T094424, end_date=20220612T094424
[2022-06-12, 09:44:24 UTC] {standard_task_runner.py:98} ERROR - Failed to execute job 10 for task insert_into_table ((1290, 'The MySQL server is running with the --secure-file-priv option so it cannot execute this statement'); 1190)
[2022-06-12, 09:44:24 UTC] {local_task_job.py:154} INFO - Task exited with return code 1
[2022-06-12, 09:44:24 UTC] {local_task_job.py:264} INFO - 0 downstream tasks scheduled from follow-on schedule check
有什么办法可以解决这个问题吗?我试着对docker-compose做了一些小的修改。yaml文件。
我docker-compose。Yaml文件如下
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.3.2
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.3.2}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ./dags:/usr/local/airflow/dags
- ./store_files:/usr/local/airflow/store_files_airflow
- ./sql_files_mysql:/usr/local/airflow/sql_files
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
mysql:
image: mysql:latest
environment:
- MYSQL_ROOT_PASSWORD=airflow
volumes:
- ./store_files:/var/lib/mysql-files
- ./mysql.cnf:/etc/mysql/mysql.cnf
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
environment:
<<: *airflow-common-env
# Required to handle warm shutdown of the celery workers properly
# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
DUMB_INIT_SETSID: "0"
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "