从连续产生的分散数据更新dask数组



我正在执行一项"连续不断"产生数据的分析。这是用来更新任务数组的。下面是一个最小的示例,旨在说明工作流程。

有谁知道我应该怎么做或者有什么想法吗?我想避免在磁盘上存储数据。

A = da.zeros((10000, 10000), chunks=(1000, 1000))
def generate_send_data(i):

for i in range(100):  # long loop
x = np.random.randint(0, 10000, 100)
y = np.random.randint(0, 10000, 100)
z = np.random.randn(100)

# send data to appropriate chunk in A in order to update A: 
# A[x, y] = A[x, y] + z

# wait for event
sleep(1+i)
F = client.map(generate_send_data, range(10))

任务数组的操作是惰性的、函数式的。这意味着每个操作都有一个唯一的键,如果重复操作,结果将是相同的,因此任何工作人员都可以进行计算,并且可以重新生成图形。

简而言之:days数组将不符合您的期望,如果您改变它们。

你需要使用一个不同的范例,也许通过使用submit来不断地发送新的数据,变量(由调度程序持有),或者任务actor(驻留在工人上的有状态对象)。

最新更新