import requests import hashlib import pyodbc from datetime import datetime, timedelta import time import json import threading from tcp.server import TCPServer # 配置信息 BASE_URL = "http://127.0.0.1:5000" # 外网地址"https://www.shnthy.com:9154" LOGIN_URL = f"{BASE_URL}/api/user/perlogin" MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9" TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task" NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour" # 登录参数 LOGIN_DATA = { "Program": 11, "SC": "1000000001", "loginName": "leduser", "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" } # 全局变量用于存储需要监控的任务ERP ID monitored_tasks = set() # 存储需要监控的erp_id inserted_tasks = {} # {erp_id: artifact_id} tasks_lock = threading.Lock() artifact_timestamps = {} # 存储每个artifact_id的时间戳 # TCP服务端实例 tcp_server = TCPServer task_before = None # 用于存储上一条任务信息 half_volume = [0, 0] # 计算SHA256密码 def hash_password(password): return password LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"]) # 获取AppID def get_app_id(): response = requests.post(LOGIN_URL, json=LOGIN_DATA) if response.status_code == 200: data = response.json() if data.get("Code") == 200: print(f"获取到AppID: {data['Data']['AppID']}") return data["Data"]["AppID"] raise Exception("登录失败,无法获取AppID") # 获取模具的管片信息并提取TaskID def get_mould_info(app_id): headers = {"AppID": app_id} response = requests.get(MOULD_INFO_URL, headers=headers) if response.status_code == 205: data = response.json() if data.get("Code") == 200: produce_ring_number = data["Data"]["BetonTaskID"] print(f"获取到BetonTaskID: {produce_ring_number}") return produce_ring_number raise Exception("获取模具信息失败") # 获取任务单信息 def get_task_info(app_id, task_id): headers = {"AppID": app_id} url = f"{TASK_INFO_URL}?TaskId={task_id}" response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() if data.get("Code") == 200: task_data = data["Data"] return task_data raise Exception("获取任务单信息失败") # 定期清理过期的时间戳记录 def cleanup_old_timestamps(current_artifact_ids=None, max_age_hours=24): """ 清理过期的时间戳记录 Args: current_artifact_ids: 当前活跃的artifact_id集合,可选 max_age_hours: 时间戳最大保留时间(小时),默认24小时 """ current_time = datetime.now() expired_ids = [] # 遍历所有存储的时间戳记录 for artifact_id, timestamp in artifact_timestamps.items(): # 如果提供了当前活跃ID列表,且该ID不在当前活跃列表中,则检查是否过期 if current_artifact_ids is None or artifact_id not in current_artifact_ids: age = current_time - timestamp # 如果超过最大保留时间,则标记为过期 if age.total_seconds() > max_age_hours * 3600: expired_ids.append(artifact_id) # 删除过期的时间戳记录 for artifact_id in expired_ids: del artifact_timestamps[artifact_id] if expired_ids: print(f"清理了 {len(expired_ids)} 个过期的时间戳记录: {expired_ids}") # 获取所有未浇筑信息 def get_all_not_pour_info(app_id): """获取所有未浇筑信息并处理任务分发逻辑""" global task_before, half_volume, artifact_timestamps headers = {"AppID": app_id} response = requests.get(NOT_POUR_INFO_URL, headers=headers) if response.status_code != 200: raise Exception("获取未浇筑信息网络请求失败") data = response.json() if data.get("Code") != 200: raise Exception("获取未浇筑信息API返回错误") artifact_list = data["Data"] if not artifact_list: return [], [], [], half_volume # 处理F块信息 f_blocks_info = _process_f_blocks(artifact_list) f_blocks = f_blocks_info["f_blocks"] f_block_count = f_blocks_info["f_block_count"] total_f_volume = f_blocks_info["total_f_volume"] f_positions = f_blocks_info["f_positions"] # 处理当前任务 current_task = _process_current_task(artifact_list[0], app_id) # 更新上一个任务信息 task_before = { "beton_task_id": current_task["beton_task_id"], "beton_volume": current_task["beton_volume"], "artifact_id": current_task["artifact_id"], "block_number": current_task["block_number"] } # 根据F块情况处理任务 task_result = _handle_tasks_by_f_blocks( f_block_count, f_positions, current_task, f_blocks, total_f_volume, artifact_list, app_id ) return task_result def _process_f_blocks(artifact_list): """处理F块相关信息""" f_blocks = [artifact for artifact in artifact_list if artifact.get("BlockNumber") == "F"] f_block_count = len(f_blocks) total_f_volume = sum(artifact["BetonVolume"] for artifact in f_blocks) f_positions = get_f_block_positions(artifact_list) return { "f_blocks": f_blocks, "f_block_count": f_block_count, "total_f_volume": total_f_volume, "f_positions": f_positions } def _process_current_task(latest_artifact, app_id): """处理当前任务信息""" return { "beton_task_id": latest_artifact["BetonTaskID"], "beton_volume": latest_artifact["BetonVolume"], "artifact_id": latest_artifact["ArtifactActionID"], "block_number": latest_artifact.get("BlockNumber", ""), "task_data": get_task_info(app_id, latest_artifact["BetonTaskID"]) } def _handle_tasks_by_f_blocks(f_block_count, f_positions, current_task, f_blocks, total_f_volume, artifact_list, app_id): """根据F块数量和位置处理任务""" # 多个F块情况 if f_block_count > 2: return _handle_multiple_f_blocks(current_task, total_f_volume, artifact_list, app_id) # 两个F块情况 elif f_block_count == 2: return _handle_two_f_blocks(f_positions, current_task, total_f_volume, artifact_list, app_id) # 一个F块情况 elif f_block_count == 1: return _handle_single_f_block(f_positions, current_task, f_blocks, total_f_volume, artifact_list, app_id) # 无F块情况 elif f_block_count == 0: return _handle_no_f_blocks(current_task, artifact_list, app_id) else: print("报警") return [], [], [], half_volume def _handle_multiple_f_blocks(current_task, total_f_volume, artifact_list, app_id): """处理多个F块的情况""" adjusted_volume = total_f_volume - half_volume[0] tasks = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": adjusted_volume, "artifact_id": current_task["artifact_id"], "block_number": current_task["block_number"], }] send_list = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": adjusted_volume, "artifact_id": current_task["artifact_id"], "block_number": current_task["block_number"], "beton_grade": current_task["task_data"]["BetonGrade"], "mix_id": current_task["task_data"]["MixID"], "time": artifact_timestamps.get(current_task["artifact_id"], datetime.now()) }] # 处理后续任务 _append_additional_tasks(send_list, artifact_list, app_id, [0, 0]) # 更新时间戳 _update_artifact_timestamps(send_list) return tasks, artifact_list, send_list, half_volume def _handle_two_f_blocks(f_positions, current_task, total_f_volume, artifact_list, app_id): """处理两个F块的情况""" if f_positions == [0, 1] and task_before.get("block_number") == "F": adjusted_volume = 0 block_number = "补方" elif f_positions == [1, 2]: adjusted_volume = artifact_list[0]["BetonVolume"] else: adjusted_volume = total_f_volume - half_volume[0] block_number = current_task["block_number"] tasks = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": adjusted_volume, "artifact_id": current_task["artifact_id"], "block_number": block_number, }] send_list = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": adjusted_volume, "artifact_id": current_task["artifact_id"], "block_number": block_number, "beton_grade": current_task["task_data"]["BetonGrade"], "mix_id": current_task["task_data"]["MixID"], "time": artifact_timestamps.get(current_task["artifact_id"], datetime.now()) }] # 处理后续任务 volumes = [adjusted_volume, artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0] if f_positions == [0, 1] and task_before.get( "block_number") == "F" else [0, 0] _append_additional_tasks(send_list, artifact_list, app_id, volumes) # 更新时间戳 _update_artifact_timestamps(send_list) return tasks, artifact_list, send_list, half_volume def _handle_single_f_block(f_positions, current_task, f_blocks, total_f_volume, artifact_list, app_id): """处理单个F块的情况""" if f_positions == [2]: f_volume = f_blocks[0].get("BetonVolume") if f_blocks else 0 half_volume[0] = round(total_f_volume / 2, 2) half_volume[1] = f_volume - half_volume[0] adjusted_volume = current_task["beton_volume"] + half_volume[0] elif f_positions == [1]: adjusted_volume = current_task["beton_volume"] + half_volume[1] elif f_positions == [0]: adjusted_volume = 0 else: adjusted_volume = current_task["beton_volume"] block_number = "补方" if f_positions == [0] else current_task["block_number"] tasks = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": adjusted_volume, "artifact_id": current_task["artifact_id"], "block_number": block_number, }] send_list = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": adjusted_volume, "artifact_id": current_task["artifact_id"], "block_number": block_number, "beton_grade": current_task["task_data"]["BetonGrade"], "mix_id": current_task["task_data"]["MixID"], "time": artifact_timestamps.get(current_task["artifact_id"], datetime.now()) }] # 处理后续任务 _append_additional_tasks_for_single_f(send_list, artifact_list, app_id, f_positions) # 更新时间戳 _update_artifact_timestamps(send_list) return tasks, artifact_list, send_list, half_volume def _handle_no_f_blocks(current_task, artifact_list, app_id): """处理无F块的情况""" tasks = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": current_task["beton_volume"], "artifact_id": current_task["artifact_id"], "block_number": current_task["block_number"], }] send_list = [{ "beton_task_id": current_task["beton_task_id"], "beton_volume": current_task["beton_volume"], "artifact_id": current_task["artifact_id"], "block_number": current_task["block_number"], "beton_grade": current_task["task_data"]["BetonGrade"], "mix_id": current_task["task_data"]["MixID"], "time": artifact_timestamps.get(current_task["artifact_id"], datetime.now()) }] # 处理后续任务 _append_additional_tasks(send_list, artifact_list, app_id, [artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0, artifact_list[2]["BetonVolume"] if len(artifact_list) > 2 else 0]) # 更新时间戳 _update_artifact_timestamps(send_list) return tasks, artifact_list, send_list, half_volume def _append_additional_tasks(send_list, artifact_list, app_id, volumes): """添加额外的任务到发送列表""" # 安全获取后续任务 second_task = artifact_list[1] if len(artifact_list) > 1 else None third_task = artifact_list[2] if len(artifact_list) > 2 else None if second_task: task_data_second = get_task_info(app_id, second_task["BetonTaskID"]) send_list.append({ "beton_task_id": second_task["BetonTaskID"], "beton_volume": volumes[0], "artifact_id": second_task["ArtifactActionID"], "block_number": second_task["BlockNumber"], "beton_grade": task_data_second["BetonGrade"], "mix_id": task_data_second["MixID"], "time": artifact_timestamps.get(second_task["ArtifactActionID"], datetime.now()) }) if third_task: task_data_third = get_task_info(app_id, third_task["BetonTaskID"]) send_list.append({ "beton_task_id": third_task["BetonTaskID"], "beton_volume": volumes[1], "artifact_id": third_task["ArtifactActionID"], "block_number": third_task["BlockNumber"], "beton_grade": task_data_third["BetonGrade"], "mix_id": task_data_third["MixID"], "time": artifact_timestamps.get(third_task["ArtifactActionID"], datetime.now()) }) def _append_additional_tasks_for_single_f(send_list, artifact_list, app_id, f_positions): """为单个F块情况添加额外任务""" second_task = artifact_list[1] if len(artifact_list) > 1 else None third_task = artifact_list[2] if len(artifact_list) > 2 else None if second_task: task_data_second = get_task_info(app_id, second_task["BetonTaskID"]) volume = (third_task["BetonVolume"] if third_task else 0) if f_positions != [2] else ( second_task["BetonVolume"] + half_volume[1]) send_list.append({ "beton_task_id": second_task["BetonTaskID"], "beton_volume": volume, "artifact_id": second_task["ArtifactActionID"], "block_number": second_task["BlockNumber"], "beton_grade": task_data_second["BetonGrade"], "mix_id": task_data_second["MixID"], "time": artifact_timestamps.get(second_task["ArtifactActionID"], datetime.now()) }) if third_task: task_data_third = get_task_info(app_id, third_task["BetonTaskID"]) volume = third_task["BetonVolume"] if f_positions in [[1], [0]] else 0 send_list.append({ "beton_task_id": third_task["BetonTaskID"], "beton_volume": volume, "artifact_id": third_task["ArtifactActionID"], "block_number": third_task["BlockNumber"], "beton_grade": task_data_third["BetonGrade"], "mix_id": task_data_third["MixID"], "time": artifact_timestamps.get(third_task["ArtifactActionID"], datetime.now()) }) def _update_artifact_timestamps(send_list): """更新artifact时间戳""" current_artifact_ids = {item["artifact_id"] for item in send_list} for artifact_id in current_artifact_ids: if artifact_id not in artifact_timestamps: artifact_timestamps[artifact_id] = datetime.now() def get_f_block_positions(artifact_list): """获取artifact_list中F块的位置""" positions = [] for i, artifact in enumerate(artifact_list): if artifact.get("BlockNumber") == "F": positions.append(i) return positions # 连接Access数据库 def connect_to_access_db(db_path, password): conn_str = ( r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' f'DBQ={db_path};' f'PWD={password};' ) return pyodbc.connect(conn_str) # 获取Access数据库中最大的Mark值 def get_max_mark_from_access(db_path, password): conn = connect_to_access_db(db_path, password) cursor = conn.cursor() # 查询最大的Mark值 cursor.execute("SELECT MAX(Mark) FROM Produce") max_mark = cursor.fetchone()[0] # 如果没有记录,返回0 if max_mark is None: max_mark = 0 conn.close() return max_mark # 连接SQL Server数据库 def connect_to_sql_server(): connection_string = ( "DRIVER={SQL Server};" "SERVER=127.0.0.1;" "DATABASE=BS23DB;" "UID=sa;" "PWD=123;" ) return pyodbc.connect(connection_string) # 插入数据到Produce表 # 在 insert_into_produce_table 函数中添加调用同事的保存函数 def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artifact_id, status): cursor = connection.cursor() if status == "1": # 准备插入数据 insert_data = { "ErpID": erp_id, "Code": task_info["TaskID"], "DatTim": datetime.now(), "Recipe": task_info["ProduceMixID"], "MorRec": "", "ProdMete": beton_volume, "MorMete": 0.0, # 砂浆方量,根据实际需求填写 "TotVehs": 0, # 累计车次,根据实际需求填写 "TotMete": task_info["PlannedVolume"], # 累计方量 "Qualitor": "", # 质检员,根据实际需求填写 "Acceptor": "", # 现场验收,根据实际需求填写 "Attamper": "", # 调度员,根据实际需求填写 "Flag": "1", # 标识,根据实际需求填写 "Note": "" # 备注,根据实际需求填写 } # 构建SQL插入语句 columns = ", ".join(insert_data.keys()) placeholders = ", ".join(["?" for _ in insert_data.values()]) sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})" # 执行插入操作 cursor.execute(sql, list(insert_data.values())) connection.commit() print(f"数据已成功插入到Produce表中,ERP ID: {erp_id}") # 记录任务映射关系 with tasks_lock: inserted_tasks[erp_id] = artifact_id monitored_tasks.add(erp_id) print(f"任务 {erp_id} (ArtifactID: {artifact_id}) 已添加到监控列表") # 调用同事提供的保存函数,将数据保存到自定义数据表 try: # 假设同事提供的函数名为 save_to_custom_table # 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量 save_to_custom_table( misid=erp_id, flag="1", # 初始Flag值 task_id=task_info["TaskID"], produce_mix_id=task_info["ProduceMixID"], project_name=task_info["ProjectName"], beton_grade=task_info["BetonGrade"], adjusted_volume=beton_volume, artifact_id=artifact_id # 已经调整后的方量 ) print(f"任务 {erp_id} 的数据已保存到自定义数据表") except Exception as e: print(f"调用保存函数时出错: {e}") # 发送数据给TCP客户端 try: time.sleep(5) task_data = { "erp_id": erp_id, # 车号,相当于序号 "task_id": task_info["TaskID"], # 任务单号 "artifact_id": artifact_id, "produce_mix_id": task_info["ProduceMixID"], # 配比号 "project_name": task_info["ProjectName"], # 任务名 "beton_grade": task_info["BetonGrade"], # 砼强度 "adjusted_volume": beton_volume, # 方量 "flag": "1x", # 状态 "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 时间 } tcp_server.send_data(task_data) print(f"任务 {erp_id} 的数据已发送给TCP客户端") except Exception as e: print(f"发送数据给TCP客户端时出错: {e}") return erp_id else: try: # 假设同事提供的函数名为 save_to_custom_table # 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量 save_to_custom_table( misid=erp_id, flag="1", # 初始Flag值 task_id=task_info["TaskID"], produce_mix_id=task_info["ProduceMixID"], project_name=task_info["ProjectName"], beton_grade=task_info["BetonGrade"], adjusted_volume=beton_volume, artifact_id=artifact_id ) print(f"任务 {erp_id} 的数据已保存到自定义数据表") except Exception as e: print(f"调用保存函数时出错: {e}") # 监控Access数据库中特定任务的Flag字段变化 def monitor_access_flag_changes(access_db_path, access_password): """监控Access数据库中派发任务的Flag状态""" # 存储任务的当前状态 task_flags = {} print("开始监控Access数据库中派发任务的Flag状态") while True: try: # 每2秒检查一次 time.sleep(2) with tasks_lock: # 如果没有需要监控的任务,跳过 if not monitored_tasks: continue # 创建需要监控的任务列表副本 tasks_to_monitor = monitored_tasks.copy() # 首先检查SQL Server中任务是否还存在 sql_conn = connect_to_sql_server() sql_cursor = sql_conn.cursor() # 检查SQL Server中是否还存在这些任务 erp_ids = list(tasks_to_monitor) if not erp_ids: sql_conn.close() continue erp_ids_str = [str(erp_id) for erp_id in erp_ids] placeholders = ','.join('?' * len(erp_ids_str)) check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})" sql_cursor.execute(check_query, erp_ids_str) sql_results = sql_cursor.fetchall() sql_conn.close() # 分离已删除和未删除的任务 existing_tasks_in_sql = {str(row[0]) for row in sql_results} deleted_from_sql_tasks = set(erp_ids_str) - existing_tasks_in_sql print(f"SQL Server中仍存在的任务: {existing_tasks_in_sql}") print(f"已从SQL Server删除的任务: {deleted_from_sql_tasks}") # 只有已从SQL Server删除的任务才需要在Access中查找 if not deleted_from_sql_tasks: continue # 连接Access数据库 conn = connect_to_access_db(access_db_path, access_password) cursor = conn.cursor() # 查询Access数据库中已删除任务的状态 placeholders = ','.join('?' * len(list(deleted_from_sql_tasks))) query = f"SELECT MISID, Flag FROM Produce WHERE MISID IN ({placeholders})" # 添加调试信息 print(f"执行查询: {query}") print(f"查询参数: {list(deleted_from_sql_tasks)}") cursor.execute(query, list(deleted_from_sql_tasks)) results = cursor.fetchall() # 添加调试信息 print(f"查询返回结果数量: {len(results)}") # 如果没有查询到结果,检查数据库中的实际数据 if len(results) == 0 and deleted_from_sql_tasks: print("未找到匹配记录,检查数据库中的实际数据:") try: cursor.execute("SELECT TOP 5 Mark, Flag FROM Produce ORDER BY Mark DESC") sample_data = cursor.fetchall() print(f"数据库中最近的5条记录: {sample_data}") except Exception as sample_e: print(f"查询样本数据时出错: {sample_e}") # 处理查询结果 current_tasks = {} for row in results: mark = row[0] flag = row[1] if row[1] is not None else "" current_tasks[mark] = flag print(f"查询到记录 - MISID: {mark}, Flag: '{flag}'") # 调试信息 # 检查每个已删除任务的状态变化 for erp_id_str in deleted_from_sql_tasks: erp_id = int(erp_id_str) current_flag = current_tasks.get(erp_id_str, "") previous_flag = task_flags.get(erp_id_str, "") # 添加调试信息 print(f"检查任务 {erp_id} - 当前Flag: '{current_flag}', 之前Flag: '{previous_flag}'") # 如果状态发生变化 if current_flag != previous_flag: # 添加这行 with tasks_lock: artifact_id = inserted_tasks.get(erp_id, "Unknown") print( f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 的Flag值已更新: {previous_flag} -> {current_flag}") task_flags[erp_id_str] = current_flag # 根据Flag值末尾的字母执行相应操作并更新自定义数据表状态 if current_flag.endswith('d'): # 将 elif 改为 if print(f"派发任务 ErpID {erp_id}: 未进行生产") # 调用同事提供的状态更新函数 try: print(1) # update_custom_table_status(erp_id, "未进行生产") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "未进行生产" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"正在生产中"分支中: elif current_flag.endswith('w'): print(f"派发任务 ErpID {erp_id}: 正在生产中") # 调用同事提供的状态更新函数 try: print(2) # update_custom_table_status(erp_id, "正在生产中") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "正在生产中" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"生产完毕"分支中: elif current_flag.endswith('n'): print(f"派发任务 ErpID {erp_id}: 生产完毕") # 任务完成,可以从监控列表中移除 with tasks_lock: monitored_tasks.discard(erp_id) print(f"派发任务 ErpID {erp_id} 已完成,停止监控") # 调用同事提供的状态更新函数 try: print(3) # update_custom_table_status(erp_id, "生产完毕") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "生产完毕" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"生产中断"分支中: elif current_flag.endswith('p'): print(f"派发任务 ErpID {erp_id}: 生产中断") # 任务中断,可以从监控列表中移除 with tasks_lock: monitored_tasks.discard(erp_id) print(f"派发任务 ErpID {erp_id} 已中断,停止监控") # 调用同事提供的状态更新函数 try: print(4) # update_custom_table_status(erp_id, "生产中断") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "生产中断" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"数据已接收"分支中: elif current_flag.endswith('x'): print(f"派发任务 ErpID {erp_id}: 数据已接收") # 调用同事提供的状态更新函数 try: print(5) update_custom_table_status(erp_id, "数据已接收") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "数据已接收" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 检查是否有任务记录已被删除(不在查询结果中但仍在监控列表中) # 这表示任务可能已完成或从系统中移除 missing_tasks = set(deleted_from_sql_tasks) - set(current_tasks.keys()) if missing_tasks: for erp_id_str in missing_tasks: erp_id = int(erp_id_str) with tasks_lock: artifact_id = inserted_tasks.get(erp_id, "Unknown") monitored_tasks.discard(erp_id) inserted_tasks.pop(erp_id, None) task_flags.pop(erp_id_str, None) print(f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 记录已从Access数据库中删除或完成") conn.close() except Exception as e: print(f"监控Access数据库 Flag时发生错误: {e}") import traceback traceback.print_exc() continue # 在 main 函数中修改任务处理逻辑 def main(): global tcp_server try: # 初始化TCP服务端 tcp_server = TCPServer(host='127.0.0.1', port=8888) tcp_server_thread = threading.Thread(target=tcp_server.start) tcp_server_thread.daemon = True tcp_server_thread.start() # 等待服务端启动 time.sleep(1) # 步骤1:获取AppID app_id = get_app_id() # 存储上次获取的所有ArtifactID last_artifact_ids = set() last_artifact_list = [] # 用于存储上一次的完整artifact_list # Access数据库路径和密码 access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径 access_password = "BCS7.2_SDBS" # Access数据库密码 # 启动Access数据库Flag监控线程 access_monitor_thread = threading.Thread(target=monitor_access_flag_changes, args=(access_db_path, access_password)) access_monitor_thread.daemon = True access_monitor_thread.start() while True: try: # 步骤2:获取所有未浇筑信息 tasks, artifact_list, send_list, half_volume = get_all_not_pour_info(app_id) print(tasks) print(artifact_list) print(send_list) print(half_volume) current_artifact_ids = {task["artifact_id"] for task in tasks} # 检查artifact_list是否发生变化 if artifact_list != last_artifact_list: print(f"检测到artifact_list更新: {artifact_list}") # 处理新出现的任务 new_artifact_ids = current_artifact_ids - last_artifact_ids if new_artifact_ids: print(f"检测到 {len(new_artifact_ids)} 个新任务") for task in tasks: if task["artifact_id"] in new_artifact_ids: task_info = get_task_info(app_id, task["beton_task_id"]) # 步骤4:连接Access数据库并获取最大Mark值 max_mark = get_max_mark_from_access(access_db_path, access_password) erp_id = int(max_mark) + 1 # print(f"获取到ERP ID: {erp_id}") # 步骤5:连接SQL Server数据库并插入数据 connection = connect_to_sql_server() # 检查 block_number 是否为 "F" if task["block_number"] == "补方": print(f"任务 {task['artifact_id']} 的 block_number 为 '补方',跳过派单") insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id, task["artifact_id"], 0) connection.close() continue print(f"处理新任务: {task['artifact_id']}") # 步骤3:获取任务单信息 insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id, task["artifact_id"], 1) connection.close() # 更新上次获取的ArtifactID集合和artifact_list last_artifact_ids = current_artifact_ids last_artifact_list = artifact_list.copy() # 保存当前的artifact_list # 每10分钟清理一次过期的时间戳记录 cleanup_old_timestamps(current_artifact_ids, max_age_hours=24) # 每10秒检查一次 time.sleep(10) except Exception as e: print(f"发生错误: {e}") # 继续循环,避免程序退出 time.sleep(2) except KeyboardInterrupt: print("程序已停止") # 停止TCP服务端 if tcp_server: tcp_server.stop() if __name__ == "__main__": main()