使用 python 将日志数据转换为 Spark 中的时间序列频率



更新

从技术上讲,我想在 Spark 中将日志数据转换为时间序列频率。我搜索了很多,但没有找到处理大数据的好方法。

我知道 pd.dataframe 可以获取某些功能的计数,但我的数据集太大而无法使用数据帧。

这意味着我需要通过MapReduce处理每一行。

我尝试过的可能是愚蠢的....

我有一个RDD,它的行是元组列表,看起来像:

[(datetime.datetime(2015, 9, 1, 0, 4, 12), 1),((datetime.datetime(2015, 9, 2, 0, 4, 12), 1),(datetime.datetime(2015, 4, 4, 1, 0, 4, 12), 1),

(datetime.datetime(2015, 9, 1, 0, 4, 12),1)]

[(datetime.datetime(2015, 10, 1, 0, 4, 12), 1),(datetime.datetime(2015, 7, 1, 0, 4, 12), 1)]

在每个元组中,第一个元素是一个日期,我可以通过python在Spark中编写一个地图函数,以根据元组中的(x,y,z)坐标的日期(月,日,小时)将具有相同(月,日,小时)的元组计数填充到3-D数组中。

这是我所做的:

def write_array(input_rdd, array):
    for item in input_rdd:
        requestTime = item[0]
        array[requestTime.month - 1, requestTime.day  -1, requestTime.hour] += 1
array_to_fill = np.zeros([12, 31, 24], dtype=np.int)
filled_array = RDD_to_fill.map(lambda s:write_array(s, array_to_fill)).collect()
with open("output.txt", 'w') as output:
    json.dump(traffic, output)

错误是:

    Traceback (most recent call last):
  File "traffic_count.py", line 67, in <module>
    main()
  File "traffic_count.py", line 58, in main
    traffic = organic_userList.Map(lambda s: write_array(s, traffic_array)) 
AttributeError: 'PipelinedRDD' object has no attribute 'Map'

我认为一定有某种方法可以将RDD每一行中的元素保存到现有的数据结构中.....有人可以帮助我吗?

非常感谢!

如果可以将输出数据设置为((month, day, hour), count)值的列表,则以下内容应该有效:

from pyspark import SparkConf, SparkContext
import datetime
conf = SparkConf().setMaster("local[*]").setAppName("WriteDates")
sc = SparkContext(conf = conf)
RDD_to_fill = sc.parallelize([(datetime.datetime(2015, 9, 1, 0, 4, 12), 1),(datetime.datetime(2015, 9, 2, 0, 4, 12), 1),(datetime.datetime(2015, 4, 1, 0, 4, 12), 1),(datetime.datetime(2015, 9, 1, 0, 4, 12),1), (datetime.datetime(2015, 10, 1, 0, 4, 12), 1), (datetime.datetime(2015, 7, 1, 0, 4, 12), 1)])
def map_date(tup):
    return ((tup[0].month, tup[0].day, tup[0].hour), tup[1])
date_rdd = RDD_to_fill.map(map_date).reduceByKey(lambda x, y: x + y)
# create a tuple for every (month, day, hour) and set the value to 0
zeros = []
for month in range(1,13):
    for day in range(1,32):
        for hour in range(24):
            zeros.append(((month, day, hour), 0))
zeros_rdd = sc.parallelize(zeros)
# union the rdd with the date_rdd (dates with non-zero values) with the zeros_rdd (dates with all zero values)
# and then add aggregate them together (via addition) by key (i.e., date tuple)
filled_tups = date_rdd.union(zeros_rdd).reduceByKey(lambda x, y: x + y).collect()

然后,如果要访问任何(月、日、小时)期间的计数,可以轻松执行以下操作:

filled_dict = dict(filled_tups)
# get count for Sept 1 at 00:00
print(filled_dict[(9,1,0)]) # prints 2

请注意,此代码未正确说明不存在的日子,例如 2 月 30

日、2 月 31 日、4 月 31 日、6 月 31 日...

相关内容

  • 没有找到相关文章

最新更新