django 使用原生sql的方法

  • 2021-12-31
  • Admin
  1. import logging
  2. import psycopg2
  3. from DBUtils.PooledDB import PooledDB
  4. from psycopg2 import pool
  5. from psycopg2._psycopg import ProgrammingError
  6. from psycopg2.extras import DictCursor
  7. from utils.ERP.erpsettings import ERP_SQL_INFO
  8. logger = logging.getLogger('django')
  9. class ERPDB(object):
  10. __pool = None
  11. def __init__(self, host=ERP_SQL_INFO.get('HOST'), user=ERP_SQL_INFO.get('USER'),
  12. password=ERP_SQL_INFO.get('PASSWORD'), database=ERP_SQL_INFO.get('DATABASE'),
  13. port=ERP_SQL_INFO.get('PORT')):
  14. try:
  15. # self.connectPool = pool.SimpleConnectionPool(2, 10, host=host, port=port,
  16. # user=user, password=password,
  17. # database=database, keepalives=1,
  18. # keepalives_idle=30, keepalives_interval=10,
  19. # keepalives_count=5)
  20. # self.conn, self.cursor = self.__get_connection()
  21. if ERPDB.__pool is None:
  22. __pool = PooledDB(creator=psycopg2,
  23. mincached=1,
  24. maxcached=20,
  25. host=host,
  26. port=port,
  27. user=user,
  28. password=password,
  29. dbname=database,
  30. # use_unicode=False,
  31. # charset="utf8",
  32. cursor_factory=DictCursor)
  33. self._conn = __pool.connection()
  34. self._cursor = self._conn.cursor()
  35. except Exception as e:
  36. logger.info('初始化erp数据库连接失败:{}'.format(e))
  37. def execute(self, sql, param=None):
  38. count = -1
  39. try:
  40. # print(sql)
  41. self._cursor.execute(sql, param)
  42. count = self._cursor.rowcount
  43. self._conn.commit()
  44. except Exception as e:
  45. logger.error('psycopg2 提交失败 {}'.format(e))
  46. self._conn.rollback()
  47. return count
  48. def get_one(self, sql, param=None):
  49. try:
  50. self.execute(sql, param)
  51. result = self._cursor.fetchone()
  52. return result
  53. except ProgrammingError:
  54. return dict()
  55. def get_all(self, sql, param=None):
  56. self.execute(sql, param)
  57. result = self._cursor.fetchall()
  58. return result
  59. def dispose(self):
  60. self._cursor.close()
  61. self._conn.close()
  62. def getAll(self, sql, param=None):
  63. """
  64. @summary: 执行查询,并取出所有结果集
  65. @param sql:查询SQL,如果有查询条件,请只指定条件列表,并将条件值使用参数[param]传递进来
  66. @param param: 可选参数,条件列表值(元组/列表
  67. @return: result list(字典对象)/boolean 查询到的结果集
  68. """
  69. if param is None:
  70. self._cursor.execute(sql)
  71. else:
  72. self._cursor.execute(sql, param)
  73. try:
  74. list_header = [row[0] for row in self._cursor.description]
  75. list_result = [[item for item in row] for row in self._cursor.fetchall()]
  76. result = [dict(zip(list_header, row)) for row in list_result]
  77. except:
  78. result = False
  79. return result
  80. def getCount(self, sql, param=None):
  81. if param is None:
  82. self._cursor.execute(sql)
  83. else:
  84. self._cursor.execute(sql, param)
  85. return self._cursor.rowcount

原文:https://blog.csdn.net/zhouxuan612/article/details/122249805

联系站长

QQ:769220720