NetworkX可扩展性,用于节点/边缘的自定义存储



NetworkX是否支持自定义节点、边和属性的存储位置?例如,我想尝试2个选项:

  1. 使用类似LevelDB/Kyoto Cabinet的东西作为后备存储。

  2. 使用一些分布式数据库(Neo4j甚至HBase-I只需要节点/边缘的分布式存储)作为后备存储。

NetworkX支持这些东西的可扩展性点是什么?

我将发布为外部存储设置NetworkX的细微差别。Kikohs正确地指出,每一本字典都有工厂。这些可以被覆盖。

对于持久化存储,唯一真正需要特别注意的字典是节点字典。

必须特别注意类似dict的实现的行为。NetworkX类中有一些代码可以更改从内存中的字典返回的值,而无需对其进行设置。

例如:

self.succ[u][v]=datadict
self.pred[v][u]=datadict

这些值将不会持久化回存储后端。为了适应这种情况,我使用了一个内存缓存,它将对象保存在内存中,当它们被逐出时,它会将它们写入底层存储。

对于内存中的缓存,我使用了cachetools。关于驱逐,请参阅:Python functools.ru_cache驱逐回调或等效

对于底层存储,我使用了plyvel(https://plyvel.readthedocs.org/en/latest/)它是LevelDB的Python接口。

下面我还给出了字典的实现。请注意,代码中仍然存在错误和bug,而且还没有经过适当的测试,但您已经了解了大致情况。

class PlyvelBatchWrite(object):
    def __init__(self, plv_dict):
        self.__batch = plv_dict._db.write_batch()
        self.__plvd = plv_dict
    def set(self, key, val):
        self.__batch.put(self.__plvd.serializer.pack(key), self.__plvd.serializer.pack(val))
    def delete(self, key):
        self.__batch.delete(self.__plvd.serializer.pack(key))
    def clear(self):
        self.__batch.clear()
    def commit(self):
        self.__batch.write()

class PlyvelDict(MutableMapping):
    def __init__(self, directory='', db=None, serializer_factory=None, cache_factory=None, **kwargs):
        self.__directory = directory
        ensure_directory(directory)
        if isinstance(db, str) or db is None:
            if db is None:
                # generate UUID
                db = str(uuid.uuid4())
            self.__db = db
            db = plyvel.DB(self.name(), **kwargs)
        else:
            self.__db = kwargs['db']
        self._db = db
        if serializer_factory:
            self.serializer = serializer_factory()
        else:
            self.serializer = None
        if cache_factory:
            self.__cache = cache_factory(self.__cache_miss, self.__cache_evict)
        else:
            self.__cache = None
    def name(self):
        full_path = os.path.join(self.__directory, self.__db)
        return full_path
    def __cache_miss(self, key):
        b_item = self._db.get(self.serializer.pack(key))
        if b_item is not None:
            return self.serializer.unpack(b_item)
        else:
            raise KeyError(key)
    def __cache_evict(self, key, val):
        self._db.put(self.serializer.pack(key), self.serializer.pack(val))
    def __copy__(self):
        return type(self)(self.__directory, self._db, type(self.serializer), type(self.__cache), db=self.__db)
    def __getitem__(self, key):
        return self.__cache[key]
    def __setitem__(self, key, value):
        if key in self.__cache:
            self.__cache[key] = value
        self.__write_to_db(key, value)
    def __write_to_db(self, key, value):
        self._db.put(self.serializer.pack(key), self.serializer.pack(value))
    def __delitem__(self, key):
        if key in self.__cache:
            del self.__cache[key]
        self._db.delete(self.serializer.pack(key))
    def __iter__(self):
        return self.iterkeys()
    def __keytransform__(self, key):
        return key
    def __len__(self):
        return self.count()
    def __del__(self):
        self.flush()
        if not self._db.closed:
            self._db.close()
    # improved methods
    def flush(self, write_to_db=False):
        if self.__cache:
            if write_to_db:
                batch = self.set_batch()
                for key, val in self.__cache.items():
                    batch.set(key, val)
                batch.commit()
            self.__cache.clear()
    def set_batch(self):
        return PlyvelBatchWrite(self)
    def iteritems(self):
        self.flush()
        for key, value in self._db.iterator(include_key=True, include_value=True):
            yield (self.serializer.unpack(key), self.serializer.unpack(value))
    def iterkeys(self):
        self.flush()
        for key in self._db.iterator(include_key=True, include_value=False):
            yield self.serializer.unpack(key)
    def itervalues(self):
        self.flush()
        for val in self._db.iterator(include_key=False, include_value=True):
            yield self.serializer.unpack(val)
    def keys(self):
        self.flush()
        # fixes default method which calls __len__
        return list(self.iterkeys())
    def values(self):
        self.flush()
        return list(self.itervalues())
    def has_key(self, key):
        return key in self
    def clear(self):
        self.flush()
        for k in self:
            del self[k]
    def count(self):
        self.flush()
        return sum(1 for key in self)

图形类:

class PersistedGraph(nx.Graph):
    def __init__(self, data=None, node_dict_factory=None, adjlist_dict_factory=None, edge_attr_dict_factory=None,
                 **attr):
        if node_dict_factory:
            self.node_dict_factory = node_dict_factory
        if adjlist_dict_factory:
            self.adjlist_dict_factory = adjlist_dict_factory
        if edge_attr_dict_factory:
            self.edge_attr_dict_factory = edge_attr_dict_factory
        nx.Graph.__init__(self, data, **attr)

应该可以通过子类化Graph类并提供用户定义的工厂函数来扩展networkx。这些函数可以查询数据库并将结果缓存在networkx使用的词典中。

我在网上文档中找不到这些行,但在你的代码中:

子类(高级):

Graph类使用dict数据结构中dict的dict。

外部dict(node_dict)包含按节点键控的邻接列表。下一个dict(adjlist)表示邻接列表并保持由邻居键入的边缘数据。内部dict(edge_attr)表示边缘数据并保存由属性名称键控的边缘属性值。

这三个dict中的每一个都可以被用户定义的类似dict的对象。一般来说,dict类功能应该是保持,但可以添加额外的功能。替换其中一个dicts通过更改class(!)变量创建一个新的图形类为类似dict的结构保留工厂。变量名称分别是node_dict_factory、adjlist_dict_ffactory和edge_attr_dict_fFactory。

    node_dict_factory : function, (default: dict)
    Factory function to be used to create the outer-most dict
    in the data structure that holds adjacency lists keyed by node.
    It should require no arguments and return a dict-like object.
    adjlist_dict_factory : function, (default: dict)
    Factory function to be used to create the adjacency list
    dict which holds edge data keyed by neighbor.
    It should require no arguments and return a dict-like object
    edge_attr_dict_factory : function, (default: dict)
    Factory function to be used to create the edge attribute
    dict which holds attrbute values keyed by attribute name.
    It should require no arguments and return a dict-like object.

我不知道networkx有任何正式的扩展。

最新更新