从芹菜中执行insert时,mysql命令不同步



当使用自定义数据库库和芹菜时,我正在运行可怕的MySQL命令不同步。

库如下:

import pymysql
import pymysql.cursors
from furl import furl
from flask import current_app
class LegacyDB:
    """Db
    Legacy Database connectivity library
    """
    def __init__(self,app):
        with app.app_context():
            self.rc = current_app.config['RAVEN']
            self.logger = current_app.logger
            self.data = {}
            # setup Mysql
            try:
                uri = furl(current_app.config['DBCX'])
                self.dbcx = pymysql.connect(
                    host=uri.host,
                    user=uri.username,
                    passwd=uri.password,
                    db=str(uri.path.segments[0]),
                    port=int(uri.port),
                    cursorclass=pymysql.cursors.DictCursor
                    )
            except:
                self.rc.captureException()
    def query(self, sql, params = None, TTL=36):
        # INPUT 1 : SQL query
        # INPUT 2 : Parameters
        # INPUT 3 : Time To Live
        # OUTPUT  : Array of result
        # check that we're still connected to the
        # database before we fire off the query
        try:
            db_cursor = self.dbcx.cursor()
            if params:
              self.logger.debug("%s : %s" % (sql, params))
              db_cursor.execute(sql,params)
              self.dbcx.commit()
            else:
              self.logger.debug("%s" % sql)
              db_cursor.execute(sql)
            self.data = db_cursor.fetchall()
            if self.data == None:
              self.data = {}
            db_cursor.close()
        except Exception as ex:
            if ex[0] == "2006":
                db_cursor.close()
                self.connect()
                db_cursor = self.dbcx.cursor()
                if params:
                  db_cursor.execute(sql,params)
                  self.dbcx.commit()
                else:
                  db_cursor.execute(sql)
                self.data = db_cursor.fetchall()
                db_cursor.close()
            else:
                self.rc.captureException()
        return self.data

该库的目的是与SQLAlchemy一起工作,同时将遗留数据库模式从基于C++的系统迁移到基于Python的系统。

所有配置都是通过一个Flask应用程序完成的,app.config['DBCX']值读取的值与SQLAlchemyString("mysql://user:pass@host:port/dbname")允许我在未来轻松切换。

我有许多任务通过芹菜运行"INSERT"语句,所有这些任务都使用这个库。正如你所想象的,运行Celery的主要原因是为了提高这个应用程序的吞吐量,但我的库或应用程序中的线程似乎遇到了问题,因为过了一段时间(大约500条已处理的消息),我在日志中看到了以下内容:

Stacktrace (most recent call last):
  File "legacy/legacydb.py", line 49, in query
    self.dbcx.commit()
  File "pymysql/connections.py", line 662, in commit
    self._read_ok_packet()
  File "pymysql/connections.py", line 643, in _read_ok_packet
    raise OperationalError(2014, "Command Out of Sync")

很明显,我犯了这个错误,但MySQL是否启用/禁用了自动提交,或者我将connection.commit()调用放在哪里似乎都无关紧要。

如果我省略了connection.commit(),那么我就不会在数据库中插入任何内容。

我最近从mysqldb转到了pymysql,出现的次数似乎更低,但考虑到这些都是简单的"插入"命令,而不是复杂的选择(这个数据库上甚至没有任何外键约束!)我很难弄清楚问题出在哪里。

就目前的情况来看,我无法使用executemany,因为我无法提前准备语句(我从"firehose"消息队列中提取数据,并将其存储在本地以备稍后处理)。

首先,确保celery thingamagig自以来使用自己的连接

>>> pymysql.threadsafety
1

这意味着:"线程可以共享模块,但不能共享连接"。

init是调用一次,还是每个工作者调用一次?如果只有一次,您需要移动初始化。

在第一次调用查询时,在线程本地变量中延迟初始化连接如何?

相关内容

  • 没有找到相关文章

最新更新