使用Pyspark中的循环连接不同的DataFrames



我在一个文件中有5个CSV文件,并希望将它们连接到Pyspark中的一个数据帧中:我使用下面的代码

name_file =['A', 'B', 'C', 'D', 'V']
for n in name_file:
n= spark.read.csv(fullpath+n+'.csv'
,header=False, 
inferSchema= True)
full_data=full_data.join(n,["id"])

错误:我得到了一个意外的结果>最后一个数据帧仅与自身连接。

预期结果:应该有6列,每个CSV有2个数据帧,其中一个数据帧与其他数据帧相同。联接应在此列上。因此,最终的数据帧应该有一个公共列和每个CSV文件中的5个特殊列。

代码似乎有几个问题,或者您可能没有提供完整的代码。

  1. 你定义完整路径了吗
  2. 你已经设置了header=False,那么spark怎么会知道有"id";列
  3. 在for循环下,您的缩进看起来是错误的
  4. full_data尚未定义,那么如何在for循环中评估的右侧?我怀疑你将其初始化为第一个csv文件,然后尝试加入再次使用第一个csv

我对下面的代码进行了一个小测试,它对我有效,并解决了我上面提出的问题。你可以根据自己的需要进行调整。

fullpath = '/content/sample_data/'
full_data = spark.read.csv(fullpath+'Book1.csv'
,header=True, 
inferSchema= True)
name_file =['Book2', 'Book3']
for n in name_file:
n= spark.read.csv(fullpath+n+'.csv'
,header=True, 
inferSchema= True)
full_data=full_data.join(n,["id"])
full_data.show(5)

最新更新