为了账号安全,请及时绑定邮箱和手机立即绑定

Psycopg2 在类中自动重新连接

Psycopg2 在类中自动重新连接

函数式编程 2022-12-14 20:45:00
我有课程可以连接到我的数据库。import psycopg2, psycopg2.extensionsfrom parseini import configimport pandas as pd, pandas.io.sql as sqlioclass MyDatabase:    def __init__(self, name='mydb.ini'):        self.params = config(filename=name)        self.my_connection = psycopg2.connect(**self.params)        self.my_cursor = self.my_connection.cursor()    def fetch_all_as_df(self, sql_statement):        return sqlio.read_sql_query(sql_statement, self.my_connection)    def df_to_sql(self, df):        table = 'sometable'        return sqlio.to_sql(df, table, self.my_connection)    def __del__(self):        self.my_cursor.close()        self.my_connection.close()在我的案例中,如何重新连接到数据库并处理 psycopg2.OperationalError?
查看完整描述

1 回答

?
饮歌长啸

TA贡献1951条经验 获得超3个赞

psycopg2.InterfaceError您可以制作一个装饰器,在或psycopg2.OperationalError被提升时尝试重新连接。

这只是它如何工作的一个例子,可能需要调整:


import time

from functools import wraps

import psycopg2, psycopg2.extensions



def retry(fn):

    @wraps(fn)

    def wrapper(*args, **kw):

        cls = args[0]

        for x in range(cls._reconnectTries):

            print(x, cls._reconnectTries)

            try:

                return fn(*args, **kw)

            except (psycopg2.InterfaceError, psycopg2.OperationalError) as e:

                print ("\nDatabase Connection [InterfaceError or OperationalError]")

                print ("Idle for %s seconds" % (cls._reconnectIdle))

                time.sleep(cls._reconnectIdle)

                cls._connect()

    return wrapper



class MyDatabase:

    _reconnectTries = 5

    _reconnectIdle = 2  # wait seconds before retying


    def __init__(self, name='mydb.ini'):

        self.my_connection = None

        self.my_cursor = None

        self.params = config(filename=name)

        self._connect()


    def _connect(self):

        self.my_connection = psycopg2.connect(**self.params)

        self.my_cursor = self.my_connection.cursor()


    @retry

    def fetch_all_as_df(self, sql_statement):

        return sqlio.read_sql_query(sql_statement, self.my_connection)


    @retry

    def dummy(self):

        self.my_cursor.execute('select 1+2 as result')

        return self.my_cursor.fetchone()


    @retry

    def df_to_sql(self, df):

        table = 'sometable'

        return sqlio.to_sql(df, table, self.my_connection)


    def __del__(self):

        # Maybe there is a connection but no cursor, whatever close silently!

        for c in (self.my_cursor, self.my_connection):

            try:

                c.close()

            except:

                pass



db = MyDatabase()

time.sleep(30)  # some time to shutdown the database

print(db.dummy())

输出:


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds


Database Connection [InterfaceError or OperationalError]

Idle for 2 seconds

(3,)

注意:_connect它本身没有修饰,所以这段代码假定初始连接总是有效!


查看完整回答
反对 回复 2022-12-14
  • 1 回答
  • 0 关注
  • 237 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信