Google Cloud Composer and Google Cloud SQL



我们可以通过哪些方式从新引入的Google Cloud Composer连接到Google Cloud SQL(MySQL(实例?目的是将数据从Cloud SQL实例获取到BigQuery中(也许通过Cloud Storage进行中间步骤(。

  1. Cloud SQL 代理能否以某种方式在托管 Composer 的 Kubernetes 集群的 pod 上公开?

  2. 如果不能,可以使用 Kubernetes Service Broker 引入 Cloud SQL 代理吗?-> https://cloud.google.com/kubernetes-engine/docs/concepts/add-on/service-broker

  3. Airflow 是否应该用于调度和调用 GCP API 命令,例如 1( 将 mysql 表导出到云存储 2( 将 mysql 导出读取到 bigquery 中?

  4. 也许我缺少其他方法来完成此操作

"Cloud SQL Proxy 提供对 Cloud SQL 第二代实例的安全访问,而无需将 IP 地址列入白名单或配置 SSL。

CloudSQL Proxy似乎是连接到CloudSQL的推荐方式。 所以在 Composer 中,从 1.6.1 版本开始,我们可以创建一个新的 Kubernetes Pod 来运行 gcr.io/cloudsql-docker/gce-proxy:latest 映像,通过服务公开它,然后在 Composer 中创建一个连接以在运算符中使用。

要进行设置:

  • 关注谷歌的文档

  • 使用 Arik's Medium Post 中的信息测试连接

    • 检查容器是否已创建kubectl get pods --all-namespaces

    • 检查服务是否已创建kubectl get services --all-namespaces

    • 跳转到工作节点kubectl --namespace=composer-1-6-1-airflow-1-10-1-<some-uid> exec -it airflow-worker-<some-uid> bash

      • 测试 mysql 连接mysql -u composer -p --host <service-name>.default.svc.cluster.local

笔记:

  • 作曲家现在使用命名空间来组织 pod

  • 不同命名空间中的 Pod 不会相互通信,除非您为它们提供完整的路径<k8-service-name>.<k8-namespace-name>.svc.cluster.local

  • 使用完整路径创建新的作曲家连接将启用成功的连接

我们遇到了同样的问题,但有一个 Postgres 实例。这就是我们所做的,并让它工作:

  • 在运行 airflow 的 Kubernetes 集群中创建 sqlproxy 部署。这是默认airflow_db连接使用的现有 airflow-sqlproxy 的副本,对部署文件进行了以下更改:

    • 将 Airflow-SQLProxy 的所有实例替换为新的代理名称
    • 在"规范:模板:规范:容器:命令:-实例"下编辑,将现有实例名称替换为我们要连接到的新实例
  • 创建一个 Kubernetes 服务,再次作为现有 Airflow-SQLPROXY-Service 的副本,并进行以下更改:

    • 将 Airflow-SQLProxy 的所有实例替换为新的代理名称
    • 在"规范:端口"下,更改为相应的端口(我们对 Postgres 实例使用了 5432(
  • 在气流 UI 中,添加 Postgres 类型的连接,并将主机设置为新创建的服务名称。

您可以按照这些说明在群集中启动新的 Cloud SQL 代理实例。

re#3:这听起来是个好计划。据我所知,没有Cloud SQL到BigQuery运算符,因此您必须像描述的那样分两个阶段进行操作。

将评论中的媒体帖子从@Leo添加到顶级 https://medium.com/@ariklevliber/connecting-to-gcp-composer-tasks-to-cloud-sql-7566350c5f53。按照该文章进行操作并设置服务后,可以使用 SQLAlchemy 从 DAG 进行连接,如下所示:

import os
from datetime import datetime, timedelta
import logging
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
logger = logging.getLogger(os.path.basename(__file__))
INSTANCE_CONNECTION_NAME = "phil-new:us-east1:phil-db"
default_args = {
'start_date': datetime(2019, 7, 16)
}

def connect_to_cloud_sql():
'''
Create a connection to CloudSQL
:return:
'''
import sqlalchemy
try:
PROXY_DB_URL = "mysql+pymysql://<user>:<password>@<cluster_ip>:3306/<dbname>"
logger.info("DB URL", PROXY_DB_URL)
engine = sqlalchemy.create_engine(PROXY_DB_URL, echo=True)
for result in engine.execute("SELECT NOW() as now"):
logger.info(dict(result))
except Exception:
logger.exception("Unable to interact with CloudSQL")

dag = DAG(
dag_id="example_sqlalchemy",
default_args=default_args,
# schedule_interval=timedelta(minutes=5),
catchup=False  # If you don't set this then the dag will run according to start date
)

t1 = PythonOperator(
task_id="example_sqlalchemy",
python_callable=connect_to_cloud_sql,
dag=dag
)

if __name__ == "__main__":
connect_to_cloud_sql()

在这里,在Hoffa对类似问题的回答中,您可以找到有关Wepay如何使用Airflow运算符每15分钟保持一次同步的参考。

从所说的答案:

看看WePay是如何做到这一点的:

  • https://wecode.wepay.com/posts/bigquery-wepay

MySQL to GCS 运算符对 MySQL 执行 SELECT 查询 桌子。SELECT 提取所有大于(或等于(最后一个的数据 高水位线。高水位线是 表(如果表是仅追加的(或修改时间戳 列(如果表收到更新(。再次,SELECT 语句 还可以回到时间(或行(上一点以捕获可能掉落的 最后一个查询中的行(由于上述问题(。

通过Airflow,他们设法使BigQuery与MySQL保持同步。 数据库每 15 分钟一次。

现在我们可以连接到云 SQL,而无需自己创建云代理。操作员将自动创建它。代码如下所示:

from airflow.models import DAG
from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceExportOperator
export_body = {
'exportContext': {
'fileType': 'CSV',
'uri': EXPORT_URI,
'databases': [DB_NAME],
'csvExportOptions': {
'selectQuery': SQL
}
}
}
default_dag_args = {}
with DAG(
'postgres_test',
schedule_interval='@once',
default_args=default_dag_args) as dag:
sql_export_task = CloudSqlInstanceExportOperator(
project_id=GCP_PROJECT_ID,
body=export_body,
instance=INSTANCE_NAME,
task_id='sql_export_task'
)

相关内容

  • 没有找到相关文章

最新更新