我有一个大的查找表,其中保存整数作为键和字符串列表作为值。我需要这个查找表做一些过滤和转换的数据,我通过spark加载。
import numpy as np
import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
conf.setMaster("local[20]")
conf.setAppName("analysis")
conf.set("spark.local.dir", "./tmp")
#conf.set("spark.executor.memory", "20g")
#conf.set("spark.driver.memory", "20g")
conf.set("spark.python.worker.reuse", "yes")
sc = SparkContext(conf=conf)
sqlc = SQLContext(sc)
在启动pyspark时,我甚至使用--driver-memory 20g
选项。
我的机器有500gb内存和27个内核。我首先在内存中加载一个名为lookup_tbl
的字典,它有17457954行。
当我尝试运行以下代码时,我在超过10分钟的时间内没有得到任何输出。等了这么久之后,我关闭了这个进程。我需要查找表功能。我甚至尝试过使用broadcast
功能。
sc.broadcast(lookup_tbl)
def clean_data(x, transform=lambda k: (int(k[0]), "t".join(k[1:]))):
x = x.split('t')
return transform(x)
def check_self(x):
from_id = x[0]
to_id = x[1]
self_ = 1
try:
common_items = set(lookup_tbl[from_id]).intersection(set(lookup_tbl[to_id]))
except KeyError:
common_items = set()
if len(common_items ) < 1:
common_items = set("-")
self_ = 0
return (((from_id, to_id, k, self_) for k in common_items ))
pair = sc.textFile("data/pair.tsv").map(lambda x: clean_data(x, transform=lambda k: (int(k[0]), int(k[1])))).flatMap(check_self)
csv_data = pair.map(lambda x: "t".join("%s" for k in xrange(len(x))) % x)
csv_data.saveAsTextFile("out/pair_transformed")
这是spark的问题还是我没有正确运行?此外,我还尝试为执行程序和驱动程序内存(~20g
)设置各种值,但没有得到任何改进。
据我所知,spark在发送给所有本地进程之前首先尝试序列化这个字典。有没有一种方法可以从公共位置使用这个字典?
首先要访问广播变量,你必须使用它的value
属性:
# You can use get instead of catching KeyError
s1 = set(lookup_tbl.value.get(from_id, set()))
s2 = set(lookup_tbl.value.get(to_id, set()))
common_items = s1.intersection(s2)
为了避免广播,你可以在mapPartitions
中本地加载lookup_tbl
:
def check_partition(iter):
lookup_tbl = ...
for x in iter:
yield check_self
identity = lambda x: x
pair = (sc.textFile(...)
.map(lambda x: clean_data(...)
.mapPartitions(check_partition)
.flatMap(identity))
如果lookup_tbl
相对较大,它仍然可能相当昂贵。有很多方法可以处理这个问题:
使用SQLite连接代替局部变量。
import sqlite3 conn = sqlite3.connect('path/to/lookup.db') c.execute("SELECT key FROM lookup WHERE id = '%s'" % from_id) s1 = {x[0] for x in c.fetchall()} c.execute("SELECT key FROM lookup WHERE id = '%s'" % to_id) s2 = {x[0] for x in c.fetchall()} common_items = s1.intersection(s2)
它很容易设置,如果数据正确索引,应该足够快
使用单个数据库服务器进行查找。MongoDB应该工作得很好,并且使用适当的内存映射可以显着减少总体内存占用
使用
join
代替广播swap = lambda x: (x[1], x[0]) def reshape1(record): (k1, (items, k2)) = record return (k2, (k1, items)) def reshape2(record): (k1, (items1, (k2, items2))) = record return (k1, k2, set(items1) & set(items2)) pairs = sc.textFile(...).map(lambda x: clean_data(...)) n = ... # Number of partitions lookup_rdd = sc.parallelize(lookup_tbl.items()).partitionBy(n) lookup_rdd.join(lookup_rdd.join(pairs).map(reshape1)).map(reshape2)