From bd0815d0e79a3bc757bab6cb24167841e0554427 Mon Sep 17 00:00:00 2001 From: fujinliang Date: Fri, 31 Oct 2025 14:30:42 +0800 Subject: [PATCH] db-api --- busisness/blls.py | 53 ++ busisness/dals.py | 204 +++++ busisness/models.py | 310 ++++++++ common/__init__.py | 5 + common/ini_handler.py | 278 +++++++ common/logging_service.py | 84 +++ common/sqlite_handler.py | 838 +++++++++++++++++++++ config/__pycache__/__init__.cpython-39.pyc | Bin 150 -> 0 bytes config/__pycache__/settings.cpython-39.pyc | Bin 1529 -> 0 bytes config/ini_manager.py | 157 ++++ config/settings.py | 4 + controller/main_controller.py | 2 +- db/three.db | Bin 0 -> 53248 bytes db/three.db-shm | Bin 0 -> 32768 bytes db/three.db-wal | Bin 0 -> 45352 bytes doc/table表设计.doc | Bin 0 -> 87603 bytes doc/~$ble表设计.doc | Bin 0 -> 162 bytes logs/app.log | 29 +- resources/resources_rc.py | 2 +- service/__init__.py | 1 + service/api_http_client.py | 285 +++++++ service/mould_service.py | 158 ++++ settings.ini | 12 + 23 files changed, 2394 insertions(+), 28 deletions(-) create mode 100644 busisness/blls.py create mode 100644 busisness/dals.py create mode 100644 busisness/models.py create mode 100644 common/__init__.py create mode 100644 common/ini_handler.py create mode 100644 common/logging_service.py create mode 100644 common/sqlite_handler.py delete mode 100644 config/__pycache__/__init__.cpython-39.pyc delete mode 100644 config/__pycache__/settings.cpython-39.pyc create mode 100644 config/ini_manager.py create mode 100644 db/three.db create mode 100644 db/three.db-shm create mode 100644 db/three.db-wal create mode 100644 doc/table表设计.doc create mode 100644 doc/~$ble表设计.doc create mode 100644 service/__init__.py create mode 100644 service/api_http_client.py create mode 100644 service/mould_service.py create mode 100644 settings.ini diff --git a/busisness/blls.py b/busisness/blls.py new file mode 100644 index 0000000..d35259c --- /dev/null +++ b/busisness/blls.py @@ -0,0 +1,53 @@ +from dataclasses import fields +from typing import List, Optional, Dict, Any +from datetime import datetime +from models import ArtifactInfoModel,PDRecordModel +from dals import ArtifactDal,PDRecordDal + +class ArtifactBll: + def __init__(self): + """初始化数据访问层,创建数据库连接""" + # 假设数据库文件在db目录下 + self.dal = ArtifactDal() + pass + + def get_artifact_task(self) -> List[ArtifactInfoModel]: + """获取官片任务数据""" + return self.dal.get_top_artifact(5,"ArtifactID asc") + +class PDRecordBll: + def __init__(self): + """初始化数据访问层,创建数据库连接""" + # 假设数据库文件在db目录下 + self.dal = PDRecordDal() + pass + + def get_PD_record(self) -> List[PDRecordModel]: + """获取PD官片任务数据""" + return self.dal.get_top_pd(5,"ID desc") + + +if __name__ == "__main__": + artifact_dal = ArtifactBll() + + artifacts = artifact_dal.get_artifact_task() + print("\n打印artifacts数据:") + for i, artifact in enumerate(artifacts): + # 如果是数据类对象,转换为字典输出 + if hasattr(artifact, '__dataclass_fields__'): + print(f"第{i+1}条: {artifact.__dict__}") + else: + print(f"第{i+1}条: {artifact}") + + pdrecord_dal=PDRecordBll() + pdrecords = pdrecord_dal.get_PD_record() + print("\n打印pdrecords数据:") + for i, record in enumerate(pdrecords): + # 如果是数据类对象,转换为字典输出 + if hasattr(record, '__dataclass_fields__'): + print(f"第{i+1}条: {record.__dict__}") + else: + print(f"第{i+1}条: {record}") + + + print('\ntest success') \ No newline at end of file diff --git a/busisness/dals.py b/busisness/dals.py new file mode 100644 index 0000000..3d8b45e --- /dev/null +++ b/busisness/dals.py @@ -0,0 +1,204 @@ +from dataclasses import fields +from typing import List, Optional, Dict, Any +from datetime import datetime +from models import ArtifactInfoModel,PDRecordModel +from common.sqlite_handler import SQLiteHandler + + +def filter_dict_for_model(data_dict: Dict[str, Any], model_class) -> Dict[str, Any]: + """过滤字典,只保留模型中定义的字段""" + # 获取模型中定义的所有字段名称 + model_fields = {field.name for field in fields(model_class)} + # 过滤字典,只保留模型中存在的字段 + return {k: v for k, v in data_dict.items() if k in model_fields} + +class BaseDal: + def __init__(self) -> None: + """初始化数据访问层,创建数据库连接""" + # 假设数据库文件在db目录下 + self.db_dao = SQLiteHandler.get_instance("db/three.db", max_readers=50, busy_timeout=4000) + +class ArtifactDal(BaseDal): + def __init__(self): + super().__init__() + + def get_all(self) -> List[ArtifactInfoModel]: + """获取所有ArtifactTask记录""" + try: + # 查询所有记录 + sql = "SELECT * FROM ArtifactTask" + results = self.db_dao.execute_read(sql) + + # 将查询结果转换为ArtifactInfo对象列表 + artifacts = [] + for row in results: + # 过滤字典,只保留模型中定义的字段 + filtered_data = filter_dict_for_model(dict(row), ArtifactInfoModel) + artifact = ArtifactInfoModel(**filtered_data) + artifacts.append(artifact) + + return artifacts + except Exception as e: + print(f"获取所有构件任务失败: {e}") + return [] + + def get_top_artifact(self, top: int,desc:str="ArtifactID asc",where:str="1=1") -> List[ArtifactInfoModel]: + """获取top条数数据,根据ArtifactID升序""" + try: + # 确保top为正整数 + if not isinstance(top, int) or top <= 0: + raise ValueError("top参数必须是正整数") + + # 查询指定数量的记录,按ArtifactID升序排列 + sql = f"SELECT * FROM ArtifactTask WHERE {where} ORDER BY {desc} LIMIT ?" + results = self.db_dao.execute_read(sql, (top,)) + + # 将所有查询结果转换为ArtifactInfoModel对象列表 + artifacts = [] + for row in results: + # 保证row的变量和模板变量一致 + artifact = ArtifactInfoModel() + artifact.ArtifactID=row["ArtifactID"] + artifact.ProduceRingNumber=row["ProduceRingNumber"] + artifact.MouldCode=row["MouldCode"] + artifact.SkeletonID=row["SkeletonID"] + artifact.RingTypeCode=row["RingTypeCode"] + artifact.SizeSpecification=row["SizeSpecification"] + artifact.BuriedDepth=row["BuriedDepth"] + artifact.BlockNumber=row["BlockNumber"] + artifact.BetonVolume=row["BetonVolume"] + artifact.BetonTaskID=row["BetonTaskID"] + artifact.Status=row["Status"] + artifacts.append(artifact) + + return artifacts + except Exception as e: + print(f"获取top构件任务失败: {e}") + return [] + + + def get_by_id(self, artifact_id: int) -> Optional[ArtifactInfoModel]: + """根据构件ID获取构件任务""" + try: + sql = "SELECT * FROM ArtifactTask WHERE ArtifactID = ?" + results = self.db_dao.execute_read(sql, (artifact_id,)) + + rows = list(results) + if not rows: + return None + + return ArtifactInfoModel(**dict(rows[0])) + except Exception as e: + print(f"根据ID获取构件任务失败: {e}") + return None + + def insert_artifact(self, artifact_data: dict) -> Optional[int]: + """插入一条构件任务记录""" + try: + # 使用insert方法插入数据 + row_id = self.db_dao.insert("ArtifactTask", artifact_data) + return row_id + except Exception as e: + print(f"插入构件任务失败: {e}") + return None + + def update_artifact(self, artifact_id: int, update_data: dict) -> bool: + """更新构件任务记录""" + try: + # 构建WHERE条件 + where_condition = {"ArtifactID": artifact_id} + # 使用update方法更新数据 + affected_rows = self.db_dao.update("ArtifactTask", update_data, where_condition) + return affected_rows > 0 + except Exception as e: + print(f"更新构件任务失败: {e}") + return False + + def validate_artifact(self, artifact_info: ArtifactInfoModel) -> bool: + """验证构件信息是否符合业务规则""" + try: + # 检查必要字段 + if not hasattr(artifact_info, 'Name') or not artifact_info.Name: + return False + + # 可以添加更多业务规则验证 + # 例如:检查构件编号格式、验证日期等 + + return True + except Exception as e: + print(f"验证构件信息失败: {e}") + return False + +class PDRecordDal(BaseDal): + def __init__(self): + super().__init__() + + def get_top_pd(self, top: int,desc:str="ID desc",where:str="1=1") -> List[PDRecordModel]: + """获取top条数数据,根据ID降序""" + try: + # 确保top为正整数 + if not isinstance(top, int) or top <= 0: + raise ValueError("top参数必须是正整数") + + # 查询指定数量的记录,按ID降序排列 + sql = f"SELECT * FROM PDRecord WHERE {where} ORDER BY {desc} LIMIT ?" + results = self.db_dao.execute_read(sql, (top,)) + + pdrecords = [] + for row in results: + # + pdrecord = PDRecordModel() + pdrecord.ID=row["ID"] + pdrecord.TaskID=row["TaskID"] + pdrecord.ProjectName=row["ProjectName"] + pdrecord.ProduceMixID=row["ProduceMixID"] + pdrecord.VinNo=row["VinNo"] + pdrecord.BetonVolume=row["BetonVolume"] + pdrecord.Status=row["Status"] + pdrecord.OptTime=row["OptTime"] + pdrecords.append(pdrecord) + + return pdrecords + except Exception as e: + print(f"获取top PD官片任务失败: {e}") + return [] + + + + + +if __name__ == "__main__": + artifact_dal = ArtifactDal() + + artifacts = artifact_dal.get_artifact_task() + + # 显示获取到的数据 + print(f"获取到 {len(artifacts)} 条构件任务数据:") + for i, artifact in enumerate(artifacts, 1): + print(f"\n记录 {i}:") + print(f"ID: {artifact.ID}") + print(f"ArtifactID: {artifact.ArtifactID}") + print(f"ProduceRingNumber: {artifact.ProduceRingNumber}") + print(f"MouldCode: {artifact.MouldCode}") + print(f"SkeletonID: {artifact.SkeletonID}") + print(f"RingTypeCode: {artifact.RingTypeCode}") + print(f"SizeSpecification: {artifact.SizeSpecification}") + print(f"BuriedDepth: {artifact.BuriedDepth}") + print(f"BlockNumber: {artifact.BlockNumber}") + print(f"HoleRingMarking: {artifact.HoleRingMarking}") + print(f"GroutingPipeMarking: {artifact.GroutingPipeMarking}") + print(f"PolypropyleneFiberMarking: {artifact.PolypropyleneFiberMarking}") + print(f"BetonVolume: {artifact.BetonVolume}") + print(f"BetonTaskID: {artifact.BetonTaskID}") + print(f"FK_PDID: {artifact.FK_PDID}") + print(f"Status: {artifact.Status}") + print(f"BeginTime: {artifact.BeginTime}") + print(f"EndTime: {artifact.EndTime}") + print(f"PStatus: {artifact.PStatus}") + print(f"Source: {artifact.Source}") + print(f"FBetonVolume: {artifact.FBetonVolume}") + print(f"OptTime: {artifact.OptTime}") + + # 可以打印更多字段... + + print('\ntest success') \ No newline at end of file diff --git a/busisness/models.py b/busisness/models.py new file mode 100644 index 0000000..6b9b36f --- /dev/null +++ b/busisness/models.py @@ -0,0 +1,310 @@ +from dataclasses import dataclass +from typing import Optional, Dict, Any, List + + +@dataclass +class LoginRequest: + """登录请求模型""" + Program: int + SC: str + loginName: str + password: str + + +@dataclass +class LoginResponse: + """登录响应模型""" + Code: int + Message: Optional[str] + Data: Optional[Dict[str, Any]] + + @property + def app_id(self) -> Optional[str]: + """获取AppID""" + return self.Data.get('AppID') if self.Data else None + + @property + def expire_time(self) -> Optional[str]: + """获取过期时间""" + return self.Data.get('ExpireTime') if self.Data else None + + @property + def sign_token(self) -> Optional[str]: + """获取SignToken""" + return self.Data.get('SignToken') if self.Data else None + + @property + def zr_jwt(self) -> Optional[str]: + """获取ZrJwt""" + return self.Data.get('ZrJwt') if self.Data else None + + +@dataclass +class ArtifactInfo: + """管片信息模型""" + #管片编号 + ArtifactID: str + #管片ID + ArtifactActionID: int + #管片副标识1 + ArtifactIDVice1: str + #产品环号 + ProduceRingNumber: int + #模具编号 + MouldCode: str + #骨架ID + SkeletonID: str + #环类型编码 + RingTypeCode: str + #尺寸规格 + SizeSpecification: str + #埋深 + BuriedDepth: str + #块号 + BlockNumber: str + #环号标记 + HoleRingMarking: str + #出管标记 + GroutingPipeMarking: str + #聚丙烯纤维标记 + PolypropyleneFiberMarking: str + # 浇筑方量 + BetonVolume: float + # 任务单号(混凝土) + BetonTaskID: str + +@dataclass +class ArtifactInfoModel: + def __init__(self): + pass + """管片表模型""" + ID: int + #管片编号 + ArtifactID: str + #管片ID + ArtifactActionID: int + #管片副标识1 + ArtifactIDVice1: str + #产品环号 + ProduceRingNumber: int + #模具编号 + MouldCode: str + #骨架ID + SkeletonID: str + #环类型编码 + RingTypeCode: str + #尺寸规格 + SizeSpecification: str + #埋深 + BuriedDepth: str + #块号 + BlockNumber: str + #环号标记 + HoleRingMarking: str + #出管标记 + GroutingPipeMarking: str + #聚丙烯纤维标记 + PolypropyleneFiberMarking: str + # 浇筑方量 + BetonVolume: float + # 任务单号 + BetonTaskID: str + #FK_PDID + FK_PDID: int=0 + #状态 + Status: int=1 + #开始时间 + BeginTime: str="" + #结束时间 + EndTime: str="" + #PStatus + PStatus: int=0 + #Source + Source: int=1 + #FBetonVolume + FBetonVolume: float=0.0 + #OptTime + OptTime: str="" + + +@dataclass +class ArtifactResponse: + """管片信息响应模型""" + Code: int + Message: Optional[str] + Data: Optional[ArtifactInfo] + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'ArtifactResponse': + """ + 从字典创建响应对象 + + Args: + data: 响应数据字典 + + Returns: + ArtifactResponse: 响应对象 + """ + response_data = data.get('Data') + artifact_info = None + if response_data: + artifact_info = ArtifactInfo(**response_data) + + return cls( + Code=data.get('Code'), + Message=data.get('Message'), + Data=artifact_info + ) + + +@dataclass +class TaskInfo: + """任务单信息模型""" + #任务单ID + TaskID: str + #任务单状态 + TaskStatus: int + #任务单状态文本 + TaskStatusText: str + #计划生产数量 + TaskCount: int + #已生产数量 + AlreadyProduceCount: int + #生产进度 + Progress: str + #工程名称 + ProjectName: str + #计划生产日期 + TaskPlanDateText: str + #强度等级 + BetonGrade: str + #设计配合比编号 + MixID: str + #生产配合比编号 + ProduceMixID: str + #计划方量 + PlannedVolume: float + #已供方量 + ProducedVolume: float + #出洞环标记 + HoleRingMarking: str + #注浆管标记 + GroutingPipeMarking: str + #聚丙烯纤维标记 + PolypropyleneFiberMarking: str + #生产日期 + TaskDateText: str + #盘数 + PlateCount: int + #任务单下发状态 + SendStatus: int + #任务单下发状态 + SendStatusText: str + #配合比下发状态 + MixSendStatus: int + #配合比下发状态 + MixSendStatusText: str + + +@dataclass +class TaskResponse: + """任务单响应模型""" + Code: int + Message: Optional[str] + Data: Optional[TaskInfo] + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'TaskResponse': + """ + 从字典创建响应对象 + + Args: + data: 响应数据字典 + + Returns: + TaskResponse: 响应对象 + """ + response_data = data.get('Data') + task_info = None + if response_data: + task_info = TaskInfo(**response_data) + + return cls( + Code=data.get('Code'), + Message=data.get('Message'), + Data=task_info + ) + + +@dataclass +class NotPourArtifactResponse: + """未浇筑管片列表响应模型""" + Code: int + Message: Optional[str] + Data: Optional[List[ArtifactInfo]] + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> 'NotPourArtifactResponse': + """ + 从字典创建响应对象 + + Args: + data: 响应数据字典 + + Returns: + NotPourArtifactResponse: 响应对象 + """ + response_data = data.get('Data') + artifacts = None + if response_data: + artifacts = [ArtifactInfo(**item) for item in response_data] + + return cls( + Code=data.get('Code'), + Message=data.get('Message'), + Data=artifacts + ) + + +@dataclass +class PDRecordModel: + def __init__(self): + pass + """管片表模型""" + ID: int + #派单编号 + PDCode: str + #任务单号 + TaskID: int + #工程名称 + ProjectName: str + #生产配合比编号 + ProduceMixID: str + #车架号 + VinNo: str + #派单方量 + BetonVolume: float + #模具编号 + MouldCode: str + #骨架编号 + SkeletonID: str + #环类型编码 + RingTypeCode: str + #尺寸规格 + SizeSpecification: str + #埋深 + BuriedDepth: str + #块号 + BlockNumber: str + # 派单模式(1自动派单 2手动派单0未知 ) + Mode: int=0 + # 派单状态(1计划中2已下发0未知),默认1 + Status: int=1 + #搅拌生产状态() + GStatus: int=0 + #数据来源(1 api 2离线) + Source: int=1 + #创建时间 + CreateTime: str="" + #派单时间(下发) + OptTime: str="" diff --git a/common/__init__.py b/common/__init__.py new file mode 100644 index 0000000..0cf7fa7 --- /dev/null +++ b/common/__init__.py @@ -0,0 +1,5 @@ +# Common utilities package + +from .sqlite_handler import SQLiteHandler + +__all__ = ['SQLiteHandler'] \ No newline at end of file diff --git a/common/ini_handler.py b/common/ini_handler.py new file mode 100644 index 0000000..6f19baf --- /dev/null +++ b/common/ini_handler.py @@ -0,0 +1,278 @@ +import os +import configparser +from typing import Dict, Any, Optional, List + + +class IniHandlerError(Exception): + """INI处理器异常基类""" + def __init__(self, message: str, file_path: str = None): + self.file_path = file_path + if file_path: + message = f"{message} (文件: {file_path})" + super().__init__(message) + +class IniHandler: + """ + INI文件操作处理器(基于文件路径的缓存) + """ + + _instances = {} # 按文件路径缓存的实例 + + def __new__(cls, file_path: str): + if file_path not in cls._instances: + instance = super().__new__(cls) + instance._file_path = file_path + instance._config = None # 单个文件的配置对象 + cls._instances[file_path] = instance + return cls._instances[file_path] + + def __init__(self, file_path: str): + """ + 初始化INI处理器 + + Args: + file_path: INI文件路径 + """ + # 文件路径已经在__new__中设置,这里不需要重复设置 + pass + + def load_config(self) -> configparser.ConfigParser: + """ + 加载并缓存INI配置文件 + """ + if self._config is not None: + return self._config + + if not os.path.exists(self._file_path): + raise IniHandlerError("INI文件不存在", self._file_path) + + try: + config = configparser.ConfigParser() + config.read(self._file_path, encoding='utf-8') + self._config = config + return config + except Exception as e: + raise IniHandlerError(f"读取INI文件失败: {e}", self._file_path) + + + def get_value(self, section: str, option: str) -> Any: + """ + 获取INI文件中的值 + + Args: + section: 节名 + option: 选项名 + default: 默认值 + + Returns: + 获取的值,如果不存在则返回默认值 + """ + try: + config = self.load_config() + if config.has_section(section) and config.has_option(section, option): + return config.get(section, option) + except IniHandlerError: + # 文件不存在或读取失败时返回默认值 + return default + except Exception as e: + raise IniHandlerError(f"获取INI值失败: {e}", self._file_path) + + def get_int_value(self, section: str, option: str) -> int: + """ + 获取INI文件中的整数值 + + Args: + section: 节名 + option: 选项名 + default: 默认值 + + Returns: + 获取的整数值,如果不存在或转换失败则返回默认值 + """ + try: + config = self.load_config() + if config.has_section(section) and config.has_option(section, option): + return config.getint(section, option) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"获取INI整数值失败: {e}", self._file_path) + + def get_float_value(self, section: str, option: str) -> float: + """ + 获取INI文件中的浮点数值 + Args: + section: 节名 + option: 选项名 + default: 默认值 + + Returns: + 获取的浮点数值,如果不存在或转换失败则返回默认值 + """ + try: + config = self.load_config() + if config.has_section(section) and config.has_option(section, option): + return config.getfloat(section, option) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"获取INI浮点数值失败: {e}", self._file_path) + + def get_boolean_value(self, section: str, option: str) -> bool: + """ + 获取INI文件中的布尔值 + Args: + section: 节名 + option: 选项名 + default: 默认值 + + Returns: + 获取的布尔值,如果不存在或转换失败则返回默认值 + """ + try: + config = self.load_config() + if config.has_section(section) and config.has_option(section, option): + return config.getboolean(section, option) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"获取INI布尔值失败: {e}", self._file_path) + + + def get_section(self, section: str) -> Dict[str, str]: + """ + 获取INI文件中的整个节 + Args: + section: 节名 + Returns: + 节中所有键值对的字典,如果节不存在则返回空字典 + """ + try: + config = self.load_config() + if config.has_section(section): + return dict(config[section]) + return {} + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"获取INI节失败: {e}", self._file_path) + + def get_sections(self) -> List[str]: + """ + 获取INI文件中的所有节名 + Returns: + 节名列表 + """ + try: + config = self.load_config() + return config.sections() + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"获取INI节名列表失败: {e}", self._file_path) + + def remove_section(self, section: str) -> None: + """ + 移除INI文件中的节 + Args: + section: 节名 + """ + try: + config = self.load_config() + if config.has_section(section): + config.remove_section(section) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"移除INI节失败: {e}", self._file_path) + + def remove_option(self, section: str, option: str) -> None: + """ + 移除INI文件中的选项 + Args: + section: 节名 + option: 选项名 + """ + try: + config = self.load_config() + if config.has_section(section) and config.has_option(section, option): + config.remove_option(section, option) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"移除INI选项失败: {e}", self._file_path) + + def has_section(self, section: str) -> bool: + """ + 检查INI文件中是否存在指定的节 + + Args: + section: 节名 + + Returns: + bool: 是否存在 + """ + try: + config = self.load_config() + return config.has_section(section) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"检查INI节失败: {e}", self._file_path) + + def has_option(self, section: str, option: str) -> bool: + """ + 检查INI文件中是否存在指定的选项 + + Args: + section: 节名 + option: 选项名 + Returns: + bool: 是否存在 + """ + try: + config = self.load_config() + return config.has_option(section, option) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"检查INI选项失败: {e}", self._file_path) + + def write_ini_file(self) -> bool: + """ + 写入INI文件(持久化,写入前先set_value,set_value设置1或多个值后调用此方法) + Returns: + bool: 是否写入成功 + """ + try: + # 确保目录存在 + dir_path = os.path.dirname(self._file_path) + if dir_path and not os.path.exists(dir_path): + os.makedirs(dir_path) + # 写入文件 + with open(self._file_path, 'w', encoding='utf-8') as f: + self._config.write(f) + return True + except Exception as e: + raise IniHandlerError(f"写入INI文件失败: {e}", self._file_path) + + def set_value(self, section: str, option: str, value: Any) -> None: + """ + 设置INI文件中的值 + + Args: + section: 节名 + option: 选项名 + value: 要设置的值 + """ + try: + config = self.load_config() + if not config.has_section(section): + config.add_section(section) + config.set(section, option, str(value)) + except IniHandlerError: + raise + except Exception as e: + raise IniHandlerError(f"设置INI值失败: {e}", self._file_path) + + \ No newline at end of file diff --git a/common/logging_service.py b/common/logging_service.py new file mode 100644 index 0000000..10a244a --- /dev/null +++ b/common/logging_service.py @@ -0,0 +1,84 @@ +import logging +import queue +import threading +import time +from logging.handlers import TimedRotatingFileHandler +from PySide6.QtCore import Signal, QObject +import os + + + +# class QTextLogger(logging.Handler): +# def __init__(self): +# super().__init__() + +# def emit(self, record): +# pass + + +class LoggingService(QObject): + # log_info_signal = Signal(str) + # log_warning_signal = Signal(str) + # log_error_signal = Signal(str) + def __init__(self): + super().__init__() + # self.logger_textEdit_info = logging.getLogger('info_logger') + # self.logger_textEdit_info.setLevel(logging.INFO) + # self.logger_textEdit_warning = logging.getLogger('warning_logger') + # self.logger_textEdit_warning.setLevel(logging.WARNING) + self.logger_file_info = logging.getLogger('file_logger') + self.logger_file_info.setLevel(logging.INFO) + self.log_queue = queue.Queue() + self.logger_thread = threading.Thread(target=self._process_logs, daemon=True,name="util_log") + self.logger_thread.start() + + def init_log(self,file_path): + # text_edit_handler = QTextLogger() + # formatter = logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + # text_edit_handler.setFormatter(formatter) + # self.logger_textEdit_info.addHandler(text_edit_handler) + + # text_edit_handler_warning = QTextLogger() + # formatter_warning = logging.Formatter('%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + # text_edit_handler_warning.setFormatter(formatter_warning) + # self.logger_textEdit_warning.addHandler(text_edit_handler_warning) + # 确保日志目录存在 + log_dir = os.path.dirname(file_path) + if log_dir and not os.path.exists(log_dir): + os.makedirs(log_dir, exist_ok=True) + + handler = TimedRotatingFileHandler(file_path, when='D', interval=1, backupCount=30,encoding='utf-8') + handler.suffix = "%Y-%m-%d" + formatter = logging.Formatter('%(asctime)s - %(levelname)s- %(message)s', datefmt='%Y-%m-%d %H:%M:%S') + handler.setLevel(logging.INFO) + handler.setFormatter(formatter) + self.logger_file_info.addHandler(handler) + + # 添加错误专用日志文件 + error_handler = TimedRotatingFileHandler( + file_path.replace('.log', '_error.log'), + when='D', interval=1, backupCount=30, encoding='utf-8') + error_handler.suffix = "%Y-%m-%d" + error_handler.setLevel(logging.ERROR) # 只处理错误级别 + error_handler.setFormatter(formatter) + self.logger_file_info.addHandler(error_handler) + + + def _process_logs(self): + while True: + time.sleep(0.1) + level, message = self.log_queue.get() + if level == logging.INFO: + # self.log_info_signal.emit(message) + self.logger_file_info.info(message) + elif level == logging.ERROR: + self.logger_file_info.error(message) + # self.log_error_signal.emit(message) + elif level == logging.WARNING: + self.logger_file_info.warning(message) + # self.log_warning_signal.emit(message) + def log_message(self,level,message): + self.log_queue.put((level, message)) + + +logging_service = LoggingService() diff --git a/common/sqlite_handler.py b/common/sqlite_handler.py new file mode 100644 index 0000000..3c71444 --- /dev/null +++ b/common/sqlite_handler.py @@ -0,0 +1,838 @@ +""" +SQLite数据库操作通用公共类 +提供数据库连接、表管理、CRUD操作等通用功能 +""" +import sqlite3 +import threading +import time +from typing import List, Dict, Any, Optional, Union +from pathlib import Path +from datetime import datetime + + +class SQLiteError(Exception): + """SQLite操作基础异常类""" + def __init__(self, message: str, sqlite_error_code: int = None, sql: str = None, + params: tuple = None, db_path: str = None): + super().__init__(message) + self.message = message + self.sqlite_error_code = sqlite_error_code + self.sql = sql + self.params = params + self.db_path = db_path + + def __str__(self): + error_info = f"SQLiteError: {self.message}" + if self.sqlite_error_code: + error_info += f" (错误代码: {self.sqlite_error_code})" + if self.sql: + error_info += f"\nSQL: {self.sql}" + if self.params: + error_info += f"\n参数: {self.params}" + if self.db_path: + error_info += f"\n数据库: {self.db_path}" + return error_info + + +class SQLiteBusyError(SQLiteError): + """SQLite BUSY错误异常(用于并发控制)""" + def __init__(self, message: str, sqlite_error_code: int = None, sql: str = None, + params: tuple = None, db_path: str = None, retry_attempt: int = None, max_retries: int = None): + super().__init__(message, sqlite_error_code, sql, params, db_path) + self.retry_attempt = retry_attempt + self.max_retries = max_retries + + def __str__(self): + error_info = f"SQLiteBusyError: {self.message}" + if self.sqlite_error_code: + error_info += f" (错误代码: {self.sqlite_error_code})" + if self.retry_attempt is not None and self.max_retries is not None: + error_info += f" (重试 {self.retry_attempt}/{self.max_retries})" + if self.sql: + error_info += f"\nSQL: {self.sql}" + if self.params: + error_info += f"\n参数: {self.params}" + if self.db_path: + error_info += f"\n数据库: {self.db_path}" + return error_info + + +def _handle_sqlite_error(error: sqlite3.Error, operation: str, sql: str = None, + params: tuple = None, db_path: str = None, + retry_attempt: int = None, max_retries: int = None) -> SQLiteError: + """ + 将SQLite错误转换为自定义异常 + + Args: + error: 原始SQLite错误 + operation: 操作描述 + sql: 执行的SQL语句 + params: SQL参数 + db_path: 数据库路径 + retry_attempt: 重试次数(用于BUSY错误) + max_retries: 最大重试次数 + + Returns: + SQLiteError: 对应的自定义异常 + """ + error_code = getattr(error, 'sqlite_errorcode', None) + error_name = getattr(error, 'sqlite_errorname', None) + + error_message = f"{operation}失败: {str(error)}" + if error_name: + error_message += f" ({error_name})" + + # 如果是SQLITE_BUSY错误,返回SQLiteBusyError(用于并发控制) + if error_name == 'SQLITE_BUSY': + return SQLiteBusyError( + message=error_message, + sqlite_error_code=error_code, + sql=sql, + params=params, + db_path=db_path, + retry_attempt=retry_attempt, + max_retries=max_retries + ) + + # 其他所有错误都返回通用的SQLiteError + return SQLiteError( + message=error_message, + sqlite_error_code=error_code, + sql=sql, + params=params, + db_path=db_path + ) + + +class SQLiteHandler: + """SQLite数据库操作通用类(单例模式)""" + + _lock = threading.Lock() # 单例锁 + _instance = None + @classmethod + def get_instance(cls, *args, **kwargs): + if cls._instance is None: + with cls._lock: + if cls._instance is None: # 双重检查 + cls._instance = cls(*args, **kwargs) + return cls._instance + + def __init__(self, db_path: str = "three.db", max_readers: int = 10, busy_timeout: int = 5000): + """ + 初始化SQLite处理器 + + Args: + db_path: 数据库文件路径,默认为app.db + max_readers: 最大并发读取数,默认为10 + busy_timeout: SQLITE_BUSY超时时间(毫秒),默认为5000毫秒 + """ + self.db_path = db_path + self.max_readers = max_readers + self.busy_timeout = busy_timeout + + # 读写分离锁 + self._read_lock = threading.RLock() # 读锁(允许多个读) + self._write_lock = threading.Lock() # 写锁 + self._active_readers = 0 # 当前活跃读取数 + + # 连接配置 + self._connection_params = { + # 是否检测相同的线程(False允许在不同线程间共享连接) + "check_same_thread": False, + # 设置数据库操作超时时间(秒) + "timeout": 10 + } + + # SQLITE_BUSY重试配置 + self.busy_retry_attempts = 3 # 重试次数 + self.busy_retry_delay = 1 # 重试延迟(秒) + + # 确保数据库目录存在 + db_dir = Path(db_path).parent + if not db_dir.exists(): + db_dir.mkdir(parents=True, exist_ok=True) + + # 初始化数据库参数 + self._setup_database_params() + + def _setup_database_params(self): + """设置数据库连接参数""" + try: + # 创建临时连接来设置参数 + conn = sqlite3.connect(self.db_path, **self._connection_params) + + # 启用WAL模式(Write-Ahead Logging) + cursor = conn.execute("PRAGMA journal_mode = WAL") + journal_mode = cursor.fetchone()[0] + print(f"WAL模式设置: {journal_mode}") + + # 启用外键约束 + conn.execute("PRAGMA foreign_keys = ON") + + # 设置SQLITE_BUSY超时 + conn.execute(f"PRAGMA busy_timeout = {self.busy_timeout}") + + # 设置行工厂为字典形式 + conn.row_factory = sqlite3.Row + + conn.close() + print("数据库参数设置完成") + except sqlite3.Error as e: + print(f"设置数据库参数失败: {e}") + raise + + def _create_connection(self) -> sqlite3.Connection: + """创建新的数据库连接""" + conn = sqlite3.connect(self.db_path, **self._connection_params) + conn.row_factory = sqlite3.Row + return conn + + def _acquire_read_lock(self): + """获取读锁""" + with self._read_lock: + if self._active_readers >= self.max_readers: + raise SQLiteBusyError(f"已达到最大并发读取数: {self.max_readers}") + self._active_readers += 1 + + def _release_read_lock(self): + """释放读锁""" + with self._read_lock: + self._active_readers -= 1 + + def _acquire_write_lock(self): + """获取写锁,超时时间为30秒""" + if not self._write_lock.acquire(timeout=30): + raise SQLiteBusyError("获取写锁超时(30秒),可能发生死锁") + + + + def execute_read(self, sql: str, params: tuple = None) -> list: + """ + 执行读操作SQL语句(使用读写锁) + + Args: + sql: SQL语句 + params: 参数元组 + + Returns: + list: 查询结果列表 + + Raises: + SQLiteError: SQL执行错误 + SQLiteBusyError: 并发读取数超过限制 + """ + start_time = time.time() + + # 获取读锁 + self._acquire_read_lock() + + conn = self._create_connection() + try: + if params: + cursor = conn.execute(sql, params) + else: + cursor = conn.execute(sql) + # 获取所有结果并返回,避免连接关闭后游标失效 + results = cursor.fetchall() + return results + except sqlite3.Error as e: + # 抛出自定义异常 + raise _handle_sqlite_error(e, "执行读操作", sql, params, self.db_path) + finally: + # 关闭连接 + if conn: + conn.close() + # 释放读锁 + self._release_read_lock() + + # 记录执行时间 + execution_time = time.time() - start_time + if execution_time > 1.0: + print(f"⚠️ 读操作执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") + + def execute_write(self, sql: str, params: tuple = None) -> sqlite3.Cursor: + """ + 执行写操作SQL语句(使用读写锁和BEGIN IMMEDIATE事务) + + Args: + sql: SQL语句 + params: 参数元组 + + Returns: + sqlite3.Cursor: 执行结果游标 + + Raises: + SQLiteError: SQL执行错误 + SQLiteBusyError: SQLITE_BUSY错误,达到最大重试次数 + """ + start_time = time.time() + + # 获取写锁 + self._acquire_write_lock() + + conn = self._create_connection() + cursor = None + + try: + # 使用BEGIN IMMEDIATE事务 + for attempt in range(self.busy_retry_attempts): + try: + # 开始IMMEDIATE事务 + conn.execute("BEGIN IMMEDIATE") + + # 执行SQL + if params: + cursor = conn.execute(sql, params) + else: + cursor = conn.execute(sql) + + # 提交事务 + conn.commit() + return cursor + + except sqlite3.OperationalError as e: + # 如果是SQLITE_BUSY错误,进行重试 + if "database is locked" in str(e) or "SQLITE_BUSY" in str(e): + if attempt < self.busy_retry_attempts - 1: + conn.rollback() + print(f"SQLITE_BUSY错误,第{attempt + 1}次重试...") + time.sleep(self.busy_retry_delay) + continue + else: + # 最后一次尝试失败 + conn.rollback() + raise _handle_sqlite_error( + e, "执行写操作", sql, params, self.db_path, + attempt, self.busy_retry_attempts + ) + else: + # 其他操作错误,直接抛出 + conn.rollback() + raise _handle_sqlite_error(e, "执行写操作", sql, params, self.db_path) + except sqlite3.Error as e: + # 其他SQLite错误,回滚并抛出 + conn.rollback() + raise _handle_sqlite_error(e, "执行写操作", sql, params, self.db_path) + + except sqlite3.Error as e: + raise + finally: + # 关闭连接 + if conn: + conn.close() + # 释放写锁 + self._write_lock.release() + + # 记录执行时间 + execution_time = time.time() - start_time + if execution_time > 1.0: + print(f"⚠️ 写操作执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") + + def executemany_write(self, sql: str, params_list: List[tuple]) -> sqlite3.Cursor: + """ + 执行批量写操作 + + Args: + sql: SQL语句 + params_list: 参数列表 + + Returns: + sqlite3.Cursor: 游标对象 + + Raises: + SQLiteError: SQL执行错误 + SQLiteBusyError: SQLITE_BUSY错误,达到最大重试次数 + """ + start_time = time.time() + + # 获取写锁 + self._acquire_write_lock() + + conn = self._create_connection() + cursor = None + + try: + # 使用BEGIN IMMEDIATE事务 + for attempt in range(self.busy_retry_attempts): + try: + # 开始IMMEDIATE事务 + conn.execute("BEGIN IMMEDIATE") + + # 执行批量SQL + cursor = conn.cursor() + cursor.executemany(sql, params_list) + + # 提交事务 + conn.commit() + return cursor + + except sqlite3.OperationalError as e: + # 如果是SQLITE_BUSY错误,进行重试 + if "database is locked" in str(e) or "SQLITE_BUSY" in str(e): + if attempt < self.busy_retry_attempts - 1: + conn.rollback() + print(f"SQLITE_BUSY错误,第{attempt + 1}次重试...") + time.sleep(self.busy_retry_delay) + continue + else: + # 最后一次尝试失败 + conn.rollback() + raise _handle_sqlite_error( + e, "执行批量写操作", sql, params_list, self.db_path, + attempt, self.busy_retry_attempts + ) + else: + # 其他操作错误,直接抛出 + conn.rollback() + raise _handle_sqlite_error(e, "执行批量写操作", sql, params_list, self.db_path) + except sqlite3.Error as e: + # 其他SQLite错误,回滚并抛出 + conn.rollback() + raise _handle_sqlite_error(e, "执行批量写操作", sql, params_list, self.db_path) + + except sqlite3.Error as e: + raise + finally: + # 关闭连接 + if conn: + conn.close() + # 释放写锁 + self._write_lock.release() + + # 记录执行时间 + execution_time = time.time() - start_time + if execution_time > 1.0: + print(f"⚠️ 批量写操作执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''} (批量数量: {len(params_list)})") + + def fetch_all(self, sql: str, params: tuple = None) -> List[Dict[str, Any]]: + """ + 查询所有记录(使用读连接) + + Args: + sql: 查询SQL + params: 查询参数 + + Returns: + List[Dict[str, Any]]: 查询结果列表 + """ + start_time = time.time() + + conn = None + try: + # 获取读锁 + self._acquire_read_lock() + + conn = self._create_connection() + cursor = conn.cursor() + + # 执行查询 + if params: + cursor.execute(sql, params) + else: + cursor.execute(sql) + + rows = cursor.fetchall() + return [dict(row) for row in rows] + except sqlite3.Error as e: + # 抛出自定义异常 + raise _handle_sqlite_error(e, "执行读操作", sql, params, self.db_path) + finally: + # 关闭连接 + if conn: + conn.close() + # 释放读锁 + self._release_read_lock() + + # 记录执行时间 + execution_time = time.time() - start_time + if execution_time > 1.0: + print(f"⚠️ fetch_all执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") + + def fetch_one(self, sql: str, params: tuple = None) -> Optional[Dict[str, Any]]: + """ + 查询单条记录(使用读连接) + + Args: + sql: 查询SQL + params: 查询参数 + + Returns: + Optional[Dict[str, Any]]: 查询结果,如果没有记录返回None + """ + start_time = time.time() + + conn = None + try: + # 获取读锁 + self._acquire_read_lock() + + conn = self._create_connection() + cursor = conn.cursor() + + # 执行查询 + if params: + cursor.execute(sql, params) + else: + cursor.execute(sql) + + row = cursor.fetchone() + return dict(row) if row else None + except sqlite3.Error as e: + # 抛出自定义异常 + raise _handle_sqlite_error(e, "执行读操作", sql, params, self.db_path) + finally: + # 关闭连接 + if conn: + conn.close() + # 释放读锁 + self._release_read_lock() + + # 记录执行时间 + execution_time = time.time() - start_time + if execution_time > 1.0: + print(f"⚠️ fetch_one执行时间过长: {execution_time:.3f}秒 - SQL: {sql[:100]}{'...' if len(sql) > 100 else ''}") + + def insert(self, table: str, data: Dict[str, Any]) -> int: + """ + 插入单条记录(使用写连接) + + Args: + table: 表名 + data: 插入数据字典 + + Returns: + int: 插入记录的主键ID + """ + columns = ', '.join(data.keys()) + placeholders = ', '.join(['?'] * len(data)) + sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})" + + cursor = self.execute_write(sql, tuple(data.values())) + return cursor.lastrowid + + def insert_select(self, target_table: str, target_columns: List[str], + source_table: str, source_columns: List[str], + where_condition: str = None, where_params: tuple = None) -> int: + """ + 使用 INSERT INTO ... SELECT ... WHERE 语法从源表向目标表插入数据 + Args: + target_table: 目标表名 + target_columns: 目标表的列名列表 + source_table: 源表名 + source_columns: 源表的列名列表,与目标表列一一对应 + where_condition: WHERE条件语句(不含WHERE关键字) + where_params: WHERE条件参数 + + Returns: + int: 插入的记录数量 + """ + # 构建目标列和源列的字符串 + target_cols_str = ', '.join(target_columns) + source_cols_str = ', '.join(source_columns) + + # 构建基础SQL + sql = f"INSERT INTO {target_table} ({target_cols_str}) SELECT {source_cols_str} FROM {source_table}" + + # 添加WHERE条件(如果有) + if where_condition: + sql += f" WHERE {where_condition}" + + # 执行SQL语句 + cursor = self.execute_write(sql, where_params) + return cursor.rowcount + + def insert_many(self, table: str, data_list: List[Dict[str, Any]]) -> int: + """ + 批量插入记录(使用写连接) + + Args: + table: 表名 + data_list: 数据字典列表 + + Returns: + int: 插入的记录数量 + """ + if not data_list: + return 0 + + columns = ', '.join(data_list[0].keys()) + placeholders = ', '.join(['?'] * len(data_list[0])) + sql = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})" + + params_list = [tuple(data.values()) for data in data_list] + cursor = self.executemany_write(sql, params_list) + return cursor.rowcount + + def update(self, table: str, data: Dict[str, Any], where: str, where_params: tuple = None) -> int: + """ + 更新记录(使用写连接) + + Args: + table: 表名 + data: 更新数据字典 + where: WHERE条件 + where_params: WHERE条件参数 + + Returns: + int: 更新的记录数量 + """ + set_clause = ', '.join([f"{key} = ?" for key in data.keys()]) + sql = f"UPDATE {table} SET {set_clause} WHERE {where}" + + params = tuple(data.values()) + if where_params: + params += where_params + + cursor = self.execute_write(sql, params) + return cursor.rowcount + + def delete(self, table: str, where: str, where_params: tuple = None) -> int: + """ + 删除记录(使用写连接) + + Args: + table: 表名 + where: WHERE条件 + where_params: WHERE条件参数 + + Returns: + int: 删除的记录数量 + """ + sql = f"DELETE FROM {table} WHERE {where}" + cursor = self.execute_write(sql, where_params) + return cursor.rowcount + + def add_column(self, table_name: str, column_name: str, column_type: str) -> bool: + """ + 添加列到表(使用写连接) + + Args: + table_name: 表名 + column_name: 列名 + column_type: 列数据类型 + + Returns: + bool: 是否添加成功 + """ + if not self.table_exists(table_name): + print(f"表 {table_name} 不存在") + return False + + sql = f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_type}" + + try: + self.execute_write(sql) + print(f"列 {column_name} 添加到表 {table_name} 成功") + return True + except sqlite3.Error as e: + print(f"添加列 {column_name} 到表 {table_name} 失败: {e}") + return False + + def table_exists(self, table_name: str) -> bool: + """ + 检查表是否存在(使用读连接) + + Args: + table_name: 表名 + + Returns: + bool: 表是否存在 + """ + sql = "SELECT name FROM sqlite_master WHERE type='table' AND name=?" + result = self.fetch_one(sql, (table_name,)) + return result is not None + + def create_table(self, table_name: str, columns: Dict[str, str]) -> bool: + """ + 创建表(使用写连接) + + Args: + table_name: 表名 + columns: 列定义字典,格式为 {列名: 数据类型} + + Returns: + bool: 是否创建成功 + """ + if self.table_exists(table_name): + print(f"表 {table_name} 已存在") + return True + + # 构建CREATE TABLE语句 + column_definitions = ', '.join([f"{col_name} {col_type}" for col_name, col_type in columns.items()]) + sql = f"CREATE TABLE {table_name} ({column_definitions})" + + try: + self.execute_write(sql) + print(f"表 {table_name} 创建成功") + return True + except sqlite3.Error as e: + print(f"创建表 {table_name} 失败: {e}") + return False + + def get_table_info(self, table_name: str) -> List[Dict[str, Any]]: + """ + 获取表结构信息(使用读连接) + + Args: + table_name: 表名 + + Returns: + List[Dict[str, Any]]: 表结构信息列表 + """ + if not self.table_exists(table_name): + print(f"表 {table_name} 不存在") + return [] + + sql = f"PRAGMA table_info({table_name})" + return self.fetch_all(sql) + + def get_table_count(self, table_name: str, where: str = None, where_params: tuple = None) -> int: + """ + 获取表记录数量(使用读连接) + + Args: + table_name: 表名 + where: WHERE条件 + where_params: WHERE条件参数 + + Returns: + int: 记录数量 + """ + sql = f"SELECT COUNT(*) as count FROM {table_name}" + if where: + sql += f" WHERE {where}" + + result = self.fetch_one(sql, where_params) + return result['count'] if result else 0 + + def vacuum(self,max_retries=3): + """执行VACUUM操作,优化数据库(使用写连接)""" + for attempt in range(max_retries): + try: + self.execute_write("VACUUM") + return True + except SQLiteBusyError: + if attempt < max_retries - 1: + time.sleep(2 ** attempt) # 指数退避 + else: + raise + + def backup_database(self, backup_path: str) -> bool: + """ + 备份数据库到指定文件(使用写连接) + + Args: + backup_path: 备份文件路径 + + Returns: + bool: 备份是否成功 + """ + try: + # 确保备份目录存在 + backup_dir = Path(backup_path).parent + if not backup_dir.exists(): + backup_dir.mkdir(parents=True, exist_ok=True) + + # 使用SQLite的备份API + source_conn = self._create_connection() + backup_conn = sqlite3.connect(backup_path) + + # 执行备份 + source_conn.backup(backup_conn) + backup_conn.close() + + print(f"数据库备份成功: {backup_path}") + return True + except sqlite3.Error as e: + print(f"数据库备份失败: {e}") + return False + + def create_incremental_backup(self, backup_dir: str, max_backups: int = 10) -> bool: + """ + 创建增量备份,自动管理备份文件数量 + + Args: + backup_dir: 备份目录 + max_backups: 最大备份文件数量 + + Returns: + bool: 备份是否成功 + """ + try: + # 确保备份目录存在 + backup_path = Path(backup_dir) + if not backup_path.exists(): + backup_path.mkdir(parents=True, exist_ok=True) + + # 生成备份文件名(包含时间戳) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + db_name = Path(self.db_path).stem + backup_file = backup_path / f"{db_name}_backup_{timestamp}.db" + + # 执行备份 + success = self.backup_database(str(backup_file)) + + if success: + # 清理旧的备份文件 + backup_files = list(backup_path.glob(f"{db_name}_backup_*.db")) + backup_files.sort(key=lambda x: x.stat().st_mtime, reverse=True) + + # 删除超出数量的旧备份 + for old_backup in backup_files[max_backups:]: + old_backup.unlink() + print(f"删除旧备份: {old_backup.name}") + + print(f"增量备份完成,当前备份数量: {min(len(backup_files), max_backups)}") + + return success + except Exception as e: + print(f"增量备份失败: {e}") + return False + +# 使用示例 +if __name__ == "__main__": + # 创建数据库处理器 + db = SQLiteHandler.get_instance("db/three.db", max_readers=50, busy_timeout=4000) + + # # 创建表 + # columns = { + # "id": "INTEGER PRIMARY KEY AUTOINCREMENT", + # "name": "TEXT NOT NULL", + # "age": "INTEGER", + # "created_at": "TIMESTAMP DEFAULT CURRENT_TIMESTAMP" + # } + # db.create_table("users", columns) + + # # 插入数据 + # user_data = {"name": "张三", "age": 25} + # user_id = db.insert("users", user_data) + # print(f"插入用户ID: {user_id}") + + # # 批量插入 + # users_data = [ + # {"name": "李四", "age": 30}, + # {"name": "王五", "age": 28} + # ] + # count = db.insert_many("users", users_data) + # print(f"批量插入数量: {count}") + + # # 查询数据 + # users = db.fetch_all("SELECT * FROM users") + # print("所有用户:", users) + + # # 更新数据 + # update_count = db.update("users", {"age": 26}, "id = ?", (user_id,)) + # print(f"更新记录数: {update_count}") + + # # 查询单条 + # user = db.fetch_one("SELECT * FROM users WHERE id = ?", (user_id,)) + # print("单个用户:", user) + + # # 删除数据 + # delete_count = db.delete("users", "age > ?", (25,)) + # print(f"删除记录数: {delete_count}") + + # # 获取表信息 + # table_info = db.get_table_info("users") + # print("表结构:", table_info) + \ No newline at end of file diff --git a/config/__pycache__/__init__.cpython-39.pyc b/config/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index b40288b4d664a05c8eadf21811a36e799c055165..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 150 zcmYe~<>g`kg6%BVGC=fW5P=LBfgA@QE@lA|DGb33nv8xc8Hzx{2;!Hui&acPWpYMh zQEov|epYI7NsL=+YD#8adVF$zUP)1YPJD4?aY<@!43L|anI01#pP83g5+AQuPY0MGN#`6i5!m!oY&&tjKjJlG$C^ zk&r`y=Fp!}_tt+=p#2NI@Km5D-&&xj0tGrtaUG#c?8k4uo!K4Dju`ZMKEd(td%wkh zH3|8{7@NaGV~SH9A^-s=A(VVczy(bDA1`N7>lPuDIT^SmOg4h9?GA~wN8 zY(WjN4Ru5x8i*ZeB6gvL*n>9W7Wjz$l5ax?`3`guccF(kfGxx!^bz-98*v|Y5N|{T zc7HVI9hBMJd_~UcFo5BuIrs2dJA}PUlh3UUWxrHzZzwlP<<7NYb~=EYmnQqWaESO8 z+(LXCZX><}1Z#HZc-5VJ_+UJFd^CQ4JUJ3y|3a{?B7Y-K#*ZfAK9-->1DK1Hw!Zml z^5BDqj~-u{%8I`c#rFe*U)zL4DB*ZGt;jt~o>_8i$az1jLk&C8Kd1?W@kLqxM8NLFL@Fy zS)8eBo@Y|iPKe<$V0xKz+O7~E@m@)SyAD?1AR^d9mZDdRvVO6rEPPD zrNPUJ_DhB7SaKC-2~aC!3I5JB4RL7hHl6cSZ>l|W(dEUIk$c!Diy&cP%){qQq?pWkkkErSvHx4? zn^d`!nZ$NkqnwnvxKRo(tLh` z=1fE|8lce(0&Luc1K-hQdCcIpl3AmIj!SVhP0g09=Xy;CExHjrI045 PXnnFi=2H{2j`aQmsd=MJ diff --git a/config/ini_manager.py b/config/ini_manager.py new file mode 100644 index 0000000..f58499c --- /dev/null +++ b/config/ini_manager.py @@ -0,0 +1,157 @@ +# config/config_manager.py +import logging +import os +import json +import configparser +import time +import threading +from common.logging_service import logging_service +from common.ini_handler import IniHandler + + +class IniManager: + """ + settings.init配置管理类,读取API相关配置 + 暂不支持多个配置文件 + """ + + def __init__(self): + """初始化实例变量""" + self._DEFAULT_CONFIG_FILE = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + 'settings.ini' + ) + self.ini_handler = IniHandler(self._DEFAULT_CONFIG_FILE) + # 默认配置值 + self._DEFAULT_API_MAX_RETRIES = 3 + self._DEFAULT_API_RETRY_INTERVAL = 1.0 + self._DEFAULT_API_TIMEOUT = 30.0 + self._DEFAULT_API_AUTH_TIMEOUT = 12 * 60 * 60 # 12小时 + self._DEFAULT_API_BASE_URL = "https://www.shnthy.com:9154" + self._DEFAULT_API_LOGIN_MODEL = { + "Program": 11, + "SC": "1000000001", + "loginName": "leduser", + "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" + } + + # 参数缓存字典,格式:{参数名: {'value': 值, 'mtime': 文件修改时间}} + self._param_cache = {} + # 缓存锁,确保多线程安全 + self._cache_lock = threading.RLock() + # 文件修改时间缓存 + self._file_mtime_cache = None + + def _get_file_mtime(self): + """获取文件修改时间,带缓存""" + try: + current_mtime = os.path.getmtime(self._DEFAULT_CONFIG_FILE) + return current_mtime + except (FileNotFoundError, OSError): + raise FileNotFoundError(f"配置文件不存在或无法访问: settings.ini") + + def _read_config_value(self, section, key, default_value, value_type=str): + """读取配置值,带参数级缓存 + + Args: + section: 配置段 + key: 配置键 + default_value: 默认值 + value_type: 值类型 + + Returns: + 配置值 + """ + cache_key = f"{section}.{key}" + current_mtime = self._get_file_mtime() + # 检查缓存 + if cache_key in self._param_cache: + cached_data = self._param_cache[cache_key] + # 如果文件未修改,直接返回缓存值 + if cached_data['value'] is not None and cached_data['mtime'] == current_mtime: + return cached_data['value'] + + try: + # 获取值 + if value_type == int: + value = self.ini_handler.get_int_value(section, key) + elif value_type == float: + value = self.ini_handler.get_float_value(section, key) + else: + value = self.ini_handler.get_value(section, key) + + except Exception as e: + value = default_value + logging_service.log_message(logging.ERROR,str(e)) + + # 使用锁确保缓存操作的线程安全 + with self._cache_lock: + self._param_cache[cache_key] = { + 'value': value, + 'mtime': current_mtime + } + return value + + @property + def api_max_retries(self): + """获取API最大重试次数""" + return self._read_config_value('api', 'max_retries', self._DEFAULT_API_MAX_RETRIES, int) + + @property + def api_retry_interval(self): + """获取API重试间隔(秒)""" + return self._read_config_value('api', 'retry_interval', self._DEFAULT_API_RETRY_INTERVAL, float) + + @property + def api_timeout(self): + """获取API请求超时时间(秒)""" + return self._read_config_value('api', 'timeout', self._DEFAULT_API_TIMEOUT, float) + + @property + def api_auth_timeout(self): + """获取API授权失效前最大时间(秒)""" + return self._read_config_value('api', 'auth_timeout', self._DEFAULT_API_AUTH_TIMEOUT, int) + + @property + def api_base_url(self): + """获取API基础URL""" + return self._read_config_value('api', 'base_url', self._DEFAULT_API_BASE_URL, str) + + @property + def api_login_url(self): + """获取API登录URL""" + return f"{self.api_base_url}/api/user/perlogin" + + @property + def api_login_model(self): + """获取API登录模型(字典格式)""" + login_model_str = self._read_config_value('api', 'login_model', None, str) + if login_model_str: + try: + value = json.loads(login_model_str) + except Exception as e: + print(f"解析API登录模型失败: {e}") + value = self._DEFAULT_API_LOGIN_MODEL + else: + value = self._DEFAULT_API_LOGIN_MODEL + + return value + + @property + def log_path(self): + """获取日志文件路径""" + return self._read_config_value('app', 'LOG_PATH', 'logs/app.log', str) + +ini_manager = IniManager() + + +# if __name__ == '__main__': +# logging_service.init_log(ini_manager.log_path) +# try: + +# logging_service.log_message(logging.INFO, "测试INI配置管理类") +# logging_service.log_message(logging.ERROR, f"api_max_retries") +# except Exception as e: +# logging_service.log_message(logging.ERROR,str(e)) + +# time.sleep(1000) \ No newline at end of file diff --git a/config/settings.py b/config/settings.py index 9587a08..6ee5883 100644 --- a/config/settings.py +++ b/config/settings.py @@ -49,3 +49,7 @@ class Settings: self.visual_check_interval = 1.0 # 视觉检查间隔(秒) self.alignment_check_interval = 0.5 # 对齐检查间隔(秒) self.max_error_count = 3 # 最大错误计数 + + #是否在线生产 + self.is_online_control = True # 是否API在线 + diff --git a/controller/main_controller.py b/controller/main_controller.py index 1c0767c..051dc75 100644 --- a/controller/main_controller.py +++ b/controller/main_controller.py @@ -18,4 +18,4 @@ class MainController: self.main_window.show() def _initSubViews(self): - pass \ No newline at end of file + pass diff --git a/db/three.db b/db/three.db new file mode 100644 index 0000000000000000000000000000000000000000..cb2afc3f5746f3058c061dd9cb8ade4bc64a6337 GIT binary patch literal 53248 zcmeI4Uu;{|9mlWj#CFmqPRXRGs-hmHs_Z~K?mgFz(=e_0U!AB)@N5&&2q|*yTkz`m z2KQR5+si6MDJ@khZImJ?-6TNW3k<1hA;6~byayh5J5~52sPkKg8_lMMMgk__YiqRAmCntIS zVIj!#wOb!luU&}H!#}P)?fdStHOkYVz|%v1UQmO6f%l6%5rWf!z_iHMuDoBp^e%g; z`+t|`Am{no^H0xJ&mTO;JR$ZG7YKj=2!H?xfB*=900@8p2!H?xbRU5sm*b!EWuOI*d zAOHd&00JNY0w4eaAOHeCIe{Je{6FmffAWz>10VnbAOHd&00JNY0w4eaAOHg0L|}*R z|DCokxPc#bjk)Is=X>|rzOenfn+D~<>+9K=%QX^@+g{nFWRDl6_78ttrk&DU(NsK~ ziIYq?G8ZRfowAM*-#|YhW3#a_GP{t8C*moROwG=RQ^&~B_%RZGGV{dj0;_L6zL1&F zvYOA%REsQzQ_-1l$|utC#>8+=Daa*Dt&PdEvBg485;iLkCXEV`WjSBTNvT5VnT5*A zaj9IFORr&Gt`zf8IWM&+n?5NOB}H#`i)3nznbWIMyRwDfOX*c9S2$6~WmVZ*)E}vo z3sOENtt!uskxcw414yJO=T4f!n(a;*{fVdoEy~5p3X_-(%beZ~En0mbsg+_K!jYrP z$(RKmiN%kEpPb7OUN4CR z#Hz9xRTs{e&nl9_O8dMe`FZaI@iH*kq8j5JH*h4%az$Yyku0oA^V#xA7S!AFnZRb` zqBi=iQ^lDjNJzbPa(vbq2AIeSTeLUubf5<2LHV83fMViOr^q#1`a z)QF5lq@Pob+UQSH^j5{GXJ8uhVyeeAGCFEI|GcI#$yiFt$>qFx?`_ZoGghDTgv`cF z>#~jOAS4-6&981;h~H0YH~vw*`0DLXE?3uXee>bPJ1>0t&4=r^|F*8qB|>!jxM_-5 z`Lg_5DW@!CSBy&ZjE2*gFZ_baXb(%SXERzF!|y%B2XE>pcHx_@XcO$4HqzJ2~??T=?z34TJDFiIpEi)hY+khJDf z>h>3XQ;kJi*4&l8a#@fswHq5BU%B(%+pMkXr+=!xd{K{7C-tJkH4+Zn&WE;h1Ll%Oop2401Mg{=_#IYK zZ~%sb-!j*+jxv^s_h=o(CU(;99C_ps+c9lgpL(uXP^9Hk(sLE5l#`l1IvOT$Q=+fL z?C1n5IsO9Q>l!(5z;@QF+kf56s_I}^Ts7!ysLPV@KR5!zw41ACbLFes{FYWnEfJL~ zCB-y6hBXQonGMOGHe5if9GcNjl(Woon(Y~j&AW3bW}!_XlSO8=S~ZoHO8J)3GuPBM z&1x6hTEND|G@AqiyFcQ3?AN)zXPpD~>%0GD|I357A4ZSvYXWXn`m@LT^)rpcc3wEx zVKWxo-C%q=c;e+CeDQ U&ohDsBPLRCIuMu^`MOB|1vE8pp8x;= literal 0 HcmV?d00001 diff --git a/db/three.db-shm b/db/three.db-shm new file mode 100644 index 0000000000000000000000000000000000000000..6199341e7697a45c23dd8cf504ddd879f4f1a21f GIT binary patch literal 32768 zcmeI*zX`%X6bImo#=k+dbOD=ywHpWlr*RD%_plYQ5nFe#(na)6M1q}mf$s;8M~>t2 z_y)L}-sd5uA1e!CQp7%rb>1Gvi~L%x_q$73mABcwIG$FU<@Cq>@u*kQxsO`j%b%A1 zS+92d?!@};X-xxTC)bY(tfB*pk c1PBlyK!5-N0t5&UAV7cs0RjXF5comh3DAlqp8x;= literal 0 HcmV?d00001 diff --git a/db/three.db-wal b/db/three.db-wal new file mode 100644 index 0000000000000000000000000000000000000000..08e8bfd5e826fdd8253e526788f13251d12a34a2 GIT binary patch literal 45352 zcmeI*Z%i9y90%}gk3s8t90Zp!39*O^3#RzobJt$4{@I`+UF_V#h|!LSgw14eh6MLY zU+B%5MIo?E6O4-Z2ONp<56QHIMKNj&(Kj03EYTOv&{j;$UMw>t#y`*X+O9`g?FfHX z_+DE6Ja^Z7?( zdCl+IbZOb_S3>y%950M>!noL3K4t8Q1p*L&00bZa0SG_<0uX=z1lCPJuea~u8cJ*J zwGCCb?x-xQcQ~9*hs?=URaI^6w1vvVFHx@+{7h^UG?SpgOZKmY;|fB*y_009U<00Izzz$Osb>fmdg z2kvV>($m{_xT1{TX>0B~e)RF~p5q`)%7eu3q0-FO->cKsueAv$@%f2P7wJoSS+n z{z)`8b>g?rCg$~*=cdQzpLG|Pvi4WgSym%_{ z%IMtmSsH)qR64T2W;smH1X=LZk=dytQh;g!Dm6*6A`zc;jjR}JR87%jA3H5ezCiWK z`RD8Y{(8X77udcyUtl{f1PDL?0uX=z1Rwwb2tWV=5P-lY5OA?{0b4pX7Y$_MWr8YajW zSk?uPoge@K2tWV=5P$##AOHafKwx7CSn>t??M7UH7Y3900_+zS z2tWV=5P$##AOHafKmY;|SQ7z#AHOc+>tX6q+ER~5po(7&Mr5ClkVc6T5|2*CM*G9m zI*M+LuWRfcb;(Oq>l!w`u8=0m#@&4y@&$@uegVt8gW{*BXZHS7q6+Nx0p4|3b-@kKlnTx@Bjb+ literal 0 HcmV?d00001 diff --git a/doc/table表设计.doc b/doc/table表设计.doc new file mode 100644 index 0000000000000000000000000000000000000000..9d8a23d21eb9a36cac9734b9f84f00d35afb0189 GIT binary patch literal 87603 zcmeI54SZD9o%heYFnQrELI|%yz(|9IFob|ek%sVM2oORN0`d~x@&@Ehh^T-uQeRLg zMM{xnSt=}BZP&Hzifk9%O07$+ORZANvef0Fe5{Y#59^bq6w$WK^ZnnOftd-pVKT$a zWO8pl-`tm(bM86kch3KR?!9O3dHZHs$9K`HnC zRhP@Q+dU+4$@VF26?17Q#(!(bQ!cR&i<2}9v4kP5@#F8C@8hY^qlBf$xyARR_S28@BR za5s#Ddtf|#4JN?7FcH2EnJ@{mU@}aBsW1)hgKU@%IWPle!Ys&z*^mcwARh{#5avP= z%!B!GKP-TSum~1IF)V?lund;N3Mhe6D1&mSfR(TcRzoFJK{eDsEj$2gU@g?agRl-B zf`{P|SPvUuBRmTA&;X)+J~7o0GMSuLvtFZx;KQ3-lBh96qb(M~IO42Y$YXkVDavb$ z^Hr#U@9jqG_uF(p<&gGVR>cNkthcTB9AQBmXs8e)VrR|DUR?7CVRKtdfZ4 z-?;ISA4y_aI-z^o-;DdhYfAdaTC(WBB+Ty_l#clGl=EsW6V=`ltPx z&hOyR&E{XuKRR760QoOT5ly5VMHY$6-}W!x7tE!7mB3Y6OuSYD5Zz;udU5W_?_T_3 zc16P9KNqXM`juDzQ|#?#J`f)upNsurX%IUjb*BNOZtVo|9fv`D$1xD!aRMalX%HXs zF^C_z010pj#FzXT#Gf>95~-Vqh+OvVL^_DI)X^!l)=JGH>c=Gh+-r8D5+6~*ULCDk z|6JlZjp5E@t<=*4SG_XZqHXsb_~X~JKhm}v1~>M0+b4IMh-X9E#veJUoLl_IJS=~K zO2!A2@wXD|QHnhug&#`ee;R%%jqTCekuiKSjjgHdr{k5f@H8XX8jEL=tqhKjA+00u zM5EYBXUoZYGGwqd4xAjFta&OuEhORE(vp6Xe(SG)@$0>5{S&_ZJo*j);*}{J>*RCn zZH_g7=q8$q&tqxGTtTI zqNv$Yz0m8E^bbn$xjCF;0lkh&JpFV$ehn#~O^;$OpUq;O%YH8Dn?!xNpU>~-`2BeH z>C_Tg%kf-}&*jVuSj+w_YU(t$XEhzmC9l(oXC`USAzdptUPL?vd~c3+zLk8wUOVq} zKA)w<|1j}N{41&JbEyB)0`#<}upZ0zGPJYZ!`39Wvsg>ry_*u4#GWvfW9h8NkSa;p zXg8iJP5J(PxD#QXKjbCwf2fx`rAuZm z5mf?K)x}gLq7-S5iRsD}`Th;}t+}si&W1nb?$58-oJr}1@(z)v!qope8vCj)`6mpW5F8$u%jn6K5(m%QJd5cJ|T(^(n zOui`?#Z`?v1vU9G700*8w_?1mF6GKla$L^Uk=#36-jr+KQW*5$i@85stLI`C{mTcn z>(R+VN}3n+WJ+fl0ZP^+8D+c(wja(ijdk#cuPcNz=1%Bv-O>*xL9 zTG7@WzJNTgCtQIRdIfv>{SLWW)=R(T9E@LdW#QY;OsJ5OlPmExgeWD~a_6R&@D&`d zZdUf=w37e#dH-9bhpVKdR*--1QufWe9?FqFZ=PP#S)4<9^VMA6)=+Z#-Q5_i%q16c zx4Dw%3!~_V52u%#MuANb8wf@OeS3>+`Drnlor# zoJ(Sv#+`;G)SM|zs>%Ir&sMT_PGLhA*Xex+w~Wtql9o+nU32A_gw58{e!Zs6(pt!B z(l7NchBCQvIea;1VAT)T##9{IBC*^@$xFQx#UxxFYpDx28{4Oq)2jAwaL1OXrAca& z#9B+el6qMU(}-7Z1veY>f`-1i|F8C7{iXWru@{g|savn zT(QMjwk4;~;Mh5?ea)$a=Kex-en+pa&C{>#<0QiB*y| z>f5GkORHCumT#|?`jbp*YxzbpR_pag-mKSCF^wAUu01zj1NSYQy34l)mTPA(pM~f>zn0>;xY#q6>z+$`4$GSAv#*uXdtBp}e6M)=h8OFv+r@BN)jaaAKgTgD zwbScrOC`Mp)HLywH=~f$QE71!PPh0s7h1Rao}pWB8KrZ+t<`P44Hqn#mG4>8N3M7& z|GC{yuhqS8p!J$tJKX2Bf;?MIgyE0|888;c!vx5LKYsd~Bj4Zp$fD`PTffr1XL3$2 zH9N8HdNjT4rQhRaJJJeJxaH+hglsNw!5TidA>E9+GcGZv1`8UQKGV z+%1Z=OwYQ@Qd=>2@L)ALCs8XLuYyTbQYvNH%JRUw^4MH%>$AF1G3n>9_}ARBUEJ}iI=xbU+Bj~2-QtALi^<$6}L^7F1-Y8l0P(`6XEbQ#4It~A|T zD^d>8xe^*+6YPdPun*3>wryqB?JbFKlxv!~A4;^QtADYDE?wc$)+mSmrAawN=L7IP zI1cZ?DL5_sbCMM87$LmX)P6zS_jN@d3U#zLv&7s;gAb?Pz+1K)IV7LwMcC*${{*eKqb^e1MGs` zVCoi;KkIM?f0)= z8r;~FLv-#310fT#Kt`M`0NVe`DJ@?AwU&}_^|gYhT<~Iw)Ur|z(Rn$P!A7Wuov;f` z{iEOhi_Uvs9~_1wAa@^6fT@4<>n}Q=gi~+;^JfAs4wI(LI4=m$elki8{Yqke$UvN#|~$&EqJA`9HMg$EP|y_4Yg1Yr_nzY_y3LZ6RuulaP>FJVQ_1b zf90hdqH_c6hP`kU-hh*E=C$psrwwj)^VjD<@bcE)V)C!7Lz6rI(xe=s^M`O2&OhJV(ER@(AGy7DLz5@}T1X_ODRkDvU;RbrAut@AupCNYJ&6AgxBU;7_Q6#wxUm`4 zaDVj|oj1a>unqRYK6n`-L4W`9V^lK!)uiAm7Tnl^mrLakonL_y@IHJ5XW;@wlK&4^ zJuoWk_Max@5S>4R9t;Icf+3Is>0s(_QAYavXHh~ElYezmIYj3S$cG}RhFW+QO#O}Z z_phuYQ-9HUD{P0|uoqr|Lm=b-rnI^L-~LM{QoVNYYLaq@&d1;_I1cZ@NjL|A=^s4n z7_OQWI{m|yL*)>i&%;Hy1Xn;_&SnSfe>-gd(XX0O`}eOVDTnCX0}>$xhJq6^U@WxW z2MAtjjACm4*2E}x{;6gZlYix;VnpLi$b)>C4-23emO=?sz{Owu><7=UFVB(p>b8Bk zxnCtj?r(c;b^1Gd{BS6ED!2TbAQdOtRzfvwgnHNt+aWsbf4JJ3k)Gj7lX8g8dto0O zh9htkehl3IZ{zqs|E!Qv$@mvj=zM>0(-l7bMd!ERJ$N5Jflr|ka){2CAfC7Tb%7KZ3Tbd!#(%c?`rp46Vq^ot)jC3xCjWBRn*RR9rW~SkI^@7i zSPmty1|sG6pZ?{^NHzcJWN^#cC|$wJp?@(chv>Wx8ekLbhCT2CnB#vuf2-dyV{unO973juWixZ#^nEk(C>2GBF!|9WhLv$Vh zPLMZNXG0F;gQmaVzyA+sej>G;BQ;&hAvza9HPpgWuo-rM^#4OK{ws9W(Z9MGJk^31 zQ)^}AUk;T+blwF&f}`+0oP;xA>L0A%e}qndfBS!;^I5nAm!S)j{B(mJVCo0XPW%1c%`)wEpa0o&*OkZ$>fsmoKB3{3~yxnA(4ultXkr4~=jUWb!&I^Z--; z=pX+nIw!#pNP%?7fP66ZkAD3{=OQS9GFS%>!?R%OAN~4^&fDMs9E8Jg1df5JfAs4w zI=>Ca;S=~2K7&TM1ef7UxC&>~iR0hj^+?(Dv27axc&kUxwfFcvNBVZ*P^Us=uCZ=$elX8g8KY*9vD7*ph zzzGOk|41zBNK8}s^cS5k!X>aWeYPFCK@TwXH}3xjuO@|0f6;j;q{0N42-AT1JEBzo zaFw-@Cka=YltXl03d>F_qwo%#0JHxe ztmnU>>;L4-K##cEAtd6*vT~_W_Mc zDO@(!Ow-NHBdTIV;~&E@I1cZ?33v}q!YMcd7uAP9Irv=NvI%W_^wTVqkwX4ikM{o5 z=T@g*DT8N8aiZ-<@O!ubU&2*TNz(Qi|JO{pTXVpuK7`AP1h=ncltce&yirX4m6dWR znfDQ7{%4u@S>}9}`JS`jFP>8W$#drm&z;|R?)iz_sD(D7_At}W&z05*kge$C)le=+<&NBbA%>69$K9_jP zG|h7-)pMuCZ7o+eTZ%2jC8G9dMcdn5y5ozwtBg|)WwF?JxG5V}Tcy5XcOQx6NT*YM z>=ee0=y}RXHeK@3&8~ZmwIp7)T6)Hwnx;HExYi@o*2?U};xWX!%XfAVq%e>z*7j#hke;e(8 zAf;gX4!wuqum2a#?*f_sTju`uYX7GH_qPw%fFWo{zC+nWbD0Bt704XmUizEKWJ$J5c4{3b_MA4M%qafz8-E1U8Fyb)p|F`Y_zi3_z zOQ9D2>zcA`gh+o=yS8W-lvZOC%^P43?Df_Eb?74G&+kD=yjm+`yxmz%_3q~Bu zCYrwiAHq39{r~E^<{f$~808)*aVVQ;ejef(V>Sd#|KG8$|3&jOaEkU|`u{eK-E6=l z%C(}5$PCdnV=!!pvWey!VJmF&)!+30w`kmLFtiIs9Lgq|?}Jz1JwyFX|F3s@f>G{~ z5{I&h=I_H<_zX<{-?99^Xr9QN?@5pXrvL9){$DiDfI=wp)!+30QSbjn^IF&ndkpnA z{eLw2f6;s|9D=vN^#2{p|BL4D!$~*~rvL9){$Dh|1Xkt%w}Y4drvHz6|1X*+!ca&z z)Zg^~(dhq0^9&dZ`LF=I`T)!IT;r- zZ|F$=Uo`L1o$Ehvf>- zn%BWP*aTkfKg9Z*{y+5oUo_tX2jHNu{$BRq^#9(kRt)BQ7z43jxN-VX*qI+*@Hpm+F&p$`{` z?NBz+JO>J)$X9>U|A%NSe;^tL5?0wn^IF&pI}P*&Zhgo8TMp z1Uv~(!PBrAo`Gj!3v7jN!Z!F8Y=`Gy2YefL!gpX7JP*4;Mh$RjuKpVK!r#C?_%3`O z_QL`ATQ~?(%f1IM!b@60dH<(ljsFwPKZ3Jx8O;7) z$Ljxy=G|^%3>YMVm;PS;zoA&+Ru&9A)0>2O4vv_egDo9N!$A5D$|jnp!4#NjsK43& zGp_xC=n$^3$|joU!hEOzFaPf~!Y}!T*&kxQ|Hq;gjP|qd_GWh&|Apo|VHX?(ukqg@ z(%*dU)0XExTFWyYGuG;xx48e0=7-@NocGn=Yy3Yp%6Ewnf4@Mrw{{mXR?{Y$Uj%Cp z`heio{=M`!$N%fqKT@;{Mmm&DG*5yQ$bjXZZ9LO+r+NDiMhb!vhum0-gzJv>XNQzc zG%tiACnX6o5k>baBUxf95C zAa_5^-~V}!O7rhMq{=ROUxq)!RcQJCZ-*KGCsnz_{QgfgxA)}Q4^p6I{muKo#x^bx zE2f1tzL#@L=8;!6(R?Umz)bM6{UILz9Y_h9{m{1bL%IKt=0z|cs==%Mhgg48!?tJ` zP^+62V+EodH+B3_y0xn9q=-| z0$$_)%<*4Q|NeK;{16<2ci=tv5dPxX+W+La<5ff6^!#MM=T3*&4&?q@xWZl=HPuP9 zJ_R4a1qh1&564J`_CJPLT1<$?9Vy#TdxGjgb+B8yD?16WXzkXnVp$8k9@G%+>$mWq zoIjL{tGHB@Mm$S&mGpi_9u==`K~+d$85ZZ8qPr@ zNTp1GWN<s}}G-L}Nj(ZemE=>OMcRjheqfWBDOiq%h-D#i5S zYFSE?(N|72Ij5JJonuvbITkgCb#V@#=k!rCvaPBrJI=lqsEL%AqV4s*<*0wN>gHcK4B3j##xkNSxdWReLgDRexZ8eX*%Ik4>o9GEb?6#M%9( z;%vj?W0kJDUoS~Vit3c2x}>OuDXQl|r52^AL{`270s2$kiIg|(LMv~sQ3AJ`2iAt9c5UtkxUfOrJWhI%*&$;=VD9$tnEkj{Kj&w|VucMfC@ zA(=l$=8}-P1bz&Skj6U&*T4&K4*F1Y@?i&@fgb$sFb8(PS?Ix7)O>grj)9C#m9dF3 zHciH<$T$UgPA&P8XBKkLTJB%T{R_E%m456QP)T~7B&mwzY5t4TYZaH%;O}_39}rYE zeO>ityASk*e$XEV!0kXUO$~x%7z|wNsyl$%sqTcK@D)gfVQ?3G6^6qINQ05!gi(+V zqag#vz*x8&#=$)>9=-+>;9i&rUx!SX1X(Z{rodE?%k=v|w9bMBunE2g=Rx$&gGx96 zqWKjVh}I<_x*vg4&1uOl;wA-@>@muy_)=HNq)B?zg>{=>rzi-9IK3Fl<{m*S7cm^jBSwT+frZTIhQ=^ zko()!5qkk0w{#JPz0C=H6QK= zY$f)Bh6{_K7?!|NSO&{s1(X10RFy$FRKQAD1*@SFs-PNbpcWp0HLw=y;6Ydi55dFm z2&{((VWHIDcV`Pcd}ZocAHhs z`tgPoud^o0*C!KuC1+kv;fe$<7j?^PvD55I`A#KgI4w1z*^|3;N>EBf9Cwk`i?z>cy!5Eq07?trD ziLXnCXXFraax@=XD4Tzv$M0Sv%B*)XD?@Or^RJ; z#kk^JU0mH<{apQBgIxEzGF^*Yi(SR8b1pe^D3?^4#|F~gCwV|}tTW!(#o5i7;7oKT zIb%uR*IWx-zjggC;8exNywK?|zY(@P-oqJ^+#UP#Uz!BF(dqK!;IKwlcZ*74`z3>~ zdInGT+|jnXHds_0p#@o_&bYVTYcV&{4y5`04&2MXby;s)HSihxXX>2&=Y`3!caQvi z{Or2lbRN8MR+l{+S9C95UY&6Dk-FQ4ePvdUPtrc<**jxMQujVdeeRe%tj~j)D+eB_ ze|(Uw>h-~Urv7B`7vmnf!?j_{op;~!Olr~tqlZ0SRWt1NwQmnQH0rLe_AdBf__nFP z9Pvf{wzN5&2aViNIB{e}^2w1;&ilj2pD&o^yrXW#s9oLONUxr9X!IAVa!g(BrZIQL zJU{laz6s+V8UEI|+({cIzI^XbCnn7NMpjM9(8>QWW&h+)hUZSXIPkHlU9JByb;i7r z(^fzB-_u@A9DLt|%8#>M{flO9ojZ0`=55<&9k*P`eXW1byb&|rEf`VyN#Usp8ATtA zTv;?b|J`}%byx2{v!Zxm#_+twe<(Y=_k@BizXUegCWaHeGH1Fx@pX-%KF zb!)G7FMjBUz2nw*DgL+hugCY@c&TRN#*b#EJ-YDW#r5A!* Dict[str, Any]: + """ + 发送HTTP请求,支持重试机制 + + Args: + method: HTTP方法 + url: 请求URL + data: 请求数据 + headers: 请求头 + timeout: 超时时间 + retries: 重试次数(可选,默认使用配置中的重试次数) + retry_interval: 重试间隔(秒,默认1秒) + **kwargs: 其他参数 + + Returns: + Dict[str, Any]: 响应数据 + """ + + if headers is None: + headers = {} + # 重试逻辑 + for attempt in range(retries + 1): + try: + if method.upper() == 'GET': + response = self._session.get( + url, headers=headers, timeout=timeout, **kwargs + ) + elif method.upper() == 'POST': + response = self._session.post( + url, json=data, headers=headers, timeout=timeout, **kwargs + ) + else: + raise ValueError(f"不支持的HTTP方法: {method}") + + # 检查响应状态 + response.raise_for_status() + + # 解析JSON响应 + return response.json() + + except exception as e: + # 如果是最后一次尝试,直接抛出异常 + if attempt == retries: + print(f"请求失败(第{attempt + 1}次尝试): {e}") + raise + # 打印重试信息 + print(f"请求失败(第{attempt + 1}次尝试),{retry_interval}秒后重试: {e}") + # 等待重试间隔 + time.sleep(retry_interval) + + def get(self, url: str, headers: Dict[str, str] = None, + timeout: int = None, retries: int = 0, retry_interval: float = 1.0, + **kwargs) -> Dict[str, Any]: + """ + GET请求 + + Args: + url: 请求URL + headers: 请求头 + timeout: 超时时间 + **kwargs: 其他参数 + + Returns: + Dict[str, Any]: 响应数据 + """ + return self.request('GET', url, headers=headers, timeout=timeout, retries=retries, retry_interval=retry_interval, **kwargs) + + def post(self, url: str, data: Dict[str, Any] = None, + headers: Dict[str, str] = None, timeout: int = None, + retries: int = 0, retry_interval: float = 1.0, + **kwargs) -> Dict[str, Any]: + """ + POST请求 + + Args: + url: 请求URL + data: 请求数据 + headers: 请求头 + timeout: 超时时间 + retries: 重试次数(可选,默认使用配置中的重试次数) + retry_interval: 重试间隔(秒,默认1秒) + **kwargs: 其他参数 + + Returns: + Dict[str, Any]: 响应数据 + """ + return self.request('POST', url, data=data, headers=headers, timeout=timeout, retries=retries, retry_interval=retry_interval, **kwargs) + + +class ApiHttpClient(BaseHttpClient): + """API客户端 - 业务API调用,整合认证和单例功能""" + + def __init__(self): + """初始化API客户端""" + """初始化API客户端""" + super().__init__() + # 认证缓存 + self._auth_cache = { + 'app_id': None, + 'expire_time': None, + 'sign_token': None, + 'zr_jwt': None + } + self._cache_lock = threading.RLock() + + @property + def settings(self): + """获取配置对象,由子类实现""" + if self._settings is None: + self._settings = ini_manager + return self._settings + + + def login(self) -> bool: + """ + 用户登录获取AppID + + Args: + url: 登录URL(可选,默认使用配置中的URL) + login_model: 登录请求模型(可选,默认使用配置中的模型) + + Returns: + bool: 登录是否成功 + """ + url = self.settings.api_login_url + login_model = self.settings.api_login_model + + print("开始登录...") + try: + # 发送登录请求 + response_data = self.request( + method='POST', + url=url, + data=login_model, + timeout=self.settings.api_timeout + ) + + # 解析登录响应 + login_response = LoginResponse(**response_data) + + if login_response.Code != 200: + error_msg = login_response.Message or "登录失败" + print(f"获取AppID失败: {error_msg}") + return False + + # 更新认证缓存 + with self._cache_lock: + self._auth_cache.update({ + 'app_id': login_response.app_id, + 'expire_time': login_response.expire_time, + 'sign_token': login_response.sign_token, + 'zr_jwt': login_response.zr_jwt + }) + + print(f"成功获取AppID: {self._auth_cache['app_id']}") + print(f"过期时间: {self._auth_cache['expire_time']}") + return True + + except Exception as e: + print(f"登录过程中出现异常: {e}") + self._clear_auth_cache() + return False + + def is_app_id_valid(self) -> bool: + """检查AppID是否有效""" + with self._cache_lock: + expire_time = self._auth_cache.get('expire_time') + + if not expire_time: + return False + + # 检查是否过期(提前12小时过期,避免临界情况) + try: + expire_timestamp = time.mktime(time.strptime(expire_time, '%Y-%m-%d %H:%M:%S')) + is_valid = time.time() < expire_timestamp - self.settings.api_auth_timeout + if not is_valid: + print("认证信息已过期") + return is_valid + except (ValueError, TypeError): + print("日期格式不正确") + return False + + def _get_auth_headers(self) -> Dict[str, str]: + """获取认证头信息""" + with self._cache_lock: + app_id = self._auth_cache.get('app_id') + + headers = { + 'AppID': app_id, + 'Content-Type': 'application/json' + } + + return headers + + + def get(self, url: str, auth: bool = True, **kwargs) -> Dict[str, Any]: + """ + GET请求(支持认证检查) + + Args: + url: 请求URL + auth: 是否需要认证 + timeout: 超时时间 + retries: 重试次数 + retry_interval: 重试间隔 + **kwargs: 其他参数 + + Returns: + Dict[str, Any]: 响应数据 + """ + if auth: + if not self.is_app_id_valid(): + self.login() + if not self.is_app_id_valid(): + raise Exception("登录失败,无法获取有效AppID") + auth_headers = self._get_auth_headers() + return self.request(method='GET', url=url, headers=auth_headers, timeout=self.settings.api_timeout, + retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs) + else: + return self.request(method='GET', url=url, timeout=self.settings.api_timeout, + retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs) + + def post(self, url: str, data: Dict[str, Any] = None, auth: bool = True,**kwargs) -> Dict[str, Any]: + """ + POST请求(支持认证检查) + + Args: + url: 请求URL + data: 请求数据 + auth: 是否需要认证 + timeout: 超时时间 + retries: 重试次数 + retry_interval: 重试间隔 + **kwargs: 其他参数 + + Returns: + Dict[str, Any]: 响应数据 + """ + if auth: + if not self.is_app_id_valid(): + self.login() + if not self.is_app_id_valid(): + raise Exception("登录失败,无法获取有效AppID") + auth_headers = self._get_auth_headers() + return self.request(method='POST', url=url, data=data, headers=auth_headers, timeout=self.settings.api_timeout, + retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs) + else: + return self.request(method='POST', url=url, data=data, timeout=self.settings.api_timeout, + retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs) + + def _clear_auth_cache(self): + """清除认证缓存""" + with self._cache_lock: + self._auth_cache.clear() + # 重新初始化必要的键 + self._auth_cache.update({ + 'app_id': None, + 'expire_time': None, + 'sign_token': None, + 'zr_jwt': None + }) + print("认证缓存已清除") + + +api_http_client = ApiHttpClient() \ No newline at end of file diff --git a/service/mould_service.py b/service/mould_service.py new file mode 100644 index 0000000..115f7ee --- /dev/null +++ b/service/mould_service.py @@ -0,0 +1,158 @@ +from datetime import datetime, timedelta +from common.sqlite_handler import SQLiteHandler +from typing import Optional, List +from api_http_client import api_http_client +from busisness.models import ArtifactInfo, TaskInfo, LoginRequest +from config.ini_manager import ini_manager + + +class MouldService: + """模具服务类,提供模具相关的API调用""" + + def __init__(self): + """初始化模具服务""" + self._api_client = api_http_client + self._host = ini_manager.api_base_url + + def get_task_info(self, task_id: str) -> Optional[TaskInfo]: + """ + 获取任务单信息 + + Args: + task_id: 任务单编号 + + Returns: + 任务单信息对象,如果失败返回None + """ + url = f"{self._host}/api/ext/artifact/task?TaskId={task_id}" + + try: + # 调用API获取数据 + response_data = self._api_client.get(url, auth=True) + + # 检查响应状态 + if response_data.get('Code') != 200: + print(f"获取任务单信息失败: {response_data.get('Message')}") + return None + + # 解析数据 + data = response_data.get('Data', {}) + if not data: + print(f"未获取到任务单 {task_id} 的信息") + return None + + # 转换为任务单信息对象 + task_info = TaskInfo(**data) + return task_info + + except Exception as e: + print(f"请求任务单信息异常: {e}") + return None + + def get_not_pour_artifacts(self) -> Optional[List[ArtifactInfo]]: + """ + 获取已入模绑定未浇筑的管片信息 + + Returns: + 未浇筑管片列表,如果失败返回None + """ + url = f"{self._host}/api/ext/artifact/not_pour" + + try: + # 调用API获取数据 + response_data = self._api_client.get(url, auth=True) + + # 检查响应状态 + if response_data.get('Code') != 200: + print(f"获取未浇筑管片信息失败: {response_data.get('Message')}") + return None + + # 解析数据 + data_list = response_data.get('Data', []) + if not data_list: + print("当前没有未浇筑的管片") + return [] + + # 转换为管片信息对象列表 + artifacts = [ArtifactInfo(**item) for item in data_list] + return artifacts + + except Exception as e: + print(f"请求未浇筑管片信息异常: {e}") + return None + + + +if __name__ == "__main__": + # 创建模具服务实例 + mould_service = MouldService() + db = SQLiteHandler.get_instance("db/three.db", max_readers=50, busy_timeout=4000) + # 测试获取未浇筑管片信息 + not_poured = mould_service.get_not_pour_artifacts() + if not_poured: + for item in not_poured: + artifact = db.fetch_one("SELECT * FROM ArtifactTask WHERE ArtifactID = ?", (item.ArtifactID,)) + if not artifact: + dict={ + "ArtifactID": item.ArtifactID, + "ArtifactActionID": item.ArtifactActionID, + "ArtifactIDVice1": item.ArtifactIDVice1, + "ProduceRingNumber": item.ProduceRingNumber, + "MouldCode": item.MouldCode, + "SkeletonID": item.SkeletonID, + "RingTypeCode": item.RingTypeCode, + "SizeSpecification": item.SizeSpecification, + "BuriedDepth": item.BuriedDepth, + "BlockNumber": item.BlockNumber, + "BetonVolume": item.BetonVolume, + "BetonTaskID": item.BetonTaskID, + "HoleRingMarking": item.HoleRingMarking, + "GroutingPipeMarking": item.GroutingPipeMarking, + "PolypropyleneFiberMarking": item.PolypropyleneFiberMarking, + "Status": 1, + "Source": 1 + } + db.insert("ArtifactTask", dict) + + dict={ + "TaskID": item.BetonTaskID, + "ProjectName": "上海市轨道交通19号线工程盾构区间管片生产2标", + "ProduceMixID": "20251030-02", + "VinNo": "", + "BetonVolume": item.BetonVolume, + "MouldCode": item.MouldCode, + "SkeletonID": item.SkeletonID, + "RingTypeCode": item.RingTypeCode, + "SizeSpecification": item.SizeSpecification, + "BuriedDepth": item.BuriedDepth, + "BlockNumber": item.BlockNumber, + "Mode": 1, + "Status": 1, + "Source": 1, + "OptTime": str(datetime.now() - timedelta(minutes=5)), + "CreateTime": str(datetime.now()) + } + db.insert("PDRecord", dict) + # for i in range(2, 5): + # row = db.fetch_one("SELECT * FROM ArtifactTask WHERE ID = ?", (i,)) + # if row: + # dict={ + # "TaskID": row["BetonTaskID"], + # "ProjectName": "上海市轨道交通19号线工程盾构区间管片生产2标", + # "ProduceMixID": "20251030-02", + # "VinNo": "", + # "BetonVolume": row["BetonVolume"], + # "MouldCode": row["MouldCode"], + # "SkeletonID": row["SkeletonID"], + # "RingTypeCode": row["RingTypeCode"], + # "SizeSpecification": row["SizeSpecification"], + # "BuriedDepth": row["BuriedDepth"], + # "BlockNumber": row["BlockNumber"], + # "Mode": 1, + # "Status": 1, + # "Source": 1, + # "OptTime": str(datetime.now() - timedelta(minutes=5)), + # "CreateTime": str(datetime.now()) + # } + # db.insert("PDRecord", dict) + diff --git a/settings.ini b/settings.ini new file mode 100644 index 0000000..c491aa3 --- /dev/null +++ b/settings.ini @@ -0,0 +1,12 @@ +[api] +max_retries = 3 +retry_interval = 1.0 +timeout = 30.0 +auth_timeout = 43200 +base_url = https://www.shnthy.com:9154 +login_model = {"Program": 11, "SC": "1000000001", "loginName": "leduser", "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d"} + +[app] +log_path = logs/app.log +db_path = db/three.db +