将行列表保存到 Piespark 中的 Hive 表中



我有一个pyspark应用程序。 我将一个 hive 表复制到我的 hdfs 目录中,在 python 中,我sqlContext.sql这个表的查询。现在这个变量是我称之为rows的数据帧。 我需要随机洗牌rows,所以我不得不将它们转换为行列表rows_list = rows.collect()。 因此,我shuffle(rows_list)将列表打乱到位。 我取我需要的随机行数x

for r in range(x): allrows2add.append(rows_list[r]) 现在我想将allrows2add保存为hive表或附加现有的hive表(以更容易做到的为准)。 问题是我不能这样做:

all_df = sc.parallelize(allrows2add).toDF() 无法执行此操作,无法推断架构 ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling

无需放入整个架构。 rows的架构有 117 列,所以我不想输入它们。 有没有办法提取rows的架构来帮助我使allrows2添加数据帧或以某种方式另存为hive表?我能做到 rows.printSchema()但不确定如何将其作为变量转换为模式格式以传递toDF()而无需解析所有文本

谢谢

添加循环信息

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()
for i in range(len(Table)):
    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count
#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;
    query = "SELECT * FROM hivetemp WHERE col1<>""+val1+"" AND col2 ==""+val2+"" ORDER BY RAND() LIMIT "+str(x)
    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)

当无法推断架构时,通常有一个原因。 toDFcreateDataFrame 函数的语法糖,默认情况下只使用前 100 行(尽管文档说它只使用第一行)来确定架构应该是什么。要更改此设置,您可以提高采样率以查看更大百分比的数据:

df = rdd.toDF(sampleRatio=0.2)
# or...
df = sqlContext.createDataFrame(rdd, samplingRatio=0.2)

您的随机样本也可能碰巧只对某些特定列进行具有空值的行。如果是这种情况,您可以从头开始创建架构,如下所示:

from pyspark.sql.types import *
# all DataFrame rows are StructType
# can create a new StructType with combinations of StructField
schema = StructType([
    StructField("column_1", StringType(), True),
    StructField("column_2", IntegerType(), True),
    # etc.
])
df = sqlContext.createDataFrame(rdd, schema=schema)

或者,可以通过访问 schema 值从以前创建的数据帧中获取架构:

df2 = sqlContext.createDataFrame(rdd, schema=df1.schema)

请注意,如果您的 RDD 的行未StructType(也称为 Row ) 对象而不是字典或列表,您将无法从它们创建数据框。如果您的RDD行是字典,则可以将它们转换为Row对象,如下所示:

rdd = rdd.map(lambda x: pyspark.sql.Row(**x))
# ** is to unpack the dictionary since the Row constructor
# only takes keyword arguments

相关内容

  • 没有找到相关文章

最新更新