一个不使用构建器类,纯原生编写的动态 SQL 拼接方案。
这种方式更加直接,适合简单场景或需要完全控制 SQL 生成过程的情况。

核心实现思路
纯原生实现的关键是:
- 使用列表收集 SQL 片段和参数
- 根据条件动态添加片段到列表中
- 最后将列表拼接成完整 SQL 语句
- 始终使用参数化查询防止 SQL 注入
完整实现方案
1. 基础查询函数
import pymysql
from typing import Dict, List, Union, Optional, Tuple
def build_dynamic_query(table: str, filters: Dict, columns: Union[str, List[str]] = "*") -> Tuple[str, List]:
"""
构建动态查询SQL语句
Args:
table: 查询的表名
filters: 查询条件字典
columns: 要查询的字段列表
Returns:
完整的SQL语句和参数列表
"""
# 处理查询字段
if isinstance(columns, list):
columns_str = ", ".join(columns)
else:
columns_str = columns
# 初始化SQL片段和参数
sql_fragments = [f"SELECT {columns_str} FROM {table}"]
params = []
# 处理WHERE条件
where_clauses = []
# 处理等于查询
if 'eq' in filters:
for column, value in filters['eq'].items():
if value is not None:
where_clauses.append(f"{column} = %s")
params.append(value)
# 处理不等于查询
if 'ne' in filters:
for column, value in filters['ne'].items():
if value is not None:
where_clauses.append(f"{column} != %s")
params.append(value)
# 处理大于查询
if 'gt' in filters:
for column, value in filters['gt'].items():
if value is not None:
where_clauses.append(f"{column} > %s")
params.append(value)
# 处理大于等于查询
if 'ge' in filters:
for column, value in filters['ge'].items():
if value is not None:
where_clauses.append(f"{column} >= %s")
params.append(value)
# 处理小于查询
if 'lt' in filters:
for column, value in filters['lt'].items():
if value is not None:
where_clauses.append(f"{column} < %s")
params.append(value)
# 处理小于等于查询
if 'le' in filters:
for column, value in filters['le'].items():
if value is not None:
where_clauses.append(f"{column} <= %s")
params.append(value)
# 处理模糊查询
if 'like' in filters:
for column, value_info in filters['like'].items():
if isinstance(value_info, dict):
value = value_info.get('value')
mode = value_info.get('mode', 'both')
else:
value = value_info
mode = 'both'
if value:
if mode == 'both':
pattern = f"%{value}%"
elif mode == 'start':
pattern = f"{value}%"
elif mode == 'end':
pattern = f"%{value}"
else:
continue
where_clauses.append(f"{column} LIKE %s")
params.append(pattern)
# 处理包含查询
if 'in' in filters:
for column, values in filters['in'].items():
if values and isinstance(values, list) and len(values) > 0:
placeholders = ", ".join(["%s"] * len(values))
where_clauses.append(f"{column} IN ({placeholders})")
params.extend(values)
# 处理区间查询
if 'between' in filters:
for column, range_info in filters['between'].items():
min_val = range_info.get('min')
max_val = range_info.get('max')
if min_val is not None and max_val is not None:
where_clauses.append(f"{column} BETWEEN %s AND %s")
params.extend([min_val, max_val])
# 处理NULL查询
if 'null' in filters:
for column, is_null in filters['null'].items():
if is_null:
where_clauses.append(f"{column} IS NULL")
else:
where_clauses.append(f"{column} IS NOT NULL")
# 处理OR条件组
if 'or' in filters:
for or_group in filters['or']:
or_clauses = []
for condition in or_group:
column = condition.get('column')
operator = condition.get('operator')
value = condition.get('value')
if column and operator and value is not None:
or_clauses.append(f"{column} {operator} %s")
params.append(value)
if or_clauses:
where_clauses.append(f"({' OR '.join(or_clauses)})")
# 添加WHERE子句
if where_clauses:
sql_fragments.append("WHERE " + " AND ".join(where_clauses))
# 处理排序
if 'order_by' in filters:
order_clauses = []
for column, direction in filters['order_by'].items():
direction = direction.upper() if direction else 'ASC'
if direction not in ['ASC', 'DESC']:
direction = 'ASC'
order_clauses.append(f"{column} {direction}")
if order_clauses:
sql_fragments.append("ORDER BY " + ", ".join(order_clauses))
# 处理分页
if 'limit' in filters:
limit = filters['limit']
offset = filters.get('offset', 0)
sql_fragments.append("LIMIT %s OFFSET %s")
params.extend([limit, offset])
# 拼接完整SQL
sql = " ".join(sql_fragments)
return sql, params
2. 数据库连接和执行函数
def create_db_connection(host: str, user: str, password: str, database: str, port: int = 3306):
"""创建数据库连接"""
try:
connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
port=port,
cursorclass=pymysql.cursors.DictCursor,
charset='utf8mb4'
)
return connection
except Exception as e:
print(f"数据库连接失败: {e}")
raise
def execute_query(connection, sql: str, params: List = None) -> List[Dict]:
"""执行查询并返回结果"""
try:
with connection.cursor() as cursor:
cursor.execute(sql, params or [])
return cursor.fetchall()
except Exception as e:
print(f"查询执行失败: {e}")
print(f"SQL: {sql}")
print(f"Params: {params}")
raise
finally:
connection.close()
def execute_update(connection, sql: str, params: List = None) -> int:
"""执行更新操作"""
try:
with connection.cursor() as cursor:
result = cursor.execute(sql, params or [])
connection.commit()
return result
except Exception as e:
connection.rollback()
print(f"更新执行失败: {e}")
print(f"SQL: {sql}")
print(f"Params: {params}")
raise
finally:
connection.close()
3. 查询执行封装函数
def dynamic_search(table: str, filters: Dict, db_config: Dict,columns: Union[str, List[str]] = "*") -> List[Dict]:
"""
动态查询函数
Args:
table: 表名
filters: 查询条件
columns: 查询字段
db_config: 数据库配置
Returns:
查询结果列表
"""
# 构建SQL和参数
sql, params = build_dynamic_query(table, filters, columns)
# 创建数据库连接
connection = create_db_connection(**db_config)
# 执行查询
return execute_query(connection, sql, params)
使用示例
示例 1: 基础查询
# 数据库配置
db_config = {
'host': 'localhost',
'user': 'your_username',
'password': 'your_password',
'database': 'your_database'
}
# 示例1: 简单查询
filters1 = {
'eq': {'status': 'active'},
'ge': {'price': 1000},
'le': {'price': 5000},
'like': {'name': '手机'}
}
# 执行查询
results1 = dynamic_search('products', filters1, db_config=db_config)
print(f"查询结果1: {len(results1)} 条记录")
# 生成的SQL:
# SELECT * FROM products WHERE status = %s AND price >= %s AND price <= %s AND name LIKE %s
# 参数: ['active', 1000, 5000, '%手机%']
示例 2: 复杂查询(包含 OR 条件和分页)
# 示例2: 复杂查询
filters2 = {
'in': {'category_id': [1, 3, 5]},
'between': {'created_at': {'min': '2023-01-01', 'max': '2023-12-31'}},
'or': [
[
{'column': 'name', 'operator': 'LIKE', 'value': '%苹果%'},
{'column': 'brand', 'operator': '=', 'value': 'Apple'}
]
],
'order_by': {'price': 'ASC', 'created_at': 'DESC'},
'limit': 20,
'offset': 0
}
results2 = dynamic_search('products', filters2, ['id', 'name', 'price', 'brand'], db_config=db_config)
print(f"查询结果2: {len(results2)} 条记录")
# 生成的SQL:
# SELECT id, name, price, brand FROM products
# WHERE category_id IN (%s, %s, %s)
# AND created_at BETWEEN %s AND %s
# AND (name LIKE %s OR brand = %s)
# ORDER BY price ASC, created_at DESC
# LIMIT %s OFFSET %s
# 参数: [1, 3, 5, '2023-01-01', '2023-12-31', '%苹果%', 'Apple', 20, 0]
示例 3: 动态条件组合
def search_products(filters: Dict) -> List[Dict]:
"""产品搜索函数"""
# 基础查询条件
query_filters = {}
# 处理名称搜索
if 'keyword' in filters and filters['keyword']:
query_filters['like'] = {
'name': {'value': filters['keyword'], 'mode': 'both'},
'description': {'value': filters['keyword'], 'mode': 'both'}
}
# 处理价格筛选
price_filters = {}
if 'min_price' in filters and filters['min_price']:
price_filters['ge'] = {'price': filters['min_price']}
if 'max_price' in filters and filters['max_price']:
price_filters['le'] = {'price': filters['max_price']}
if price_filters:
query_filters.update(price_filters)
# 处理分类筛选
if 'category' in filters and filters['category']:
if isinstance(filters['category'], list):
query_filters['in'] = {'category_id': filters['category']}
else:
query_filters['eq'] = {'category_id': filters['category']}
# 处理状态筛选
if 'status' in filters and filters['status']:
query_filters['eq'] = {'status': filters['status']}
# 处理排序
if 'sort' in filters and filters['sort']:
sort_info = filters['sort'].split(':')
sort_column = sort_info[0]
sort_dir = sort_info[1].upper() if len(sort_info) > 1 else 'ASC'
query_filters['order_by'] = {sort_column: sort_dir}
# 处理分页
if 'page' in filters and 'page_size' in filters:
page = filters['page']
page_size = filters['page_size']
offset = (page - 1) * page_size
query_filters['limit'] = page_size
query_filters['offset'] = offset
return dynamic_search('products', query_filters, db_config=db_config)
# 使用动态搜索函数
search_params = {
'keyword': '笔记本电脑',
'min_price': 3000,
'max_price': 8000,
'category': [2, 4],
'sort': 'price:asc',
'page': 1,
'page_size': 15
}
products = search_products(search_params)
print(f"搜索结果: {len(products)} 条记录")
支持的查询条件类型
|
条件类型 |
关键字 |
示例 |
说明 |
|
等于 |
eq |
{'eq': {'status': 'active'}} |
字段等于指定值 |
|
不等于 |
ne |
{'ne': {'status': 'deleted'}} |
字段不等于指定值 |
|
大于 |
gt |
{'gt': {'price': 1000}} |
字段大于指定值 |
|
大于等于 |
ge |
{'ge': {'price': 1000}} |
字段大于等于指定值 |
|
小于 |
lt |
{'lt': {'price': 5000}} |
字段小于指定值 |
|
小于等于 |
le |
{'le': {'price': 5000}} |
字段小于等于指定值 |
|
模糊查询 |
like |
{'like': {'name': '手机'}} |
支持前缀、后缀、全匹配 |
|
包含查询 |
in |
{'in': {'category_id': [1,2,3]}} |
字段值在指定列表中 |
|
区间查询 |
between |
{'between': {'price': {'min': 1000, 'max': 5000}}} |
字段值在指定区间内 |
|
NULL 查询 |
null |
{'null': {'description': True}} |
字段值为 NULL 或非 NULL |
|
OR 条件组 |
or |
{'or': [[{'column': 'name', 'operator': 'LIKE', 'value': '%苹果%'}]]} |
OR 连接的条件组 |
|
排序 |
order_by |
{'order_by': {'price': 'ASC'}} |
结果排序 |
|
分页 |
limit + offset |
{'limit': 20, 'offset': 0} |
结果分页 |
安全性思考
- 防止 SQL 注入:所有参数都使用%s占位符参数值通过 pymysql 的参数化查询传递永远不直接拼接用户输入到 SQL 字符串中
- 输入验证:提议在使用前验证输入参数的类型和格式可以添加字段白名单,限制允许查询的字段
- SQL 注入防护示例:
# 安全的方式
filters = {'eq': {'username': user_input}}
# 不安全的方式 (永远不要这样做!)
# sql = f"SELECT * FROM users WHERE username = '{user_input}'"
性能优化提议
- 索引优化:为常用查询条件的字段建立索引特别是用于 WHERE、ORDER BY 的字段
- 查询优化:只查询需要的字段,避免使用SELECT *合理使用 LIMIT 限制返回结果数量
- 连接管理:思考使用连接池管理数据库连接避免频繁创建和关闭连接
总结
这个纯原生的动态 SQL 拼接方案具有以下特点:
- 简洁直接: 不使用复杂的类结构,代码易于理解和维护
- 安全可靠: 严格使用参数化查询,有效防止 SQL 注入
- 功能完整: 支持各种常见的查询条件类型
- 灵活扩展: 容易添加新的查询条件类型
- 易于集成: 可以方便地集成到现有的项目中
这种方式适合对 SQL 生成过程需要完全控制的场景,或者项目规模较小、不需要复杂抽象的情况。在实际使用中,你可以根据具体需求对这个基础方案进行调整和扩展。


