更新
从技术上讲,我想在 Spark 中将日志数据转换为时间序列频率。我搜索了很多,但没有找到处理大数据的好方法。
我知道 pd.dataframe 可以获取某些功能的计数,但我的数据集太大而无法使用数据帧。
这意味着我需要通过MapReduce处理每一行。
我尝试过的可能是愚蠢的....
我有一个RDD,它的行是元组列表,看起来像:
[(datetime.datetime(2015, 9, 1, 0, 4, 12), 1),((datetime.datetime(2015, 9, 2, 0, 4, 12), 1),(datetime.datetime(2015, 4, 4, 1, 0, 4, 12), 1),(datetime.datetime(2015, 9, 1, 0, 4, 12),1)]
[(datetime.datetime(2015, 10, 1, 0, 4, 12), 1),(datetime.datetime(2015, 7, 1, 0, 4, 12), 1)]
在每个元组中,第一个元素是一个日期,我可以通过python在Spark中编写一个地图函数,以根据元组中的(x,y,z)坐标的日期(月,日,小时)将具有相同(月,日,小时)的元组计数填充到3-D数组中。
这是我所做的:
def write_array(input_rdd, array):
for item in input_rdd:
requestTime = item[0]
array[requestTime.month - 1, requestTime.day -1, requestTime.hour] += 1
array_to_fill = np.zeros([12, 31, 24], dtype=np.int)
filled_array = RDD_to_fill.map(lambda s:write_array(s, array_to_fill)).collect()
with open("output.txt", 'w') as output:
json.dump(traffic, output)
错误是:
Traceback (most recent call last):
File "traffic_count.py", line 67, in <module>
main()
File "traffic_count.py", line 58, in main
traffic = organic_userList.Map(lambda s: write_array(s, traffic_array))
AttributeError: 'PipelinedRDD' object has no attribute 'Map'
我认为一定有某种方法可以将RDD每一行中的元素保存到现有的数据结构中.....有人可以帮助我吗?
非常感谢!
如果可以将输出数据设置为((month, day, hour), count)
值的列表,则以下内容应该有效:
from pyspark import SparkConf, SparkContext
import datetime
conf = SparkConf().setMaster("local[*]").setAppName("WriteDates")
sc = SparkContext(conf = conf)
RDD_to_fill = sc.parallelize([(datetime.datetime(2015, 9, 1, 0, 4, 12), 1),(datetime.datetime(2015, 9, 2, 0, 4, 12), 1),(datetime.datetime(2015, 4, 1, 0, 4, 12), 1),(datetime.datetime(2015, 9, 1, 0, 4, 12),1), (datetime.datetime(2015, 10, 1, 0, 4, 12), 1), (datetime.datetime(2015, 7, 1, 0, 4, 12), 1)])
def map_date(tup):
return ((tup[0].month, tup[0].day, tup[0].hour), tup[1])
date_rdd = RDD_to_fill.map(map_date).reduceByKey(lambda x, y: x + y)
# create a tuple for every (month, day, hour) and set the value to 0
zeros = []
for month in range(1,13):
for day in range(1,32):
for hour in range(24):
zeros.append(((month, day, hour), 0))
zeros_rdd = sc.parallelize(zeros)
# union the rdd with the date_rdd (dates with non-zero values) with the zeros_rdd (dates with all zero values)
# and then add aggregate them together (via addition) by key (i.e., date tuple)
filled_tups = date_rdd.union(zeros_rdd).reduceByKey(lambda x, y: x + y).collect()
然后,如果要访问任何(月、日、小时)期间的计数,可以轻松执行以下操作:
filled_dict = dict(filled_tups)
# get count for Sept 1 at 00:00
print(filled_dict[(9,1,0)]) # prints 2
请注意,此代码未正确说明不存在的日子,例如 2 月 30