我有一个pyspark应用程序。 我将一个 hive 表复制到我的 hdfs 目录中,在 python 中,我sqlContext.sql
这个表的查询。现在这个变量是我称之为rows
的数据帧。 我需要随机洗牌rows
,所以我不得不将它们转换为行列表rows_list = rows.collect()
。 因此,我shuffle(rows_list)
将列表打乱到位。 我取我需要的随机行数x
:
for r in range(x):
allrows2add.append(rows_list[r])
现在我想将allrows2add保存为hive表或附加现有的hive表(以更容易做到的为准)。 问题是我不能这样做:
all_df = sc.parallelize(allrows2add).toDF()
无法执行此操作,无法推断架构 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
无需放入整个架构。 rows
的架构有 117 列,所以我不想输入它们。 有没有办法提取rows
的架构来帮助我使allrows2添加数据帧或以某种方式另存为hive表?我能做到 rows.printSchema()
但不确定如何将其作为变量转换为模式格式以传递toDF()
而无需解析所有文本
谢谢
添加循环信息
#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
rows = sqlContext.sql(qry)
val1 = Table[i][0]
val2 = Table[i][1]
count = Table[i][2]
x = 100 - count
#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
query = "SELECT * FROM hivetemp WHERE col1<>""+val1+"" AND col2 ==""+val2+"" ORDER BY RAND() LIMIT "+str(x)
rows = sqlContext.sql(query)
rows = rows.withColumn("col4", lit(10))
rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
#rows_list = rows.collect()
#shuffle(rows_list)
当无法推断架构时,通常有一个原因。 toDF
是 createDataFrame
函数的语法糖,默认情况下只使用前 100 行(尽管文档说它只使用第一行)来确定架构应该是什么。要更改此设置,您可以提高采样率以查看更大百分比的数据:
df = rdd.toDF(sampleRatio=0.2)
# or...
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2)
您的随机样本也可能碰巧只对某些特定列进行具有空值的行。如果是这种情况,您可以从头开始创建架构,如下所示:
from pyspark.sql.types import *
# all DataFrame rows are StructType
# can create a new StructType with combinations of StructField
schema = StructType([
StructField("column_1", StringType(), True),
StructField("column_2", IntegerType(), True),
# etc.
])
df = sqlContext.createDataFrame(rdd, schema=schema)
或者,可以通过访问 schema
值从以前创建的数据帧中获取架构:
df2 = sqlContext.createDataFrame(rdd, schema=df1.schema)
请注意,如果您的 RDD 的行未StructType
(也称为 Row
) 对象而不是字典或列表,您将无法从它们创建数据框。如果您的RDD行是字典,则可以将它们转换为Row
对象,如下所示:
rdd = rdd.map(lambda x: pyspark.sql.Row(**x))
# ** is to unpack the dictionary since the Row constructor
# only takes keyword arguments