在SparkSQL连接中无法解析列名



我不知道为什么会发生这种情况。在PySpark中,我读取两个数据框架并打印出它们的列名,它们如预期的那样,但是当执行SQL连接时,我得到一个无法解析给定输入的列名的错误。我已经简化了合并只是为了让它工作,但我需要添加更多的连接条件,这就是为什么我使用SQL(将添加:"和b.mnvr_bgn mnvr_temp_idx_prev_temp

中被重命名为'_col7'。
mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end')
print mnvr_temp_idx_prev.columns
['device_id', 'mnvr_bgn', 'mnvr_end']
raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end'))
print raw_data_filtered.columns
['device_id', 'trip_id', 'idx_trip_end']
raw_data_filtered.registerTempTable('raw_data_filtered_temp')
mnvr_temp_idx_prev.registerTempTable('mnvr_temp_idx_prev_temp') 
test = sqlContext.sql('SELECT a.device_id, a.idx_trip_end, b.mnvr_bgn, b.mnvr_end 
                          FROM raw_data_filtered_temp as a  
                             INNER JOIN mnvr_temp_idx_prev_temp as b 
                                ON a.device_id = b.device_id')

Traceback(最近一次调用):AnalysisException: u"cannot resolve 'b.device_id'给定输入列:[_col7, trip_id, device_id, mnvr_end, mnvr_bgn, idx_trip_end];第1行pos 237"

任何帮助都是感激的!

我建议在至少一个数据帧中重命名'device_id'字段的名称。我修改了你的查询只是一点,并测试了它(在scala)。下面的查询作品

test = sqlContext.sql("select * FROM raw_data_filtered_temp a INNER JOIN mnvr_temp_idx_prev_temp b ON a.device_id = b.device_id")
[device_id: string, mnvr_bgn: string, mnvr_end: string, device_id: string, trip_id: string, idx_trip_end: string]

现在,如果你在上面的语句中做一个'select * ',它将工作。但是如果你尝试选择'device_id',你会得到一个错误"引用'device_id'是模棱两可的"。正如你在上面的"test"数据帧定义中看到的,它有两个相同名称的字段(device_id)。因此,为了避免这种情况,我建议更改其中一个数据框架中的字段名。

mnvr_temp_idx_prev = mnvr_3.select('device_id', 'mnvr_bgn', 'mnvr_end')
                           .withColumnRenamned("device_id","device")  
raw_data_filtered = raw_data.select('device_id', 'trip_id', 'idx').groupby('device_id', 'trip_id').agg(F.max('idx').alias('idx_trip_end'))

现在使用dataframe或sqlContext

//using dataframes with multiple conditions
  val test = mnvr_temp_idx_prev.join(raw_data_filtered,$"device" === $"device_id"
                                                   && $"mnvr_bgn" < $"idx_trip_id","inner")

//在SQL Context中

 test = sqlContext.sql("select * FROM raw_data_filtered_temp a INNER JOIN mnvr_temp_idx_prev_temp b ON a.device_id = b.device and a. idx_trip_id < b.mnvr_bgn")

以上查询将解决您的问题。如果您的数据集太大,我建议不要在Join条件中使用'>'或'<'操作符,因为它会导致交叉连接,如果数据集很大,这是一个昂贵的操作。

相关内容

  • 没有找到相关文章

最新更新