使用芹菜分发数学处理



Intro

嘿伙计们,我对芹菜和任务队列非常陌生,所以我有一个可能相当幼稚的问题。

我想拿一个相当大的.csv文件(转换为熊猫数据帧(,并对所有列对运行皮尔逊测试(统计数学函数(。 单个内核大约需要 9 分钟才能到期,我们有数百个这样的.csv文件!

因此,我想在 3 服务器群集上的所有内核之间分配此处理。 这是到目前为止我的代码原型....

from celery import Celery
import numpy as np
import pandas as pd
import scipy.stats as stats
import itertools
app = Celery()
minute_CSV = pd.read_csv('./test_dataframe.csv')
cycle_length = 300
row_max = minute_CSV.shape[0]
r_vector_data = pd.DataFrame()
column_combinations = itertools.combinations(minute_CSV.filter(regex='FREQ').keys(),2)
xy_cols = list(column_combinations)
@app.task
def data_processing(minute_CSV, cycle_length, row_max, x, y):
return np.array([stats.pearsonr(minute_CSV[x][c-cycle_length:c],
minute_CSV[y][c-cycle_length:c])[0] for c in range(cycle_length,row_max)])
for i in range(0, len(xy_cols)):
x = xy_cols[i][0]
y = xy_cols[i][1]
r_vector_data[x + ' to ' + y] = data_processing.delay(minute_CSV, cycle_length, row_max, x, y)
pd.DataFrame.to_csv(r_vector_data, processed_dataframe.csv)

当我运行这个时,我收到以下消息:

"[1200 行 x 870 列] 不可序列化 JSON ">

数学

Pearson 相关性的工作方式如下:取 300 个(在我的例子中(两列的连续行,运行相关性并将结果存储在新的 DataFrame (r_vector_data( 中。 这是针对以下行执行的:(0..299(、(1..300(、(2..301(等。

此外,此脚本仅考虑一个.csv文件,但稍后将在:)进行修改。

关于从这里去哪里的想法? 我将如何使用芹菜来实现这一点,因为我在文档中有点迷失。

谢谢!

您看到错误是因为 Celery 正在尝试对minute_CSV进行 JSON 序列化。 默认情况下,Celery 中的每条消息都使用 JSON 进行编码。 有关详细信息,请参阅 http://docs.celeryproject.org/projects/kombu/en/latest/userguide/serialization.html。

为了限制数据传输,您可能只想将每次调用的相关行发送到data_processing任务。

最新更新