pyspark RDD中aggregateByKey的替代或更好的方法



我有一个天气数据csv文件,其中每个条目都有站点ID和当天记录的最小或最大值。第二个元素是了解值代表什么的关键字。示例输入如下。

stationID    feature value
ITE00100554    TMAX  -75  
ITE00100554    TMIN -148         
GM000010962    PRCP    0         
EZE00100082    TMAX  -86         
EZE00100082    TMIN -135         
ITE00100554    TMAX  -60         
ITE00100554    TMIN -125         
GM000010962    PRCP    0         
EZE00100082    TMAX  -44         
EZE00100082    TMIN -130         
ITE00100554    TMAX  -23 

我已经用TMIN或TMAX过滤掉了条目。为给定的数据记录每个条目。我在构建RDD时剥离了Date,因为它不感兴趣。我的目标是在所有记录中找到每个站点的最小值和最大值,即

ITE00100554, 'TMIN', <global_min_value recorded by that station>
ITE00100554, 'TMAX', <global_max_value>
EZE00100082, 'TMIN', <global_min_value>
EZE00100082, 'TMAX', <global_max_value>

我可以使用aggregateByKey完成这项工作,但根据这个链接https://backtobazics.com/big-data/spark/apache-spark-aggregatebykey-example/我不必使用aggregateByKey,因为输入值和输出值的格式是相同的。所以我想知道是否有一种替代或更好的方法可以在不定义这么多函数的情况下进行编码。

stationtemps = entries.filter(lambda x: x[1] in ['TMIN', 'TMAX']).map(lambda x: (x[0], (x[1], x[2])))  # (stationID, (tempkey, value))
max_temp = stationtemps.values().values().max()
min_temp = stationtemps.values().values().min()

def max_seqOp(accumulator, element):
return (accumulator if accumulator[1] > element[1] else element)

def max_combOp(accu1, accu2):
return (accu1 if accu1[1] > accu2[1] else accu2)

def min_seqOp(accumulator, element):
return (accumulator if accumulator[1] < element[1] else element)

def min_combOp(accu1, accu2):
return (accu1 if accu1[1] < accu2[1] else accu2)

station_max_temps = stationtemps.aggregateByKey(('', min_temp), max_seqOp, max_combOp).sortByKey()
station_min_temps = stationtemps.aggregateByKey(('', max_temp), min_seqOp, min_combOp).sortByKey()
min_max_temps = station_max_temps.zip(station_min_temps).collect()
with open('1800_min_max.csv', 'w') as fd:
writer = csv.writer(fd)
writer.writerows(map(lambda x: list(list(x)), min_max_temps))

我正在学习pyspark,还没有掌握所有不同的转换函数。

这里模拟输入,如果最小值和最大值填写正确,那么为什么需要指示器TMIN、TMAX?实际上不需要蓄电池。

rdd = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmax', 7), ('s0','tmax',14), ('s0','tmin', 3)  ])
rddcollect = rdd.collect()
#print(rddcollect)
rdd2 = rdd.map(lambda x:  (x[0], x[2]))
#rdd2collect = rdd2.collect()
#print(rdd2collect)
rdd3 = rdd2.groupByKey().sortByKey()
rdd4 = rdd3.map(lambda k_v: ( k_v[0], (sorted(k_v[1])))  )
rdd4.collect()

退货:

Out[27]: [('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [0, 7])]

替代答案

  • 澄清后
  • 假设最小值和最大值有意义
  • 用我自己的数据
  • BTW还有其他解决方案

如下:

include = ['tmin','tmax']
rdd0 = sc.parallelize([  ('s1','tmin',-3), ('s1','tmax', 5), ('s2','tmin',0), ('s2','tmin',-12), ('s2','tmax', 7), ('s2','tmax', 17), ('s2','tother', 17), ('s0','tmax',14), ('s0','tmin', 3)  ])
rdd1 = rdd0.filter(lambda x: any(e in x for e in include) )
rdd2 = rdd1.map(lambda x:  ( (x[0],x[1]), x[2]))
rdd3 = rdd2.groupByKey().sortByKey()
rdd4Min = rdd3.filter(lambda k_v: k_v[0][1] == 'tmin').map(lambda k_v: ( k_v[0][0], min( k_v[1]  ) ))
rdd4Max = rdd3.filter(lambda k_v: k_v[0][1] == 'tmax').map(lambda k_v: ( k_v[0][0], max( k_v[1]  ) ))
rdd5=rdd4Min.union(rdd4Max)
rdd6 = rdd5.groupByKey().sortByKey()
res = rdd6.map(lambda k_v: ( k_v[0], (sorted(k_v[1]))))
rescollect = res.collect()
print(rescollect)

退货:

[('s0', [3, 14]), ('s1', [-3, 5]), ('s2', [-12, 17])]

遵循与@thebluephantom相同的逻辑,这是我从csv 读取时的最后一段代码

def get_temp_points(item):
if item[0][1] == 'TMIN':
return (item[0], min(item[1]))
else:
return (item[0], max(item[1]))

data = lines.filter(lambda x: any(ele for ele in x if ele in ['TMIN', 'TMAX']))
temps = data.map(lambda x: ((x[0], x[2]), float(x[3]))
temp_list = temps.groupByKey().mapValues(list) 
##((stationID, 'TMIN'/'TMAX'), listofvalues)
min_max_temps = temp_list.map(get_temp_points).collect()

最新更新