- import logging
-
- import psycopg2
- from DBUtils.PooledDB import PooledDB
- from psycopg2 import pool
- from psycopg2._psycopg import ProgrammingError
- from psycopg2.extras import DictCursor
-
- from utils.ERP.erpsettings import ERP_SQL_INFO
-
- logger = logging.getLogger('django')
-
-
- class ERPDB(object):
- __pool = None
-
- def __init__(self, host=ERP_SQL_INFO.get('HOST'), user=ERP_SQL_INFO.get('USER'),
- password=ERP_SQL_INFO.get('PASSWORD'), database=ERP_SQL_INFO.get('DATABASE'),
- port=ERP_SQL_INFO.get('PORT')):
- try:
- # self.connectPool = pool.SimpleConnectionPool(2, 10, host=host, port=port,
- # user=user, password=password,
- # database=database, keepalives=1,
- # keepalives_idle=30, keepalives_interval=10,
- # keepalives_count=5)
- # self.conn, self.cursor = self.__get_connection()
- if ERPDB.__pool is None:
- __pool = PooledDB(creator=psycopg2,
- mincached=1,
- maxcached=20,
- host=host,
- port=port,
- user=user,
- password=password,
- dbname=database,
- # use_unicode=False,
- # charset="utf8",
- cursor_factory=DictCursor)
- self._conn = __pool.connection()
- self._cursor = self._conn.cursor()
- except Exception as e:
- logger.info('初始化erp数据库连接失败:{}'.format(e))
-
- def execute(self, sql, param=None):
- count = -1
- try:
- # print(sql)
- self._cursor.execute(sql, param)
- count = self._cursor.rowcount
- self._conn.commit()
- except Exception as e:
- logger.error('psycopg2 提交失败 {}'.format(e))
- self._conn.rollback()
- return count
-
- def get_one(self, sql, param=None):
- try:
- self.execute(sql, param)
- result = self._cursor.fetchone()
- return result
- except ProgrammingError:
- return dict()
-
- def get_all(self, sql, param=None):
- self.execute(sql, param)
- result = self._cursor.fetchall()
- return result
-
- def dispose(self):
- self._cursor.close()
- self._conn.close()
-
- def getAll(self, sql, param=None):
- """
- @summary: 执行查询,并取出所有结果集
- @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
- @param param: 可选参数,条件列表值(元组/列表
- @return: result list(字典对象)/boolean 查询到的结果集
- """
- if param is None:
- self._cursor.execute(sql)
- else:
- self._cursor.execute(sql, param)
- try:
- list_header = [row[0] for row in self._cursor.description]
- list_result = [[item for item in row] for row in self._cursor.fetchall()]
- result = [dict(zip(list_header, row)) for row in list_result]
- except:
- result = False
- return result
-
- def getCount(self, sql, param=None):
- if param is None:
- self._cursor.execute(sql)
- else:
- self._cursor.execute(sql, param)
- return self._cursor.rowcount
django 使用原生sql的方法
- 2021-12-31
- Admin
原文:https://blog.csdn.net/zhouxuan612/article/details/122249805
ERP
联系站长
QQ:769220720