Files
Feeding_control_system/services/task_service.py
2025-11-12 16:49:40 +08:00

367 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""任务处理服务"""
from datetime import datetime
from API.client import APIClient
from database.access_db import AccessDB
from database.sql_server import SQLServerDB
from config.settings import ACCESS_DB_PATH, ACCESS_DB_PASSWORD
from utils.helpers import get_f_block_positions
class TaskService:
def __init__(self):
self.api_client = APIClient()
self.half_volume = [0, 0]
self.task_before = None
self.artifact_timestamps = {}
def process_not_pour_info(self):
"""处理未浇筑信息"""
artifact_list = self.api_client.get_not_pour_info()
if not artifact_list:
return [], [], [], self.half_volume
# 处理F块信息
f_blocks_info = self._process_f_blocks(artifact_list)
f_blocks = f_blocks_info["f_blocks"]
f_block_count = f_blocks_info["f_block_count"]
total_f_volume = f_blocks_info["total_f_volume"]
f_positions = f_blocks_info["f_positions"]
# 处理当前任务
current_task = self._process_current_task(artifact_list[0])
# 更新上一个任务信息
self.task_before = {
"beton_task_id": current_task["beton_task_id"],
"beton_volume": current_task["beton_volume"],
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"]
}
# 根据F块情况处理任务
task_result = self._handle_tasks_by_f_blocks(
f_block_count, f_positions, current_task,
f_blocks, total_f_volume, artifact_list
)
return task_result
def _process_f_blocks(self, artifact_list):
"""处理F块相关信息"""
f_blocks = [artifact for artifact in artifact_list if artifact.get("BlockNumber") == "F"]
f_block_count = len(f_blocks)
total_f_volume = sum(artifact["BetonVolume"] for artifact in f_blocks)
f_positions = get_f_block_positions(artifact_list)
return {
"f_blocks": f_blocks,
"f_block_count": f_block_count,
"total_f_volume": total_f_volume,
"f_positions": f_positions
}
def _process_current_task(self, latest_artifact):
"""处理当前任务信息"""
task_data = self.api_client.get_task_info(latest_artifact["BetonTaskID"])
return {
"beton_task_id": latest_artifact["BetonTaskID"],
"beton_volume": latest_artifact["BetonVolume"],
"artifact_id": latest_artifact["ArtifactActionID"],
"block_number": latest_artifact.get("BlockNumber", ""),
"task_data": task_data
}
def _handle_tasks_by_f_blocks(self, f_block_count, f_positions, current_task,
f_blocks, total_f_volume, artifact_list):
"""根据F块数量和位置处理任务"""
# 多个F块情况
if f_block_count > 2:
return self._handle_multiple_f_blocks(current_task, total_f_volume, artifact_list)
# 两个F块情况
elif f_block_count == 2:
return self._handle_two_f_blocks(f_positions, current_task, total_f_volume, artifact_list)
# 一个F块情况
elif f_block_count == 1:
return self._handle_single_f_block(f_positions, current_task, f_blocks,
total_f_volume, artifact_list)
# 无F块情况
elif f_block_count == 0:
return self._handle_no_f_blocks(current_task, artifact_list)
else:
print("报警")
return [], [], [], self.half_volume
def _handle_multiple_f_blocks(self, current_task, total_f_volume, artifact_list):
"""处理多个F块的情况"""
adjusted_volume = total_f_volume - self.half_volume[0]
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": self.artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
self._append_additional_tasks(send_list, artifact_list, [0, 0])
# 更新时间戳
self._update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, self.half_volume
def _handle_two_f_blocks(self, f_positions, current_task, total_f_volume, artifact_list):
"""处理两个F块的情况"""
if f_positions == [0, 1] and self.task_before.get("block_number") == "F":
adjusted_volume = 0
block_number = "补方"
else:
adjusted_volume = total_f_volume - self.half_volume[0]
block_number = current_task["block_number"]
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": self.artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
volumes = [adjusted_volume,
artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0] if f_positions == [0,
1] and self.task_before.get(
"block_number") == "F" else [0, 0]
self._append_additional_tasks(send_list, artifact_list, volumes)
# 更新时间戳
self._update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, self.half_volume
def _handle_single_f_block(self, f_positions, current_task, f_blocks, total_f_volume, artifact_list):
"""处理单个F块的情况"""
if f_positions == [2]:
f_volume = f_blocks[0].get("BetonVolume") if f_blocks else 0
self.half_volume[0] = round(total_f_volume / 2, 2)
self.half_volume[1] = f_volume - self.half_volume[0]
adjusted_volume = current_task["beton_volume"] + self.half_volume[0]
elif f_positions == [1]:
adjusted_volume = current_task["beton_volume"] + self.half_volume[1]
elif f_positions == [0]:
adjusted_volume = 0
else:
adjusted_volume = current_task["beton_volume"]
block_number = "补方" if f_positions == [0] else current_task["block_number"]
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": self.artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
self._append_additional_tasks_for_single_f(send_list, artifact_list, f_positions)
# 更新时间戳
self._update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, self.half_volume
def _handle_no_f_blocks(self, current_task, artifact_list):
"""处理无F块的情况"""
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": current_task["beton_volume"],
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": current_task["beton_volume"],
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": self.artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
self._append_additional_tasks(send_list, artifact_list,
[artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0,
artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0])
# 更新时间戳
self._update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, self.half_volume
def _append_additional_tasks(self, send_list, artifact_list, volumes):
"""添加额外的任务到发送列表"""
# 安全获取后续任务
second_task = artifact_list[1] if len(artifact_list) > 1 else None
third_task = artifact_list[2] if len(artifact_list) > 2 else None
if second_task:
task_data_second = self.api_client.get_task_info(second_task["BetonTaskID"])
send_list.append({
"beton_task_id": second_task["BetonTaskID"],
"beton_volume": volumes[0],
"artifact_id": second_task["ArtifactActionID"],
"block_number": second_task["BlockNumber"],
"beton_grade": task_data_second["BetonGrade"],
"mix_id": task_data_second["MixID"],
"time": self.artifact_timestamps.get(second_task["ArtifactActionID"], datetime.now())
})
if third_task:
task_data_third = self.api_client.get_task_info(third_task["BetonTaskID"])
send_list.append({
"beton_task_id": third_task["BetonTaskID"],
"beton_volume": volumes[1],
"artifact_id": third_task["ArtifactActionID"],
"block_number": third_task["BlockNumber"],
"beton_grade": task_data_third["BetonGrade"],
"mix_id": task_data_third["MixID"],
"time": self.artifact_timestamps.get(third_task["ArtifactActionID"], datetime.now())
})
def _append_additional_tasks_for_single_f(self, send_list, artifact_list, f_positions):
"""为单个F块情况添加额外任务"""
second_task = artifact_list[1] if len(artifact_list) > 1 else None
third_task = artifact_list[2] if len(artifact_list) > 2 else None
if second_task:
task_data_second = self.api_client.get_task_info(second_task["BetonTaskID"])
volume = (third_task["BetonVolume"] if third_task else 0) if f_positions != [2] else (
second_task["BetonVolume"] + self.half_volume[1])
send_list.append({
"beton_task_id": second_task["BetonTaskID"],
"beton_volume": volume,
"artifact_id": second_task["ArtifactActionID"],
"block_number": second_task["BlockNumber"],
"beton_grade": task_data_second["BetonGrade"],
"mix_id": task_data_second["MixID"],
"time": self.artifact_timestamps.get(second_task["ArtifactActionID"], datetime.now())
})
if third_task:
task_data_third = self.api_client.get_task_info(third_task["BetonTaskID"])
volume = third_task["BetonVolume"] if f_positions in [[1], [0]] else 0
send_list.append({
"beton_task_id": third_task["BetonTaskID"],
"beton_volume": volume,
"artifact_id": third_task["ArtifactActionID"],
"block_number": third_task["BlockNumber"],
"beton_grade": task_data_third["BetonGrade"],
"mix_id": task_data_third["MixID"],
"time": self.artifact_timestamps.get(third_task["ArtifactActionID"], datetime.now())
})
def _update_artifact_timestamps(self, send_list):
"""更新artifact时间戳"""
current_artifact_ids = {item["artifact_id"] for item in send_list}
for artifact_id in current_artifact_ids:
if artifact_id not in self.artifact_timestamps:
self.artifact_timestamps[artifact_id] = datetime.now()
def insert_into_produce_table(self, connection, task_info, beton_volume, erp_id, artifact_id, status):
"""插入数据到Produce表"""
sql_db = SQLServerDB()
if status == "1":
# 准备插入数据
insert_data = {
"ErpID": erp_id,
"Code": task_info["TaskID"],
"DatTim": datetime.now(),
"Recipe": task_info["ProduceMixID"],
"MorRec": "",
"ProdMete": beton_volume,
"MorMete": 0.0, # 砂浆方量,根据实际需求填写
"TotVehs": 0, # 累计车次,根据实际需求填写
"TotMete": task_info["PlannedVolume"], # 累计方量
"Qualitor": "", # 质检员,根据实际需求填写
"Acceptor": "", # 现场验收,根据实际需求填写
"Attamper": "", # 调度员,根据实际需求填写
"Flag": "1", # 标识,根据实际需求填写
"Note": "" # 备注,根据实际需求填写
}
sql_db.insert_produce_data(insert_data)
print(f"数据已成功插入到Produce表中ERP ID: {erp_id}")
# 调用同事提供的保存函数,将数据保存到自定义数据表
try:
# 假设同事提供的函数名为 save_to_custom_table
# 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量
save_to_custom_table(
misid=erp_id,
flag="1", # 初始Flag值
task_id=task_info["TaskID"],
produce_mix_id=task_info["ProduceMixID"],
project_name=task_info["ProjectName"],
beton_grade=task_info["BetonGrade"],
adjusted_volume=beton_volume,
artifact_id=artifact_id
# 已经调整后的方量
)
print(f"任务 {erp_id} 的数据已保存到自定义数据表")
except Exception as e:
print(f"调用保存函数时出错: {e}")
return erp_id
else:
try:
# 假设同事提供的函数名为 save_to_custom_table
# 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量
save_to_custom_table(
misid=erp_id,
flag="1", # 初始Flag值
task_id=task_info["TaskID"],
produce_mix_id=task_info["ProduceMixID"],
project_name=task_info["ProjectName"],
beton_grade=task_info["BetonGrade"],
adjusted_volume=beton_volume,
artifact_id=artifact_id
)
print(f"任务 {erp_id} 的数据已保存到自定义数据表")
except Exception as e:
print(f"调用保存函数时出错: {e}")