我有一个涉及大量数据的步骤,需要在Python中完成(目前使用pandas)。我只是好奇地提出了一个建议,以确保我有足够的资源来执行大型数据操作,针对几种不同的客户端配置,以及两个如何使这个过程更有效(即使用Pyspark或其他工具,这些都是新的,所以请承担任何后续问题)。谢谢你的帮助,如果需要的话,我会试着添加更多的细节,只是想保持这个相当高的水平。
管道目前是一些PythonOps,一些BashOperators和一些BigQuery操作符(通过GCP Composer运行)
Cloud Composer用于调度管道。
For第一次查询:
根据您正在使用的资源,如果在DAG中运行多个任务,并且操作符的数量更多,那么您需要配置Composer Environment以满足标准。
- 节点计数可以根据任务实例而增加。
- 由于可以在单个DAG中安排多个任务,因此可以将机器类型更改为工作负载优化的机器和磁盘大小,以最大限度地减少环境的减速。
第二个查询:
PySpark支持Apache Spark,它通过并行和批处理系统处理大型数据集。因此,可以使用Pyspark来提高数据处理的效率。
因此,为了在Cloud Composer中运行PySpark代码,您需要创建一个Dataproc集群,因为PySpark作业在Dataproc集群中运行。在DAG中,您可以使用DataprocCreateClusterOperator来调度创建一个dataprocc集群。创建集群之后,可以使用DataprocSubmitJobOperator将PySpark作业提交到Dataproc集群。要向集群提交作业,需要提供作业源文件。您可以参考下面的代码片段。
PySpark Job code:
import pyspark
from operator import add
sc = pyspark.SparkContext()
data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x:
(x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
ascending=False).collect()
for (word, count) in counts:
print("{}: {}".format(word, count))
DAG code:
import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago
PROJECT_ID = "give your project id"
CLUSTER_NAME = "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
'start_date': YESTERDAY,
}
# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
}
with models.DAG(
"dataproc",
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
)
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
)
create_cluster >> pyspark_task
Dataflow也可以用于批处理和流数据的数据处理。