206 lines
8.2 KiB
Python
206 lines
8.2 KiB
Python
from dataclasses import fields
|
||
from typing import List, Optional, Dict, Any
|
||
from datetime import datetime
|
||
from models import ArtifactInfoModel,PDRecordModel
|
||
from common.sqlite_handler import SQLiteHandler
|
||
|
||
|
||
def filter_dict_for_model(data_dict: Dict[str, Any], model_class) -> Dict[str, Any]:
|
||
"""过滤字典,只保留模型中定义的字段"""
|
||
# 获取模型中定义的所有字段名称
|
||
model_fields = {field.name for field in fields(model_class)}
|
||
# 过滤字典,只保留模型中存在的字段
|
||
return {k: v for k, v in data_dict.items() if k in model_fields}
|
||
|
||
class BaseDal:
|
||
def __init__(self) -> None:
|
||
"""初始化数据访问层,创建数据库连接"""
|
||
# 假设数据库文件在db目录下
|
||
self.db_dao = SQLiteHandler.get_instance("db/three.db", max_readers=50, busy_timeout=4000)
|
||
|
||
class ArtifactDal(BaseDal):
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
def get_all(self) -> List[ArtifactInfoModel]:
|
||
"""获取所有ArtifactTask记录"""
|
||
try:
|
||
# 查询所有记录
|
||
sql = "SELECT * FROM ArtifactTask"
|
||
results = self.db_dao.execute_read(sql)
|
||
|
||
# 将查询结果转换为ArtifactInfo对象列表
|
||
artifacts = []
|
||
for row in results:
|
||
# 过滤字典,只保留模型中定义的字段
|
||
filtered_data = filter_dict_for_model(dict(row), ArtifactInfoModel)
|
||
artifact = ArtifactInfoModel(**filtered_data)
|
||
artifacts.append(artifact)
|
||
|
||
return artifacts
|
||
except Exception as e:
|
||
print(f"获取所有构件任务失败: {e}")
|
||
return []
|
||
|
||
def get_top_artifact(self, top: int,desc:str="ArtifactID asc",where:str="1=1") -> List[ArtifactInfoModel]:
|
||
"""获取top条数数据,根据ArtifactID升序"""
|
||
try:
|
||
# 确保top为正整数
|
||
if not isinstance(top, int) or top <= 0:
|
||
raise ValueError("top参数必须是正整数")
|
||
|
||
# 查询指定数量的记录,按ArtifactID升序排列
|
||
sql = f"SELECT * FROM ArtifactTask WHERE {where} ORDER BY {desc} LIMIT ?"
|
||
results = self.db_dao.execute_read(sql, (top,))
|
||
|
||
# 将所有查询结果转换为ArtifactInfoModel对象列表
|
||
artifacts = []
|
||
for row in results:
|
||
# 保证row的变量和模板变量一致
|
||
artifact = ArtifactInfoModel()
|
||
artifact.ArtifactID=row["ArtifactID"]
|
||
artifact.ProduceRingNumber=row["ProduceRingNumber"]
|
||
artifact.MouldCode=row["MouldCode"]
|
||
artifact.SkeletonID=row["SkeletonID"]
|
||
artifact.RingTypeCode=row["RingTypeCode"]
|
||
artifact.SizeSpecification=row["SizeSpecification"]
|
||
artifact.BuriedDepth=row["BuriedDepth"]
|
||
artifact.BlockNumber=row["BlockNumber"]
|
||
artifact.BetonVolume=row["BetonVolume"]
|
||
artifact.BetonTaskID=row["BetonTaskID"]
|
||
artifact.Status=row["Status"]
|
||
artifact.BeginTime=row["BeginTime"]
|
||
artifacts.append(artifact)
|
||
|
||
return artifacts
|
||
except Exception as e:
|
||
print(f"获取top构件任务失败: {e}")
|
||
return []
|
||
|
||
|
||
def get_by_id(self, artifact_id: int) -> Optional[ArtifactInfoModel]:
|
||
"""根据构件ID获取构件任务"""
|
||
try:
|
||
sql = "SELECT * FROM ArtifactTask WHERE ArtifactID = ?"
|
||
results = self.db_dao.execute_read(sql, (artifact_id,))
|
||
|
||
rows = list(results)
|
||
if not rows:
|
||
return None
|
||
|
||
return ArtifactInfoModel(**dict(rows[0]))
|
||
except Exception as e:
|
||
print(f"根据ID获取构件任务失败: {e}")
|
||
return None
|
||
|
||
|
||
def insert_artifact(self, artifact_data: dict) -> Optional[int]:
|
||
"""插入一条构件任务记录"""
|
||
try:
|
||
# 使用insert方法插入数据
|
||
row_id = self.db_dao.insert("ArtifactTask", artifact_data)
|
||
return row_id
|
||
except Exception as e:
|
||
print(f"插入构件任务失败: {e}")
|
||
return None
|
||
|
||
def update_artifact(self, artifact_id: int, update_data: dict) -> bool:
|
||
"""更新构件任务记录"""
|
||
try:
|
||
# 构建WHERE条件
|
||
where_condition = {"ArtifactID": artifact_id}
|
||
# 使用update方法更新数据
|
||
affected_rows = self.db_dao.update("ArtifactTask", update_data, where_condition)
|
||
return affected_rows > 0
|
||
except Exception as e:
|
||
print(f"更新构件任务失败: {e}")
|
||
return False
|
||
|
||
def validate_artifact(self, artifact_info: ArtifactInfoModel) -> bool:
|
||
"""验证构件信息是否符合业务规则"""
|
||
try:
|
||
# 检查必要字段
|
||
if not hasattr(artifact_info, 'Name') or not artifact_info.Name:
|
||
return False
|
||
|
||
# 可以添加更多业务规则验证
|
||
# 例如:检查构件编号格式、验证日期等
|
||
|
||
return True
|
||
except Exception as e:
|
||
print(f"验证构件信息失败: {e}")
|
||
return False
|
||
|
||
class PDRecordDal(BaseDal):
|
||
def __init__(self):
|
||
super().__init__()
|
||
|
||
def get_top_pd(self, top: int,desc:str="ID desc",where:str="1=1") -> List[PDRecordModel]:
|
||
"""获取top条数数据,根据ID降序"""
|
||
try:
|
||
# 确保top为正整数
|
||
if not isinstance(top, int) or top <= 0:
|
||
raise ValueError("top参数必须是正整数")
|
||
|
||
# 查询指定数量的记录,按ID降序排列
|
||
sql = f"SELECT * FROM PDRecord WHERE {where} ORDER BY {desc} LIMIT ?"
|
||
results = self.db_dao.execute_read(sql, (top,))
|
||
|
||
pdrecords = []
|
||
for row in results:
|
||
#
|
||
pdrecord = PDRecordModel()
|
||
pdrecord.ID=row["ID"]
|
||
pdrecord.TaskID=row["TaskID"]
|
||
pdrecord.ProjectName=row["ProjectName"]
|
||
pdrecord.ProduceMixID=row["ProduceMixID"]
|
||
pdrecord.VinNo=row["VinNo"]
|
||
pdrecord.BetonVolume=row["BetonVolume"]
|
||
pdrecord.Status=row["Status"]
|
||
pdrecord.OptTime=row["OptTime"]
|
||
pdrecords.append(pdrecord)
|
||
|
||
return pdrecords
|
||
except Exception as e:
|
||
print(f"获取top PD官片任务失败: {e}")
|
||
return []
|
||
|
||
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
artifact_dal = ArtifactDal()
|
||
|
||
artifacts = artifact_dal.get_artifact_task()
|
||
|
||
# 显示获取到的数据
|
||
print(f"获取到 {len(artifacts)} 条构件任务数据:")
|
||
for i, artifact in enumerate(artifacts, 1):
|
||
print(f"\n记录 {i}:")
|
||
print(f"ID: {artifact.ID}")
|
||
print(f"ArtifactID: {artifact.ArtifactID}")
|
||
print(f"ProduceRingNumber: {artifact.ProduceRingNumber}")
|
||
print(f"MouldCode: {artifact.MouldCode}")
|
||
print(f"SkeletonID: {artifact.SkeletonID}")
|
||
print(f"RingTypeCode: {artifact.RingTypeCode}")
|
||
print(f"SizeSpecification: {artifact.SizeSpecification}")
|
||
print(f"BuriedDepth: {artifact.BuriedDepth}")
|
||
print(f"BlockNumber: {artifact.BlockNumber}")
|
||
print(f"HoleRingMarking: {artifact.HoleRingMarking}")
|
||
print(f"GroutingPipeMarking: {artifact.GroutingPipeMarking}")
|
||
print(f"PolypropyleneFiberMarking: {artifact.PolypropyleneFiberMarking}")
|
||
print(f"BetonVolume: {artifact.BetonVolume}")
|
||
print(f"BetonTaskID: {artifact.BetonTaskID}")
|
||
print(f"FK_PDID: {artifact.FK_PDID}")
|
||
print(f"Status: {artifact.Status}")
|
||
print(f"BeginTime: {artifact.BeginTime}")
|
||
print(f"EndTime: {artifact.EndTime}")
|
||
print(f"PStatus: {artifact.PStatus}")
|
||
print(f"Source: {artifact.Source}")
|
||
print(f"FBetonVolume: {artifact.FBetonVolume}")
|
||
print(f"OptTime: {artifact.OptTime}")
|
||
|
||
# 可以打印更多字段...
|
||
|
||
print('\ntest success') |