如何让PySpark在Google Cloud Composer上工作



我发现Google Cloud Composer是一个非常有前途的托管Apache Airflow服务,但我不知道如何使用Cloud Composer来执行带有PySpark代码的管道。我可以安装其他Python包,例如Pandas,并使用Cloud Composer。

任何指针都非常感谢。

Cloud Composer用于调度管道。

因此,为了在Cloud Composer中运行PySpark代码,您需要创建一个Dataproc集群,因为PySpark作业在Dataproc群集中运行。在DAG中,使用DataprocCreateClusterOperator可以计划创建一个Dataproc集群。创建集群后,可以使用DataprocSubmitJobOperator将PySpark作业提交到Dataproc集群。要向集群提交作业,您需要提供作业源文件。您可以参考下面的代码。

PySpark代码:


import pyspark
from operator import add
sc = pyspark.SparkContext()
data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x: 
(x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
ascending=False).collect()
for (word, count) in counts:
print("{}: {}".format(word, count))

DAG代码:


import os
import datetime
from airflow import models
from airflow.providers.google.cloud.operators.dataproc import (
DataprocCreateClusterOperator,
DataprocSubmitJobOperator
)
from airflow.providers.google.cloud.sensors.dataproc import DataprocJobSensor
from airflow.utils.dates import days_ago
PROJECT_ID = "give your project id"
CLUSTER_NAME =  "your dataproc cluster name that you want to create"
REGION = "us-central1"
ZONE = "us-central1-a"
PYSPARK_URI = "GCS location of your PySpark Code i.e gs://[input file]"
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
default_dag_args = {
'start_date': YESTERDAY,
}
# Cluster definition
# [START how_to_cloud_dataproc_create_cluster]
CLUSTER_CONFIG = {
"master_config": {
"num_instances": 1,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
"worker_config": {
"num_instances": 2,
"machine_type_uri": "n1-standard-4",
"disk_config": {"boot_disk_type": "pd-standard", "boot_disk_size_gb": 1024},
},
}
with models.DAG(
"dataproc",
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
# [START how_to_cloud_dataproc_create_cluster_operator]
create_cluster = DataprocCreateClusterOperator(
task_id="create_cluster",
project_id=PROJECT_ID,
cluster_config=CLUSTER_CONFIG,
region=REGION,
cluster_name=CLUSTER_NAME,
)
PYSPARK_JOB = {
"reference": {"project_id": PROJECT_ID},
"placement": {"cluster_name": CLUSTER_NAME},
"pyspark_job": {"main_python_file_uri": PYSPARK_URI},
}
pyspark_task = DataprocSubmitJobOperator(
task_id="pyspark_task", job=PYSPARK_JOB, location=REGION, project_id=PROJECT_ID
)
create_cluster >>  pyspark_task

最新更新