Python mysql(使用pymysql)自动重新连接



我不确定这是否可能,但我正在寻找一种在连接丢失时重新连接到mysql数据库的方法。所有的连接都保存在一个事件队列中,但我认为这应该无关紧要。我确信,如果我投入一些时间,我可以想出一种重新连接到数据库的方法。然而,我正在浏览pymysql代码,我看到在连接类中有一个'ping'方法,我不确定如何使用。

该方法看起来像它将重新连接第一次,但之后,它切换重新连接标志为False再次?我可以使用这种方法吗?或者如果它丢失了,有其他方法可以建立连接吗?即使它不是pymysql人们如何解决,数据库服务器下降,不得不重新建立连接到mysql服务器?

def ping(self, reconnect=True):
    ''' Check if the server is alive '''
    if self.socket is None:
        if reconnect:
            self._connect()
            reconnect = False
        else:
            raise Error("Already closed")
    try:
        self._execute_command(COM_PING, "")
        return self._read_ok_packet()
    except Exception:
        if reconnect:
            self._connect()
            return self.ping(False)
        else:
            raise

嗯,我在我的应用程序中遇到了同样的问题,我在PyMySQL文档中找到了一个方法,ping到服务器并检查连接是否关闭,如果它关闭了,那么它再次重新连接。

from pymysql import connect
from pymysql.cursors import DictCursor
# create the connection
connection = connect(host='host', port='port', user='user', 
                     password='password', db='db', 
                     cursorclass=DictCursor)
# get the cursor
cursor = connection.cursor()
# if the connection was lost, then it reconnects
connection.ping(reconnect=True)      
# execute the query
cursor.execute(query)

终于有了一个可行的解决方案,可能会对某人有所帮助。

from gevent import monkey
monkey.patch_socket()
import logging
import gevent
from gevent.queue import Queue
import pymysql as db
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("connection_pool")

class ConnectionPool:
    def __init__(self, db_config, time_to_sleep=30, test_run=False):
        self.username = db_config.get('user')
        self.password = db_config.get('password')
        self.host = db_config.get('host')
        self.port = int(db_config.get('port'))
        self.max_pool_size = 20
        self.test_run = test_run
        self.pool = None
        self.time_to_sleep = time_to_sleep
        self._initialize_pool()
    def get_initialized_connection_pool(self):
        return self.pool
    def _initialize_pool(self):
        self.pool = Queue(maxsize=self.max_pool_size)
        current_pool_size = self.pool.qsize()
        if current_pool_size < self.max_pool_size:  # this is a redundant check, can be removed
            for _ in xrange(0, self.max_pool_size - current_pool_size):
                try:
                    conn = db.connect(host=self.host,
                                      user=self.username,
                                      passwd=self.password,
                                      port=self.port)
                    self.pool.put_nowait(conn)
                except db.OperationalError, e:
                    LOGGER.error("Cannot initialize connection pool - retrying in {} seconds".format(self.time_to_sleep))
                    LOGGER.exception(e)
                    break
        self._check_for_connection_loss()
    def _re_initialize_pool(self):
        gevent.sleep(self.time_to_sleep)
        self._initialize_pool()
    def _check_for_connection_loss(self):
        while True:
            conn = None
            if self.pool.qsize() > 0:
                conn = self.pool.get()
            if not self._ping(conn):
                if self.test_run:
                    self.port = 3306
                self._re_initialize_pool()
            else:
                self.pool.put_nowait(conn)
            if self.test_run:
                break
            gevent.sleep(self.time_to_sleep)
    def _ping(self, conn):
        try:
            if conn is None:
                conn = db.connect(host=self.host,
                                  user=self.username,
                                  passwd=self.password,
                                  port=self.port)
            cursor = conn.cursor()
            cursor.execute('select 1;')
            LOGGER.debug(cursor.fetchall())
            return True
        except db.OperationalError, e:
            LOGGER.warn('Cannot connect to mysql - retrying in {} seconds'.format(self.time_to_sleep))
            LOGGER.exception(e)
            return False
# test (pytest compatible) -------------------------------------------------------------------------------------------
import logging
from src.py.ConnectionPool import ConnectionPool
logging.basicConfig(level=logging.DEBUG)
LOGGER = logging.getLogger("test_connection_pool")

def test_get_initialized_connection_pool():
    config = {
        'user': 'root',
        'password': '',
        'host': '127.0.0.1',
        'port': 3305
    }
    conn_pool = ConnectionPool(config, time_to_sleep=5, test_run=True)
    pool = conn_pool.get_initialized_connection_pool()
    # when in test run the port will be switched back to 3306
    # so the queue size should be 20 - will be nice to work 
    # around this rather than test_run hack
    assert pool.qsize() == 20

最简单的方法是在发送查询之前检查连接。

您可以通过创建一个包含两个方法的小类来做到这一点:connectquery:

import pymysql
import pymysql.cursors
class DB:
    def connect(self):
        self.conn = pymysql.connect(
                             host=hostname,
                             user=username,
                             password=password,
                             db=dbname,
                             charset='utf8mb4',
                             cursorclass=pymysql.cursors.DictCursor,
                             port=3306)
    def query(self, sql):
        try:
            cursor = self.conn.cursor()
            cursor.execute(sql)
        except pymysql.OperationalError:
            self.connect()
            cursor = self.conn.cursor()
            cursor.execute(sql)
        return cursor
db = DB()

现在,无论何时使用db.query("example SQL")发送查询,请求都会自动准备遇到连接错误,并在需要时使用self.connect()重新连接。

记住:这是一个简化的示例。通常,您希望让PyMySQL帮助您转义查询中的特殊字符。要做到这一点,你必须在query方法中添加第二个参数,并从那里开始。

逻辑很简单,如果连接关闭,然后尝试重新连接几次,在这种情况下,我使用max重试15次来重新连接或ping。

import pymysql, pymysql.cursors
conn = pymysql.connect(
                         host=hostname,
                         user=username,
                         password=password,
                         db=dbname,
                         charset='utf8mb4',
                         cursorclass=pymysql.cursors.DictCursor,
                         )
cursor = conn.cursor()
# you can do transactions to database and when you need conn later, just make sure the server is still connected
if conn.open is False:
   max_try = 15
   try = 0
   while conn.open is False:
       if try < max_try:
           conn.ping() # autoreconnect is true by default
       try +=1
# check the conn again to make sure it connected
if conn.open:
    # statements when conn is successfully reconnect to the server
else:
    # it must be something wrong : server, network etc

旧的,但我遇到了一个类似的问题,在程序中访问托管数据库。我最终使用的解决方案是创建一个装饰器,以便在查询时自动重新连接。

给定一个连接函数:

def connect(self):
    self.conn = mysql.connector.connect(host=self.host, user=self.user, 
    database=self.database, password=self.password)
    self.cursor = self.conn.cursor()
    print("Established connectionn...")
我创建

def _reconnect(func):
    @wraps(func)
    def rec(self,*args,**kwargs):
        try:
            result = func(self,*args,**kwargs)
            return result
        except (mysql.connector.Error, mysql.connector.Warning) as e:
            self.connect()
            result = func(self,*args,**kwargs)
            return result
    return rec 

这样任何使用该连接的函数现在都可以这样装饰

@_reconnect
def check_user_exists(self,user_id):
    self.cursor.execute("SELECT COUNT(*) FROM _ where user_id={};".format(user_id))
    if self.cursor.fetchall()[0][0]==0:
        return False 
    else:
        return True

这个装饰器将重新建立一个连接,并重新运行任何涉及到数据库查询的函数。

您可以在每次执行查询时使用属性来保持连接活动:

import pymysql
import pymysql.cursors
import pandas as pd 
class DB:
    def __init__(self, hostname='1.1.1.1', username='root', password='password',
                 database=None, port=3306, charset="utf8mb4"):
        self.hostname = hostname
        self.database = database
        self.username = username 
        self.password = password
        self.port = port
        self.charset = charset
        self.connect()
    
    @property
    def conn(self): 
        if not self.connection.open:
            print('Going to reconnect')
        self.connection.ping(reconnect=True)
        return self.connection
    def connect(self):
        self.connection = pymysql.connect(
                             host=self.hostname,
                             user=self.username,
                             password=self.password,
                             db=self.database,
                             charset=self.charset,
                             cursorclass=pymysql.cursors.DictCursor,
                             port=self.port)
    def query(self, sql):
        return pd.read_sql_query(sql, con=self.conn)
db = DB(hostname='1.1.1.1', username='root', password='password', database=None, port=3306, charset="utf8mb4")

相关内容

  • 没有找到相关文章

最新更新