Files
Feeding_control_system/busisness/dals.py
2025-10-31 14:30:42 +08:00

204 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from dataclasses import fields
from typing import List, Optional, Dict, Any
from datetime import datetime
from models import ArtifactInfoModel,PDRecordModel
from common.sqlite_handler import SQLiteHandler
def filter_dict_for_model(data_dict: Dict[str, Any], model_class) -> Dict[str, Any]:
"""过滤字典,只保留模型中定义的字段"""
# 获取模型中定义的所有字段名称
model_fields = {field.name for field in fields(model_class)}
# 过滤字典,只保留模型中存在的字段
return {k: v for k, v in data_dict.items() if k in model_fields}
class BaseDal:
def __init__(self) -> None:
"""初始化数据访问层,创建数据库连接"""
# 假设数据库文件在db目录下
self.db_dao = SQLiteHandler.get_instance("db/three.db", max_readers=50, busy_timeout=4000)
class ArtifactDal(BaseDal):
def __init__(self):
super().__init__()
def get_all(self) -> List[ArtifactInfoModel]:
"""获取所有ArtifactTask记录"""
try:
# 查询所有记录
sql = "SELECT * FROM ArtifactTask"
results = self.db_dao.execute_read(sql)
# 将查询结果转换为ArtifactInfo对象列表
artifacts = []
for row in results:
# 过滤字典,只保留模型中定义的字段
filtered_data = filter_dict_for_model(dict(row), ArtifactInfoModel)
artifact = ArtifactInfoModel(**filtered_data)
artifacts.append(artifact)
return artifacts
except Exception as e:
print(f"获取所有构件任务失败: {e}")
return []
def get_top_artifact(self, top: int,desc:str="ArtifactID asc",where:str="1=1") -> List[ArtifactInfoModel]:
"""获取top条数数据根据ArtifactID升序"""
try:
# 确保top为正整数
if not isinstance(top, int) or top <= 0:
raise ValueError("top参数必须是正整数")
# 查询指定数量的记录按ArtifactID升序排列
sql = f"SELECT * FROM ArtifactTask WHERE {where} ORDER BY {desc} LIMIT ?"
results = self.db_dao.execute_read(sql, (top,))
# 将所有查询结果转换为ArtifactInfoModel对象列表
artifacts = []
for row in results:
# 保证row的变量和模板变量一致
artifact = ArtifactInfoModel()
artifact.ArtifactID=row["ArtifactID"]
artifact.ProduceRingNumber=row["ProduceRingNumber"]
artifact.MouldCode=row["MouldCode"]
artifact.SkeletonID=row["SkeletonID"]
artifact.RingTypeCode=row["RingTypeCode"]
artifact.SizeSpecification=row["SizeSpecification"]
artifact.BuriedDepth=row["BuriedDepth"]
artifact.BlockNumber=row["BlockNumber"]
artifact.BetonVolume=row["BetonVolume"]
artifact.BetonTaskID=row["BetonTaskID"]
artifact.Status=row["Status"]
artifacts.append(artifact)
return artifacts
except Exception as e:
print(f"获取top构件任务失败: {e}")
return []
def get_by_id(self, artifact_id: int) -> Optional[ArtifactInfoModel]:
"""根据构件ID获取构件任务"""
try:
sql = "SELECT * FROM ArtifactTask WHERE ArtifactID = ?"
results = self.db_dao.execute_read(sql, (artifact_id,))
rows = list(results)
if not rows:
return None
return ArtifactInfoModel(**dict(rows[0]))
except Exception as e:
print(f"根据ID获取构件任务失败: {e}")
return None
def insert_artifact(self, artifact_data: dict) -> Optional[int]:
"""插入一条构件任务记录"""
try:
# 使用insert方法插入数据
row_id = self.db_dao.insert("ArtifactTask", artifact_data)
return row_id
except Exception as e:
print(f"插入构件任务失败: {e}")
return None
def update_artifact(self, artifact_id: int, update_data: dict) -> bool:
"""更新构件任务记录"""
try:
# 构建WHERE条件
where_condition = {"ArtifactID": artifact_id}
# 使用update方法更新数据
affected_rows = self.db_dao.update("ArtifactTask", update_data, where_condition)
return affected_rows > 0
except Exception as e:
print(f"更新构件任务失败: {e}")
return False
def validate_artifact(self, artifact_info: ArtifactInfoModel) -> bool:
"""验证构件信息是否符合业务规则"""
try:
# 检查必要字段
if not hasattr(artifact_info, 'Name') or not artifact_info.Name:
return False
# 可以添加更多业务规则验证
# 例如:检查构件编号格式、验证日期等
return True
except Exception as e:
print(f"验证构件信息失败: {e}")
return False
class PDRecordDal(BaseDal):
def __init__(self):
super().__init__()
def get_top_pd(self, top: int,desc:str="ID desc",where:str="1=1") -> List[PDRecordModel]:
"""获取top条数数据根据ID降序"""
try:
# 确保top为正整数
if not isinstance(top, int) or top <= 0:
raise ValueError("top参数必须是正整数")
# 查询指定数量的记录按ID降序排列
sql = f"SELECT * FROM PDRecord WHERE {where} ORDER BY {desc} LIMIT ?"
results = self.db_dao.execute_read(sql, (top,))
pdrecords = []
for row in results:
#
pdrecord = PDRecordModel()
pdrecord.ID=row["ID"]
pdrecord.TaskID=row["TaskID"]
pdrecord.ProjectName=row["ProjectName"]
pdrecord.ProduceMixID=row["ProduceMixID"]
pdrecord.VinNo=row["VinNo"]
pdrecord.BetonVolume=row["BetonVolume"]
pdrecord.Status=row["Status"]
pdrecord.OptTime=row["OptTime"]
pdrecords.append(pdrecord)
return pdrecords
except Exception as e:
print(f"获取top PD官片任务失败: {e}")
return []
if __name__ == "__main__":
artifact_dal = ArtifactDal()
artifacts = artifact_dal.get_artifact_task()
# 显示获取到的数据
print(f"获取到 {len(artifacts)} 条构件任务数据:")
for i, artifact in enumerate(artifacts, 1):
print(f"\n记录 {i}:")
print(f"ID: {artifact.ID}")
print(f"ArtifactID: {artifact.ArtifactID}")
print(f"ProduceRingNumber: {artifact.ProduceRingNumber}")
print(f"MouldCode: {artifact.MouldCode}")
print(f"SkeletonID: {artifact.SkeletonID}")
print(f"RingTypeCode: {artifact.RingTypeCode}")
print(f"SizeSpecification: {artifact.SizeSpecification}")
print(f"BuriedDepth: {artifact.BuriedDepth}")
print(f"BlockNumber: {artifact.BlockNumber}")
print(f"HoleRingMarking: {artifact.HoleRingMarking}")
print(f"GroutingPipeMarking: {artifact.GroutingPipeMarking}")
print(f"PolypropyleneFiberMarking: {artifact.PolypropyleneFiberMarking}")
print(f"BetonVolume: {artifact.BetonVolume}")
print(f"BetonTaskID: {artifact.BetonTaskID}")
print(f"FK_PDID: {artifact.FK_PDID}")
print(f"Status: {artifact.Status}")
print(f"BeginTime: {artifact.BeginTime}")
print(f"EndTime: {artifact.EndTime}")
print(f"PStatus: {artifact.PStatus}")
print(f"Source: {artifact.Source}")
print(f"FBetonVolume: {artifact.FBetonVolume}")
print(f"OptTime: {artifact.OptTime}")
# 可以打印更多字段...
print('\ntest success')