pyspark dataframe column : Hive column



我有一个蜂巢表如下:

hive> describe stock_quote;
OK
tickerid                string                                      
tradeday                string                                      
tradetime               string                                      
openprice               string                                      
highprice               string                                      
lowprice                string                                      
closeprice              string                                      
volume                  string

遵循火花代码读取CSV文件,并试图将记录插入蜂巢表:

sc = spark.sparkContext
lines = sc.textFile('file:///<File Location>')
rows = lines.map(lambda line : line.split(','))
rows_map = rows.map(lambda row : Row(TickerId = row[0], TradeDay = row[1], TradeTime = row[2], OpenPrice = row[3], HighPrice = row[4], LowPrice = row[5], ClosePrice = row[6], Volume = row[7]))
rows_df = spark.createDataFrame(rows_map)
rows_df.write.mode('append').insertInto('default.stock_quote')

我面临的问题是,当我在dataframe上调用show((函数时,它按字母顺序打印列,例如以下

|ClosePrice|HighPrice|LowPrice|OpenPrice|TickerId|TradeDay|TradeTime|Volume|

,在表中,它插入了Cloteprice(DF中的第一列(在TickerId(蜂巢表中的第一列(列,高价列中的高价值等。

尝试在dataframe上调用select((函数,没有帮助。尝试将列名列表放置如下:

rows_df = spark.createDataFrame(rows_map, ["TickerId", "TradeDay", "TradeTime", "OpenPrice", "HighPrice", "LowPrice", "ClosePrice", "Volume"])

上面更改列的名称顺序,但值保持在同一位置,这更不正确。

任何帮助都将不胜感激。

您也可以使用saveAsTable而不是insertInto

来自文档:

insertInto不同,saveAsTable将使用列名来查找正确的列位置

您应该使用namedtuple而不是Row,因为'row'尝试订购列名。因此,订购的列名与default.stock_quote 表的列顺序不匹配,请检查PySpark中的Scala Case类等效的是什么?有关更多详细信息

所以你应该做

from collections import namedtuple
table = namedtuple('table', ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])
rows_map = rows.map(lambda row : table(row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))

和 @user6910411所建议的:" 普通元组也可以做"

rows_map = rows.map(lambda row : (row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7]))
rows_df = spark.createDataFrame(rows_map, ['TickerId', 'TradeDay', 'TradeTime', 'OpenPrice', 'HighPrice', 'LowPrice', 'ClosePrice', 'Volume'])

现在insertInto应该工作

是如何发生的,它是按字母顺序排序的?这是CSV文件中的内容吗?

无论如何,我会按照以下步骤进行操作:

  • 从您的表中选择列
  • 根据表
  • 的列重新安排数据框
# pyspark below
list_columns = spark.sql('select * from table').columns # there might be simpler way
dataframe.select(*list_columns)

最新更新