spark app的step type应该是什么?我面临的问题是,主类型没有设置或无法识别纱线。当使用emraddsteppsoperator时,似乎正在考虑将应用程序视为简单的jar而不是spark提交模式。请查收附件气流日,误差和emr截图
amazon emr cloud console手动添加spark job作为步骤
添加spark jar类型步骤而不是自定义jar步骤后…
设置spark提交参数和主方法参数步骤类型可以是流或spark应用程序或自定义jar
My Error Message:
> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
这是一个AWS EMR管道的示例。
从创建集群开始,添加步骤/操作,检查步骤,最后完成终止集群。
import time
from airflow.operators.python import PythonOperator
from datetime import timedelta
from airflow import DAG
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor
from airflow.utils.dates import days_ago
SPARK_STEPS = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS2 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
'MainClass': 'com.sadim.scalatestnadeem.Test',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS3 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',
'MainClass': 'com.sadim.TestSadim',
'Args': ['spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--conf',
'spark.yarn.submit.waitAppCompletion=true'],
},
}
]
SPARK_STEPS4 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'spark-submit',
'--deploy-mode',
'client',
'--master',
'yarn',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
SPARK_STEPS5 = [
{
'Name': 'PerformETL',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',
#'MainClass': 'com.sadim.main',
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',
'--mode',
'DeltaLoadByDays',
'--noOfDaysBehindTodayForDeltaLoad',
'1',
'--s3InputPath',
's3://data-lake/documents/accountscore/categoriseddata/',
'--s3OutputPathcustomerCategoryWiseSummarizedViews',
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],
},
}
]
JOB_FLOW_OVERRIDES = {
'Name': 'ob_emr_airflow_automation',
'ReleaseLabel': 'emr-6.6.0',
'LogUri': 's3://test-data/emr_logs/',
'Instances': {
'InstanceGroups': [
{
'Name': 'Master node',
'Market': 'ON_DEMAND',
'InstanceRole': 'MASTER',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
},
{
'Name': "Slave nodes",
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',
'InstanceType': 'm5.xlarge',
'InstanceCount': 1
}
],
'Ec2SubnetId': 'subnet-03129248888a14196',
'Ec2KeyName': 'datalake-emr-nodes',
'KeepJobFlowAliveWhenNoSteps': True,
'TerminationProtected': False
},
'BootstrapActions': [
{
'Name': 'Java11InstallBootstrap',
'ScriptBootstrapAction': {
'Path': 's3://test-data/jars/bootstrap.sh',
'Args': [
]
}
}
],
'Configurations': [
{
"Classification":"spark-defaults",
"Properties":{
"spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' - XX:MaxHeapFreeRatio=70",
"spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time - XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
}
}
],
'JobFlowRole': 'DL_EMR_EC2_DefaultRole',
'ServiceRole': 'EMR_DefaultRole',
}
with DAG(
dag_id='emr_job_flow_manual_steps_dag_v6',
default_args={
'owner': 'airflow',
'depends_on_past': False,
'email': ['sadim.nadeem@sadim.com'],
'email_on_failure': False,
'email_on_retry': False,
},
dagrun_timeout=timedelta(hours=1),
start_date=days_ago(1),
schedule_interval='0 3 * * *',
tags=['example'],
) as dag:
# [START howto_operator_emr_manual_steps_tasks]
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=dag,
python_callable=lambda: time.sleep(400))
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
steps=SPARK_STEPS5,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=cluster_creator.output,
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker >> cluster_remover
# [END howto_operator_emr_manual_steps_tasks]
# Task dependencies created via `XComArgs`:
# cluster_creator >> step_checker
# cluster_creator >> cluster_remover
问题已修复。我们需要使用命令运行器jar作为emraddsteppsoperator中的jar选项,并在args中传递特定的ETL作业jar作为
SPARK_STEPS = [
{
'Name': 'DetectAndLoadOrphanTransactions',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
#'MainClass': 'com.sadim.main',
'Args': [
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'com.sadim.DetectAndLoadOrphanTransactions',
's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
'--Txnspath',
's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
'--demographyPath',
's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
'--outPath',
's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
},
}
]