Dagster PySpark未在EMR上运行



我正试图在Dagster中建立一个管道,它做以下事情:

  1. 使用EmrJobRunner类启动EMR集群run_job_flow函数。

  2. 在该集群中添加一个或多个步骤来处理PySpark中的数据使用emr_pyspark_step_launcher资源

  3. 所有步骤完成后关闭集群。

我首先遵循本教程,它假设您有一个正在运行的EMR集群,并且您将EMR集群ID硬编码为作业规范的一部分。这种方法有效,因为我可以看到我的步骤正在电子病历上运行。然而,当我尝试自动化这个过程时,我注意到PySpark在本地运行,而不是在EMR上运行。我尝试将emr_pyspark_step_launcher包装为一个资源,该资源将集群ID设置为管道的一部分。集群ID可以通过使用EmrJobRunner类中的一个函数获得,该函数在提供集群名称时返回集群ID。我试图在启动集群后的作业期间动态添加集群ID,但这没有像预期的那样工作。

这是我的代码,任何帮助将不胜感激。

from pathlib import Path
from dagster_aws.emr import emr_pyspark_step_launcher
from dagster_aws.emr.emr import EmrJobRunner
from dagster_aws.s3 import s3_resource
from dagster_pyspark import pyspark_resource
from pyspark.sql import DataFrame
from transformations import execute_transformation
from dagster import IOManager, graph, io_manager, op, resource, In, Nothing, Out
from utils.configs import get_emr_cluster_config
import logging

class ParquetIOManager(IOManager):
def _get_path(self, context):
return "/".join(
[
context.resource_config["path_prefix"],
context.run_id,
context.step_key,
context.name,
]
)
def handle_output(self, context, obj):
if isinstance(obj, DataFrame):
obj.write.parquet(self._get_path(context))
# return obj
def load_input(self, context):
spark = context.resources.pyspark.spark_session
return spark.read.parquet(self._get_path(context.upstream_output))

@io_manager(required_resource_keys={"pyspark"}, config_schema={"path_prefix": str})
def parquet_io_manager():
return ParquetIOManager()

@resource
def emr_job_runner(init_context):
return EmrJobRunner(region="eu-central-1")

@resource(
config_schema={"cluster_name": str}, required_resource_keys={"emr_job_runner"}
)
def my_pyspark_step_launcher(init_context):
cluster_id = init_context.resources.emr_job_runner.cluster_id_from_name(
cluster_name=init_context.resource_config["cluster_name"]
)
init_context.log.info(f"CLUSTER ID during resource initilization: {cluster_id}")
return emr_pyspark_step_launcher.configured(
{
"cluster_id": cluster_id,
"local_pipeline_package_path": str(Path(__file__).parent.parent),
"deploy_local_pipeline_package": True,
"region_name": "eu-central-1",
"staging_bucket": "EMR_STAGING_BUCKET",
"wait_for_logs": True,
}
)

def launch_cluster(emr: EmrJobRunner, log: logging.Logger, emr_config: dict) -> None:
emr_config = get_emr_cluster_config(
release_label=emr_config["emr_release_label"],
cluster_name=emr_config["cluster_name"],
master_node_instance_type=emr_config["master_node_instance_type"],
worker_node_instance_type=emr_config["worker_node_instance_type"],
worker_node_instance_count=emr_config["worker_node_instance_count"],
ec2_subnet_id=emr_config["ec2_subnet_id"],
bid_price=emr_config["worker_node_spot_bid_price"],
)
return emr.run_job_flow(log=log, cluster_config=emr_config)

@op(
config_schema={
"emr_release_label": str,
"cluster_name": str,
"master_node_instance_type": str,
"worker_node_instance_type": str,
"worker_node_instance_count": int,
"ec2_subnet_id": str,
"worker_node_spot_bid_price": str,
},
required_resource_keys={"emr_job_runner"},
out=Out(Nothing),
)
def launch_emr_cluster(context) -> None:
op_config = context.op_config
cluster_id = launch_cluster(
emr=context.resources.emr_job_runner, log=context.log, emr_config=op_config
)
context.log.info(f"CLUSTER ID: {cluster_id}")

@op(
ins={"start": In(Nothing)},
required_resource_keys={"pyspark", "pyspark_step_launcher"},
)
def get_dataframe(context) -> DataFrame:
return execute_transformation(spark_session=context.resources.pyspark.spark_session)

@graph
def make_and_filter_data():
get_dataframe(launch_emr_cluster())

run_data_emr = make_and_filter_data.to_job(
name="prod",
resource_defs={
"pyspark_step_launcher": my_pyspark_step_launcher,
"pyspark": pyspark_resource,
"s3": s3_resource.configured({"region_name": "eu-central-1"}),
"io_manager": parquet_io_manager.configured(
{"path_prefix": "s3://EMR_STEP_OUTPUT"}
),
"emr_job_runner": emr_job_runner,
},
)

这有点棘手,因为资源是为每个操作初始化的,并且在op启动之前初始化的。因此,您不能修改/将值从op传递到资源初始化。在这种特殊情况下,我认为您可能想尝试在步骤启动器资源定义中进行集群初始化—这样您就可以在初始化步骤启动器时访问集群ID。像这样:

@resource
def emr_job_runner(init_context):
return EmrJobRunner(region="eu-central-1")

def launch_cluster(emr: EmrJobRunner, log: logging.Logger, emr_config: dict) -> None:
emr_config = get_emr_cluster_config(
release_label=emr_config["emr_release_label"],
cluster_name=emr_config["cluster_name"],
master_node_instance_type=emr_config["master_node_instance_type"],
worker_node_instance_type=emr_config["worker_node_instance_type"],
worker_node_instance_count=emr_config["worker_node_instance_count"],
ec2_subnet_id=emr_config["ec2_subnet_id"],
bid_price=emr_config["worker_node_spot_bid_price"],
)
return emr.run_job_flow(log=log, cluster_config=emr_config)
@resource(config_schema={
"emr_release_label": str,
"cluster_name": str,
"master_node_instance_type": str,
"worker_node_instance_type": str,
"worker_node_instance_count": int,
"ec2_subnet_id": str,
"worker_node_spot_bid_price": str,
},
required_resource_keys={"emr_job_runner"})
def cluster_launcher(init_context):
config = init_context.resource_config
# TODO: handle if cluster already exists, as this resource will be initialized for each op / resource
# that requires it
cluster_id = launch_cluster(
emr=context.resources.emr_job_runner, log=context.log, emr_config=config
)
context.log.info(f"CLUSTER ID: {cluster_id}")
return cluster_id

@resource(
config_schema={"cluster_name": str}, required_resource_keys={"emr_job_runner", "cluster_launcher"}
)
def my_pyspark_step_launcher(init_context):
cluster_id = init_context.resources.cluster_launcher
init_context.log.info(f"CLUSTER ID during resource initilization: {cluster_id}")
return emr_pyspark_step_launcher.configured(
{
"cluster_id": cluster_id,
"local_pipeline_package_path": str(Path(__file__).parent.parent),
"deploy_local_pipeline_package": True,
"region_name": "eu-central-1",
"staging_bucket": "EMR_STAGING_BUCKET",
"wait_for_logs": True,
}
)

最新更新