#在线版本,实时监控Flag信息,但是还没有实现每次只取最上面那条 import requests import hashlib import pyodbc from datetime import datetime import time import json import threading from TCPServer import TCPServer # 配置信息 BASE_URL = "https://www.shnthy.com:9154" # 外网地址 LOGIN_URL = f"{BASE_URL}/api/user/perlogin" MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9" TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task" NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour" # 登录参数 LOGIN_DATA = { "Program": 11, "SC": "1000000001", "loginName": "leduser", "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" } # 全局变量用于存储需要监控的任务ERP ID monitored_tasks = set() # 存储需要监控的erp_id inserted_tasks = {} # {erp_id: artifact_id} tasks_lock = threading.Lock() # TCP服务端实例 tcp_server = TCPServer # 计算SHA256密码 def hash_password(password): return password LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"]) # 获取AppID def get_app_id(): response = requests.post(LOGIN_URL, json=LOGIN_DATA) if response.status_code == 200: data = response.json() if data.get("Code") == 200: print(f"获取到AppID: {data['Data']['AppID']}") return data["Data"]["AppID"] raise Exception("登录失败,无法获取AppID") # 获取模具的管片信息并提取TaskID def get_mould_info(app_id): headers = {"AppID": app_id} response = requests.get(MOULD_INFO_URL, headers=headers) if response.status_code == 205: data = response.json() if data.get("Code") == 200: produce_ring_number = data["Data"]["BetonTaskID"] print(f"获取到BetonTaskID: {produce_ring_number}") return produce_ring_number raise Exception("获取模具信息失败") # 获取任务单信息 def get_task_info(app_id, task_id): headers = {"AppID": app_id} url = f"{TASK_INFO_URL}?TaskId={task_id}" response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() if data.get("Code") == 200: task_data = data["Data"] print(f"获取到任务单信息:") print(f" TaskID: {task_data['TaskID']}") print(f" ProduceMixID: {task_data['ProduceMixID']}") print(f" ProjectName: {task_data['ProjectName']}") print(f" BetonGrade: {task_data['BetonGrade']}") print(f" MixID: {task_data['MixID']}") print(f" PlannedVolume: {task_data['PlannedVolume']}") print(f" ProducedVolume: {task_data['ProducedVolume']}") print(f" Progress: {task_data['Progress']}") print(f" TaskDateText: {task_data['TaskDateText']}") print(f" TaskStatusText: {task_data['TaskStatusText']}") return task_data raise Exception("获取任务单信息失败") # 获取所有未浇筑信息 def get_all_not_pour_info(app_id): headers = {"AppID": app_id} response = requests.get(NOT_POUR_INFO_URL, headers=headers) if response.status_code == 200: data = response.json() if data.get("Code") == 200: artifact_list = data["Data"] tasks = [] for artifact in artifact_list: beton_task_id = artifact["BetonTaskID"] beton_volume = artifact["BetonVolume"] artifact_id = artifact["ArtifactActionID"] block_number = artifact.get("BlockNumber", "") # 根据BlockNumber调整方量 if block_number == "L2": adjusted_volume = beton_volume + 0.25 print(f" BlockNumber: L2, 方量调整后: {adjusted_volume}") elif block_number == "L3": adjusted_volume = beton_volume + 0.3 print(f" BlockNumber: L3, 方量调整后: {adjusted_volume}") else: adjusted_volume = beton_volume print(f" BlockNumber: {block_number}, 方量未调整") tasks.append({ "beton_task_id": beton_task_id, "beton_volume": adjusted_volume, "artifact_id": artifact_id, "block_number": block_number }) return tasks raise Exception("获取未浇筑信息失败") # 连接Access数据库 def connect_to_access_db(db_path, password): conn_str = ( r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' f'DBQ={db_path};' f'PWD={password};' ) return pyodbc.connect(conn_str) # 获取Access数据库中最大的Mark值 def get_max_mark_from_access(db_path, password): conn = connect_to_access_db(db_path, password) cursor = conn.cursor() # 查询最大的Mark值 cursor.execute("SELECT MAX(Mark) FROM Produce") max_mark = cursor.fetchone()[0] # 如果没有记录,返回0 if max_mark is None: max_mark = 0 conn.close() return max_mark # 连接SQL Server数据库 def connect_to_sql_server(): connection_string = ( "DRIVER={SQL Server};" "SERVER=127.0.0.1;" "DATABASE=BS23DB;" "UID=sa;" "PWD=123;" ) return pyodbc.connect(connection_string) # 插入数据到Produce表 # 在 insert_into_produce_table 函数中添加调用同事的保存函数 def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artifact_id): cursor = connection.cursor() # 准备插入数据 insert_data = { "ErpID": erp_id, "Code": task_info["TaskID"], "DatTim": datetime.now(), "Recipe": task_info["ProduceMixID"], "MorRec": "", "ProdMete": beton_volume, "MorMete": 0.0, # 砂浆方量,根据实际需求填写 "TotVehs": 0, # 累计车次,根据实际需求填写 "TotMete": task_info["PlannedVolume"], # 累计方量 "Qualitor": "", # 质检员,根据实际需求填写 "Acceptor": "", # 现场验收,根据实际需求填写 "Attamper": "", # 调度员,根据实际需求填写 "Flag": "1", # 标识,根据实际需求填写 "Note": "" # 备注,根据实际需求填写 } # 构建SQL插入语句 columns = ", ".join(insert_data.keys()) placeholders = ", ".join(["?" for _ in insert_data.values()]) sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})" # 执行插入操作 cursor.execute(sql, list(insert_data.values())) connection.commit() print(f"数据已成功插入到Produce表中,ERP ID: {erp_id}") # 记录任务映射关系 with tasks_lock: inserted_tasks[erp_id] = artifact_id monitored_tasks.add(erp_id) print(f"任务 {erp_id} (ArtifactID: {artifact_id}) 已添加到监控列表") # 调用同事提供的保存函数,将数据保存到自定义数据表 try: # 假设同事提供的函数名为 save_to_custom_table # 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量 # save_to_custom_table( # misid=erp_id, # flag="1", # 初始Flag值 # task_id=task_info["TaskID"], # produce_mix_id=task_info["ProduceMixID"], # project_name=task_info["ProjectName"], # beton_grade=task_info["BetonGrade"], # adjusted_volume=beton_volume # 已经调整后的方量 # ) print(f"任务 {erp_id} 的数据已保存到自定义数据表") except Exception as e: print(f"调用保存函数时出错: {e}") # 发送数据给TCP客户端 try: time.sleep(5) task_data = { "erp_id": erp_id,#车号,相当于序号 "task_id": task_info["TaskID"],#任务单号 "produce_mix_id": task_info["ProduceMixID"],#配比号 "project_name": task_info["ProjectName"],#任务名 "beton_grade": task_info["BetonGrade"],#砼强度 "adjusted_volume": beton_volume,#方量 "flag": "1x",#状态 "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")#时间 } tcp_server.send_data(task_data) print(f"任务 {erp_id} 的数据已发送给TCP客户端") except Exception as e: print(f"发送数据给TCP客户端时出错: {e}") return erp_id # 监控Access数据库中特定任务的Flag字段变化 def monitor_access_flag_changes(access_db_path, access_password): """监控Access数据库中派发任务的Flag状态""" # 存储任务的当前状态 task_flags = {} print("开始监控Access数据库中派发任务的Flag状态") while True: try: # 每2秒检查一次 time.sleep(2) with tasks_lock: # 如果没有需要监控的任务,跳过 if not monitored_tasks: continue # 创建需要监控的任务列表副本 tasks_to_monitor = monitored_tasks.copy() # 首先检查SQL Server中任务是否还存在 sql_conn = connect_to_sql_server() sql_cursor = sql_conn.cursor() # 检查SQL Server中是否还存在这些任务 erp_ids = list(tasks_to_monitor) if not erp_ids: sql_conn.close() continue erp_ids_str = [str(erp_id) for erp_id in erp_ids] placeholders = ','.join('?' * len(erp_ids_str)) check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})" sql_cursor.execute(check_query, erp_ids_str) sql_results = sql_cursor.fetchall() sql_conn.close() # 分离已删除和未删除的任务 existing_tasks_in_sql = {str(row[0]) for row in sql_results} deleted_from_sql_tasks = set(erp_ids_str) - existing_tasks_in_sql print(f"SQL Server中仍存在的任务: {existing_tasks_in_sql}") print(f"已从SQL Server删除的任务: {deleted_from_sql_tasks}") # 只有已从SQL Server删除的任务才需要在Access中查找 if not deleted_from_sql_tasks: continue # 连接Access数据库 conn = connect_to_access_db(access_db_path, access_password) cursor = conn.cursor() # 查询Access数据库中已删除任务的状态 placeholders = ','.join('?' * len(list(deleted_from_sql_tasks))) query = f"SELECT MISID, Flag FROM Produce WHERE MISID IN ({placeholders})" # 添加调试信息 print(f"执行查询: {query}") print(f"查询参数: {list(deleted_from_sql_tasks)}") cursor.execute(query, list(deleted_from_sql_tasks)) results = cursor.fetchall() # 添加调试信息 print(f"查询返回结果数量: {len(results)}") # 如果没有查询到结果,检查数据库中的实际数据 if len(results) == 0 and deleted_from_sql_tasks: print("未找到匹配记录,检查数据库中的实际数据:") try: cursor.execute("SELECT TOP 5 Mark, Flag FROM Produce ORDER BY Mark DESC") sample_data = cursor.fetchall() print(f"数据库中最近的5条记录: {sample_data}") except Exception as sample_e: print(f"查询样本数据时出错: {sample_e}") # 处理查询结果 current_tasks = {} for row in results: mark = row[0] flag = row[1] if row[1] is not None else "" current_tasks[mark] = flag print(f"查询到记录 - MISID: {mark}, Flag: '{flag}'") # 调试信息 # 检查每个已删除任务的状态变化 for erp_id_str in deleted_from_sql_tasks: erp_id = int(erp_id_str) current_flag = current_tasks.get(erp_id_str, "") previous_flag = task_flags.get(erp_id_str, "") # 添加调试信息 print(f"检查任务 {erp_id} - 当前Flag: '{current_flag}', 之前Flag: '{previous_flag}'") # 如果状态发生变化 if current_flag != previous_flag: # 添加这行 with tasks_lock: artifact_id = inserted_tasks.get(erp_id, "Unknown") print( f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 的Flag值已更新: {previous_flag} -> {current_flag}") task_flags[erp_id_str] = current_flag # 根据Flag值末尾的字母执行相应操作并更新自定义数据表状态 if current_flag.endswith('d'): # 将 elif 改为 if print(f"派发任务 ErpID {erp_id}: 未进行生产") # 调用同事提供的状态更新函数 try: print(1) # update_custom_table_status(erp_id, "未进行生产") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "未进行生产" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"正在生产中"分支中: elif current_flag.endswith('w'): print(f"派发任务 ErpID {erp_id}: 正在生产中") # 调用同事提供的状态更新函数 try: print(2) # update_custom_table_status(erp_id, "正在生产中") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "正在生产中" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"生产完毕"分支中: elif current_flag.endswith('n'): print(f"派发任务 ErpID {erp_id}: 生产完毕") # 任务完成,可以从监控列表中移除 with tasks_lock: monitored_tasks.discard(erp_id) print(f"派发任务 ErpID {erp_id} 已完成,停止监控") # 调用同事提供的状态更新函数 try: print(3) # update_custom_table_status(erp_id, "生产完毕") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "生产完毕" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"生产中断"分支中: elif current_flag.endswith('p'): print(f"派发任务 ErpID {erp_id}: 生产中断") # 任务中断,可以从监控列表中移除 with tasks_lock: monitored_tasks.discard(erp_id) print(f"派发任务 ErpID {erp_id} 已中断,停止监控") # 调用同事提供的状态更新函数 try: print(4) # update_custom_table_status(erp_id, "生产中断") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "生产中断" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 在"数据已接收"分支中: elif current_flag.endswith('x'): print(f"派发任务 ErpID {erp_id}: 数据已接收") # 调用同事提供的状态更新函数 try: print(5) # update_custom_table_status(erp_id, "数据已接收") except Exception as e: print(f"更新状态时出错: {e}") # 发送数据给TCP客户端(只发送erp_id和状态) try: status_data = { "erp_id": erp_id, "status": "数据已接收" } tcp_server.send_data(status_data) except Exception as e: print(f"发送状态数据给TCP客户端时出错: {e}") # 检查是否有任务记录已被删除(不在查询结果中但仍在监控列表中) # 这表示任务可能已完成或从系统中移除 missing_tasks = set(deleted_from_sql_tasks) - set(current_tasks.keys()) if missing_tasks: for erp_id_str in missing_tasks: erp_id = int(erp_id_str) with tasks_lock: artifact_id = inserted_tasks.get(erp_id, "Unknown") monitored_tasks.discard(erp_id) inserted_tasks.pop(erp_id, None) task_flags.pop(erp_id_str, None) print(f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 记录已从Access数据库中删除或完成") conn.close() except Exception as e: print(f"监控Access数据库 Flag时发生错误: {e}") import traceback traceback.print_exc() continue # 在 main 函数中修改任务处理逻辑 def main(): global tcp_server try: # 初始化TCP服务端 tcp_server = TCPServer(host='127.0.0.1', port=8888) tcp_server_thread = threading.Thread(target=tcp_server.start) tcp_server_thread.daemon = True tcp_server_thread.start() # 等待服务端启动 time.sleep(1) # 步骤1:获取AppID app_id = get_app_id() # 存储上次获取的所有ArtifactID last_artifact_ids = set() # Access数据库路径和密码 access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径 access_password = "BCS7.2_SDBS" # Access数据库密码 # 启动Access数据库Flag监控线程 access_monitor_thread = threading.Thread(target=monitor_access_flag_changes, args=(access_db_path, access_password)) access_monitor_thread.daemon = True access_monitor_thread.start() while True: try: # 步骤2:获取所有未浇筑信息 tasks = get_all_not_pour_info(app_id) current_artifact_ids = {task["artifact_id"] for task in tasks} # 检查是否有新任务 new_artifact_ids = current_artifact_ids - last_artifact_ids if new_artifact_ids: print(f"检测到 {len(new_artifact_ids)} 个新任务") for task in tasks: if task["artifact_id"] in new_artifact_ids: # 检查 block_number 是否为 "F" if task["block_number"] == "F": print(f"任务 {task['artifact_id']} 的 block_number 为 'F',跳过派单") continue print(f"处理新任务: {task['artifact_id']}") # 步骤3:获取任务单信息 task_info = get_task_info(app_id, task["beton_task_id"]) # 步骤4:连接Access数据库并获取最大Mark值 max_mark = get_max_mark_from_access(access_db_path, access_password) erp_id = int(max_mark) + 1 print(f"获取到ERP ID: {erp_id}") # 步骤5:连接SQL Server数据库并插入数据 connection = connect_to_sql_server() insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id, task["artifact_id"]) connection.close() # 更新上次获取的ArtifactID集合 last_artifact_ids = current_artifact_ids # 每2秒检查一次 time.sleep(2) except Exception as e: print(f"发生错误: {e}") # 继续循环,避免程序退出 time.sleep(2) except KeyboardInterrupt: print("程序已停止") # 停止TCP服务端 if tcp_server: tcp_server.stop() if __name__ == "__main__": main()