如何在键值数据库中进行并行化



我的目的是对大型csv文件进行版本控制,因此,我正在使用键值数据库,其中键将是完整行中的列,值将是行本身。例如:

Name, Age, Roll.No
Aviral, 22, 1
Apoorv, 19, 2

如果我将 Roll no 作为键,我的目的是将数据库中的键作为 rollno(可能是它的哈希)并将值作为完整的行:Aviral, 22, 1

我已经完成了上面的实现,但为了处理大型 csv 文件(甚至是 20GB 的 534M 行),速度太慢了。我正在实现 dask,但它比普通的熊猫顺序流慢。我的疑问是,如何在键值数据库中进行并行插入?

import json
import sys
from datetime import datetime
from hashlib import md5
import dask.dataframe as dd
import dask.multiprocessing
import pandas as pd
from kyotocabinet import *

class IndexInKyoto:
    def hash_string(self, string):
        return md5(string.encode('utf-8')).hexdigest()
    def dbproc(self, db):
        db[self.hash_string(self.key)] = self.row
    def index_row(self, key, row):
        self.row = row
        self.key = key
        DB.process(self.dbproc, "index.kch")
# destination = "/Users/aviralsrivastava/dev/levelsdb-learning/10gb.csv"
destination = "10M_rows.csv"
df = dd.read_csv(destination)
df_for_file_attributes = pd.read_csv(destination, nrows=2)
column_list = list(df_for_file_attributes)
# df = df.compute(scheduler='processes')     # convert to pandas
start_time = datetime.utcnow()
row_counter = 0
ob = IndexInKyoto()
# function to apply to each sub-dataframe
@dask.delayed
def print_a_block(d):
    #for row in d.itertuples(index=False):
    # print(row)
    print("a block called!")
    d = d.to_dict(orient='records')
    for row in d:
        key = str(row["0"])
        row = json.dumps(row, default=str)
        ob.index_row(key, row)
print("Calling compute!")
dask.compute(*[print_a_block(d) for d in df.to_delayed()])
print(datetime.utcnow() - start_time)

京都内阁不允许你并行化插入(https://fallabs.com/kyotocabinet/spex.html),每个写入器将阻塞,直到另一个写入器完成,所以你不能在京都橱柜中并行插入,但 Redis 将允许这样的插入,以优化进一步的使用 Redis 流水线 (https://redis.io/topics/pipelining),它将批处理您的数据并在加载大量数据时在很大程度上减少 RTT。

任务运行速度慢于顺序处理的原因是按顺序管理多进程写入数据库的开销。

最新更新