气流emraddsteppsoperator无法执行火花遮光罐



spark app的step type应该是什么?我面临的问题是,主类型没有设置或无法识别纱线。当使用emraddsteppsoperator时,似乎正在考虑将应用程序视为简单的jar而不是spark提交模式。请查收附件气流日,误差和emr截图

amazon emr cloud console手动添加spark job作为步骤

添加spark jar类型步骤而不是自定义jar步骤后…

设置spark提交参数和主方法参数

步骤类型可以是流或spark应用程序或自定义jar

My Error Message:

> Exception in thread "main" org.apache.spark.SparkException: A master URL must be set in your configuration
at org.apache.spark.SparkContext.<init>(SparkContext.scala:385)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2574)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:934)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:928)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews$.main(ExtractcustomerCategoryWiseSummarizedViews.scala:13)
at com.mcf.ExtractcustomerCategoryWiseSummarizedViews.main(ExtractcustomerCategoryWiseSummarizedViews.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)

这是一个AWS EMR管道的示例。

从创建集群开始,添加步骤/操作,检查步骤,最后完成终止集群。

import time    
from airflow.operators.python import PythonOperator     
from datetime import timedelta    
from airflow import DAG    
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator    
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator    
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator     
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor    
from airflow.utils.dates import days_ago    
SPARK_STEPS = [    
{    
'Name': 'PerformETL',    
'ActionOnFailure': 'CONTINUE',    
'HadoopJarStep': {
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
#'MainClass': 'com.sadim.main',    
'Args': ['spark-submit',    
'--deploy-mode',    
'cluster',    
'--master',    
'yarn',    
'--class',    
'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',     
'--mode',    
'DeltaLoadByDays',    
'--noOfDaysBehindTodayForDeltaLoad',    
'1',    
'--s3InputPath',    
's3://data-lake/documents/accountscore/categoriseddata/',    
'--s3OutputPathcustomerCategoryWiseSummarizedViews',    
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
},
}
]
SPARK_STEPS2 = [
{
'Name': 'sadim_test3',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 's3://test-data/jars/scalatestnadeem-0.0.1-SNAPSHOT_v2.jar',
'MainClass': 'com.sadim.scalatestnadeem.Test',
'Args': ['spark-submit',    
'--deploy-mode',    
'client',    
'--master',    
'yarn',    
'--conf',    
'spark.yarn.submit.waitAppCompletion=true'],    
},    
}    
]    
SPARK_STEPS3 = [    
{    
'Name': 'sadim_test3',    
'ActionOnFailure': 'CONTINUE',    
'HadoopJarStep': {    
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT_masteryarnwithoutdependencyandtest.jar',    
'MainClass': 'com.sadim.TestSadim',    
'Args': ['spark-submit',     
'--deploy-mode',    
'client',     
'--master',     
'yarn',    
'--conf',    
'spark.yarn.submit.waitAppCompletion=true'],    
},    
}    
]    
SPARK_STEPS4 = [    
{    
'Name': 'PerformETL',    
'ActionOnFailure': 'CONTINUE',    
'HadoopJarStep': {    
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
#'MainClass': 'com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
'spark-submit',    
'--deploy-mode',    
'client',    
'--master',    
'yarn',                    
'--mode',    
'DeltaLoadByDays',    
'--noOfDaysBehindTodayForDeltaLoad',    
'1',    
'--s3InputPath',    
's3://data-lake/documents/accountscore/categoriseddata/',    
'--s3OutputPathcustomerCategoryWiseSummarizedViews',    
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
},    
}    
]    
SPARK_STEPS5 = [    
{    
'Name': 'PerformETL',    
'ActionOnFailure': 'CONTINUE',    
'HadoopJarStep': {    
'Jar': 's3://mydata/jars/open-0.0.1-SNAPSHOT.jar',    
#'MainClass': 'com.sadim.main',    
'Args': ['com.sadim.ExtractcustomerCategoryWiseSummarizedViews',    
'--mode',    
'DeltaLoadByDays',    
'--noOfDaysBehindTodayForDeltaLoad',    
'1',    
'--s3InputPath',    
's3://data-lake/documents/accountscore/categoriseddata/',    
'--s3OutputPathcustomerCategoryWiseSummarizedViews',    
's3://test-data/spark_output//customerCategoryWiseSummarizedViews//test'],    
},    
}    
]    
JOB_FLOW_OVERRIDES = {    
'Name': 'ob_emr_airflow_automation',    
'ReleaseLabel': 'emr-6.6.0',    
'LogUri': 's3://test-data/emr_logs/',    
'Instances': {    
'InstanceGroups': [    
{    
'Name': 'Master node',    
'Market': 'ON_DEMAND',    
'InstanceRole': 'MASTER',    
'InstanceType': 'm5.xlarge',    
'InstanceCount': 1    
},    
{    
'Name': "Slave nodes",    
'Market': 'ON_DEMAND',
'InstanceRole': 'CORE',    
'InstanceType': 'm5.xlarge',    
'InstanceCount': 1    
}    
],    
'Ec2SubnetId': 'subnet-03129248888a14196',    
'Ec2KeyName': 'datalake-emr-nodes',    
'KeepJobFlowAliveWhenNoSteps': True,    
'TerminationProtected': False    
},    
'BootstrapActions': [    
{    
'Name': 'Java11InstallBootstrap',    
'ScriptBootstrapAction': {    
'Path': 's3://test-data/jars/bootstrap.sh',    
'Args': [    
]    
}    
}    
],    
'Configurations': [    
{    
"Classification":"spark-defaults",    
"Properties":{    
"spark.driver.defaultJavaOptions":"-XX:OnOutOfMemoryError='kill -9 %p' -    XX:MaxHeapFreeRatio=70",    
"spark.executor.defaultJavaOptions":"-verbose:gc -Xlog:gc*::time -    XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 -XX:+IgnoreUnrecognizedVMOptions"
}    
}    
],    
'JobFlowRole': 'DL_EMR_EC2_DefaultRole',    
'ServiceRole': 'EMR_DefaultRole',    
}    
with DAG(    
dag_id='emr_job_flow_manual_steps_dag_v6',    
default_args={    
'owner': 'airflow',    
'depends_on_past': False,    
'email': ['sadim.nadeem@sadim.com'],    
'email_on_failure': False,    
'email_on_retry': False,    
},    
dagrun_timeout=timedelta(hours=1),    
start_date=days_ago(1),
schedule_interval='0 3 * * *',
tags=['example'],
) as dag:
# [START howto_operator_emr_manual_steps_tasks]
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
delay_python_task: PythonOperator = PythonOperator(task_id="delay_python_task",
dag=dag,
python_callable=lambda: time.sleep(400))
step_adder = EmrAddStepsOperator(
task_id='add_steps',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
steps=SPARK_STEPS5,
)
step_checker = EmrStepSensor(
task_id='watch_step',
job_flow_id=cluster_creator.output,
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id=cluster_creator.output,
aws_conn_id='aws_default',
)
cluster_creator >> step_adder >> step_checker >> cluster_remover
# [END howto_operator_emr_manual_steps_tasks]
# Task dependencies created via `XComArgs`:
#   cluster_creator >> step_checker
#   cluster_creator >> cluster_remover

问题已修复。我们需要使用命令运行器jar作为emraddsteppsoperator中的jar选项,并在args中传递特定的ETL作业jar作为

SPARK_STEPS = [
{
'Name': 'DetectAndLoadOrphanTransactions',
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
#'MainClass': 'com.sadim.main',
'Args': [ 
'spark-submit',
'--deploy-mode',
'cluster',
'--master',
'yarn',
'--class',
'com.sadim.DetectAndLoadOrphanTransactions',
's3://nadeem-test-data/jars/open-0.0.1-SNAPSHOT_yarn_yarndep_maininpom.jar',
'--Txnspath',
's3://nadeem-test-data/spark_output//CategorizedTxnsDataset//2022_load_v2',
'--demographyPath',
's3://nadeem-test-data/spark_output//customerDemographics//2022_load_v2'
'--outPath',
's3://nadeem-test-data/spark_output//orphan_ihub_ids//2022_load_v2'],
},
}
]

相关内容

  • 没有找到相关文章

最新更新