将SparkSubmitOperator与Airflow一起使用时,hdfs路径无效


//etl.py
start = DummyOperator(task_id = 'start', dag = dag) 
job1 = SparkSubmitOperator(task_id = 't1', application = '/home/airflow/dags/test.py',
name = 'test', conf = {'spark.master': 'yarn'}, dag = dag)

start >> job1
//test.py
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-1.8.0-openjdk-amd64'
os.environ['SPARK_HOME'] = '/opt/spark3'
os.environ['YARN_CONF_DIR'] = '/opt/hadoop/etc/hadoop'
os.environ['HADOOP_CONF_DIR'] = '/opt/hadoop/etc/hadoop'
spark = SparkSession.builder.master("yarn").appName('test1').getOrCreate()
target_dir = "hdfs:/localhost:9000/hospital/data/test.csv"
file = spark.read.format('csv').options(header='True').options(inferSchema='True').load(target_dir)

我把";test.csv";在…上hdfs://hospital/data/test.csv,我运行了airflow Web服务器,但我得到了一个n错误的

java.lang.IollegalArgumentException:路径名/来自hdfs:/localhost:9000/hopital/data不是有效的DFS文件名。

我也试过hdfs:///localhost:9000/hospital/data,hdfs::/医院/数据。。。等等,但总是出现同样的错误。

我该如何解决?

Pathname应该是hdfs服务器中的路径,而不是完整的url。

要配置您的spark会话以连接到hdfs服务器:

spark = (
SparkSession.builder.master("yarn").appName('test1')
.set("spark.hadoop.fs.default.name", "hdfs://localhost:9000")
.set("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")
.getOrCreate()
)            

路径只是/hospital/data/test.csv

最新更新