Python操作SQLite3数据库辅助类(查询构造器)

!本文可能 超过1年没有更新,今后内容也许不会被维护或者支持,部分内容可能具有时效性,涉及技术细节或者软件使用方面,本人不保证相应的兼容和可操作性。

接触Python时间也不是很长的,最近有个项目需要分析数据,于是选用Python为编程语言,除了语言特性外主要还是看重Python对于SQLite3数据库良好的支持能力了,因为需要灵活处理大量的中间数据。

刚开始一些模块我还乐此不疲的写SQL语句,后来渐渐厌倦了,回想到以前捣鼓C#的时候利用反射初步构建了个SQL查询构造器,直到发现linq,于是放弃了这个计划,当然微软后来又推出了Entity Framework,这些都是后话了,而且现在我对微软的东西兴趣不是很大的,好了,扯多了,下面继续正文。

对了,再扯一句,优秀的博客程序Drupal也使用了类似的查询构造器进行数据库查询,避免直接写SQL语句,另外这样做的一点点好处就是,可以一定程度的屏蔽平台相关性,对于数据库迁移还是有帮助的。

不过我今天介绍的数据库辅助类查询构造器是个很简单的东东,甚至只限于SQLite数据库,如果有童鞋感兴趣可以完善下,我目前只要操作SQLite顺手就可以了,对于比较大的数据库应用就直接上ORM吧。

先看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
import sqlite3
 
# ***************************************************
# *
# * Description: Python操作SQLite3数据库辅助类(查询构造器)
# * Author: wangye
# * Website: http://wangye.org
# *
# ***************************************************
 
def _wrap_value(value):
    return repr(value)
 
def _wrap_values(values):
    return list(map(_wrap_value, values))
 
def _wrap_fields(fields):
    for key,value in fields.items():
        fields[key] = _wrap_value(value)
    return fields
 
def _concat_keys(keys):
    return "[" + "],[".join(keys) + "]"
 
def _concat_values(values):
    return ",".join(values)
 
def _concat_fields(fields, operator = (None, ",")):
    if operator:
        unit_operator, group_operator = operator
    # fields = _wrap_fields(fields)
    compiled = []
    for key,value in fields.items():
        compiled.append("[" + key + "]")
        if unit_operator:
            compiled.append(unit_operator)
            compiled.append(value)
        compiled.append(group_operator)
    compiled.pop() # pop last group_operator
    return " ".join(compiled)
 
class DataCondition(object):
    """
        本类用于操作SQL构造器辅助类的条件语句部分
 
        例如:
        DataCondition(("=", "AND"), id = 26)
        DataCondition(("=", "AND"), True, id = 26)
    """
 
    def __init__(self, operator = ("=", "AND"), ingroup = True, **kwargs):
        """
            构造方法
            参数:
                operator 操作符,分为(表达式操作符, 条件运算符)
                ingroup  是否分组,如果分组,将以括号包含
                kwargs   键值元组,包含数据库表的列名以及值
                         注意这里的等于号不等于实际生成SQL语句符号
                         实际符号是由operator[0]控制的
            例如:
            DataCondition(("=", "AND"), id = 26)
            (id=26)
            DataCondition((">", "OR"), id = 26, age = 35)
            (id>26 OR age>35)
            DataCondition(("LIKE", "OR"), False, name = "John", company = "Google")
            name LIKE 'John' OR company LIKE "Google"
        """
        self.ingroup = ingroup
        self.fields = kwargs
        self.operator = operator
 
    def __unicode__(self):
        self.fields = _wrap_fields(self.fields)
        result = _concat_fields(self.fields, self.operator)
        if self.ingroup:
            return "(" + result + ")"
        return result
 
    def __str__(self):
        return self.__unicode__()
 
    def toString(self):
        return self.__unicode__()
 
class DataHelper(object):
 
    """
        SQLite3 数据查询辅助类
    """
 
    def __init__(self, filename):
        """
            构造方法
            参数: filename 为SQLite3 数据库文件名
        """
        self.file_name = filename
 
    def open(self):
        """
            打开数据库并设置游标
        """
        self.connection = sqlite3.connect(self.file_name)
        self.cursor = self.connection.cursor()
        return self
 
    def close(self):
        """
            关闭数据库,注意若不显式调用此方法,
            在类被回收时也会尝试调用
        """
        if hasattr(self, "connection") and self.connection:
            self.connection.close()
 
    def __del__(self):
        """
            析构方法,做一些清理工作
        """
        self.close()
 
    def commit(self):
        """
            提交事务
            SELECT语句不需要此操作,默认的execute方法的
            commit_at_once设为True会隐式调用此方法,
            否则就需要显示调用本方法。
        """
        self.connection.commit()
 
    def execute(self, sql = None, commit_at_once = True):
        """
            执行SQL语句
            参数:
                sql  要执行的SQL语句,若为None,则调用构造器生成的SQL语句。
                commit_at_once 是否立即提交事务,如果不立即提交,
                对于非查询操作,则需要调用commit显式提交。
        """
        if not sql:
            sql = self.sql
        self.cursor.execute(sql)
        if commit_at_once:
            self.commit()
 
    def fetchone(self, sql = None):
        """
            取一条记录
        """
        self.execute(sql, False)
        return self.cursor.fetchone()
 
    def fetchall(self, sql = None):
        """
            取所有记录
        """
        self.execute(sql, False)
        return self.cursor.fetchall()
 
    def __concat_keys(self, keys):
        return _concat_keys(keys)
 
    def __concat_values(self, values):
        return _concat_values(values)
 
    def table(self, *args):
        """
            设置查询的表,多个表名用逗号分隔
        """
        self.tables = args
        self.tables_snippet = self.__concat_keys(self.tables)
        return self
 
    def __wrap_value(self, value):
        return _wrap_value(value)
 
    def __wrap_values(self, values):
        return _wrap_values(values)
 
    def __wrap_fields(self, fields):
        return _wrap_fields(fields)
 
    def __where(self):
        # self.condition_snippet
        if hasattr(self, "condition_snippet"):
            self.where_snippet = " WHERE " + self.condition_snippet
 
    def __select(self):
        template = "SELECT %(keys)s FROM %(tables)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "keys" : self.__concat_keys(self.body_keys), 
        }
        self.sql = template % body_snippet_fields
 
    def __insert(self):
        template = "INSERT INTO %(tables)s (%(keys)s) VALUES (%(values)s)"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "keys" : self.__concat_keys(list(self.body_fields.keys())),
            "values" : self.__concat_values(list(self.body_fields.values()))
        }
        self.sql = template % body_snippet_fields
 
    def __update(self):
        template = "UPDATE %(tables)s SET %(fields)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet,
            "fields" : _concat_fields(self.body_fields, ("=",","))
        }
        self.sql = template % body_snippet_fields
 
    def __delete(self):
        template = "DELETE FROM %(tables)s"
        body_snippet_fields = {
            "tables" : self.tables_snippet
        }
        self.sql = template % body_snippet_fields
 
    def __build(self):
        {
            "SELECT": self.__select,
            "INSERT": self.__insert,
            "UPDATE": self.__update,
            "DELETE": self.__delete
        }[self.current_token]()
 
    def __unicode__(self):
        return self.sql
 
    def __str__(self):
        return self.__unicode__()
 
    def select(self, *args):
        self.current_token = "SELECT"
        self.body_keys = args
        self.__build()
        return self
 
    def insert(self, **kwargs):
        self.current_token = "INSERT"
        self.body_fields = self.__wrap_fields(kwargs)
        self.__build()
        return self
 
    def update(self, **kwargs):
        self.current_token = "UPDATE"
        self.body_fields = self.__wrap_fields(kwargs)
        self.__build()
        return self
 
    def delete(self, *conditions):
        self.current_token = "DELETE"
        self.__build()
        #if *conditions:
        self.where(*conditions)
        return self
 
    def where(self, *conditions):
        conditions = list(map(str, conditions))
        self.condition_snippet = " AND ".join(conditions)
        self.__where()
        if hasattr(self, "where_snippet"):
            self.sql += self.where_snippet
        return self

下面举几个例子供大家参考吧:

db = DataHelper("/home/wangye/sample.db3")
db.open() # 打开数据库
db.execute("""
    CREATE TABLE [staffs] (
      [staff_id] INTEGER PRIMARY KEY AUTOINCREMENT,
      [staff_name] TEXT NOT NULL,
      [staff_cardnum] TEXT NOT NULL,
      [staff_reserved] INTEGER NOT NULL
)
""") # 直接执行SQL语句,注意这里commit_at_once默认为True
 
db.table("staffs").insert(staff_name="John", staff_cardnum="1001", staff_reserved=0)
# 插入一条记录
 
rs = db.table("staffs").select("staff_id", "staff_name").fetchall()
# 直接取出所有staff_id和staff_name
 
rs = db.table("staffs").select("staff_name").where(DataCondition(("=", "AND"), id = 1)).fetchone()
# 取一条staff_id为1的staff_name
 
rs = db.table("staffs").select("staff_name").where(DataCondition(("<", "AND"), id = 100), DataCondition(("=", "AND"), staff_reserved = 1)).fetchone()
# 取一条id小于100并且staff_reserved为1的staff_name记录
 
db.close() # 关闭数据库

目前还没有让其支持星号(*)操作符,另外在多表同名列操作方面处理得也不是很好,这个只用于日常简单的脚本操作,最好不要用于生产环境,因为可能有未知问题。

若无特别说明,本网站文章均为原创,原则上这些文章不允许转载,但是如果阁下是出于研究学习目的可以转载到阁下的个人博客或者主页,转载遵循创作共同性“署名-非商业性使用-相同方式共享”原则,请转载时注明作者出处谢绝商业性、非署名、采集站、垃圾站或者纯粹为了流量的转载。谢谢合作!
请稍后...

发表评论

电子邮件地址不会被公开。 必填项已用*标注