我得到一个数据集,位于一个.txt文件中,包含1000万行RDF三元组形式,如下所示:
wsdbm:User0 wsdbm:follows wsdbm:User300 .
wsdbm:User6 wsdbm:likes wsdbm:Product92 .
wsdbm:Product0 rev:hasReview wsdbm:Review478 .
wsdbm:User2 wsdbm:friendOf wsdbm:User119 .
....
由于这些是RDF三元组,在我们的例子中我们有
Subjects: User0, User6, Product, User2
Predicates: follows, likes, hasReview, friendOf
Objects: User300, Product92, Review478, User119
我的目标是写一个SQL形式的查询:
SELECT follows.subject, follows.object, friendOf.object,
likes.object, hasReview.object
FROM follows, friendOf, likes, hasReview
WHERE follows.object = friendOf.subject
AND friendOf.object = likes.subject
AND likes.object = hasReview.subject
到目前为止,我创建了一个名为PropertyTables的类,它具有一个方法,该方法遍历初始文件并将每个主题、谓词和对象转换为整数,以改进连接上的计算时间并节省内存:
class PropertyTables():
"""
This class holds all 4 Property Tables necessary for the required query.
Each Property Table is an instance of the class 'PropertyTable'.
"""
def __init__(self):
self.property_tables = defaultdict()
self.hash_map = HashDict()
def parse_file(self, file_path, remove_prefix = False):
data = open(file_path, 'r')
for line in data:
subj, prop, *obj = line.rstrip('n.').split('t')
obj = obj[0].rstrip()
if remove_prefix:
subj, prop, obj = [self.remove_prefix(s) for s in (subj, prop, obj)]
if prop in ['follows', 'friendOf', 'likes', 'hasReview']:
self.hash_and_store(subj, prop, obj)
data.close()
类PropertyTable,在docstring中提到:
class PropertyTable():
"""
This class represents a single Property Table, i.e. it holds every Subject and Object
"""
def __init__(self):
self.table = []
def insert(self, r, s):
# If r and s are already tuples, they get appended to the Property Table.
# Otherwise, we convert them to a tuple beforehand. This is mostly relevant when creating the
# Property Tables when reading the data.
if type(r) == tuple:
self.table.append(r + s)
else:
self.table.append((r, s))
类HashDict()
是一个简单的散列值字典,因此我们可以在连接后再次检索它们。
为了不让一个帖子走得太远,我现在有一个单一的哈希连接算法:
def hash_join(self, property_1: PropertyTable, index_0, property_2: PropertyTable, index_1):
ht = defaultdict(list)
# Create Hash Table for table1
for s in property_1.table:
ht[s[index_0]].append(s)
# Join Tables
joined_table = PropertyTable()
for r in property_2.table:
for s in ht[r[index_1]]:
joined_table.insert(s, r)
return joined_table
根据前面的要求,我使用这个函数顺序地连接每个表。
WHERE follows.object = friendOf.subject
AND friendOf.object = likes.subject
AND likes.object = hasReview.subject
join_follows_friendOf = hash_join(pt.property_tables['follows'], 1, pt.property_tables['friendOf'], 0)
join_friendOf_likes = hash_join(join_follows_friendOf, 3, pt.property_tables['likes'], 0)
join_likes_hasReview = hash_join(join_friendOf_likes, 5, pt.property_tables['hasReview'], 0)
结果是正确的小表,但1000万行简单地导致内存不足的错误,我正在寻找方法来避免这种情况。我很抱歉这个非常广泛的帖子,但我想一些细节是必要的,以便得到一些建议!
编辑:
Line # Mem usage Increment Occurrences Line Contents
=============================================================
53 68.0 MiB 68.0 MiB 1 @profile
54 def hash_and_store(self, subj, prop, obj):
55
56 68.0 MiB 0.0 MiB 1 hashed_subj, hashed_obj = self.hash_map.hash_values(subj, obj)
57
58 68.0 MiB 0.0 MiB 1 if prop not in self.property_tables:
59 self.property_tables[prop] = PropertyTable()
60 68.0 MiB 0.0 MiB 1 self.property_tables[prop].insert(hashed_subj, hashed_obj)
Line # Mem usage Increment Occurrences Line Contents
=============================================================
32 68.1 MiB 68.1 MiB 1 @profile
33 def parse_file(self, file_path, remove_prefix = False):
34
35 68.1 MiB 0.0 MiB 1 data = open(file_path, 'r')
36
37
38
39
40
41 80.7 MiB 0.3 MiB 109311 for line in data:
42 80.7 MiB 0.0 MiB 109310 subj, prop, *obj = line.rstrip('n.').split('t')
43 80.7 MiB 0.5 MiB 109310 obj = obj[0].rstrip()
44
45 80.7 MiB 0.0 MiB 109310 if remove_prefix:
46 80.7 MiB 9.0 MiB 655860 subj, prop, obj = [self.remove_prefix(s) for s in (subj, prop, obj)]
47
48 80.7 MiB 0.0 MiB 109310 if prop in ['follows', 'friendOf', 'likes', 'hasReview']:
49 80.7 MiB 2.8 MiB 80084 self.hash_and_store(subj, prop, obj)
50
51 80.7 MiB 0.0 MiB 1 data.close()
Line # Mem usage Increment Occurrences Line Contents
=============================================================
38 80.7 MiB 80.7 MiB 1 @profile
39 def hash_join(self, property_1: PropertyTable, index_0, property_2: PropertyTable, index_1):
40
41 80.7 MiB 0.0 MiB 1 ht = defaultdict(list)
42
43 # Create Hash Table for table1
44
45 81.2 MiB 0.0 MiB 31888 for s in property_1.table:
46 81.2 MiB 0.5 MiB 31887 ht[s[index_0]].append(s)
47
48 # Join Tables
49
50 81.2 MiB 0.0 MiB 1 joined_table = PropertyTable()
51
52 203.8 MiB 0.0 MiB 45713 for r in property_2.table:
53 203.8 MiB 0.0 MiB 1453580 for s in ht[r[index_1]]:
54 203.8 MiB 122.6 MiB 1407868 joined_table.insert(s, r)
55
56 203.8 MiB 0.0 MiB 1 return joined_table
你的问题的核心是:
小表的结果是正确的,但1000万行只是导致内存错误,我正在寻找方法来避免这种情况。
在你的顶级问题陈述之后,使用一个不那么通用的结构,我们可以这样做:
def runQuery(dataLines):
from collections import defaultdict
pred = dict(zip(['follows','friendOf','likes','hasReview'],range(4)))
tables = [defaultdict(list) for _ in pred]
def encode(s):
if s[-1].isdigit():
i = 0
while s[-1 - i].isdigit():
i += 1
return int(s[-i:])
if any(s.endswith(k) for k in pred):
return sum(v for k, v in pred.items() if s.endswith(k))
return None
for line in dataLines:
if not line:
continue
subj, prop, *obj = line.rstrip('n.').split('t')
obj = obj[0].rstrip()
subj, prop, obj = [encode(s) for s in (subj, prop, obj)]
if prop is not None:
tables[prop][subj].append(obj)
tables = [{k:tuple(v) for k, v in table.items()} for table in tables]
#[print(list(pred.keys())[i], tables[i], sep='n') for i in range(len(pred))]
# create reverse index for subject, object where subject [user] follows object [user]
object_of_follows = defaultdict(set)
for k, v in tables[pred['follows']].items():
for user in v:
object_of_follows[user].add(k)
# create reverse index for subject, object where subject [user] is friendOf object [user]
object_of_friendOf = defaultdict(set)
for k, v in tables[pred['friendOf']].items():
if k in object_of_follows:
for user in v:
object_of_friendOf[user].add(k)
# create reverse index for subject, object where subject [user] likes object [product]
object_of_likes = defaultdict(set)
for k, v in tables[pred['likes']].items():
if k in object_of_friendOf:
for product in v:
object_of_likes[product].add(k)
# create reverse index for subject, object where subject [product] hasReview object [review]
object_of_hasReview = defaultdict(set)
for k, v in tables[pred['hasReview']].items():
if k in object_of_likes:
for review in v:
object_of_hasReview[review].add(k)
def addToResult(result, e):
d = object_of_hasReview[e]
c = {y for x in d for y in object_of_likes[x]}
b = {y for x in c for y in object_of_friendOf[x]}
a = {y for x in b for y in object_of_follows[x]}
toAdd = [(ax, bx, cx, dx, e) for dx in d for cx in c for bx in b for ax in a]
result += toAdd
result = []
for e in object_of_hasReview:
addToResult(result, e)
print(f'result row count {len(result):,}')
return result
解释:
- 创建一个包含4个表的列表(
follows, friendOf, likes, hasReview
),每个表都是一个字典映射主题到对象元组 - 创建4个反向索引(
object_of_follows, object_of_friendOf, object_of_likes, object_of_hasReview
);例如:object_of_follows
是一个字典,它将每个用户映射为follows
中的一个对象到一组用户,每个用户都是follows
中跟随对象 的主题。object_of_friendOf
是一个字典,它将friendOf
中的每个对象(用户)映射到一组用户,每个用户都是与friendOf
和object_of_follows
中的对象相关联的主题(用户)(换句话说,是follows
中一个或多个主题的对象)- 等。
- 将
object_of_hasReview
中幸存的每个评论拆分为多个结果行,其中包含查询中指定的每个唯一结果follows.subject, follows.object, friendsOf.object, likes.object, hasReview.object
- 返回所有这些爆炸行的列表。
1000万行的测试代码:
dataLines = []
numFollowers = 1000
numChildren = 10
overlapFactor = max(1, numChildren // 2)
def largerPowerOfTen(x):
y = 1
while x >= y:
y *= 10
return y
aCeil = largerPowerOfTen(numFollowers)
bCeil = largerPowerOfTen(aCeil * numChildren)
cCeil = largerPowerOfTen(bCeil * numChildren)
dCeil = largerPowerOfTen(cCeil * numChildren)
friendOf, likes = set(), set()
for a in range(numFollowers):
for b in range(aCeil + a * overlapFactor, aCeil + a * overlapFactor + numChildren):
dataLines.append(f'wsdbm:User{a} wsdbm:follows wsdbm:User{b} .n')
for c in range(bCeil + b * overlapFactor, bCeil + b * overlapFactor + numChildren):
if (b,c) not in friendOf:
dataLines.append(f'wsdbm:User{b} wsdbm:friendOf wsdbm:User{c} .n')
friendOf.add((b,c))
for d in range(cCeil + c * overlapFactor, cCeil + c * overlapFactor + numChildren):
if (c,d) not in likes:
dataLines.append(f'wsdbm:User{c} wsdbm:likes wsdbm:Product{d} .n')
likes.add((c,d))
for e in range(dCeil * (d + 1), dCeil * (d + 1) + numChildren):
dataLines.append(f'wsdbm:Product{d} wsdbm:hasReview wsdbm:Review{e} .n')
print(f'dataLines row count {len(dataLines):,}')
from timeit import timeit
n = 1
print(f'Timeit results:')
t = timeit(f"runQuery(dataLines)", setup=f"from __main__ import dataLines, runQuery", number=n) / n
print(f'======== runQuery ran in {t} seconds using {n} iterations')
'''
result = runQuery(dataLines)
print(f'result row count {len(result):,}')
print(f'{"follows.subject":>20}{"follows.object":>20}{"friendsOf.object":>20}{"likes.object":>20}{"hasReview.object":>20}')
[print(f'{a:20}{b:20}{c:20}{d:20}{e:20}') for a,b,c,d,e in result]
'''
输出:
dataLines row count 10,310,350
Timeit results:
result row count 12,398,500
======== runQuery ran in 81.53253880003467 seconds using 1 iterations
这是一个小规模样本运行的输入/输出:
参数
numFollowers = 3
numChildren = 3
overlapFactor = 2
输入(在表中存储后):
follows
{0: (10, 11, 12), 1: (12, 13, 14), 2: (14, 15, 16)}
friendOf
{10: (120, 121, 122), 11: (122, 123, 124), 12: (124, 125, 126), 13: (126, 127, 128), 14: (128, 129, 130), 15: (130, 131, 132), 16: (132, 133, 134)}
likes
{120: (1240, 1241, 1242), 121: (1242, 1243, 1244), 122: (1244, 1245, 1246), 123: (1246, 1247, 1248), 124: (1248, 1249, 1250), 125: (1250, 1251, 1252), 126: (1252, 1253, 1254), 127: (1254, 1255, 1256), 128: (1256, 1257, 1258), 129: (1258, 1259, 1260), 130: (1260, 1261, 1262), 131: (1262, 1263, 1264), 132: (1264, 1265, 1266), 133: (1266, 1267, 1268), 134: (1268, 1269, 1270)}
hasReview
{1240: (12410000, 12410001, 12410002), 1241: (12420000, 12420001, 12420002), 1242: (12430000, 12430001, 12430002, 12430000, 12430001, 12430002), 1243: (12440000, 12440001, 12440002), 1244: (12450000, 12450001, 12450002, 12450000, 12450001, 12450002, 12450000, 12450001, 12450002), 1245: (12460000, 12460001, 12460002, 12460000, 12460001, 12460002), 1246: (12470000, 12470001, 12470002, 12470000, 12470001, 12470002, 12470000, 12470001, 12470002), 1247: (12480000, 12480001, 12480002), 1248: (12490000, 12490001, 12490002, 12490000, 12490001, 12490002, 12490000, 12490001, 12490002, 12490000, 12490001, 12490002), 1249: (12500000, 12500001, 12500002, 12500000, 12500001, 12500002, 12500000, 12500001, 12500002), 1250: (12510000, 12510001, 12510002, 12510000, 12510001, 12510002, 12510000, 12510001, 12510002, 12510000, 12510001, 12510002, 12510000, 12510001, 12510002), 1251: (12520000, 12520001, 12520002, 12520000, 12520001, 12520002), 1252: (12530000, 12530001, 12530002, 12530000, 12530001, 12530002, 12530000, 12530001, 12530002, 12530000, 12530001, 12530002, 12530000, 12530001, 12530002), 1253: (12540000, 12540001, 12540002, 12540000, 12540001, 12540002, 12540000, 12540001, 12540002), 1254: (12550000, 12550001, 12550002, 12550000, 12550001, 12550002, 12550000, 12550001, 12550002, 12550000, 12550001, 12550002), 1255: (12560000, 12560001, 12560002), 1256: (12570000, 12570001, 12570002, 12570000, 12570001, 12570002, 12570000, 12570001, 12570002, 12570000, 12570001, 12570002), 1257: (12580000, 12580001, 12580002, 12580000, 12580001, 12580002, 12580000, 12580001, 12580002), 1258: (12590000, 12590001, 12590002, 12590000, 12590001, 12590002, 12590000, 12590001, 12590002, 12590000, 12590001, 12590002, 12590000, 12590001, 12590002), 1259: (12600000, 12600001, 12600002, 12600000, 12600001, 12600002), 1260: (12610000, 12610001, 12610002, 12610000, 12610001, 12610002, 12610000, 12610001, 12610002, 12610000, 12610001, 12610002, 12610000, 12610001, 12610002), 1261: (12620000, 12620001, 12620002, 12620000, 12620001, 12620002, 12620000, 12620001, 12620002), 1262: (12630000, 12630001, 12630002, 12630000, 12630001, 12630002, 12630000, 12630001, 12630002, 12630000, 12630001, 12630002), 1263: (12640000, 12640001, 12640002), 1264: (12650000, 12650001, 12650002, 12650000, 12650001, 12650002, 12650000, 12650001, 12650002), 1265: (12660000, 12660001, 12660002, 12660000, 12660001, 12660002), 1266: (12670000, 12670001, 12670002, 12670000, 12670001, 12670002, 12670000, 12670001, 12670002), 1267: (12680000, 12680001, 12680002), 1268: (12690000, 12690001, 12690002, 12690000, 12690001, 12690002), 1269: (12700000, 12700001, 12700002), 1270: (12710000, 12710001, 12710002)}
输出result row count 351
follows.subject follows.object friendsOf.object likes.object hasReview.object
0 10 120 1240 12410000
0 10 120 1240 12410001
0 10 120 1240 12410002
0 10 120 1241 12420000
0 10 120 1241 12420001
0 10 120 1241 12420002
0 10 120 1242 12430000
0 10 121 1242 12430000
0 10 120 1242 12430001
0 10 121 1242 12430001
0 10 120 1242 12430002
0 10 121 1242 12430002
0 10 121 1243 12440000
0 10 121 1243 12440001
0 10 121 1243 12440002
0 10 121 1244 12450000
0 11 121 1244 12450000
0 10 122 1244 12450000
0 11 122 1244 12450000
0 10 121 1244 12450001
0 11 121 1244 12450001
0 10 122 1244 12450001
0 11 122 1244 12450001
0 10 121 1244 12450002
0 11 121 1244 12450002
etc.