我正在使用databases
python包(https://pypi.org/project/databases/)管理到我的postgresql数据库的连接
从文档中(https://www.encode.io/databases/database_queries/#queries)上面说我可以用
# Fetch multiple rows without loading them all into memory at once
query = notes.select()
async for row in database.iterate(query=query):
...
或
# Fetch multiple rows
query = notes.select()
rows = await database.fetch_all(query=query)
以下是我尝试过的:
def check_all_orders():
query = "SELECT * FROM orders WHERE shipped=True"
return database.fetch_all(query)
...
...
...
@app.task
async def check_orders():
query = await check_all_orders()
today = datetime.utcnow()
for q in query:
if q.last_notification is not None:
if (today - q.last_notification).total_seconds() < q.cooldown:
continue
和
@app.task
async def check_orders():
query = "SELECT * FROM orders WHERE shipped=True"
today = datetime.utcnow()
async for q in database.iterate(query=query):
if q.last_notification is not None:
if (today - q.last_notification).total_seconds() < q.cooldown:
continue
我同时使用了,但出现以下错误:
引发TypeError(f'类型为{o.类.名称}的对象'kombu.exceptions.EncodeError:coroutine类型的对象不是JSON可序列化的
以下的完整错误
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/celery/app/trace.py", line 472, in trace_task
mark_as_done(
File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 154, in mark_as_done
self.store_result(task_id, result, state, request=request)
File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 434, in store_result
self._store_result(task_id, result, state, traceback,
File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 856, in _store_result
self._set_with_state(self.get_key_for_task(task_id), self.encode(meta), state)
File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 324, in encode
_, _, payload = self._encode(data)
File "/usr/local/lib/python3.9/site-packages/celery/backends/base.py", line 328, in _encode
return dumps(data, serializer=self.serializer)
File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 220, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.9/contextlib.py", line 135, in __exit__
self.gen.throw(type, value, traceback)
File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 53, in _reraise_errors
reraise(wrapper, wrapper(exc), sys.exc_info()[2])
File "/usr/local/lib/python3.9/site-packages/kombu/exceptions.py", line 21, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 49, in _reraise_errors
yield
File "/usr/local/lib/python3.9/site-packages/kombu/serialization.py", line 220, in dumps
payload = encoder(data)
File "/usr/local/lib/python3.9/site-packages/kombu/utils/json.py", line 65, in dumps
return _dumps(s, cls=cls or _default_encoder,
File "/usr/local/lib/python3.9/json/__init__.py", line 234, in dumps
return cls(
File "/usr/local/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/usr/local/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/usr/local/lib/python3.9/site-packages/kombu/utils/json.py", line 55, in default
return super().default(o)
File "/usr/local/lib/python3.9/json/encoder.py", line 179, in default
raise TypeError(f'Object of type {o.__class__.__name__} '
kombu.exceptions.EncodeError: Object of type coroutine is not JSON serializable
Schema.jsonify方法——就像Flask中的jsonify一样——返回kombu无法序列化的响应对象(IIRC kombu默认序列化为JSON(。您可能应该使用dump而不是jsonify来返回字典。