如何在火花数据框架列中按值订购字典或元组



我需要在火花数据框架中按值来对字典进行排序。我尝试了许多不同的方式,包括未显示的方式。我发现了许多关于订购Python词典的回答,但它们在我的情况下没有工作。

我尝试了订购的命令和分类。

我对输出是字典不挑剔,它也可以是元组

样本数据:

a = ["This is dummy data this dummy data is being used for word counts","See if this will work see if working not working", "Is this working is this working maybe it is maybe it isnt", "hopefully this works"]
b = [1,2,1,2]
df = sqlContext.createDataFrame(zip(b, a), schema=['id', 'text'])

我编写的代码准备数据:

def MostCommonWords(data):
  #agg text by id
  GroupedText = data.groupby("id").agg(F.concat_ws(", ", F.collect_list(data.text)).alias('aggText'))
  #tokenizing text to count in the next step
  tokenizer = Tokenizer(inputCol='aggText', outputCol='textTokenized')
  GroupedText = tokenizer.transform(GroupedText)
  #creating udf from counter function and applying udf to tokenized text
  CounterUDF = F.udf(lambda x: dict(Counter(x)), MapType(StringType(), IntegerType()))
  GroupedText = GroupedText.withColumn('WordFrequency', CounterUDF(F.col("textTokenized")))
  #Top 10 most frequent words for each id
  Nlargest_UDF = F.udf(lambda x: dict(heapq.nlargest(10, x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
  MostCommon = GroupedText.withColumn('MostCommon', Nlargest_UDF(F.col("WordFrequency")))
  MostCommon = MostCommon.select('id','MostCommon')
  return MostCommon
MostCommon = MostCommonWords(df)

我尝试在每一行中对字典进行分类:

naming = collections.namedtuple('Word', 'Count')
#SorterUDF = F.udf(lambda x: sorted([naming(v,k) for (k,v) in x.items(), key=itemgetter(1)], MapType(StringType(), IntegerType(), reverse=True)))
#SorterUDF = F.udf(lambda x: {k: v for k, v in sorted(x.items(), key=itemgetter(1), reverse = True)})
#SorterUDF = F.udf(lambda x: dict(sorted(x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
SorterUDF = F.udf(lambda x: OrderedDict(sorted(x.items(), key=itemgetter(1))), MapType(StringType(), IntegerType()))
Sortedd = MostCommon.withColumn('SortedMostCommon', SorterUDF(F.col("MostCommon")))

我没有遇到错误,只是没有排序。预期结果:每行的字典或元组按值排序。

Spark没有订购MapType。当前的MapType在内部转换为dict类型。所以我想我们必须使用StructTypeArrayType

def sort_dict_f(x):
    sorted_x = sorted(x.items(), key=operator.itemgetter(1))
    return sorted_x
schema = ArrayType(StructType([
    StructField("word", StringType(), False), StructField("count", IntegerType(), False)
]))
SorterUDF = F.udf(sort_dict_f, schema)
df = MostCommon.withColumn('SortedMostCommon', SorterUDF("MostCommon"))
df.show()
print(df.take(1)[0]['SortedMostCommon'])

输出:

+---+--------------------+--------------------+
| id|          MostCommon|    SortedMostCommon|
+---+--------------------+--------------------+
|  1|[dummy -> 2, isnt...|[[isnt,, 1], [bei...|
|  2|[not -> 1, see ->...|[[will, 1], [work...|
+---+--------------------+--------------------+
[Row(word='isnt,', count=1), Row(word='being', count=1), Row(word='used', count=1), Row(word='working', count=2), Row(word='maybe', count=2), Row(word='it', count=2), Row(word='dummy', count=2), Row(word='data', count=2), Row(word='this', count=4), Row(word='is', count=5)]

您可以看到单词现在被其计数正确分类。

最新更新