我正在经历一些我无法完全弄清楚的行为。 我正在使用 Cassandra 来存储消息对象,并且我正在使用 Celery 对数据库进行异步拉取和推送。 一切都很好,除了一个芹菜任务;使用相同代码/类的其他任务工作。 以下是代码逻辑的粗略细分:
db_manager = DBManager()
class User(object):
def __init__(self, user_id):
... normal init stuff ...
self.loader()
@run_async
def loader(self):
... loads from database if found, otherwise pulls from API ...
# THIS WORKS
@celery.task(name='user-to-db', filter=task_method)
def to_db(self):
# db_manager is a custom backend that handles relevant db reads, writes, etc.
db_manager.add('users', self.user_payload)
# THIS WORKS
@celery.task(name='load-friends', filter=task_method)
def load_friends(self):
# Checks secondary redis index for friends of user
friends = redis.srandmember('users:the-users-id:friends', self.id, 20)
if not friends:
profiles = load_friends_from_api(user_id=self.id)
else:
query = "SELECT * FROM keyspace.users WHERE id IN ({friends})".format(friends=friends)
# Init a User object for every friend
loaded_friends = [User(friend) for friend in profiles]
# Returns a class container with all the instances of User(friend), accessible through a class property
return FriendContainer(self.id, loaded_friends)
# THIS DOES NOT WORK
@celery.task(name='get-user-messages', filter=task_method)
def get_user_messages(self):
# THIS IS WHERE IT FAILS #
messages = db_manager.get("SELECT message FROM keyspace.message_timelines WHERE user_id = {user_id}".format(user_id=self.id))
# THAT LINE ABOVE #
# Init a message class object for every message payload in database
msgs = [Message(m, user=self) for m in messages]
# Returns a message container class holding all the message objects, accessible through a class property
return MessageContainer(msgs)
最后一个类方法引发错误:
File "/usr/local/lib/python2.7/dist-packages/kombu/serialization.py", line 356, in pickle_dumps
return dumper(obj, protocol=pickle_protocol)
EncodeError: Can't pickle <class 'cassandra.io.eventletreactor.message'>: attribute lookup cassandra.io.eventletreactor.message failed
cassandra.io.eventletreactor.message
指向 Cassandra 中的用户定义类型,我将其用作每个用户的消息对象的容器。 引发此错误的行是:
messages = db_manager.get("SELECT message FROM keyspace.message_timelines WHERE user_id = {user_id}".format(user_id=self.id))
这是来自DBManager()
的方法:
class DBManager(object):
... stuff ...
def get(self, query):
# I do some stuff to prepare the query, namely substituting `WHERE this = that` for `WHERE this = ?` to create a Cassandra prepared statement.
statement = cassandra.prepare(query_prepared)
# I want these messages as a dict, not the default namedtuple
cassandra.row_factory = dict_factory
# User id is parsed out of query
results = cassandra.execute(statement, (user_id,))
rows = results.current_rows
# rows is a list of dicts, no weird class references or anything in there
return rows
我读过类外的 Celery 任务是一种实验性的,但我无法弄清楚为什么所有其他方法 qu 使用相同实例的任务DBManager
都在工作。
该问题似乎局限于用户定义的类型message
的某些问题,该问题在 Cassandra 驱动程序中表现不佳;但是,如果我在 Celery 任务本身中从DBManager
运行 get
方法,它就可以工作了。 也就是说,如果我将错误从DBManager.get
复制/粘贴到User.get_user_messages
的代码,它可以正常工作。 如果我尝试从User.get_user_messages
内部调用DBManager.get
,它会中断。
我只是无法弄清楚问题出在哪里。 我可以很好地完成以下所有操作:
- 在没有芹菜的情况下运行
get_user_messages
方法,它可以工作。 - 如果我直接在 Celery 任务方法本身中运行
get
方法代码,请与 Celery 一起运行get_user_messages
方法。 - 我可以运行注册为 Celery 任务的其他方法,这些方法指向
DBManager
中使用 Cassandra 驱动程序的其他方法,甚至是将相同的message
用户定义类型插入数据库的方法。 - 我自己尝试过腌制所有的东西,并以各种组合,但无法重现错误。
我没有尝试过的:
- 将序列化程序更改为
json
或yaml
。 db 有效负载中有一些方便的项目不会使用这两个中的任何一个进行序列化。 - 使用
dill
而不是pickle
。 似乎这应该无需切换序列化程序即可工作,因为我可以让各个部分单独工作。
我可以说搞砸它并直接通过 Cassandra 驱动程序而不是我的 DBManager
类运行查询,但我觉得这应该是可以解决的,我只是错过了一些非常非常明显的东西,如此明显以至于我没有看到它。 任何关于哪里看的建议将不胜感激。
在相关性的情况下:Cassandra 3.3,CQL 3.4,DataStax蟒蛇驱动程序3.1
呵呵,我发现了问题,而且非常明显。 我想我实际上并没有尝试腌制所有的东西,只是大部分的东西,而且我没有在凌晨 4 点的调试昏迷中抓住这一点。
无论如何,cassandra.row_factory = dict_factory
在用户定义的类型上调用时,实际上并没有将所有内容作为字典返回。 它给出了一个{'label': message(x='this', y='that')}
字典,其中message
是一个命名元组。 Cassandra 驱动程序在类实例中动态创建命名元组,因此 pickle 找不到它。