""" SQLite数据库操作通用公共类 提供数据库连接、表管理、CRUD操作等通用功能 """ import sqlite3 import threading import time from typing import List, Dict, Any, Optional, Union from pathlib import Path from datetime import datetime class SQLiteError(Exception): """SQLite操作基础异常类""" def __init__(self, message: str, sqlite_error_code: int = None, sql: str = None, params: tuple = None, db_path: str = None): super().__init__(message) self.message = message self.sqlite_error_code = sqlite_error_code self.sql = sql self.params = params self.db_path = db_path def __str__(self): error_info = f"SQLiteError: {self.message}" if self.sqlite_error_code: error_info += f" (错误代码: {self.sqlite_error_code})" if self.sql: error_info += f"\nSQL: {self.sql}" if self.params: error_info += f"\n参数: {self.params}" if self.db_path: error_info += f"\n数据库: {self.db_path}" return error_info class SQLiteBusyError(SQLiteError): """SQLite BUSY错误异常(用于并发控制)""" def __init__(self, message: str, sqlite_error_code: int = None, sql: str = None, params: tuple = None, db_path: str = None, retry_attempt: int = None, max_retries: int = None): super().__init__(message, sqlite_error_code, sql, params, db_path) self.retry_attempt = retry_attempt self.max_retries = max_retries def __str__(self): error_info = f"SQLiteBusyError: {self.message}" if self.sqlite_error_code: error_info += f" (错误代码: {self.sqlite_error_code})" if self.retry_attempt is not None and self.max_retries is not None: error_info += f" (重试 {self.retry_attempt}/{self.max_retries})" if self.sql: error_info += f"\nSQL: {self.sql}" if self.params: error_info += f"\n参数: {self.params}" if self.db_path: error_info += f"\n数据库: {self.db_path}" return error_info def _handle_sqlite_error(error: sqlite3.Error, operation: str, sql: str = None, params: tuple = None, db_path: str = None, retry_attempt: int = None, max_retries: int = None) -> SQLiteError: """ 将SQLite错误转换为自定义异常 Args: error: 原始SQLite错误 operation: 操作描述 sql: 执行的SQL语句 params: SQL参数 db_path: 数据库路径 retry_attempt: 重试次数(用于BUSY错误) max_retries: 最大重试次数 Returns: SQLiteError: 对应的自定义异常 """ error_code = getattr(error, 'sqlite_errorcode', None) error_name = getattr(error, 'sqlite_errorname', None) error_message = f"{operation}失败: {str(error)}" if error_name: error_message += f" ({error_name})" # 如果是SQLITE_BUSY错误,返回SQLiteBusyError(用于并发控制) if error_name == 'SQLITE_BUSY': return SQLiteBusyError( message=error_message, sqlite_error_code=error_code, sql=sql, params=params, db_path=db_path, retry_attempt=retry_attempt, max_retries=max_retries ) # 其他所有错误都返回通用的SQLiteError return SQLiteError( message=error_message, sqlite_error_code=error_code, sql=sql, params=params, db_path=db_path ) class SQLiteHandler: """SQLite数据库操作通用类(单例模式)""" _lock = threading.Lock() # 单例锁 _instances = {} @classmethod def get_instance(cls,db_path, *args, **kwargs): # 使用文件路径作为键 key = db_path if key not in cls._instances: with cls._lock: if key not in cls._instances: # 双重检查 cls._instances[key] = cls(db_path, *args, **kwargs) return cls._instances[key] def __init__(self, db_path: str = "three.db", max_readers: int = 10, busy_timeout: int = 5000): """ 初始化SQLite处理器 Args: db_path: 数据库文件路径,默认为app.db max_readers: 最大并发读取数,默认为10 busy_timeout: SQLITE_BUSY超时时间(毫秒),默认为5000毫秒 """ self.db_path = db_path self.max_readers = max_readers self.busy_timeout = busy_timeout # 读写分离锁 self._read_lock = threading.RLock() # 读锁(允许多个读) self._write_lock = threading.Lock() # 写锁 self._active_readers = 0 # 当前活跃读取数 # 连接配置 self._connection_params = { # 是否检测相同的线程(False允许在不同线程间共享连接) "check_same_thread": False, # 设置数据库操作超时时间(秒) "timeout": 10 } # SQLITE_BUSY重试配置 self.busy_retry_attempts = 3 # 重试次数 self.busy_retry_delay = 1 # 重试延迟(秒) # 确保数据库目录存在 db_dir = Path(db_path).parent if not db_dir.exists(): db_dir.mkdir(parents=True, exist_ok=True) # 初始化数据库参数 self._setup_database_params() def _setup_database_params(self): """设置数据库连接参数""" try: # 创建临时连接来设置参数 conn = sqlite3.connect(self.db_path, **self._connection_params) # 启用WAL模式(Write-Ahead Logging) cursor = conn.execute("PRAGMA journal_mode = WAL") journal_mode = cursor.fetchone()[0] print(f"WAL模式设置: {journal_mode}") # 启用外键约束 conn.execute("PRAGMA foreign_keys = ON") # 设置SQLITE_BUSY超时 conn.execute(f"PRAGMA busy_timeout = {self.busy_timeout}") # 设置行工厂为字典形式 conn.row_factory = sqlite3.Row conn.close() print("数据库参数设置完成") except sqlite3.Error as e: print(f"设置数据库参数失败: {e}") raise def _create_connection(self) -> sqlite3.Connection: """创建新的数据库连接""" conn = sqlite3.connect(self.db_path, **self._connection_params) conn.set_trace_callback(lambda x: print(x)) conn.row_factory = sqlite3.Row return conn def _acquire_read_lock(self): """获取读锁""" with self._read_lock: if self._active_readers >= self.max_readers: raise SQLiteBusyError(f"已达到最大并发读取数: {self.max_readers}") self._active_readers += 1 def _release_read_lock(self): """释放读锁""" with self._read_lock: self._active_readers -= 1 def _acquire_write_lock(self): """获取写锁,超时时间为30秒""" if not self._write_lock.acquire(timeout=30): raise SQLiteBusyError("获取写锁超时(30秒),可能发生死锁") def execute_read(self, sql: str, params: tuple = None) -> list: """ 执行读操作SQL语句(使用读写锁) Args: sql: SQL语句 params: 参数元组 Returns: list: 查询结果列表 Raises: SQLiteError: SQL执行错误 SQLiteBusyError: 并发读取数超过限制 """ start_time = time.time() # 获取读锁 self._acquire_read_lock() conn = self._create_connection() try: if params: cursor = conn.execute(sql, params) else: cursor = conn.execute(sql) # 获取所有结果并返回,避免连接关闭后游标失效 results = cursor.fetchall() return results except sqlite3.Error as e: # 抛出自定义异常 raise _handle_sqlite_error(e, "执行读操作", sql, params, self.db_path) finally: # 关闭连接 if conn: conn.close() # 释放读锁 self._release_read_lock() # 记录执行时间 execution_time = time.time() - start_time if execution_time > 1.0: print(f"⚠️ 读操作执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") def execute_write(self, sql: str, params: tuple = None) -> sqlite3.Cursor: """ 执行写操作SQL语句(使用读写锁和BEGIN IMMEDIATE事务) Args: sql: SQL语句 params: 参数元组 Returns: sqlite3.Cursor: 执行结果游标 Raises: SQLiteError: SQL执行错误 SQLiteBusyError: SQLITE_BUSY错误,达到最大重试次数 """ start_time = time.time() # 获取写锁 self._acquire_write_lock() conn = self._create_connection() cursor = None try: # 使用BEGIN IMMEDIATE事务 for attempt in range(self.busy_retry_attempts): try: # 开始IMMEDIATE事务 conn.execute("BEGIN IMMEDIATE") # 执行SQL if params: cursor = conn.execute(sql, params) else: cursor = conn.execute(sql) # 提交事务 conn.commit() return cursor except sqlite3.OperationalError as e: # 如果是SQLITE_BUSY错误,进行重试 if "database is locked" in str(e) or "SQLITE_BUSY" in str(e): if attempt < self.busy_retry_attempts - 1: conn.rollback() print(f"SQLITE_BUSY错误,第{attempt + 1}次重试...") time.sleep(self.busy_retry_delay) continue else: # 最后一次尝试失败 conn.rollback() raise _handle_sqlite_error( e, "执行写操作", sql, params, self.db_path, attempt, self.busy_retry_attempts ) else: # 其他操作错误,直接抛出 conn.rollback() raise _handle_sqlite_error(e, "执行写操作", sql, params, self.db_path) except sqlite3.Error as e: # 其他SQLite错误,回滚并抛出 conn.rollback() raise _handle_sqlite_error(e, "执行写操作", sql, params, self.db_path) except sqlite3.Error as e: raise finally: # 关闭连接 if conn: conn.close() # 释放写锁 self._write_lock.release() # 记录执行时间 execution_time = time.time() - start_time if execution_time > 1.0: print(f"⚠️ 写操作执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") def executemany_write(self, sql: str, params_list: List[tuple]) -> sqlite3.Cursor: """ 执行批量写操作 Args: sql: SQL语句 params_list: 参数列表 Returns: sqlite3.Cursor: 游标对象 Raises: SQLiteError: SQL执行错误 SQLiteBusyError: SQLITE_BUSY错误,达到最大重试次数 """ start_time = time.time() # 获取写锁 self._acquire_write_lock() conn = self._create_connection() cursor = None try: # 使用BEGIN IMMEDIATE事务 for attempt in range(self.busy_retry_attempts): try: # 开始IMMEDIATE事务 conn.execute("BEGIN IMMEDIATE") # 执行批量SQL cursor = conn.cursor() cursor.executemany(sql, params_list) # 提交事务 conn.commit() return cursor except sqlite3.OperationalError as e: # 如果是SQLITE_BUSY错误,进行重试 if "database is locked" in str(e) or "SQLITE_BUSY" in str(e): if attempt < self.busy_retry_attempts - 1: conn.rollback() print(f"SQLITE_BUSY错误,第{attempt + 1}次重试...") time.sleep(self.busy_retry_delay) continue else: # 最后一次尝试失败 conn.rollback() raise _handle_sqlite_error( e, "执行批量写操作", sql, params_list, self.db_path, attempt, self.busy_retry_attempts ) else: # 其他操作错误,直接抛出 conn.rollback() raise _handle_sqlite_error(e, "执行批量写操作", sql, params_list, self.db_path) except sqlite3.Error as e: # 其他SQLite错误,回滚并抛出 conn.rollback() raise _handle_sqlite_error(e, "执行批量写操作", sql, params_list, self.db_path) except sqlite3.Error as e: raise finally: # 关闭连接 if conn: conn.close() # 释放写锁 self._write_lock.release() # 记录执行时间 execution_time = time.time() - start_time if execution_time > 1.0: print(f"⚠️ 批量写操作执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''} (批量数量: {len(params_list)})") def fetch_all(self, sql: str, params: tuple = None) -> List[Dict[str, Any]]: """ 查询所有记录(使用读连接) Args: sql: 查询SQL params: 查询参数 Returns: List[Dict[str, Any]]: 查询结果列表 """ start_time = time.time() conn = None try: # 获取读锁 self._acquire_read_lock() conn = self._create_connection() cursor = conn.cursor() # 执行查询 if params: cursor.execute(sql, params) else: cursor.execute(sql) rows = cursor.fetchall() return [dict(row) for row in rows] except sqlite3.Error as e: # 抛出自定义异常 raise _handle_sqlite_error(e, "执行读操作", sql, params, self.db_path) finally: # 关闭连接 if conn: conn.close() # 释放读锁 self._release_read_lock() # 记录执行时间 execution_time = time.time() - start_time if execution_time > 1.0: print(f"⚠️ fetch_all执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") def fetch_one(self, sql: str, params: tuple = None) -> Optional[Dict[str, Any]]: """ 查询单条记录(使用读连接) Args: sql: 查询SQL params: 查询参数 Returns: Optional[Dict[str, Any]]: 查询结果,如果没有记录返回None """ start_time = time.time() conn = None try: # 获取读锁 self._acquire_read_lock() conn = self._create_connection() cursor = conn.cursor() # 执行查询 if params: cursor.execute(sql, params) else: cursor.execute(sql) row = cursor.fetchone() return dict(row) if row else None except sqlite3.Error as e: # 抛出自定义异常 raise _handle_sqlite_error(e, "执行读操作", sql, params, self.db_path) finally: # 关闭连接 if conn: conn.close() # 释放读锁 self._release_read_lock() # 记录执行时间 execution_time = time.time() - start_time if execution_time > 1.0: print(f"⚠️ fetch_one执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") def insert(self, table: str, data: Dict[str, Any]) -> int: """ 插入单条记录(使用写连接) Args: table: 表名 data: 插入数据字典 Returns: int: 插入记录的主键ID """ columns = ', '.join(data.keys()) placeholders = ', '.join(['?'] * len(data)) sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})" cursor = self.execute_write(sql, tuple(data.values())) return cursor.lastrowid def insert_select(self, target_table: str, target_columns: List[str], source_table: str, source_columns: List[str], where_condition: str = None, where_params: tuple = None) -> int: """ 使用 INSERT INTO ... SELECT ... WHERE 语法从源表向目标表插入数据 Args: target_table: 目标表名 target_columns: 目标表的列名列表 source_table: 源表名 source_columns: 源表的列名列表,与目标表列一一对应 where_condition: WHERE条件语句(不含WHERE关键字) where_params: WHERE条件参数 Returns: int: 插入的记录数量 """ # 构建目标列和源列的字符串 target_cols_str = ', '.join(target_columns) source_cols_str = ', '.join(source_columns) # 构建基础SQL sql = f"INSERT INTO {target_table} ({target_cols_str}) SELECT {source_cols_str} FROM {source_table}" # 添加WHERE条件(如果有) if where_condition: sql += f" WHERE {where_condition}" # 执行SQL语句 cursor = self.execute_write(sql, where_params) return cursor.rowcount def insert_many(self, table: str, data_list: List[Dict[str, Any]]) -> int: """ 批量插入记录(使用写连接) Args: table: 表名 data_list: 数据字典列表 Returns: int: 插入的记录数量 """ if not data_list: return 0 columns = ', '.join(data_list[0].keys()) placeholders = ', '.join(['?'] * len(data_list[0])) sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})" params_list = [tuple(data.values()) for data in data_list] cursor = self.executemany_write(sql, params_list) return cursor.rowcount def update(self, table: str, data: Dict[str, Any], where: str, where_params: tuple = None) -> int: """ 更新记录(使用写连接) Args: table: 表名 data: 更新数据字典 where: WHERE条件 where_params: WHERE条件参数 Returns: int: 更新的记录数量 """ set_clause = ', '.join([f"{key} = ?" for key in data.keys()]) sql = f"UPDATE {table} SET {set_clause} WHERE {where}" params = tuple(data.values()) if where_params: params += where_params cursor = self.execute_write(sql, params) return cursor.rowcount def delete(self, table: str, where: str, where_params: tuple = None) -> int: """ 删除记录(使用写连接) Args: table: 表名 where: WHERE条件 where_params: WHERE条件参数 Returns: int: 删除的记录数量 """ sql = f"DELETE FROM {table} WHERE {where}" cursor = self.execute_write(sql, where_params) return cursor.rowcount def add_column(self, table_name: str, column_name: str, column_type: str) -> bool: """ 添加列到表(使用写连接) Args: table_name: 表名 column_name: 列名 column_type: 列数据类型 Returns: bool: 是否添加成功 """ if not self.table_exists(table_name): print(f"表 {table_name} 不存在") return False sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}" try: self.execute_write(sql) print(f"列 {column_name} 添加到表 {table_name} 成功") return True except sqlite3.Error as e: print(f"添加列 {column_name} 到表 {table_name} 失败: {e}") return False def table_exists(self, table_name: str) -> bool: """ 检查表是否存在(使用读连接) Args: table_name: 表名 Returns: bool: 表是否存在 """ sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?" result = self.fetch_one(sql, (table_name,)) return result is not None def create_table(self, table_name: str, columns: Dict[str, str]) -> bool: """ 创建表(使用写连接) Args: table_name: 表名 columns: 列定义字典,格式为 {列名: 数据类型} Returns: bool: 是否创建成功 """ if self.table_exists(table_name): print(f"表 {table_name} 已存在") return True # 构建CREATE TABLE语句 column_definitions = ', '.join([f"{col_name} {col_type}" for col_name, col_type in columns.items()]) sql = f"CREATE TABLE {table_name} ({column_definitions})" try: self.execute_write(sql) print(f"表 {table_name} 创建成功") return True except sqlite3.Error as e: print(f"创建表 {table_name} 失败: {e}") return False def get_table_info(self, table_name: str) -> List[Dict[str, Any]]: """ 获取表结构信息(使用读连接) Args: table_name: 表名 Returns: List[Dict[str, Any]]: 表结构信息列表 """ if not self.table_exists(table_name): print(f"表 {table_name} 不存在") return [] sql = f"PRAGMA table_info({table_name})" return self.fetch_all(sql) def get_table_count(self, table_name: str, where: str = None, where_params: tuple = None) -> int: """ 获取表记录数量(使用读连接) Args: table_name: 表名 where: WHERE条件 where_params: WHERE条件参数 Returns: int: 记录数量 """ sql = f"SELECT COUNT(*) as count FROM {table_name}" if where: sql += f" WHERE {where}" result = self.fetch_one(sql, where_params) return result['count'] if result else 0 def vacuum(self,max_retries=3): """执行VACUUM操作,优化数据库(使用写连接)""" for attempt in range(max_retries): try: self.execute_write("VACUUM") return True except SQLiteBusyError: if attempt < max_retries - 1: time.sleep(2 ** attempt) # 指数退避 else: raise def backup_database(self, backup_path: str) -> bool: """ 备份数据库到指定文件(使用写连接) Args: backup_path: 备份文件路径 Returns: bool: 备份是否成功 """ try: # 确保备份目录存在 backup_dir = Path(backup_path).parent if not backup_dir.exists(): backup_dir.mkdir(parents=True, exist_ok=True) # 使用SQLite的备份API source_conn = self._create_connection() backup_conn = sqlite3.connect(backup_path) # 执行备份 source_conn.backup(backup_conn) backup_conn.close() print(f"数据库备份成功: {backup_path}") return True except sqlite3.Error as e: print(f"数据库备份失败: {e}") return False def create_incremental_backup(self, backup_dir: str, max_backups: int = 10) -> bool: """ 创建增量备份,自动管理备份文件数量 Args: backup_dir: 备份目录 max_backups: 最大备份文件数量 Returns: bool: 备份是否成功 """ try: # 确保备份目录存在 backup_path = Path(backup_dir) if not backup_path.exists(): backup_path.mkdir(parents=True, exist_ok=True) # 生成备份文件名(包含时间戳) timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") db_name = Path(self.db_path).stem backup_file = backup_path / f"{db_name}_backup_{timestamp}.db" # 执行备份 success = self.backup_database(str(backup_file)) if success: # 清理旧的备份文件 backup_files = list(backup_path.glob(f"{db_name}_backup_*.db")) backup_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) # 删除超出数量的旧备份 for old_backup in backup_files[max_backups:]: old_backup.unlink() print(f"删除旧备份: {old_backup.name}") print(f"增量备份完成,当前备份数量: {min(len(backup_files), max_backups)}") return success except Exception as e: print(f"增量备份失败: {e}") return False # 使用示例 if __name__ == "__main__": # 创建数据库处理器 db = SQLiteHandler.get_instance("db/three.db", max_readers=50, busy_timeout=4000) # # 创建表 # columns = { # "id": "INTEGER PRIMARY KEY AUTOINCREMENT", # "name": "TEXT NOT NULL", # "age": "INTEGER", # "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP" # } # db.create_table("users", columns) # # 插入数据 # user_data = {"name": "张三", "age": 25} # user_id = db.insert("users", user_data) # print(f"插入用户ID: {user_id}") # # 批量插入 # users_data = [ # {"name": "李四", "age": 30}, # {"name": "王五", "age": 28} # ] # count = db.insert_many("users", users_data) # print(f"批量插入数量: {count}") # # 查询数据 # users = db.fetch_all("SELECT * FROM users") # print("所有用户:", users) # # 更新数据 # update_count = db.update("users", {"age": 26}, "id = ?", (user_id,)) # print(f"更新记录数: {update_count}") # # 查询单条 # user = db.fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) # print("单个用户:", user) # # 删除数据 # delete_count = db.delete("users", "age > ?", (25,)) # print(f"删除记录数: {delete_count}") # # 获取表信息 # table_info = db.get_table_info("users") # print("表结构:", table_info)