Files
Feeding_control_system/InsertData_list.py
2025-11-12 14:01:26 +08:00

899 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
import hashlib
import pyodbc
from datetime import datetime, timedelta
import time
import json
import threading
from tcp.server import TCPServer
# 配置信息
BASE_URL = "http://127.0.0.1:5000" # 外网地址"https://www.shnthy.com:9154"
LOGIN_URL = f"{BASE_URL}/api/user/perlogin"
MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9"
TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task"
NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour"
# 登录参数
LOGIN_DATA = {
"Program": 11,
"SC": "1000000001",
"loginName": "leduser",
"password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d"
}
# 全局变量用于存储需要监控的任务ERP ID
monitored_tasks = set() # 存储需要监控的erp_id
inserted_tasks = {} # {erp_id: artifact_id}
tasks_lock = threading.Lock()
artifact_timestamps = {} # 存储每个artifact_id的时间戳
# TCP服务端实例
tcp_server = TCPServer
task_before = None # 用于存储上一条任务信息
half_volume = [0, 0]
# 计算SHA256密码
def hash_password(password):
return password
LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"])
# 获取AppID
def get_app_id():
response = requests.post(LOGIN_URL, json=LOGIN_DATA)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
print(f"获取到AppID: {data['Data']['AppID']}")
return data["Data"]["AppID"]
raise Exception("登录失败无法获取AppID")
# 获取模具的管片信息并提取TaskID
def get_mould_info(app_id):
headers = {"AppID": app_id}
response = requests.get(MOULD_INFO_URL, headers=headers)
if response.status_code == 205:
data = response.json()
if data.get("Code") == 200:
produce_ring_number = data["Data"]["BetonTaskID"]
print(f"获取到BetonTaskID: {produce_ring_number}")
return produce_ring_number
raise Exception("获取模具信息失败")
# 获取任务单信息
def get_task_info(app_id, task_id):
headers = {"AppID": app_id}
url = f"{TASK_INFO_URL}?TaskId={task_id}"
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
task_data = data["Data"]
return task_data
raise Exception("获取任务单信息失败")
# 定期清理过期的时间戳记录
def cleanup_old_timestamps(current_artifact_ids=None, max_age_hours=24):
"""
清理过期的时间戳记录
Args:
current_artifact_ids: 当前活跃的artifact_id集合可选
max_age_hours: 时间戳最大保留时间小时默认24小时
"""
current_time = datetime.now()
expired_ids = []
# 遍历所有存储的时间戳记录
for artifact_id, timestamp in artifact_timestamps.items():
# 如果提供了当前活跃ID列表且该ID不在当前活跃列表中则检查是否过期
if current_artifact_ids is None or artifact_id not in current_artifact_ids:
age = current_time - timestamp
# 如果超过最大保留时间,则标记为过期
if age.total_seconds() > max_age_hours * 3600:
expired_ids.append(artifact_id)
# 删除过期的时间戳记录
for artifact_id in expired_ids:
del artifact_timestamps[artifact_id]
if expired_ids:
print(f"清理了 {len(expired_ids)} 个过期的时间戳记录: {expired_ids}")
# 获取所有未浇筑信息
def get_all_not_pour_info(app_id):
"""获取所有未浇筑信息并处理任务分发逻辑"""
global task_before, half_volume, artifact_timestamps
headers = {"AppID": app_id}
response = requests.get(NOT_POUR_INFO_URL, headers=headers)
if response.status_code != 200:
raise Exception("获取未浇筑信息网络请求失败")
data = response.json()
if data.get("Code") != 200:
raise Exception("获取未浇筑信息API返回错误")
artifact_list = data["Data"]
if not artifact_list:
return [], [], [], half_volume
# 处理F块信息
f_blocks_info = _process_f_blocks(artifact_list)
f_blocks = f_blocks_info["f_blocks"]
f_block_count = f_blocks_info["f_block_count"]
total_f_volume = f_blocks_info["total_f_volume"]
f_positions = f_blocks_info["f_positions"]
# 处理当前任务
current_task = _process_current_task(artifact_list[0], app_id)
# 更新上一个任务信息
task_before = {
"beton_task_id": current_task["beton_task_id"],
"beton_volume": current_task["beton_volume"],
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"]
}
# 根据F块情况处理任务
task_result = _handle_tasks_by_f_blocks(
f_block_count, f_positions, current_task,
f_blocks, total_f_volume, artifact_list, app_id
)
return task_result
def _process_f_blocks(artifact_list):
"""处理F块相关信息"""
f_blocks = [artifact for artifact in artifact_list if artifact.get("BlockNumber") == "F"]
f_block_count = len(f_blocks)
total_f_volume = sum(artifact["BetonVolume"] for artifact in f_blocks)
f_positions = get_f_block_positions(artifact_list)
return {
"f_blocks": f_blocks,
"f_block_count": f_block_count,
"total_f_volume": total_f_volume,
"f_positions": f_positions
}
def _process_current_task(latest_artifact, app_id):
"""处理当前任务信息"""
return {
"beton_task_id": latest_artifact["BetonTaskID"],
"beton_volume": latest_artifact["BetonVolume"],
"artifact_id": latest_artifact["ArtifactActionID"],
"block_number": latest_artifact.get("BlockNumber", ""),
"task_data": get_task_info(app_id, latest_artifact["BetonTaskID"])
}
def _handle_tasks_by_f_blocks(f_block_count, f_positions, current_task,
f_blocks, total_f_volume, artifact_list, app_id):
"""根据F块数量和位置处理任务"""
# 多个F块情况
if f_block_count > 2:
return _handle_multiple_f_blocks(current_task, total_f_volume, artifact_list, app_id)
# 两个F块情况
elif f_block_count == 2:
return _handle_two_f_blocks(f_positions, current_task, total_f_volume, artifact_list, app_id)
# 一个F块情况
elif f_block_count == 1:
return _handle_single_f_block(f_positions, current_task, f_blocks,
total_f_volume, artifact_list, app_id)
# 无F块情况
elif f_block_count == 0:
return _handle_no_f_blocks(current_task, artifact_list, app_id)
else:
print("报警")
return [], [], [], half_volume
def _handle_multiple_f_blocks(current_task, total_f_volume, artifact_list, app_id):
"""处理多个F块的情况"""
adjusted_volume = total_f_volume - half_volume[0]
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
_append_additional_tasks(send_list, artifact_list, app_id, [0, 0])
# 更新时间戳
_update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, half_volume
def _handle_two_f_blocks(f_positions, current_task, total_f_volume, artifact_list, app_id):
"""处理两个F块的情况"""
if f_positions == [0, 1] and task_before.get("block_number") == "F":
adjusted_volume = 0
block_number = "补方"
else:
adjusted_volume = total_f_volume - half_volume[0]
block_number = current_task["block_number"]
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
volumes = [adjusted_volume, artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0] if f_positions == [0,
1] and task_before.get(
"block_number") == "F" else [0, 0]
_append_additional_tasks(send_list, artifact_list, app_id, volumes)
# 更新时间戳
_update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, half_volume
def _handle_single_f_block(f_positions, current_task, f_blocks, total_f_volume, artifact_list, app_id):
"""处理单个F块的情况"""
if f_positions == [2]:
f_volume = f_blocks[0].get("BetonVolume") if f_blocks else 0
half_volume[0] = round(total_f_volume / 2, 2)
half_volume[1] = f_volume - half_volume[0]
adjusted_volume = current_task["beton_volume"] + half_volume[0]
elif f_positions == [1]:
adjusted_volume = current_task["beton_volume"] + half_volume[1]
elif f_positions == [0]:
adjusted_volume = 0
else:
adjusted_volume = current_task["beton_volume"]
block_number = "补方" if f_positions == [0] else current_task["block_number"]
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": adjusted_volume,
"artifact_id": current_task["artifact_id"],
"block_number": block_number,
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
_append_additional_tasks_for_single_f(send_list, artifact_list, app_id, f_positions)
# 更新时间戳
_update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, half_volume
def _handle_no_f_blocks(current_task, artifact_list, app_id):
"""处理无F块的情况"""
tasks = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": current_task["beton_volume"],
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
}]
send_list = [{
"beton_task_id": current_task["beton_task_id"],
"beton_volume": current_task["beton_volume"],
"artifact_id": current_task["artifact_id"],
"block_number": current_task["block_number"],
"beton_grade": current_task["task_data"]["BetonGrade"],
"mix_id": current_task["task_data"]["MixID"],
"time": artifact_timestamps.get(current_task["artifact_id"], datetime.now())
}]
# 处理后续任务
_append_additional_tasks(send_list, artifact_list, app_id,
[artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0,
artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0])
# 更新时间戳
_update_artifact_timestamps(send_list)
return tasks, artifact_list, send_list, half_volume
def _append_additional_tasks(send_list, artifact_list, app_id, volumes):
"""添加额外的任务到发送列表"""
# 安全获取后续任务
second_task = artifact_list[1] if len(artifact_list) > 1 else None
third_task = artifact_list[2] if len(artifact_list) > 2 else None
if second_task:
task_data_second = get_task_info(app_id, second_task["BetonTaskID"])
send_list.append({
"beton_task_id": second_task["BetonTaskID"],
"beton_volume": volumes[0],
"artifact_id": second_task["ArtifactActionID"],
"block_number": second_task["BlockNumber"],
"beton_grade": task_data_second["BetonGrade"],
"mix_id": task_data_second["MixID"],
"time": artifact_timestamps.get(second_task["ArtifactActionID"], datetime.now())
})
if third_task:
task_data_third = get_task_info(app_id, third_task["BetonTaskID"])
send_list.append({
"beton_task_id": third_task["BetonTaskID"],
"beton_volume": volumes[1],
"artifact_id": third_task["ArtifactActionID"],
"block_number": third_task["BlockNumber"],
"beton_grade": task_data_third["BetonGrade"],
"mix_id": task_data_third["MixID"],
"time": artifact_timestamps.get(third_task["ArtifactActionID"], datetime.now())
})
def _append_additional_tasks_for_single_f(send_list, artifact_list, app_id, f_positions):
"""为单个F块情况添加额外任务"""
second_task = artifact_list[1] if len(artifact_list) > 1 else None
third_task = artifact_list[2] if len(artifact_list) > 2 else None
if second_task:
task_data_second = get_task_info(app_id, second_task["BetonTaskID"])
volume = (third_task["BetonVolume"] if third_task else 0) if f_positions != [2] else (
second_task["BetonVolume"] + half_volume[1])
send_list.append({
"beton_task_id": second_task["BetonTaskID"],
"beton_volume": volume,
"artifact_id": second_task["ArtifactActionID"],
"block_number": second_task["BlockNumber"],
"beton_grade": task_data_second["BetonGrade"],
"mix_id": task_data_second["MixID"],
"time": artifact_timestamps.get(second_task["ArtifactActionID"], datetime.now())
})
if third_task:
task_data_third = get_task_info(app_id, third_task["BetonTaskID"])
volume = third_task["BetonVolume"] if f_positions in [[1], [0]] else 0
send_list.append({
"beton_task_id": third_task["BetonTaskID"],
"beton_volume": volume,
"artifact_id": third_task["ArtifactActionID"],
"block_number": third_task["BlockNumber"],
"beton_grade": task_data_third["BetonGrade"],
"mix_id": task_data_third["MixID"],
"time": artifact_timestamps.get(third_task["ArtifactActionID"], datetime.now())
})
def _update_artifact_timestamps(send_list):
"""更新artifact时间戳"""
current_artifact_ids = {item["artifact_id"] for item in send_list}
for artifact_id in current_artifact_ids:
if artifact_id not in artifact_timestamps:
artifact_timestamps[artifact_id] = datetime.now()
def get_f_block_positions(artifact_list):
"""获取artifact_list中F块的位置"""
positions = []
for i, artifact in enumerate(artifact_list):
if artifact.get("BlockNumber") == "F":
positions.append(i)
return positions
# 连接Access数据库
def connect_to_access_db(db_path, password):
conn_str = (
r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
f'DBQ={db_path};'
f'PWD={password};'
)
return pyodbc.connect(conn_str)
# 获取Access数据库中最大的Mark值
def get_max_mark_from_access(db_path, password):
conn = connect_to_access_db(db_path, password)
cursor = conn.cursor()
# 查询最大的Mark值
cursor.execute("SELECT MAX(Mark) FROM Produce")
max_mark = cursor.fetchone()[0]
# 如果没有记录返回0
if max_mark is None:
max_mark = 0
conn.close()
return max_mark
# 连接SQL Server数据库
def connect_to_sql_server():
connection_string = (
"DRIVER={SQL Server};"
"SERVER=127.0.0.1;"
"DATABASE=BS23DB;"
"UID=sa;"
"PWD=123;"
)
return pyodbc.connect(connection_string)
# 插入数据到Produce表
# 在 insert_into_produce_table 函数中添加调用同事的保存函数
def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artifact_id, status):
cursor = connection.cursor()
if status == "1":
# 准备插入数据
insert_data = {
"ErpID": erp_id,
"Code": task_info["TaskID"],
"DatTim": datetime.now(),
"Recipe": task_info["ProduceMixID"],
"MorRec": "",
"ProdMete": beton_volume,
"MorMete": 0.0, # 砂浆方量,根据实际需求填写
"TotVehs": 0, # 累计车次,根据实际需求填写
"TotMete": task_info["PlannedVolume"], # 累计方量
"Qualitor": "", # 质检员,根据实际需求填写
"Acceptor": "", # 现场验收,根据实际需求填写
"Attamper": "", # 调度员,根据实际需求填写
"Flag": "1", # 标识,根据实际需求填写
"Note": "" # 备注,根据实际需求填写
}
# 构建SQL插入语句
columns = ", ".join(insert_data.keys())
placeholders = ", ".join(["?" for _ in insert_data.values()])
sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})"
# 执行插入操作
cursor.execute(sql, list(insert_data.values()))
connection.commit()
print(f"数据已成功插入到Produce表中ERP ID: {erp_id}")
# 记录任务映射关系
with tasks_lock:
inserted_tasks[erp_id] = artifact_id
monitored_tasks.add(erp_id)
print(f"任务 {erp_id} (ArtifactID: {artifact_id}) 已添加到监控列表")
# 调用同事提供的保存函数,将数据保存到自定义数据表
try:
# 假设同事提供的函数名为 save_to_custom_table
# 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量
save_to_custom_table(
misid=erp_id,
flag="1", # 初始Flag值
task_id=task_info["TaskID"],
produce_mix_id=task_info["ProduceMixID"],
project_name=task_info["ProjectName"],
beton_grade=task_info["BetonGrade"],
adjusted_volume=beton_volume,
artifact_id=artifact_id
# 已经调整后的方量
)
print(f"任务 {erp_id} 的数据已保存到自定义数据表")
except Exception as e:
print(f"调用保存函数时出错: {e}")
# 发送数据给TCP客户端
try:
time.sleep(5)
task_data = {
"erp_id": erp_id, # 车号,相当于序号
"task_id": task_info["TaskID"], # 任务单号
"artifact_id": artifact_id,
"produce_mix_id": task_info["ProduceMixID"], # 配比号
"project_name": task_info["ProjectName"], # 任务名
"beton_grade": task_info["BetonGrade"], # 砼强度
"adjusted_volume": beton_volume, # 方量
"flag": "1x", # 状态
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 时间
}
tcp_server.send_data(task_data)
print(f"任务 {erp_id} 的数据已发送给TCP客户端")
except Exception as e:
print(f"发送数据给TCP客户端时出错: {e}")
return erp_id
else:
try:
# 假设同事提供的函数名为 save_to_custom_table
# 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量
save_to_custom_table(
misid=erp_id,
flag="1", # 初始Flag值
task_id=task_info["TaskID"],
produce_mix_id=task_info["ProduceMixID"],
project_name=task_info["ProjectName"],
beton_grade=task_info["BetonGrade"],
adjusted_volume=beton_volume,
artifact_id=artifact_id
)
print(f"任务 {erp_id} 的数据已保存到自定义数据表")
except Exception as e:
print(f"调用保存函数时出错: {e}")
# 监控Access数据库中特定任务的Flag字段变化
def monitor_access_flag_changes(access_db_path, access_password):
"""监控Access数据库中派发任务的Flag状态"""
# 存储任务的当前状态
task_flags = {}
print("开始监控Access数据库中派发任务的Flag状态")
while True:
try:
# 每2秒检查一次
time.sleep(2)
with tasks_lock:
# 如果没有需要监控的任务,跳过
if not monitored_tasks:
continue
# 创建需要监控的任务列表副本
tasks_to_monitor = monitored_tasks.copy()
# 首先检查SQL Server中任务是否还存在
sql_conn = connect_to_sql_server()
sql_cursor = sql_conn.cursor()
# 检查SQL Server中是否还存在这些任务
erp_ids = list(tasks_to_monitor)
if not erp_ids:
sql_conn.close()
continue
erp_ids_str = [str(erp_id) for erp_id in erp_ids]
placeholders = ','.join('?' * len(erp_ids_str))
check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})"
sql_cursor.execute(check_query, erp_ids_str)
sql_results = sql_cursor.fetchall()
sql_conn.close()
# 分离已删除和未删除的任务
existing_tasks_in_sql = {str(row[0]) for row in sql_results}
deleted_from_sql_tasks = set(erp_ids_str) - existing_tasks_in_sql
print(f"SQL Server中仍存在的任务: {existing_tasks_in_sql}")
print(f"已从SQL Server删除的任务: {deleted_from_sql_tasks}")
# 只有已从SQL Server删除的任务才需要在Access中查找
if not deleted_from_sql_tasks:
continue
# 连接Access数据库
conn = connect_to_access_db(access_db_path, access_password)
cursor = conn.cursor()
# 查询Access数据库中已删除任务的状态
placeholders = ','.join('?' * len(list(deleted_from_sql_tasks)))
query = f"SELECT MISID, Flag FROM Produce WHERE MISID IN ({placeholders})"
# 添加调试信息
print(f"执行查询: {query}")
print(f"查询参数: {list(deleted_from_sql_tasks)}")
cursor.execute(query, list(deleted_from_sql_tasks))
results = cursor.fetchall()
# 添加调试信息
print(f"查询返回结果数量: {len(results)}")
# 如果没有查询到结果,检查数据库中的实际数据
if len(results) == 0 and deleted_from_sql_tasks:
print("未找到匹配记录,检查数据库中的实际数据:")
try:
cursor.execute("SELECT TOP 5 Mark, Flag FROM Produce ORDER BY Mark DESC")
sample_data = cursor.fetchall()
print(f"数据库中最近的5条记录: {sample_data}")
except Exception as sample_e:
print(f"查询样本数据时出错: {sample_e}")
# 处理查询结果
current_tasks = {}
for row in results:
mark = row[0]
flag = row[1] if row[1] is not None else ""
current_tasks[mark] = flag
print(f"查询到记录 - MISID: {mark}, Flag: '{flag}'") # 调试信息
# 检查每个已删除任务的状态变化
for erp_id_str in deleted_from_sql_tasks:
erp_id = int(erp_id_str)
current_flag = current_tasks.get(erp_id_str, "")
previous_flag = task_flags.get(erp_id_str, "")
# 添加调试信息
print(f"检查任务 {erp_id} - 当前Flag: '{current_flag}', 之前Flag: '{previous_flag}'")
# 如果状态发生变化
if current_flag != previous_flag: # 添加这行
with tasks_lock:
artifact_id = inserted_tasks.get(erp_id, "Unknown")
print(
f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 的Flag值已更新: {previous_flag} -> {current_flag}")
task_flags[erp_id_str] = current_flag
# 根据Flag值末尾的字母执行相应操作并更新自定义数据表状态
if current_flag.endswith('d'): # 将 elif 改为 if
print(f"派发任务 ErpID {erp_id}: 未进行生产")
# 调用同事提供的状态更新函数
try:
print(1)
# update_custom_table_status(erp_id, "未进行生产")
except Exception as e:
print(f"更新状态时出错: {e}")
# 发送数据给TCP客户端只发送erp_id和状态
try:
status_data = {
"erp_id": erp_id,
"status": "未进行生产"
}
tcp_server.send_data(status_data)
except Exception as e:
print(f"发送状态数据给TCP客户端时出错: {e}")
# 在"正在生产中"分支中:
elif current_flag.endswith('w'):
print(f"派发任务 ErpID {erp_id}: 正在生产中")
# 调用同事提供的状态更新函数
try:
print(2)
# update_custom_table_status(erp_id, "正在生产中")
except Exception as e:
print(f"更新状态时出错: {e}")
# 发送数据给TCP客户端只发送erp_id和状态
try:
status_data = {
"erp_id": erp_id,
"status": "正在生产中"
}
tcp_server.send_data(status_data)
except Exception as e:
print(f"发送状态数据给TCP客户端时出错: {e}")
# 在"生产完毕"分支中:
elif current_flag.endswith('n'):
print(f"派发任务 ErpID {erp_id}: 生产完毕")
# 任务完成,可以从监控列表中移除
with tasks_lock:
monitored_tasks.discard(erp_id)
print(f"派发任务 ErpID {erp_id} 已完成,停止监控")
# 调用同事提供的状态更新函数
try:
print(3)
# update_custom_table_status(erp_id, "生产完毕")
except Exception as e:
print(f"更新状态时出错: {e}")
# 发送数据给TCP客户端只发送erp_id和状态
try:
status_data = {
"erp_id": erp_id,
"status": "生产完毕"
}
tcp_server.send_data(status_data)
except Exception as e:
print(f"发送状态数据给TCP客户端时出错: {e}")
# 在"生产中断"分支中:
elif current_flag.endswith('p'):
print(f"派发任务 ErpID {erp_id}: 生产中断")
# 任务中断,可以从监控列表中移除
with tasks_lock:
monitored_tasks.discard(erp_id)
print(f"派发任务 ErpID {erp_id} 已中断,停止监控")
# 调用同事提供的状态更新函数
try:
print(4)
# update_custom_table_status(erp_id, "生产中断")
except Exception as e:
print(f"更新状态时出错: {e}")
# 发送数据给TCP客户端只发送erp_id和状态
try:
status_data = {
"erp_id": erp_id,
"status": "生产中断"
}
tcp_server.send_data(status_data)
except Exception as e:
print(f"发送状态数据给TCP客户端时出错: {e}")
# 在"数据已接收"分支中:
elif current_flag.endswith('x'):
print(f"派发任务 ErpID {erp_id}: 数据已接收")
# 调用同事提供的状态更新函数
try:
print(5)
update_custom_table_status(erp_id, "数据已接收")
except Exception as e:
print(f"更新状态时出错: {e}")
# 发送数据给TCP客户端只发送erp_id和状态
try:
status_data = {
"erp_id": erp_id,
"status": "数据已接收"
}
tcp_server.send_data(status_data)
except Exception as e:
print(f"发送状态数据给TCP客户端时出错: {e}")
# 检查是否有任务记录已被删除(不在查询结果中但仍在监控列表中)
# 这表示任务可能已完成或从系统中移除
missing_tasks = set(deleted_from_sql_tasks) - set(current_tasks.keys())
if missing_tasks:
for erp_id_str in missing_tasks:
erp_id = int(erp_id_str)
with tasks_lock:
artifact_id = inserted_tasks.get(erp_id, "Unknown")
monitored_tasks.discard(erp_id)
inserted_tasks.pop(erp_id, None)
task_flags.pop(erp_id_str, None)
print(f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 记录已从Access数据库中删除或完成")
conn.close()
except Exception as e:
print(f"监控Access数据库 Flag时发生错误: {e}")
import traceback
traceback.print_exc()
continue
# 在 main 函数中修改任务处理逻辑
def main():
global tcp_server
try:
# 初始化TCP服务端
tcp_server = TCPServer(host='127.0.0.1', port=8888)
tcp_server_thread = threading.Thread(target=tcp_server.start)
tcp_server_thread.daemon = True
tcp_server_thread.start()
# 等待服务端启动
time.sleep(1)
# 步骤1获取AppID
app_id = get_app_id()
# 存储上次获取的所有ArtifactID
last_artifact_ids = set()
last_artifact_list = [] # 用于存储上一次的完整artifact_list
# Access数据库路径和密码
access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径
access_password = "BCS7.2_SDBS" # Access数据库密码
# 启动Access数据库Flag监控线程
access_monitor_thread = threading.Thread(target=monitor_access_flag_changes,
args=(access_db_path, access_password))
access_monitor_thread.daemon = True
access_monitor_thread.start()
while True:
try:
# 步骤2获取所有未浇筑信息
tasks, artifact_list, send_list, half_volume = get_all_not_pour_info(app_id)
print(tasks)
print(artifact_list)
print(send_list)
print(half_volume)
current_artifact_ids = {task["artifact_id"] for task in tasks}
# 检查artifact_list是否发生变化
if artifact_list != last_artifact_list:
print(f"检测到artifact_list更新: {artifact_list}")
# 处理新出现的任务
new_artifact_ids = current_artifact_ids - last_artifact_ids
if new_artifact_ids:
print(f"检测到 {len(new_artifact_ids)} 个新任务")
for task in tasks:
if task["artifact_id"] in new_artifact_ids:
task_info = get_task_info(app_id, task["beton_task_id"])
# 步骤4连接Access数据库并获取最大Mark值
max_mark = get_max_mark_from_access(access_db_path, access_password)
erp_id = int(max_mark) + 1
# print(f"获取到ERP ID: {erp_id}")
# 步骤5连接SQL Server数据库并插入数据
connection = connect_to_sql_server()
# 检查 block_number 是否为 "F"
if task["block_number"] == "补方":
print(f"任务 {task['artifact_id']} 的 block_number 为 '补方',跳过派单")
insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id,
task["artifact_id"], 0)
connection.close()
continue
print(f"处理新任务: {task['artifact_id']}")
# 步骤3获取任务单信息
insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id,
task["artifact_id"], 1)
connection.close()
# 更新上次获取的ArtifactID集合和artifact_list
last_artifact_ids = current_artifact_ids
last_artifact_list = artifact_list.copy() # 保存当前的artifact_list
# 每10分钟清理一次过期的时间戳记录
cleanup_old_timestamps(current_artifact_ids, max_age_hours=24)
# 每10秒检查一次
time.sleep(10)
except Exception as e:
print(f"发生错误: {e}")
# 继续循环,避免程序退出
time.sleep(2)
except KeyboardInterrupt:
print("程序已停止")
# 停止TCP服务端
if tcp_server:
tcp_server.stop()
if __name__ == "__main__":
main()