我正在用python 3编写一个脚本,该脚本正在侦听隧道,并根据收到的消息在MySQL中保存和更新数据。
我进入了奇怪的行为,我使用pymysql模块与 MySQL 进行了简单的连接,一切正常,在一段时间后这个简单的连接关闭。
所以我决定实现与MySQL的池连接,这里出现了问题。有些事情没有发生错误,但问题如下:
我的光标 =yield self._pool.execute(query, list(filters.values(
)))游标结果 =tornado_mysql.pools.pool 对象位于0x0000019DE5D71F98
像这样的堆栈不再做任何事情
如果我从光标中删除产量,则通过该行,下一行会抛出错误
响应 =yield c.fetchall()
属性错误:"未来"对象没有属性"fetchall">
如何修复 MySQL 池连接以正常工作?
我尝试过:
我使用很少的模块进行池连接,一切都在同一个问题中
恢复了与pymysql的简单连接并再次工作
在我的代码下面:
蟒蛇脚本文件
import pika
from model import SyncModel
_model = SyncModel(conf, _server_id)
@coroutine
def main():
credentials = pika.PlainCredentials('user', 'password')
try:
cp = pika.ConnectionParameters(
host='127.0.0.1',
port=5671,
credentials=credentials,
ssl=False,
)
connection = pika.BlockingConnection(cp)
channel = connection.channel()
@coroutine
def callback(ch, method, properties, body):
if 'messageType' in properties.headers:
message_type = properties.headers['messageType']
if message_type in allowed_message_types:
result = ptoto_file._reflection.ParseMessage(descriptors[message_type], body)
if result:
result = protobuf_to_dict(result)
if message_type == 'MyMessage':
yield _model.message_event(data=result)
else:
print('Message type not in allowed list = ' + str(message_type))
print('continue listening...')
channel.basic_consume(callback, queue='queue', no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
except Exception as e:
print('Could not connect to host 127.0.0.1 on port 5671')
print(str(e))
if __name__ == '__main__':
main()
同步模型
from tornado_mysql import pools
from tornado.gen import coroutine, Return
from tornado_mysql.cursors import DictCursor
class SyncModel(object):
def __init__(self, conf, server_id):
self.conf = conf
servers = [i for i in conf.mysql.servers]
for s in servers:
if s['server_id'] == server_id:
// s hold all data as, host, user, port, autocommit, charset, db, password
s['cursorclass'] = DictCursor
self._pool = pools.Pool(s, max_idle_connections=1, max_recycle_sec=3)
@coroutine
def message_event(self, data):
table_name = 'table_name'
query = ''
data = data['message']
filters = {
'id': data['id']
}
// here the connection fails as describe above
response = yield self.query_select(table_name, self._pool, filters=filters)
@coroutine
def query_select(self, table_name, _pool, filters=None):
if filters is None:
filters = {}
combined_filters = ['`%s` = %%s' % i for i in filters.keys()]
where = 'WHERE ' + ' AND '.join(combined_filters) if combined_filters else ''
query = """SELECT * FROM `%s` %s""" % (table_name, where)
c = self._pool.execute(query, list(filters.values()))
response = yield c.fetchall()
raise Return({response})
所有代码都只与数据库的简单连接,在我开始使用池示例后不再起作用。将不胜感激在此问题上的任何帮助。
这是一个独立的脚本。
池连接不起作用,因此切换回 pymysql 并仔细检查连接
我想发布有效的答案,只有这个解决方案对我有用
-
在连接到MySQL之前检查连接是否打开,如果没有重新连接
if not self.mysql.open: self.mysql.ping(reconnect=True)