From bbbe478d3e137a0c441eac9d42de880d808de46a Mon Sep 17 00:00:00 2001 From: xiongyi <827523911@qq.com> Date: Wed, 29 Oct 2025 21:23:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=B0=BE=E6=95=B0=E6=8E=A7?= =?UTF-8?q?=E5=88=B6=E5=92=8C=E7=8A=B6=E6=80=81=E7=9B=91=E6=8E=A7=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E6=B7=BB=E5=8A=A0TCP=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E4=BC=A0=E8=BE=93=E5=92=8C=E6=95=B0=E6=8D=AE=E5=BA=93=E5=AD=98?= =?UTF-8?q?=E5=82=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- InsertDataFlag.py | 278 +++++++++++++++++++-------- InsertData_tran.py | 461 +++++++++++++++++++++++++++++++++++++++++++++ TCPServer.py | 133 +++++++++++++ 3 files changed, 790 insertions(+), 82 deletions(-) create mode 100644 InsertData_tran.py create mode 100644 TCPServer.py diff --git a/InsertDataFlag.py b/InsertDataFlag.py index b3655f9..1e88356 100644 --- a/InsertDataFlag.py +++ b/InsertDataFlag.py @@ -21,6 +21,11 @@ LOGIN_DATA = { "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" } +# 全局变量用于存储需要监控的任务ERP ID +monitored_tasks = set() # 存储需要监控的erp_id +inserted_tasks = {} # {erp_id: artifact_id} +tasks_lock = threading.Lock() + # 计算SHA256密码 def hash_password(password): @@ -74,33 +79,24 @@ def get_task_info(app_id, task_id): print(f" Progress: {task_data['Progress']}") print(f" TaskDateText: {task_data['TaskDateText']}") print(f" TaskStatusText: {task_data['TaskStatusText']}") - - - - return task_data raise Exception("获取任务单信息失败") -# 获取未浇筑信息(新增) -def get_not_pour_info(app_id): +# 获取所有未浇筑信息 +def get_all_not_pour_info(app_id): headers = {"AppID": app_id} response = requests.get(NOT_POUR_INFO_URL, headers=headers) if response.status_code == 200: data = response.json() if data.get("Code") == 200: - # 处理列表数据 artifact_list = data["Data"] - if len(artifact_list) > 0: - first_artifact = artifact_list[0] # 获取第一个元素 - beton_task_id = first_artifact["BetonTaskID"] - beton_volume = first_artifact["BetonVolume"] - artifact_id = first_artifact["ArtifactActionID"] # 获取ArtifactID - block_number = first_artifact.get("BlockNumber", "") - print(f"获取到BetonTaskID: {beton_task_id}") - print(f"获取到BetonVolume: {beton_volume}") - print(f"获取到ArtifactActionID: {artifact_id}") - print(f"获取到BlockNumber: {block_number}") + tasks = [] + for artifact in artifact_list: + beton_task_id = artifact["BetonTaskID"] + beton_volume = artifact["BetonVolume"] + artifact_id = artifact["ArtifactActionID"] + block_number = artifact.get("BlockNumber", "") # 根据BlockNumber调整方量 if block_number == "L2": @@ -113,12 +109,14 @@ def get_not_pour_info(app_id): adjusted_volume = beton_volume print(f" BlockNumber: {block_number}, 方量未调整") - # 更新BetonVolume为调整后的值 - beton_volume = adjusted_volume + tasks.append({ + "beton_task_id": beton_task_id, + "beton_volume": adjusted_volume, + "artifact_id": artifact_id, + "block_number": block_number + }) - return beton_task_id, beton_volume, artifact_id - else: - raise Exception("未找到未浇筑信息") + return tasks raise Exception("获取未浇筑信息失败") @@ -162,7 +160,7 @@ def connect_to_sql_server(): # 插入数据到Produce表 -def insert_into_produce_table(connection, task_info, beton_volume, erp_id): +def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artifact_id): cursor = connection.cursor() # 准备插入数据 @@ -191,101 +189,217 @@ def insert_into_produce_table(connection, task_info, beton_volume, erp_id): # 执行插入操作 cursor.execute(sql, list(insert_data.values())) connection.commit() - print(f"数据已成功插入到Produce表中") + print(f"数据已成功插入到Produce表中,ERP ID: {erp_id}") -# def to_system(): -# get_not_pour_info + # 记录任务映射关系 + with tasks_lock: + inserted_tasks[erp_id] = artifact_id + monitored_tasks.add(erp_id) + print(f"任务 {erp_id} (ArtifactID: {artifact_id}) 已添加到监控列表") -# 监控Flag字段变化 -def monitor_flag_changes(db_path, password): - conn = connect_to_access_db(db_path, password) - cursor = conn.cursor() + return erp_id - # 获取初始Flag值 - cursor.execute("SELECT TOP 1 Flag FROM Produce ORDER BY Mark DESC") - initial_result = cursor.fetchone() - initial_flag = initial_result[0] if initial_result[0] is not None else "" - print(f"初始Flag值: {initial_flag}") - if initial_flag.endswith('x'): - print("数据已接收") +# 监控Access数据库中特定任务的Flag字段变化 +def monitor_access_flag_changes(access_db_path, access_password): + """监控Access数据库中派发任务的Flag状态""" + + # 存储任务的当前状态 + task_flags = {} + + print("开始监控Access数据库中派发任务的Flag状态") while True: try: - # 每2秒检查一次Flag值 + # 每2秒检查一次 time.sleep(2) - # 使用TOP 1获取最新记录 - cursor.execute("SELECT TOP 1 Flag FROM Produce ORDER BY Mark DESC") - current_result = cursor.fetchone() - current_flag = current_result[0] if current_result[0] is not None else "" + with tasks_lock: + # 如果没有需要监控的任务,跳过 + if not monitored_tasks: + continue - # 检查Flag值是否发生变化 - if current_flag != initial_flag: - print(f"Flag值已更新: {initial_flag} -> {current_flag}") - initial_flag = current_flag + # 创建需要监控的任务列表副本 + tasks_to_monitor = monitored_tasks.copy() + + # 首先检查SQL Server中任务是否还存在 + sql_conn = connect_to_sql_server() + sql_cursor = sql_conn.cursor() + + # 检查SQL Server中是否还存在这些任务 + erp_ids = list(tasks_to_monitor) + if not erp_ids: + sql_conn.close() + continue + + erp_ids_str = [str(erp_id) for erp_id in erp_ids] + placeholders = ','.join('?' * len(erp_ids_str)) + check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})" + sql_cursor.execute(check_query, erp_ids_str) + sql_results = sql_cursor.fetchall() + sql_conn.close() + + # 分离已删除和未删除的任务 + existing_tasks_in_sql = {str(row[0]) for row in sql_results} + deleted_from_sql_tasks = set(erp_ids_str) - existing_tasks_in_sql + + print(f"SQL Server中仍存在的任务: {existing_tasks_in_sql}") + print(f"已从SQL Server删除的任务: {deleted_from_sql_tasks}") + + # 只有已从SQL Server删除的任务才需要在Access中查找 + if not deleted_from_sql_tasks: + continue + + # 连接Access数据库 + conn = connect_to_access_db(access_db_path, access_password) + cursor = conn.cursor() + + # 查询Access数据库中已删除任务的状态 + placeholders = ','.join('?' * len(list(deleted_from_sql_tasks))) + query = f"SELECT MISID, Flag FROM Produce WHERE MISID IN ({placeholders})" + + # 添加调试信息 + print(f"执行查询: {query}") + print(f"查询参数: {list(deleted_from_sql_tasks)}") + + cursor.execute(query, list(deleted_from_sql_tasks)) + results = cursor.fetchall() + + # 添加调试信息 + print(f"查询返回结果数量: {len(results)}") + + # 如果没有查询到结果,检查数据库中的实际数据 + if len(results) == 0 and deleted_from_sql_tasks: + print("未找到匹配记录,检查数据库中的实际数据:") + try: + cursor.execute("SELECT TOP 5 Mark, Flag FROM Produce ORDER BY Mark DESC") + sample_data = cursor.fetchall() + print(f"数据库中最近的5条记录: {sample_data}") + except Exception as sample_e: + print(f"查询样本数据时出错: {sample_e}") + + # 处理查询结果 + current_tasks = {} + for row in results: + mark = row[0] + flag = row[1] if row[1] is not None else "" + current_tasks[mark] = flag + print(f"查询到记录 - MISID: {mark}, Flag: '{flag}'") # 调试信息 + + # 检查每个已删除任务的状态变化 + for erp_id_str in deleted_from_sql_tasks: + erp_id = int(erp_id_str) + current_flag = current_tasks.get(erp_id_str, "") + previous_flag = task_flags.get(erp_id_str, "") # 添加调试信息 - print(f"当前Flag值: {current_flag}") - # 根据Flag值末尾的字母执行相应操作 + print(f"检查任务 {erp_id} - 当前Flag: '{current_flag}', 之前Flag: '{previous_flag}'") - if current_flag.endswith('d'): - print("未进行生产") - elif current_flag.endswith('w'): - print("正在生产中") - elif current_flag.endswith('n'): - print("生产完毕") - elif current_flag.endswith('p'): - print("生产中断") + # 如果状态发生变化 + if current_flag != previous_flag: + with tasks_lock: + artifact_id = inserted_tasks.get(erp_id, "Unknown") + print( + f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 的Flag值已更新: {previous_flag} -> {current_flag}") + task_flags[erp_id_str] = current_flag + + # 根据Flag值末尾的字母执行相应操作 + if current_flag.endswith('d'): + print(f"派发任务 ErpID {erp_id}: 未进行生产") + elif current_flag.endswith('w'): + print(f"派发任务 ErpID {erp_id}: 正在生产中") + elif current_flag.endswith('n'): + print(f"派发任务 ErpID {erp_id}: 生产完毕") + # 任务完成,可以从监控列表中移除 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"派发任务 ErpID {erp_id} 已完成,停止监控") + elif current_flag.endswith('p'): + print(f"派发任务 ErpID {erp_id}: 生产中断") + # 任务中断,可以从监控列表中移除 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"派发任务 ErpID {erp_id} 已中断,停止监控") + elif current_flag.endswith('x'): + print(f"派发任务 ErpID {erp_id}: 数据已接收") + + # 检查是否有任务记录已被删除(不在查询结果中但仍在监控列表中) + # 这表示任务可能已完成或从系统中移除 + missing_tasks = set(deleted_from_sql_tasks) - set(current_tasks.keys()) + if missing_tasks: + for erp_id_str in missing_tasks: + erp_id = int(erp_id_str) + with tasks_lock: + artifact_id = inserted_tasks.get(erp_id, "Unknown") + monitored_tasks.discard(erp_id) + inserted_tasks.pop(erp_id, None) + task_flags.pop(erp_id_str, None) + print(f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 记录已从Access数据库中删除或完成") + + conn.close() except Exception as e: - print(f"监控Flag时发生错误: {e}") + print(f"监控Access数据库 Flag时发生错误: {e}") + import traceback + traceback.print_exc() continue -# 主函数 +# 在 main 函数中修改任务处理逻辑 def main(): try: # 步骤1:获取AppID app_id = get_app_id() - # 上次获取的ArtifactID,用于检测变化 - last_artifact_id = None + # 存储上次获取的所有ArtifactID + last_artifact_ids = set() # Access数据库路径和密码 access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径 access_password = "BCS7.2_SDBS" # Access数据库密码 - # 启动Flag监控线程 - flag_monitor_thread = threading.Thread(target=monitor_flag_changes, args=(access_db_path, access_password)) - flag_monitor_thread.daemon = True - flag_monitor_thread.start() + # 启动Access数据库Flag监控线程 + access_monitor_thread = threading.Thread(target=monitor_access_flag_changes, + args=(access_db_path, access_password)) + access_monitor_thread.daemon = True + access_monitor_thread.start() while True: try: - # 步骤2:获取未浇筑信息中的BetonTaskID、BetonVolume和ArtifactID - beton_task_id, beton_volume, artifact_id = get_not_pour_info(app_id) - # beton_task_id = "20251016-01" + # 步骤2:获取所有未浇筑信息 + tasks = get_all_not_pour_info(app_id) + current_artifact_ids = {task["artifact_id"] for task in tasks} - # 检查ArtifactID是否发生变化 - if artifact_id != last_artifact_id: - print(f"检测到新任务: {artifact_id}") + # 检查是否有新任务 + new_artifact_ids = current_artifact_ids - last_artifact_ids + if new_artifact_ids: + print(f"检测到 {len(new_artifact_ids)} 个新任务") - # 步骤3:使用BetonTaskID获取任务单信息 - task_info = get_task_info(app_id, beton_task_id) + for task in tasks: + if task["artifact_id"] in new_artifact_ids: + # 检查 block_number 是否为 "F" + if task["block_number"] == "F": + print(f"任务 {task['artifact_id']} 的 block_number 为 'F',跳过派单") + continue - # 步骤4:连接Access数据库并获取最大Mark值 - max_mark = get_max_mark_from_access(access_db_path, access_password) - erp_id = int(max_mark) + 1 # 在最大Mark值基础上加1 - print(f"获取到ERP ID: {erp_id}") + print(f"处理新任务: {task['artifact_id']}") - # 步骤5:连接SQL Server数据库并插入数据 - connection = connect_to_sql_server() - insert_into_produce_table(connection, task_info, beton_volume, erp_id) - connection.close() + # 步骤3:获取任务单信息 + task_info = get_task_info(app_id, task["beton_task_id"]) - # 更新上次获取的ArtifactID - last_artifact_id = artifact_id + # 步骤4:连接Access数据库并获取最大Mark值 + max_mark = get_max_mark_from_access(access_db_path, access_password) + erp_id = int(max_mark) + 1 + print(f"获取到ERP ID: {erp_id}") + + # 步骤5:连接SQL Server数据库并插入数据 + connection = connect_to_sql_server() + insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id, + task["artifact_id"]) + connection.close() + + # 更新上次获取的ArtifactID集合 + last_artifact_ids = current_artifact_ids # 每2秒检查一次 time.sleep(2) diff --git a/InsertData_tran.py b/InsertData_tran.py new file mode 100644 index 0000000..8078b6c --- /dev/null +++ b/InsertData_tran.py @@ -0,0 +1,461 @@ +import requests +import hashlib +import pyodbc +from datetime import datetime +import time +import json +import threading + +# 配置信息 +BASE_URL = "https://www.shnthy.com:9154" # 外网地址 +LOGIN_URL = f"{BASE_URL}/api/user/perlogin" +MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9" +TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task" +NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour" # 新增接口 + +# 登录参数 +LOGIN_DATA = { + "Program": 11, + "SC": "1000000001", + "loginName": "leduser", + "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" +} + +# 全局变量用于存储需要监控的任务ERP ID +monitored_tasks = set() # 存储需要监控的erp_id +inserted_tasks = {} # {erp_id: artifact_id} +tasks_lock = threading.Lock() + + +# 计算SHA256密码 +def hash_password(password): + return password + + +LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"]) + + +# 获取AppID +def get_app_id(): + response = requests.post(LOGIN_URL, json=LOGIN_DATA) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + print(f"获取到AppID: {data['Data']['AppID']}") + return data["Data"]["AppID"] + raise Exception("登录失败,无法获取AppID") + + +# 获取模具的管片信息并提取TaskID +def get_mould_info(app_id): + headers = {"AppID": app_id} + response = requests.get(MOULD_INFO_URL, headers=headers) + if response.status_code == 205: + data = response.json() + if data.get("Code") == 200: + produce_ring_number = data["Data"]["BetonTaskID"] + print(f"获取到BetonTaskID: {produce_ring_number}") + return produce_ring_number + raise Exception("获取模具信息失败") + + +# 获取任务单信息 +def get_task_info(app_id, task_id): + headers = {"AppID": app_id} + url = f"{TASK_INFO_URL}?TaskId={task_id}" + response = requests.get(url, headers=headers) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + task_data = data["Data"] + print(f"获取到任务单信息:") + print(f" TaskID: {task_data['TaskID']}") + print(f" ProduceMixID: {task_data['ProduceMixID']}") + print(f" ProjectName: {task_data['ProjectName']}") + print(f" BetonGrade: {task_data['BetonGrade']}") + print(f" MixID: {task_data['MixID']}") + print(f" PlannedVolume: {task_data['PlannedVolume']}") + print(f" ProducedVolume: {task_data['ProducedVolume']}") + print(f" Progress: {task_data['Progress']}") + print(f" TaskDateText: {task_data['TaskDateText']}") + print(f" TaskStatusText: {task_data['TaskStatusText']}") + return task_data + raise Exception("获取任务单信息失败") + + +# 获取所有未浇筑信息 +def get_all_not_pour_info(app_id): + headers = {"AppID": app_id} + response = requests.get(NOT_POUR_INFO_URL, headers=headers) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + artifact_list = data["Data"] + tasks = [] + for artifact in artifact_list: + beton_task_id = artifact["BetonTaskID"] + beton_volume = artifact["BetonVolume"] + artifact_id = artifact["ArtifactActionID"] + block_number = artifact.get("BlockNumber", "") + + # 根据BlockNumber调整方量 + if block_number == "L2": + adjusted_volume = beton_volume + 0.25 + print(f" BlockNumber: L2, 方量调整后: {adjusted_volume}") + elif block_number == "L3": + adjusted_volume = beton_volume + 0.3 + print(f" BlockNumber: L3, 方量调整后: {adjusted_volume}") + else: + adjusted_volume = beton_volume + print(f" BlockNumber: {block_number}, 方量未调整") + + tasks.append({ + "beton_task_id": beton_task_id, + "beton_volume": adjusted_volume, + "artifact_id": artifact_id, + "block_number": block_number + }) + + return tasks + raise Exception("获取未浇筑信息失败") + + +# 连接Access数据库 +def connect_to_access_db(db_path, password): + conn_str = ( + r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' + f'DBQ={db_path};' + f'PWD={password};' + ) + return pyodbc.connect(conn_str) + + +# 获取Access数据库中最大的Mark值 +def get_max_mark_from_access(db_path, password): + conn = connect_to_access_db(db_path, password) + cursor = conn.cursor() + + # 查询最大的Mark值 + cursor.execute("SELECT MAX(Mark) FROM Produce") + max_mark = cursor.fetchone()[0] + + # 如果没有记录,返回0 + if max_mark is None: + max_mark = 0 + + conn.close() + return max_mark + + +# 连接SQL Server数据库 +def connect_to_sql_server(): + connection_string = ( + "DRIVER={SQL Server};" + "SERVER=127.0.0.1;" + "DATABASE=BS23DB;" + "UID=sa;" + "PWD=123;" + ) + return pyodbc.connect(connection_string) + + +# 插入数据到Produce表 +# 在 insert_into_produce_table 函数中添加调用同事的保存函数 +def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artifact_id): + cursor = connection.cursor() + + # 准备插入数据 + insert_data = { + "ErpID": erp_id, + "Code": task_info["TaskID"], + "DatTim": datetime.now(), + "Recipe": task_info["ProduceMixID"], + "MorRec": "", + "ProdMete": beton_volume, + "MorMete": 0.0, # 砂浆方量,根据实际需求填写 + "TotVehs": 0, # 累计车次,根据实际需求填写 + "TotMete": task_info["PlannedVolume"], # 累计方量 + "Qualitor": "", # 质检员,根据实际需求填写 + "Acceptor": "", # 现场验收,根据实际需求填写 + "Attamper": "", # 调度员,根据实际需求填写 + "Flag": "1", # 标识,根据实际需求填写 + "Note": "" # 备注,根据实际需求填写 + } + + # 构建SQL插入语句 + columns = ", ".join(insert_data.keys()) + placeholders = ", ".join(["?" for _ in insert_data.values()]) + sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})" + + # 执行插入操作 + cursor.execute(sql, list(insert_data.values())) + connection.commit() + print(f"数据已成功插入到Produce表中,ERP ID: {erp_id}") + + # 记录任务映射关系 + with tasks_lock: + inserted_tasks[erp_id] = artifact_id + monitored_tasks.add(erp_id) + print(f"任务 {erp_id} (ArtifactID: {artifact_id}) 已添加到监控列表") + + # 调用同事提供的保存函数,将数据保存到自定义数据表 + try: + # 假设同事提供的函数名为 save_to_custom_table + # 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量 + save_to_custom_table( + misid=erp_id, + flag="1", # 初始Flag值 + task_id=task_info["TaskID"], + produce_mix_id=task_info["ProduceMixID"], + project_name=task_info["ProjectName"], + beton_grade=task_info["BetonGrade"], + adjusted_volume=beton_volume # 已经调整后的方量 + ) + print(f"任务 {erp_id} 的数据已保存到自定义数据表") + except Exception as e: + print(f"调用保存函数时出错: {e}") + + return erp_id + +# 监控Access数据库中特定任务的Flag字段变化 +# 监控Access数据库中特定任务的Flag字段变化 +def monitor_access_flag_changes(access_db_path, access_password): + """监控Access数据库中派发任务的Flag状态""" + + # 存储任务的当前状态 + task_flags = {} + + print("开始监控Access数据库中派发任务的Flag状态") + + while True: + try: + # 每2秒检查一次 + time.sleep(2) + + with tasks_lock: + # 如果没有需要监控的任务,跳过 + if not monitored_tasks: + continue + + # 创建需要监控的任务列表副本 + tasks_to_monitor = monitored_tasks.copy() + + # 首先检查SQL Server中任务是否还存在 + sql_conn = connect_to_sql_server() + sql_cursor = sql_conn.cursor() + + # 检查SQL Server中是否还存在这些任务 + erp_ids = list(tasks_to_monitor) + if not erp_ids: + sql_conn.close() + continue + + erp_ids_str = [str(erp_id) for erp_id in erp_ids] + placeholders = ','.join('?' * len(erp_ids_str)) + check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})" + sql_cursor.execute(check_query, erp_ids_str) + sql_results = sql_cursor.fetchall() + sql_conn.close() + + # 分离已删除和未删除的任务 + existing_tasks_in_sql = {str(row[0]) for row in sql_results} + deleted_from_sql_tasks = set(erp_ids_str) - existing_tasks_in_sql + + print(f"SQL Server中仍存在的任务: {existing_tasks_in_sql}") + print(f"已从SQL Server删除的任务: {deleted_from_sql_tasks}") + + # 只有已从SQL Server删除的任务才需要在Access中查找 + if not deleted_from_sql_tasks: + continue + + # 连接Access数据库 + conn = connect_to_access_db(access_db_path, access_password) + cursor = conn.cursor() + + # 查询Access数据库中已删除任务的状态 + placeholders = ','.join('?' * len(list(deleted_from_sql_tasks))) + query = f"SELECT MISID, Flag FROM Produce WHERE MISID IN ({placeholders})" + + # 添加调试信息 + print(f"执行查询: {query}") + print(f"查询参数: {list(deleted_from_sql_tasks)}") + + cursor.execute(query, list(deleted_from_sql_tasks)) + results = cursor.fetchall() + + # 添加调试信息 + print(f"查询返回结果数量: {len(results)}") + + # 如果没有查询到结果,检查数据库中的实际数据 + if len(results) == 0 and deleted_from_sql_tasks: + print("未找到匹配记录,检查数据库中的实际数据:") + try: + cursor.execute("SELECT TOP 5 Mark, Flag FROM Produce ORDER BY Mark DESC") + sample_data = cursor.fetchall() + print(f"数据库中最近的5条记录: {sample_data}") + except Exception as sample_e: + print(f"查询样本数据时出错: {sample_e}") + + # 处理查询结果 + current_tasks = {} + for row in results: + mark = row[0] + flag = row[1] if row[1] is not None else "" + current_tasks[mark] = flag + print(f"查询到记录 - MISID: {mark}, Flag: '{flag}'") # 调试信息 + + # 检查每个已删除任务的状态变化 + for erp_id_str in deleted_from_sql_tasks: + erp_id = int(erp_id_str) + current_flag = current_tasks.get(erp_id_str, "") + previous_flag = task_flags.get(erp_id_str, "") + + # 添加调试信息 + print(f"检查任务 {erp_id} - 当前Flag: '{current_flag}', 之前Flag: '{previous_flag}'") + + # 如果状态发生变化 + if current_flag != previous_flag: + with tasks_lock: + artifact_id = inserted_tasks.get(erp_id, "Unknown") + print( + f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 的Flag值已更新: {previous_flag} -> {current_flag}") + task_flags[erp_id_str] = current_flag + + # 根据Flag值末尾的字母执行相应操作并更新自定义数据表状态 + if current_flag.endswith('d'): + print(f"派发任务 ErpID {erp_id}: 未进行生产") + # 调用同事提供的状态更新函数 + try: + update_custom_table_status(erp_id, "未进行生产") + except Exception as e: + print(f"更新状态时出错: {e}") + elif current_flag.endswith('w'): + print(f"派发任务 ErpID {erp_id}: 正在生产中") + # 调用同事提供的状态更新函数 + try: + update_custom_table_status(erp_id, "正在生产中") + except Exception as e: + print(f"更新状态时出错: {e}") + elif current_flag.endswith('n'): + print(f"派发任务 ErpID {erp_id}: 生产完毕") + # 任务完成,可以从监控列表中移除 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"派发任务 ErpID {erp_id} 已完成,停止监控") + # 调用同事提供的状态更新函数 + try: + update_custom_table_status(erp_id, "生产完毕") + except Exception as e: + print(f"更新状态时出错: {e}") + elif current_flag.endswith('p'): + print(f"派发任务 ErpID {erp_id}: 生产中断") + # 任务中断,可以从监控列表中移除 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"派发任务 ErpID {erp_id} 已中断,停止监控") + # 调用同事提供的状态更新函数 + try: + update_custom_table_status(erp_id, "生产中断") + except Exception as e: + print(f"更新状态时出错: {e}") + elif current_flag.endswith('x'): + print(f"派发任务 ErpID {erp_id}: 数据已接收") + # 调用同事提供的状态更新函数 + try: + update_custom_table_status(erp_id, "数据已接收") + except Exception as e: + print(f"更新状态时出错: {e}") + + # 检查是否有任务记录已被删除(不在查询结果中但仍在监控列表中) + # 这表示任务可能已完成或从系统中移除 + missing_tasks = set(deleted_from_sql_tasks) - set(current_tasks.keys()) + if missing_tasks: + for erp_id_str in missing_tasks: + erp_id = int(erp_id_str) + with tasks_lock: + artifact_id = inserted_tasks.get(erp_id, "Unknown") + monitored_tasks.discard(erp_id) + inserted_tasks.pop(erp_id, None) + task_flags.pop(erp_id_str, None) + print(f"派发任务 ErpID {erp_id} (ArtifactID: {artifact_id}) 记录已从Access数据库中删除或完成") + + conn.close() + + except Exception as e: + print(f"监控Access数据库 Flag时发生错误: {e}") + import traceback + traceback.print_exc() + continue + + + +# 在 main 函数中修改任务处理逻辑 +def main(): + try: + # 步骤1:获取AppID + app_id = get_app_id() + + # 存储上次获取的所有ArtifactID + last_artifact_ids = set() + + # Access数据库路径和密码 + access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径 + access_password = "BCS7.2_SDBS" # Access数据库密码 + + # 启动Access数据库Flag监控线程 + access_monitor_thread = threading.Thread(target=monitor_access_flag_changes, + args=(access_db_path, access_password)) + access_monitor_thread.daemon = True + access_monitor_thread.start() + + while True: + try: + # 步骤2:获取所有未浇筑信息 + tasks = get_all_not_pour_info(app_id) + current_artifact_ids = {task["artifact_id"] for task in tasks} + + # 检查是否有新任务 + new_artifact_ids = current_artifact_ids - last_artifact_ids + if new_artifact_ids: + print(f"检测到 {len(new_artifact_ids)} 个新任务") + + for task in tasks: + if task["artifact_id"] in new_artifact_ids: + # 检查 block_number 是否为 "F" + if task["block_number"] == "F": + print(f"任务 {task['artifact_id']} 的 block_number 为 'F',跳过派单") + continue + + print(f"处理新任务: {task['artifact_id']}") + + # 步骤3:获取任务单信息 + task_info = get_task_info(app_id, task["beton_task_id"]) + + # 步骤4:连接Access数据库并获取最大Mark值 + max_mark = get_max_mark_from_access(access_db_path, access_password) + erp_id = int(max_mark) + 1 + print(f"获取到ERP ID: {erp_id}") + + # 步骤5:连接SQL Server数据库并插入数据 + connection = connect_to_sql_server() + insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id, + task["artifact_id"]) + connection.close() + + # 更新上次获取的ArtifactID集合 + last_artifact_ids = current_artifact_ids + + # 每2秒检查一次 + time.sleep(2) + + except Exception as e: + print(f"发生错误: {e}") + # 继续循环,避免程序退出 + time.sleep(2) + + except KeyboardInterrupt: + print("程序已停止") + + +if __name__ == "__main__": + main() diff --git a/TCPServer.py b/TCPServer.py new file mode 100644 index 0000000..12db040 --- /dev/null +++ b/TCPServer.py @@ -0,0 +1,133 @@ +import socket +import threading +import json +import time +from datetime import datetime + +class TCPServer: + def __init__(self, host='localhost', port=8080): + self.host = host + self.port = port + self.server_socket = None + self.client_socket = None + self.client_address = None + self.running = False + + def start_server(self): + """启动TCP服务端""" + try: + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((self.host, self.port)) + self.server_socket.listen(1) + self.running = True + + print(f"TCP服务端已启动,监听 {self.host}:{self.port}") + + while self.running: + try: + self.client_socket, self.client_address = self.server_socket.accept() + print(f"客户端已连接: {self.client_address}") + + # 启动客户端处理线程 + client_thread = threading.Thread(target=self.handle_client) + client_thread.daemon = True + client_thread.start() + + except Exception as e: + if self.running: + print(f"接受客户端连接时出错: {e}") + + except Exception as e: + print(f"启动服务端时出错: {e}") + finally: + self.stop_server() + + def handle_client(self): + """处理客户端通信""" + try: + while self.running and self.client_socket: + try: + # 设置接收超时 + self.client_socket.settimeout(1.0) + + # 接收客户端数据 + data = self.client_socket.recv(1024) + if not data: + break + + # 解析JSON数据 + message = data.decode('utf-8') + json_data = json.loads(message) + + # 处理客户端返回的异常数据 + self.process_client_data(json_data) + + except socket.timeout: + # 超时继续循环 + continue + except json.JSONDecodeError as e: + print(f"解析JSON数据时出错: {e}") + except Exception as e: + print(f"处理客户端数据时出错: {e}") + + except Exception as e: + print(f"客户端处理线程出错: {e}") + finally: + self.disconnect_client() + + def process_client_data(self, json_data): + """处理客户端返回的数据""" + try: + cmd = json_data.get("cmd") + timestamp = json_data.get("timestamp") + + if cmd == "production_error": + print(f"收到异常生产通知 - 时间: {timestamp}") + # 调用update_custom_table_status更新数据库状态 + try: + update_custom_table_status("异常生产") + print("数据库状态已更新为: 异常生产") + except Exception as e: + print(f"更新数据库状态时出错: {e}") + elif cmd == "interrupt_error": + print(f"收到中断生产通知 - 时间: {timestamp}") + # 调用update_custom_table_status更新数据库状态 + try: + update_custom_table_status("中断生产") + print("数据库状态已更新为: 中断生产") + except Exception as e: + print(f"更新数据库状态时出错: {e}") + else: + print(f"收到未知命令: {cmd}") + + except Exception as e: + print(f"处理客户端数据时出错: {e}") + + def send_data(self, data): + """向客户端发送数据""" + try: + if self.client_socket and self.running: + json_data = json.dumps(data, ensure_ascii=False) + self.client_socket.send((json_data + "\n").encode('utf-8')) + print(f"已发送数据: {json_data}") + return True + except Exception as e: + print(f"发送数据时出错: {e}") + return False + + def disconnect_client(self): + """断开客户端连接""" + if self.client_socket: + self.client_socket.close() + self.client_socket = None + print("客户端已断开连接") + + def stop_server(self): + """停止服务端""" + self.running = False + if self.client_socket: + self.client_socket.close() + if self.server_socket: + self.server_socket.close() + print("TCP服务端已停止")