无法在 EC2 上使用新创建的气流环境连接到雪花



我有一个 AWS Cloudformation 模板,用于创建基本的气流环境(一个 EC2 t3.small 实例同时托管 Web 服务器和调度程序,没有外部数据库,没有芹菜执行程序(。此环境连接到 Snowflake 数据仓库,以将文件从 S3 推送到 Snowflake 上的数据库中。我成功地在 EC2 实例上创建了一个气流环境,我在 CFT 中做的最后一件事就是激活气流调度程序。日志似乎表明干净启动,Web服务器启动没有问题。运行测试 DAG 以仅连接到雪花时,出现此错误:

[2020-02-11 11:23:25,422] {__init__.py:1580} ERROR - 250001 (08001): None: Failed to connect to DB: xxxxxxxxxxx.us-east-1.snowflakecomputing.com:443. IP [69995088867598] is not allowed to access Snowflake.  Contact your local security administrator.

以下是我们所知道的:

1(来自EC2的IP地址很好,它没有被列入黑名单或不在雪花侧的白名单上,所以这个错误有点令人费解。

2(在Airflow之外手动运行Python脚本工作正常 - 与Snowflake的连接按预期发生。

3( 终止气流调度程序,通过"气流调度程序 -D"启动它,然后重新运行 DAG 将导致成功运行。

4(日志没有指示除我上面发布的内容之外的任何其他错误。

这是我的CFT的用户数据部分:

'Fn::Base64':
!Sub |
#!/bin/bash -x
exec > /tmp/user-data.log 2>&1
# Path settings
export HOME=/root
export AIRFLOW_HOME=$HOME/airflow
export PATH=$PATH:/root/airflow:/root/airflow/bin
# Egress proxy and exceptions
# Step 1: Install Python version
apt-get update -y
apt-get install build-essential checkinstall -y
apt-get install libssl-dev openssl zlib1g-dev libffi-dev libsqlite3-dev libpq-dev postgresql postgresql-client python3-psycopg2 -y
wget https://www.python.org/ftp/python/3.7.3/Python-3.7.3.tgz
tar xzvf Python-3.7.3.tgz
cd Python-3.7.3
./configure
make
make altinstall
# Step 2: Create virtual environment with new Python version
cd ~
python3.7 -m venv airflow
# Create pip.conf
echo '[global]
proxy = http://http.proxy.com:8000
index-url = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple
index = https://${ArtifactoryUsername}:${ArtifactoryPassword}@artifactory.com/api/pypi/pypi-prereleases/simple' > $AIRFLOW_HOME/pip.conf
# Allow these ports through the ufw firewall
sudo ufw allow 8080
# Upgrade Pip
$AIRFLOW_HOME/bin/pip install --upgrade pip
/usr/local/bin/aws s3 sync s3://${S3CDAPBucket}/dags $AIRFLOW_HOME/dags/
# Install required PIP packages into virtual environment
$AIRFLOW_HOME/bin/pip install -r $AIRFLOW_HOME/dags/requirements.txt --no-cache-dir --retries 10
# Setup airflow local db; edit config file as needed
$AIRFLOW_HOME/bin/airflow initdb
sed -i 's/load_exampless=sTrue/load_examples = False/g' $AIRFLOW_HOME/airflow.cfg
sed -i 's/default_dag_run_display_numbers=s25/default_dag_run_display_number = 5/g' $AIRFLOW_HOME/airflow.cfg
sed -i "/remote_loggings=sFalse/cremote_logging = True" $AIRFLOW_HOME/airflow.cfg
sed -i "/remote_base_log_folders=/cremote_base_log_folder = s3://${S3CDAPBucket}/logs" $AIRFLOW_HOME/airflow.cfg
# Re-init the database to use the postgres instance; start scheduler and webserver
$AIRFLOW_HOME/bin/airflow pool -i $AIRFLOW_HOME/dags/config/airflow/airflow-pool-config.json
sed -i "/snowflake_batch_password/c   "snowflake_batch_password" : "${SnowflakeBatchPassword}"," $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json
$AIRFLOW_HOME/bin/airflow variables -i $AIRFLOW_HOME/dags/config/airflow/airflow-variables-dev.json
sudo apt autoremove -y
chgrp -R cloud-user /root
chmod -R g+rwx /root
$AIRFLOW_HOME/bin/airflow webserver -D
$AIRFLOW_HOME/bin/airflow scheduler -D

DAG 测试与雪花的连接如下所示:

from airflow import DAG
from datetime import datetime, timedelta
import json
import os
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow import AirflowException
from snowflake import snowflake_hook
from aws.s3 import FileOps
import boto3
from dag_admin import common_dag_tasks, landing_zone_dag_tasks, raw_zone_dag_tasks
from logger import logger
from functools import partial
import traceback
from data_validation import batch_validation
from snowflake.snowflake_hook import SnowflakeHook
def snowflake_task():
my_snowflake = SnowflakeHook(account='xxxxxxxxxxx.us-east-1',
username='SNOWFLAKE_DEV_BATCH_USER',
password='password',
warehouse='DEV_BATCH_XSMALL',
role='DEV_DB_DEVELOPER_ROLE')
conn = my_snowflake.get_conn()
cur = conn.cursor()
try:
cur.execute('select count(*) as row_cnt from DEV.LANDING__MST_DC_DBO.DC_ACCOUNTING_ITEM__BULK')
for (row_cnt) in cur:
print('Row Count = {}'.format(row_cnt))
finally:
cur.close()
conn.close()

dag_id = 'snowflake-test'
my_dag = DAG(dag_id=dag_id,
start_date=datetime.today(),
schedule_interval=None)
start_task = PythonOperator(
task_id="start-task",
dag=my_dag,
python_callable=snowflake_task)

要求.txt:

apache-airflow==1.10.3
Jinja2==2.10.0
Werkzeug==0.14.1
tzlocal==1.5.1
Flask==1.0.4
snowflake-connector-python==2.0.3
inhouse-snowflake==1.0.0
marshmallow-sqlalchemy==0.17.1
inhouse-aws==1.0.0
inhouse-dag-admin==1.0.0
psycopg2==2.8.4
boto3==1.9.253
inhouse-logging==1.0.0
inhouse-data-validation==1.0.0

根据错误,您可能在雪花帐户中启用了网络策略。 您需要获取 EC2 计算机的公有 IP,并将其添加到允许的 IP 中。

这里是完整的教程。另外,你需要考虑使用一个 Elastic-IP 并附加到你的机器上,这样公网 IP 不会改变,即使你的机器被终止,你仍然可以附加公网 IP。

最新更新