我在本地中有一个示例数据集,我正在尝试在集群上进行一些基本的操作。
import dask.dataframe as ddf
from dask.distributed import Client
client = Client('Ip address of the scheduler')
import dask.dataframe as ddf
csvdata = ddf.read_csv('Path to the CSV file')
客户端连接到调度程序,该调度程序又连接到两名工人(在其他计算机上(。
我的问题可能很琐碎。
是否应该在其他工人节点上存在此CSV文件?
我似乎找不到文件。
使用,
futures=client.scatter(csvdata) x = ddf.from_delayed([future], meta=df) #Price is a column in the data df.Price.sum().compute(get=client.get) #returns" dd.Scalar<series-..., dtype=float64>" How do I access it? client.submit(sum, x.Price) #returns "distributed.utils - ERROR - 6dc5a9f58c30954f77913aa43c792cc8"
另外,我确实提到了这个从客户端加载本地文件到dask分布式群集和http://distributed.readthedocs.io/en/latest/manage-compaint.html
我以为我在这里混合了很多东西,我的理解被混乱了。任何帮助都将不胜感激。
是的,这里dask.dataframe假设您在客户端代码中所指的文件也可以由工人访问。如果不是这种情况,那么您将在本地计算机中明确阅读数据并将其分散给工人。
看起来您正在尝试做到这一点,只是您要散布DASK数据范围而不是Pandas DataFrames。实际上,您将必须在分散磁盘之前具体地从磁盘上加载熊猫数据。如果您的数据适合内存,那么您应该能够准确地执行您现在正在做的事情,但是用pd.read_csv
替换dd._read_csv调用csvdata = pandas.read_csv('Path to the CSV file')
[future] = client.scatter([csvdata])
x = ddf.from_delayed([future], meta=df).repartition(npartitions=10).persist()
#Price is a column in the data
df.Price.sum().compute(get=client.get) # Should return an integer
如果您的数据太大,那么您可以考虑使用本地使用DASK将数据逐步读取到群集。
import dask.dataframe as dd
ddf = dd.read_csv('filename')
futures = ddf.map_partitions(lambda part: c.scatter([part])[0]).compute(get=dask.get) # single threaded local scheduler
ddf = dd.from_delayed(list(futures), meta=ddf.meta)