diff --git a/.idea/vcs.xml b/.idea/vcs.xml index 94a25f7..1326942 100644 --- a/.idea/vcs.xml +++ b/.idea/vcs.xml @@ -2,5 +2,6 @@ + \ No newline at end of file diff --git a/Allocation.py b/Allocation.py new file mode 100644 index 0000000..5522d6e --- /dev/null +++ b/Allocation.py @@ -0,0 +1,424 @@ +#离线版本,根据彭的返回值更新状态,待测试 + +import requests +import hashlib +import pyodbc +from datetime import datetime +import time +import json +import threading +from TCPServer import TCPServer + +# 配置信息 +BASE_URL = "https://www.shnthy.com:9154" # 外网地址 +LOGIN_URL = f"{BASE_URL}/api/user/perlogin" +MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9" +TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task" +NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour" # 新增接口 + +# 登录参数 +LOGIN_DATA = { + "Program": 11, + "SC": "1000000001", + "loginName": "leduser", + "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" +} + +# 全局变量用于存储需要监控的任务ERP ID +monitored_tasks = set() # 存储需要监控的erp_id +inserted_tasks = {} # {erp_id: artifact_id} +tasks_lock = threading.Lock() + +# TCP服务端实例 +tcp_server = TCPServer + +# 存储任务信息,用于在SQL Server数据删除后保存到自定义表 +task_info_storage = {} # {erp_id: task_info} + + +# 计算SHA256密码 +def hash_password(password): + return password + + +LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"]) + + +# 获取AppID +def get_app_id(): + response = requests.post(LOGIN_URL, json=LOGIN_DATA) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + print(f"获取到AppID: {data['Data']['AppID']}") + return data["Data"]["AppID"] + raise Exception("登录失败,无法获取AppID") + + +# 获取模具的管片信息并提取TaskID +def get_mould_info(app_id): + headers = {"AppID": app_id} + response = requests.get(MOULD_INFO_URL, headers=headers) + if response.status_code == 205: + data = response.json() + if data.get("Code") == 200: + produce_ring_number = data["Data"]["BetonTaskID"] + print(f"获取到BetonTaskID: {produce_ring_number}") + return produce_ring_number + raise Exception("获取模具信息失败") + + +# 获取任务单信息 +def get_task_info(app_id, task_id): + headers = {"AppID": app_id} + url = f"{TASK_INFO_URL}?TaskId={task_id}" + response = requests.get(url, headers=headers) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + task_data = data["Data"] + print(f"获取到任务单信息:") + print(f" TaskID: {task_data['TaskID']}") + print(f" ProduceMixID: {task_data['ProduceMixID']}") + print(f" ProjectName: {task_data['ProjectName']}") + print(f" BetonGrade: {task_data['BetonGrade']}") + print(f" MixID: {task_data['MixID']}") + print(f" PlannedVolume: {task_data['PlannedVolume']}") + print(f" ProducedVolume: {task_data['ProducedVolume']}") + print(f" Progress: {task_data['Progress']}") + print(f" TaskDateText: {task_data['TaskDateText']}") + print(f" TaskStatusText: {task_data['TaskStatusText']}") + return task_data + raise Exception("获取任务单信息失败") + + +# 获取所有未浇筑信息 +def get_all_not_pour_info(app_id): + headers = {"AppID": app_id} + response = requests.get(NOT_POUR_INFO_URL, headers=headers) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + artifact_list = data["Data"] + tasks = [] + for artifact in artifact_list: + beton_task_id = artifact["BetonTaskID"] + beton_volume = artifact["BetonVolume"] + artifact_id = artifact["ArtifactActionID"] + block_number = artifact.get("BlockNumber", "") + + # 根据BlockNumber调整方量 + if block_number == "L2": + adjusted_volume = beton_volume + 0.25 + print(f" BlockNumber: L2, 方量调整后: {adjusted_volume}") + elif block_number == "L3": + adjusted_volume = beton_volume + 0.3 + print(f" BlockNumber: L3, 方量调整后: {adjusted_volume}") + else: + adjusted_volume = beton_volume + print(f" BlockNumber: {block_number}, 方量未调整") + + tasks.append({ + "beton_task_id": beton_task_id, + "beton_volume": adjusted_volume, + "artifact_id": artifact_id, + "block_number": block_number + }) + + return tasks + raise Exception("获取未浇筑信息失败") + + +# 连接Access数据库 +def connect_to_access_db(db_path, password): + conn_str = ( + r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' + f'DBQ={db_path};' + f'PWD={password};' + ) + return pyodbc.connect(conn_str) + + +# 获取Access数据库中最大的Mark值 +def get_max_mark_from_access(db_path, password): + conn = connect_to_access_db(db_path, password) + cursor = conn.cursor() + + # 查询最大的Mark值 + cursor.execute("SELECT MAX(Mark) FROM Produce") + max_mark = cursor.fetchone()[0] + + # 如果没有记录,返回0 + if max_mark is None: + max_mark = 0 + + conn.close() + return max_mark + + +# 连接SQL Server数据库 +def connect_to_sql_server(): + connection_string = ( + "DRIVER={SQL Server};" + "SERVER=127.0.0.1;" + "DATABASE=BS23DB;" + "UID=sa;" + "PWD=123;" + ) + return pyodbc.connect(connection_string) + + +# 插入数据到Produce表 +def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artifact_id): + cursor = connection.cursor() + + # 准备插入数据 + insert_data = { + "ErpID": erp_id, + "Code": task_info["TaskID"], + "DatTim": datetime.now(), + "Recipe": task_info["ProduceMixID"], + "MorRec": "", + "ProdMete": beton_volume, + "MorMete": 0.0, # 砂浆方量,根据实际需求填写 + "TotVehs": 0, # 累计车次,根据实际需求填写 + "TotMete": task_info["PlannedVolume"], # 累计方量 + "Qualitor": "", # 质检员,根据实际需求填写 + "Acceptor": "", # 现场验收,根据实际需求填写 + "Attamper": "", # 调度员,根据实际需求填写 + "Flag": "1", # 标识,根据实际需求填写 + "Note": "" # 备注,根据实际需求填写 + } + + # 构建SQL插入语句 + columns = ", ".join(insert_data.keys()) + placeholders = ", ".join(["?" for _ in insert_data.values()]) + sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})" + + # 执行插入操作 + cursor.execute(sql, list(insert_data.values())) + connection.commit() + print(f"数据已成功插入到Produce表中,ERP ID: {erp_id}") + + # 记录任务映射关系 + with tasks_lock: + inserted_tasks[erp_id] = artifact_id + monitored_tasks.add(erp_id) + # 存储任务信息,用于后续保存到自定义表 + task_info_storage[erp_id] = { + "task_info": task_info, + "beton_volume": beton_volume, + "artifact_id": artifact_id + } + print(f"任务 {erp_id} (ArtifactID: {artifact_id}) 已添加到监控列表") + + # 发送数据给TCP客户端 + try: + task_data = { + "erp_id": erp_id, + "task_id": task_info["TaskID"], + "produce_mix_id": task_info["ProduceMixID"], + "project_name": task_info["ProjectName"], + "beton_grade": task_info["BetonGrade"], + "adjusted_volume": beton_volume, + "flag": "1x", + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + tcp_server.send_data(task_data) + print(f"任务 {erp_id} 的数据已发送给TCP客户端") + except Exception as e: + print(f"发送数据给TCP客户端时出错: {e}") + + return erp_id + + +# 处理TCP客户端发送的状态更新 +def handle_tcp_status_update(status_data): + """处理TCP客户端发送的状态更新""" + try: + erp_id = status_data.get("erp_id") + status = status_data.get("status") + + if erp_id and status: + # 调用数据库状态更新函数 + update_custom_table_status(erp_id, status) + print(f"已更新任务 {erp_id} 的状态为: {status}") + + # 如果是完成或中断状态,从监控列表中移除 + if status in ["生产完毕", "生产中断"]: + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"任务 {erp_id} 已完成/中断,停止监控") + except Exception as e: + print(f"处理TCP状态更新时出错: {e}") + + +# 监控Access数据库中特定任务的Flag字段变化 +def monitor_access_flag_changes(access_db_path, access_password): + """监控Access数据库中派发任务的状态""" + print("开始监控SQL Server中任务的删除状态") + + while True: + try: + # 每2秒检查一次 + time.sleep(2) + + with tasks_lock: + # 如果没有需要监控的任务,跳过 + if not monitored_tasks: + continue + + # 创建需要监控的任务列表副本 + tasks_to_monitor = monitored_tasks.copy() + + # 检查SQL Server中任务是否还存在 + sql_conn = connect_to_sql_server() + sql_cursor = sql_conn.cursor() + + # 检查SQL Server中是否还存在这些任务 + erp_ids = list(tasks_to_monitor) + if not erp_ids: + sql_conn.close() + continue + + erp_ids_str = [str(erp_id) for erp_id in erp_ids] + placeholders = ','.join('?' * len(erp_ids_str)) + check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})" + sql_cursor.execute(check_query, erp_ids_str) + sql_results = sql_cursor.fetchall() + sql_conn.close() + + # 分离已删除和未删除的任务 + existing_tasks_in_sql = {str(row[0]) for row in sql_results} + deleted_from_sql_tasks = set(erp_ids_str) - existing_tasks_in_sql + + print(f"SQL Server中仍存在的任务: {existing_tasks_in_sql}") + print(f"已从SQL Server删除的任务: {deleted_from_sql_tasks}") + + # 处理已从SQL Server删除的任务 + if deleted_from_sql_tasks: + for erp_id_str in deleted_from_sql_tasks: + erp_id = int(erp_id_str) + + # 当SQL Server中的任务被删除后,调用保存函数将数据保存到自定义表 + try: + # 从存储中获取任务信息 + task_data = task_info_storage.get(erp_id) + if task_data: + task_info = task_data["task_info"] + beton_volume = task_data["beton_volume"] + artifact_id = task_data["artifact_id"] + + # 调用同事提供的保存函数,将数据保存到自定义数据表 + save_to_custom_table( + misid=erp_id, + flag="1", # 初始Flag值 + task_id=task_info["TaskID"], + produce_mix_id=task_info["ProduceMixID"], + project_name=task_info["ProjectName"], + beton_grade=task_info["BetonGrade"], + adjusted_volume=beton_volume # 已经调整后的方量 + ) + print(f"任务 {erp_id} 的数据已保存到自定义数据表") + + # 从存储中移除已处理的任务信息 + task_info_storage.pop(erp_id, None) + except Exception as e: + print(f"调用保存函数时出错: {e}") + + # 从监控列表中移除已处理的任务 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"任务 {erp_id} 已从SQL Server删除并处理完成,停止监控") + + except Exception as e: + print(f"监控数据库时发生错误: {e}") + import traceback + traceback.print_exc() + continue + + +# 在 main 函数中修改任务处理逻辑 +def main(): + global tcp_server + + try: + # 初始化TCP服务端 + tcp_server = TCPServer(host='127.0.0.1', port=8888) + tcp_server_thread = threading.Thread(target=tcp_server.start) + tcp_server_thread.daemon = True + tcp_server_thread.start() + + # 等待服务端启动 + time.sleep(1) + + # 步骤1:获取AppID + app_id = get_app_id() + + # 存储上次获取的所有ArtifactID + last_artifact_ids = set() + + # Access数据库路径和密码 + access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径 + access_password = "BCS7.2_SDBS" # Access数据库密码 + + # 启动监控线程 + access_monitor_thread = threading.Thread(target=monitor_access_flag_changes, + args=(access_db_path, access_password)) + access_monitor_thread.daemon = True + access_monitor_thread.start() + + while True: + try: + # 步骤2:获取所有未浇筑信息 + tasks = get_all_not_pour_info(app_id) + current_artifact_ids = {task["artifact_id"] for task in tasks} + + # 检查是否有新任务 + new_artifact_ids = current_artifact_ids - last_artifact_ids + if new_artifact_ids: + print(f"检测到 {len(new_artifact_ids)} 个新任务") + + for task in tasks: + if task["artifact_id"] in new_artifact_ids: + # 检查 block_number 是否为 "F" + if task["block_number"] == "F": + print(f"任务 {task['artifact_id']} 的 block_number 为 'F',跳过派单") + continue + + print(f"处理新任务: {task['artifact_id']}") + + # 步骤3:获取任务单信息 + task_info = get_task_info(app_id, task["beton_task_id"]) + + # 步骤4:连接Access数据库并获取最大Mark值 + max_mark = get_max_mark_from_access(access_db_path, access_password) + erp_id = int(max_mark) + 1 + print(f"获取到ERP ID: {erp_id}") + + # 步骤5:连接SQL Server数据库并插入数据 + connection = connect_to_sql_server() + insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id, + task["artifact_id"]) + connection.close() + + # 更新上次获取的ArtifactID集合 + last_artifact_ids = current_artifact_ids + + # 每2秒检查一次 + time.sleep(2) + + except Exception as e: + print(f"发生错误: {e}") + # 继续循环,避免程序退出 + time.sleep(2) + + except KeyboardInterrupt: + print("程序已停止") + # 停止TCP服务端 + if tcp_server: + tcp_server.stop() + + +if __name__ == "__main__": + main() diff --git a/Allocation_api.py b/Allocation_api.py new file mode 100644 index 0000000..c78c981 --- /dev/null +++ b/Allocation_api.py @@ -0,0 +1,448 @@ +#从api列表获取,只读第一个,还没实现 + +import requests +import hashlib +import pyodbc +from datetime import datetime +import time +import json +import threading +from TCPServer import TCPServer + +# 配置信息 +BASE_URL = "https://www.shnthy.com:9154" # 外网地址 +LOGIN_URL = f"{BASE_URL}/api/user/perlogin" +MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9" +TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task" +NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour" # 新增接口 + +# 登录参数 +LOGIN_DATA = { + "Program": 11, + "SC": "1000000001", + "loginName": "leduser", + "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" +} + +# 全局变量用于存储需要监控的任务ERP ID +monitored_tasks = set() # 存储需要监控的erp_id +inserted_tasks = {} # {erp_id: artifact_id} +tasks_lock = threading.Lock() + +# TCP服务端实例 +tcp_server = None + +# 存储任务信息,用于在SQL Server数据删除后保存到自定义表 +task_info_storage = {} # {erp_id: task_info} + + +# 计算SHA256密码 +def hash_password(password): + return password + + +LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"]) + + +# 获取AppID +def get_app_id(): + response = requests.post(LOGIN_URL, json=LOGIN_DATA) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + print(f"获取到AppID: {data['Data']['AppID']}") + return data["Data"]["AppID"] + raise Exception("登录失败,无法获取AppID") + + +# 获取模具的管片信息并提取TaskID +def get_mould_info(app_id): + headers = {"AppID": app_id} + response = requests.get(MOULD_INFO_URL, headers=headers) + if response.status_code == 205: + data = response.json() + if data.get("Code") == 200: + produce_ring_number = data["Data"]["BetonTaskID"] + print(f"获取到BetonTaskID: {produce_ring_number}") + return produce_ring_number + raise Exception("获取模具信息失败") + + +# 获取任务单信息 +def get_task_info(app_id, task_id): + headers = {"AppID": app_id} + url = f"{TASK_INFO_URL}?TaskId={task_id}" + response = requests.get(url, headers=headers) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + task_data = data["Data"] + print(f"获取到任务单信息:") + print(f" TaskID: {task_data['TaskID']}") + print(f" ProduceMixID: {task_data['ProduceMixID']}") + print(f" ProjectName: {task_data['ProjectName']}") + print(f" BetonGrade: {task_data['BetonGrade']}") + print(f" MixID: {task_data['MixID']}") + print(f" PlannedVolume: {task_data['PlannedVolume']}") + print(f" ProducedVolume: {task_data['ProducedVolume']}") + print(f" Progress: {task_data['Progress']}") + print(f" TaskDateText: {task_data['TaskDateText']}") + print(f" TaskStatusText: {task_data['TaskStatusText']}") + return task_data + raise Exception("获取任务单信息失败") + + +# 获取所有未浇筑信息 +def get_all_not_pour_info(app_id): + headers = {"AppID": app_id} + response = requests.get(NOT_POUR_INFO_URL, headers=headers) + if response.status_code == 200: + data = response.json() + if data.get("Code") == 200: + artifact_list = data["Data"] + tasks = [] + + # 检查列表是否为空 + if not artifact_list: + return tasks + + # 检查最后一条数据的BlockNumber是否为"F" + last_has_F = False + if len(artifact_list) > 0: + last_block_number = artifact_list[-1].get("BlockNumber", "") + if last_block_number == "F": + last_has_F = True + + # 处理每条数据 + for i, artifact in enumerate(artifact_list): + beton_task_id = artifact["BetonTaskID"] + beton_volume = artifact["BetonVolume"] + artifact_id = artifact["ArtifactActionID"] + block_number = artifact.get("BlockNumber", "") + + # 根据BlockNumber调整方量 + adjusted_volume = beton_volume + + # 如果最后一条是"F",则调整前面两片的方量 + if last_has_F and i < len(artifact_list) - 1: # 不处理最后一条"F" + if i == 0: # 第一片 + adjusted_volume = beton_volume + 0.25 + print(f" BlockNumber: {block_number}, 第一片方量调整后 (+0.25): {adjusted_volume}") + elif i == 1: # 第二片 + adjusted_volume = beton_volume + 0.3 + print(f" BlockNumber: {block_number}, 第二片方量调整后 (+0.3): {adjusted_volume}") + else: + # 原有的方量调整逻辑 + if block_number == "L2": + adjusted_volume = beton_volume + 0.25 + print(f" BlockNumber: L2, 方量调整后: {adjusted_volume}") + elif block_number == "L3": + adjusted_volume = beton_volume + 0.3 + print(f" BlockNumber: L3, 方量调整后: {adjusted_volume}") + else: + print(f" BlockNumber: {block_number}, 方量未调整") + + tasks.append({ + "beton_task_id": beton_task_id, + "beton_volume": adjusted_volume, + "artifact_id": artifact_id, + "block_number": block_number + }) + + return tasks + raise Exception("获取未浇筑信息失败") + + +# 连接Access数据库 +def connect_to_access_db(db_path, password): + conn_str = ( + r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' + f'DBQ={db_path};' + f'PWD={password};' + ) + return pyodbc.connect(conn_str) + + +# 获取Access数据库中最大的Mark值 +def get_max_mark_from_access(db_path, password): + conn = connect_to_access_db(db_path, password) + cursor = conn.cursor() + + # 查询最大的Mark值 + cursor.execute("SELECT MAX(Mark) FROM Produce") + max_mark = cursor.fetchone()[0] + + # 如果没有记录,返回0 + if max_mark is None: + max_mark = 0 + + conn.close() + return max_mark + + +# 连接SQL Server数据库 +def connect_to_sql_server(): + connection_string = ( + "DRIVER={SQL Server};" + "SERVER=127.0.0.1;" + "DATABASE=BS23DB;" + "UID=sa;" + "PWD=123;" + ) + return pyodbc.connect(connection_string) + + +# 插入数据到Produce表 +def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artifact_id): + cursor = connection.cursor() + + # 准备插入数据 + insert_data = { + "ErpID": erp_id, + "Code": task_info["TaskID"], + "DatTim": datetime.now(), + "Recipe": task_info["ProduceMixID"], + "MorRec": "", + "ProdMete": beton_volume, + "MorMete": 0.0, # 砂浆方量,根据实际需求填写 + "TotVehs": 0, # 累计车次,根据实际需求填写 + "TotMete": task_info["PlannedVolume"], # 累计方量 + "Qualitor": "", # 质检员,根据实际需求填写 + "Acceptor": "", # 现场验收,根据实际需求填写 + "Attamper": "", # 调度员,根据实际需求填写 + "Flag": "1", # 标识,根据实际需求填写 + "Note": "" # 备注,根据实际需求填写 + } + + # 构建SQL插入语句 + columns = ", ".join(insert_data.keys()) + placeholders = ", ".join(["?" for _ in insert_data.values()]) + sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})" + + # 执行插入操作 + cursor.execute(sql, list(insert_data.values())) + connection.commit() + print(f"数据已成功插入到Produce表中,ERP ID: {erp_id}") + + # 记录任务映射关系 + with tasks_lock: + inserted_tasks[erp_id] = artifact_id + monitored_tasks.add(erp_id) + # 存储任务信息,用于后续保存到自定义表 + task_info_storage[erp_id] = { + "task_info": task_info, + "beton_volume": beton_volume, + "artifact_id": artifact_id + } + print(f"任务 {erp_id} (ArtifactID: {artifact_id}) 已添加到监控列表") + + # 发送数据给TCP客户端 + try: + task_data = { + "erp_id": erp_id, + "task_id": task_info["TaskID"], + "produce_mix_id": task_info["ProduceMixID"], + "project_name": task_info["ProjectName"], + "beton_grade": task_info["BetonGrade"], + "adjusted_volume": beton_volume, + "flag": "1x", + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + tcp_server.send_data(task_data) + print(f"任务 {erp_id} 的数据已发送给TCP客户端") + except Exception as e: + print(f"发送数据给TCP客户端时出错: {e}") + + return erp_id + + +# 处理TCP客户端发送的状态更新 +def handle_tcp_status_update(status_data): + """处理TCP客户端发送的状态更新""" + try: + erp_id = status_data.get("erp_id") + status = status_data.get("status") + + if erp_id and status: + # 调用数据库状态更新函数 + update_custom_table_status(erp_id, status) + print(f"已更新任务 {erp_id} 的状态为: {status}") + + # 如果是完成或中断状态,从监控列表中移除 + if status in ["生产完毕", "生产中断"]: + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"任务 {erp_id} 已完成/中断,停止监控") + except Exception as e: + print(f"处理TCP状态更新时出错: {e}") + + +# 监控Access数据库中特定任务的Flag字段变化 +def monitor_access_flag_changes(access_db_path, access_password): + """监控Access数据库中派发任务的状态""" + print("开始监控SQL Server中任务的删除状态") + + while True: + try: + # 每2秒检查一次 + time.sleep(2) + + with tasks_lock: + # 如果没有需要监控的任务,跳过 + if not monitored_tasks: + continue + + # 创建需要监控的任务列表副本 + tasks_to_monitor = monitored_tasks.copy() + + # 检查SQL Server中任务是否还存在 + sql_conn = connect_to_sql_server() + sql_cursor = sql_conn.cursor() + + # 检查SQL Server中是否还存在这些任务 + erp_ids = list(tasks_to_monitor) + if not erp_ids: + sql_conn.close() + continue + + erp_ids_str = [str(erp_id) for erp_id in erp_ids] + placeholders = ','.join('?' * len(erp_ids_str)) + check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})" + sql_cursor.execute(check_query, erp_ids_str) + sql_results = sql_cursor.fetchall() + sql_conn.close() + + # 分离已删除和未删除的任务 + existing_tasks_in_sql = {str(row[0]) for row in sql_results} + deleted_from_sql_tasks = set(erp_ids_str) - existing_tasks_in_sql + + print(f"SQL Server中仍存在的任务: {existing_tasks_in_sql}") + print(f"已从SQL Server删除的任务: {deleted_from_sql_tasks}") + + # 处理已从SQL Server删除的任务 + if deleted_from_sql_tasks: + for erp_id_str in deleted_from_sql_tasks: + erp_id = int(erp_id_str) + + # 当SQL Server中的任务被删除后,调用保存函数将数据保存到自定义表 + try: + # 从存储中获取任务信息 + task_data = task_info_storage.get(erp_id) + if task_data: + task_info = task_data["task_info"] + beton_volume = task_data["beton_volume"] + artifact_id = task_data["artifact_id"] + + # 调用同事提供的保存函数,将数据保存到自定义数据表 + save_to_custom_table( + misid=erp_id, + flag="1", # 初始Flag值 + task_id=task_info["TaskID"], + produce_mix_id=task_info["ProduceMixID"], + project_name=task_info["ProjectName"], + beton_grade=task_info["BetonGrade"], + adjusted_volume=beton_volume # 已经调整后的方量 + ) + print(f"任务 {erp_id} 的数据已保存到自定义数据表") + + # 从存储中移除已处理的任务信息 + task_info_storage.pop(erp_id, None) + except Exception as e: + print(f"调用保存函数时出错: {e}") + + # 从监控列表中移除已处理的任务 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"任务 {erp_id} 已从SQL Server删除并处理完成,停止监控") + + except Exception as e: + print(f"监控数据库时发生错误: {e}") + import traceback + traceback.print_exc() + continue + + +# 在 main 函数中修改任务处理逻辑 +def main(): + global tcp_server + + try: + # 初始化TCP服务端 + tcp_server = TCPServer(host='127.0.0.1', port=8888) + tcp_server_thread = threading.Thread(target=tcp_server.start) + tcp_server_thread.daemon = True + tcp_server_thread.start() + + # 等待服务端启动 + time.sleep(1) + + # 步骤1:获取AppID + app_id = get_app_id() + + # 存储上次获取的所有ArtifactID + last_artifact_ids = set() + + # Access数据库路径和密码 + access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径 + access_password = "BCS7.2_SDBS" # Access数据库密码 + + # 启动监控线程 + access_monitor_thread = threading.Thread(target=monitor_access_flag_changes, + args=(access_db_path, access_password)) + access_monitor_thread.daemon = True + access_monitor_thread.start() + + while True: + try: + # 步骤2:获取所有未浇筑信息 + tasks = get_all_not_pour_info(app_id) + current_artifact_ids = {task["artifact_id"] for task in tasks} + + # 检查是否有新任务 + new_artifact_ids = current_artifact_ids - last_artifact_ids + if new_artifact_ids: + print(f"检测到 {len(new_artifact_ids)} 个新任务") + + for task in tasks: + if task["artifact_id"] in new_artifact_ids: + # 检查 block_number 是否为 "F" + if task["block_number"] == "F": + print(f"任务 {task['artifact_id']} 的 block_number 为 'F',跳过派单") + continue + + print(f"处理新任务: {task['artifact_id']}") + + # 步骤3:获取任务单信息 + task_info = get_task_info(app_id, task["beton_task_id"]) + + # 步骤4:连接Access数据库并获取最大Mark值 + max_mark = get_max_mark_from_access(access_db_path, access_password) + erp_id = int(max_mark) + 1 + print(f"获取到ERP ID: {erp_id}") + + # 步骤5:连接SQL Server数据库并插入数据 + connection = connect_to_sql_server() + insert_into_produce_table(connection, task_info, task["beton_volume"], erp_id, + task["artifact_id"]) + connection.close() + + # 更新上次获取的ArtifactID集合 + last_artifact_ids = current_artifact_ids + + # 每2秒检查一次 + time.sleep(2) + + except Exception as e: + print(f"发生错误: {e}") + # 继续循环,避免程序退出 + time.sleep(2) + + except KeyboardInterrupt: + print("程序已停止") + # 停止TCP服务端 + if tcp_server: + tcp_server.stop() + + +if __name__ == "__main__": + main() diff --git a/InsertDataFlag.py b/InsertDataFlag.py index 1e88356..e231290 100644 --- a/InsertDataFlag.py +++ b/InsertDataFlag.py @@ -99,12 +99,12 @@ def get_all_not_pour_info(app_id): block_number = artifact.get("BlockNumber", "") # 根据BlockNumber调整方量 - if block_number == "L2": + if block_number == "L1": adjusted_volume = beton_volume + 0.25 - print(f" BlockNumber: L2, 方量调整后: {adjusted_volume}") - elif block_number == "L3": + print(f" BlockNumber: L1, 方量调整后: {adjusted_volume}") + elif block_number == "L2": adjusted_volume = beton_volume + 0.3 - print(f" BlockNumber: L3, 方量调整后: {adjusted_volume}") + print(f" BlockNumber: L2, 方量调整后: {adjusted_volume}") else: adjusted_volume = beton_volume print(f" BlockNumber: {block_number}, 方量未调整") diff --git a/InsertData_tran.py b/InsertData_tran.py index 8078b6c..8733163 100644 --- a/InsertData_tran.py +++ b/InsertData_tran.py @@ -1,3 +1,5 @@ +#在线版本,实时监控Flag信息,但是还没有实现每次只取最上面那条 + import requests import hashlib import pyodbc @@ -5,6 +7,7 @@ from datetime import datetime import time import json import threading +from TCPServer import TCPServer # 配置信息 BASE_URL = "https://www.shnthy.com:9154" # 外网地址 @@ -26,6 +29,9 @@ monitored_tasks = set() # 存储需要监控的erp_id inserted_tasks = {} # {erp_id: artifact_id} tasks_lock = threading.Lock() +# TCP服务端实例 +tcp_server = TCPServer + # 计算SHA256密码 def hash_password(password): @@ -202,22 +208,40 @@ def insert_into_produce_table(connection, task_info, beton_volume, erp_id, artif try: # 假设同事提供的函数名为 save_to_custom_table # 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量 - save_to_custom_table( - misid=erp_id, - flag="1", # 初始Flag值 - task_id=task_info["TaskID"], - produce_mix_id=task_info["ProduceMixID"], - project_name=task_info["ProjectName"], - beton_grade=task_info["BetonGrade"], - adjusted_volume=beton_volume # 已经调整后的方量 - ) + # save_to_custom_table( + # misid=erp_id, + # flag="1", # 初始Flag值 + # task_id=task_info["TaskID"], + # produce_mix_id=task_info["ProduceMixID"], + # project_name=task_info["ProjectName"], + # beton_grade=task_info["BetonGrade"], + # adjusted_volume=beton_volume # 已经调整后的方量 + # ) print(f"任务 {erp_id} 的数据已保存到自定义数据表") except Exception as e: print(f"调用保存函数时出错: {e}") + # 发送数据给TCP客户端 + try: + time.sleep(5) + task_data = { + "erp_id": erp_id,#车号,相当于序号 + "task_id": task_info["TaskID"],#任务单号 + "produce_mix_id": task_info["ProduceMixID"],#配比号 + "project_name": task_info["ProjectName"],#任务名 + "beton_grade": task_info["BetonGrade"],#砼强度 + "adjusted_volume": beton_volume,#方量 + "flag": "1x",#状态 + "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")#时间 + } + tcp_server.send_data(task_data) + print(f"任务 {erp_id} 的数据已发送给TCP客户端") + except Exception as e: + print(f"发送数据给TCP客户端时出错: {e}") + return erp_id -# 监控Access数据库中特定任务的Flag字段变化 + # 监控Access数据库中特定任务的Flag字段变化 def monitor_access_flag_changes(access_db_path, access_password): """监控Access数据库中派发任务的Flag状态""" @@ -314,7 +338,7 @@ def monitor_access_flag_changes(access_db_path, access_password): print(f"检查任务 {erp_id} - 当前Flag: '{current_flag}', 之前Flag: '{previous_flag}'") # 如果状态发生变化 - if current_flag != previous_flag: + if current_flag != previous_flag: # 添加这行 with tasks_lock: artifact_id = inserted_tasks.get(erp_id, "Unknown") print( @@ -322,50 +346,113 @@ def monitor_access_flag_changes(access_db_path, access_password): task_flags[erp_id_str] = current_flag # 根据Flag值末尾的字母执行相应操作并更新自定义数据表状态 - if current_flag.endswith('d'): + if current_flag.endswith('d'): # 将 elif 改为 if print(f"派发任务 ErpID {erp_id}: 未进行生产") # 调用同事提供的状态更新函数 try: - update_custom_table_status(erp_id, "未进行生产") - except Exception as e: - print(f"更新状态时出错: {e}") - elif current_flag.endswith('w'): - print(f"派发任务 ErpID {erp_id}: 正在生产中") - # 调用同事提供的状态更新函数 - try: - update_custom_table_status(erp_id, "正在生产中") - except Exception as e: - print(f"更新状态时出错: {e}") - elif current_flag.endswith('n'): - print(f"派发任务 ErpID {erp_id}: 生产完毕") - # 任务完成,可以从监控列表中移除 - with tasks_lock: - monitored_tasks.discard(erp_id) - print(f"派发任务 ErpID {erp_id} 已完成,停止监控") - # 调用同事提供的状态更新函数 - try: - update_custom_table_status(erp_id, "生产完毕") - except Exception as e: - print(f"更新状态时出错: {e}") - elif current_flag.endswith('p'): - print(f"派发任务 ErpID {erp_id}: 生产中断") - # 任务中断,可以从监控列表中移除 - with tasks_lock: - monitored_tasks.discard(erp_id) - print(f"派发任务 ErpID {erp_id} 已中断,停止监控") - # 调用同事提供的状态更新函数 - try: - update_custom_table_status(erp_id, "生产中断") - except Exception as e: - print(f"更新状态时出错: {e}") - elif current_flag.endswith('x'): - print(f"派发任务 ErpID {erp_id}: 数据已接收") - # 调用同事提供的状态更新函数 - try: - update_custom_table_status(erp_id, "数据已接收") + print(1) + # update_custom_table_status(erp_id, "未进行生产") except Exception as e: print(f"更新状态时出错: {e}") + # 发送数据给TCP客户端(只发送erp_id和状态) + try: + status_data = { + "erp_id": erp_id, + "status": "未进行生产" + } + tcp_server.send_data(status_data) + except Exception as e: + print(f"发送状态数据给TCP客户端时出错: {e}") + + # 在"正在生产中"分支中: + elif current_flag.endswith('w'): + print(f"派发任务 ErpID {erp_id}: 正在生产中") + # 调用同事提供的状态更新函数 + try: + print(2) + # update_custom_table_status(erp_id, "正在生产中") + except Exception as e: + print(f"更新状态时出错: {e}") + + # 发送数据给TCP客户端(只发送erp_id和状态) + try: + status_data = { + "erp_id": erp_id, + "status": "正在生产中" + } + tcp_server.send_data(status_data) + except Exception as e: + print(f"发送状态数据给TCP客户端时出错: {e}") + + # 在"生产完毕"分支中: + elif current_flag.endswith('n'): + print(f"派发任务 ErpID {erp_id}: 生产完毕") + # 任务完成,可以从监控列表中移除 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"派发任务 ErpID {erp_id} 已完成,停止监控") + # 调用同事提供的状态更新函数 + try: + print(3) + # update_custom_table_status(erp_id, "生产完毕") + except Exception as e: + print(f"更新状态时出错: {e}") + + # 发送数据给TCP客户端(只发送erp_id和状态) + try: + status_data = { + "erp_id": erp_id, + "status": "生产完毕" + } + tcp_server.send_data(status_data) + except Exception as e: + print(f"发送状态数据给TCP客户端时出错: {e}") + + # 在"生产中断"分支中: + elif current_flag.endswith('p'): + print(f"派发任务 ErpID {erp_id}: 生产中断") + # 任务中断,可以从监控列表中移除 + with tasks_lock: + monitored_tasks.discard(erp_id) + print(f"派发任务 ErpID {erp_id} 已中断,停止监控") + # 调用同事提供的状态更新函数 + try: + print(4) + # update_custom_table_status(erp_id, "生产中断") + except Exception as e: + print(f"更新状态时出错: {e}") + + # 发送数据给TCP客户端(只发送erp_id和状态) + try: + status_data = { + "erp_id": erp_id, + "status": "生产中断" + } + tcp_server.send_data(status_data) + except Exception as e: + print(f"发送状态数据给TCP客户端时出错: {e}") + + # 在"数据已接收"分支中: + elif current_flag.endswith('x'): + print(f"派发任务 ErpID {erp_id}: 数据已接收") + # 调用同事提供的状态更新函数 + try: + print(5) + # update_custom_table_status(erp_id, "数据已接收") + except Exception as e: + print(f"更新状态时出错: {e}") + + # 发送数据给TCP客户端(只发送erp_id和状态) + try: + status_data = { + "erp_id": erp_id, + "status": "数据已接收" + } + tcp_server.send_data(status_data) + except Exception as e: + print(f"发送状态数据给TCP客户端时出错: {e}") + # 检查是否有任务记录已被删除(不在查询结果中但仍在监控列表中) # 这表示任务可能已完成或从系统中移除 missing_tasks = set(deleted_from_sql_tasks) - set(current_tasks.keys()) @@ -388,10 +475,20 @@ def monitor_access_flag_changes(access_db_path, access_password): continue - # 在 main 函数中修改任务处理逻辑 def main(): + global tcp_server + try: + # 初始化TCP服务端 + tcp_server = TCPServer(host='127.0.0.1', port=8888) + tcp_server_thread = threading.Thread(target=tcp_server.start) + tcp_server_thread.daemon = True + tcp_server_thread.start() + + # 等待服务端启动 + time.sleep(1) + # 步骤1:获取AppID app_id = get_app_id() @@ -455,6 +552,9 @@ def main(): except KeyboardInterrupt: print("程序已停止") + # 停止TCP服务端 + if tcp_server: + tcp_server.stop() if __name__ == "__main__": diff --git a/TCPServer.py b/TCPServer.py index 12db040..cd3ac9d 100644 --- a/TCPServer.py +++ b/TCPServer.py @@ -1,133 +1,175 @@ import socket -import threading import json +import threading import time from datetime import datetime +import os + class TCPServer: - def __init__(self, host='localhost', port=8080): + def __init__(self, host='127.0.0.1', port=8888): self.host = host self.port = port self.server_socket = None - self.client_socket = None - self.client_address = None - self.running = False - - def start_server(self): + self.is_running = False + self.client_sockets = [] + + def start(self): """启动TCP服务端""" try: + self.is_running = True self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) - self.server_socket.listen(1) - self.running = True - - print(f"TCP服务端已启动,监听 {self.host}:{self.port}") - - while self.running: - try: - self.client_socket, self.client_address = self.server_socket.accept() - print(f"客户端已连接: {self.client_address}") - - # 启动客户端处理线程 - client_thread = threading.Thread(target=self.handle_client) - client_thread.daemon = True - client_thread.start() - - except Exception as e: - if self.running: - print(f"接受客户端连接时出错: {e}") - + self.server_socket.listen(5) + print(f"TCP服务端已启动,监听 {self.host}:{self.port}...") + + # 启动接受连接的线程 + accept_thread = threading.Thread(target=self.accept_connections, daemon=True) + accept_thread.start() + + # 保持主线程运行 + try: + while self.is_running: + time.sleep(1) + except KeyboardInterrupt: + print("\n服务端正在关闭...") + self.stop() + except Exception as e: print(f"启动服务端时出错: {e}") - finally: - self.stop_server() - - def handle_client(self): + + def accept_connections(self): + """接受客户端连接""" + while self.is_running: + try: + client_socket, client_address = self.server_socket.accept() + self.client_sockets.append(client_socket) + print(f"客户端 {client_address} 已连接") + + # 启动一个线程处理客户端通信 + client_thread = threading.Thread( + target=self.handle_client, + args=(client_socket, client_address), + daemon=True + ) + client_thread.start() + + except Exception as e: + if self.is_running: + print(f"接受客户端连接时出错: {e}") + break + + def handle_client(self, client_socket, client_address): """处理客户端通信""" try: - while self.running and self.client_socket: + while self.is_running and client_socket in self.client_sockets: try: # 设置接收超时 - self.client_socket.settimeout(1.0) - + client_socket.settimeout(1.0) + # 接收客户端数据 - data = self.client_socket.recv(1024) + data = client_socket.recv(1024) if not data: break - + # 解析JSON数据 - message = data.decode('utf-8') - json_data = json.loads(message) - - # 处理客户端返回的异常数据 - self.process_client_data(json_data) - + message = data.decode('utf-8').strip() + if message: + json_data = json.loads(message) + # 处理客户端返回的异常数据 + self.process_client_data(json_data) + except socket.timeout: # 超时继续循环 continue except json.JSONDecodeError as e: - print(f"解析JSON数据时出错: {e}") + print(f"解析客户端 {client_address} JSON数据时出错: {e}") except Exception as e: - print(f"处理客户端数据时出错: {e}") - + print(f"处理客户端 {client_address} 数据时出错: {e}") + break + except Exception as e: - print(f"客户端处理线程出错: {e}") + print(f"客户端 {client_address} 处理线程出错: {e}") finally: - self.disconnect_client() - + # 清理客户端连接 + if client_socket in self.client_sockets: + self.client_sockets.remove(client_socket) + client_socket.close() + print(f"客户端 {client_address} 已断开连接") + def process_client_data(self, json_data): """处理客户端返回的数据""" try: cmd = json_data.get("cmd") timestamp = json_data.get("timestamp") - + erp_id = json_data.get("erp_id") # 从JSON数据中解析ErpID + if cmd == "production_error": - print(f"收到异常生产通知 - 时间: {timestamp}") + print(f"收到异常生产通知 - 时间: {timestamp}, ERP ID: {erp_id}") # 调用update_custom_table_status更新数据库状态 try: - update_custom_table_status("异常生产") - print("数据库状态已更新为: 异常生产") + # update_custom_table_status(erp_id, "异常生产") + print(f"数据库状态已更新为: 异常生产 (ERP ID: {erp_id})") except Exception as e: print(f"更新数据库状态时出错: {e}") elif cmd == "interrupt_error": - print(f"收到中断生产通知 - 时间: {timestamp}") + print(f"收到中断生产通知 - 时间: {timestamp}, ERP ID: {erp_id}") # 调用update_custom_table_status更新数据库状态 try: - update_custom_table_status("中断生产") - print("数据库状态已更新为: 中断生产") + # update_custom_table_status(erp_id, "中断生产") + print(f"数据库状态已更新为: 中断生产 (ERP ID: {erp_id})") except Exception as e: print(f"更新数据库状态时出错: {e}") else: print(f"收到未知命令: {cmd}") - + except Exception as e: print(f"处理客户端数据时出错: {e}") - + def send_data(self, data): - """向客户端发送数据""" - try: - if self.client_socket and self.running: - json_data = json.dumps(data, ensure_ascii=False) - self.client_socket.send((json_data + "\n").encode('utf-8')) - print(f"已发送数据: {json_data}") - return True - except Exception as e: - print(f"发送数据时出错: {e}") - return False - - def disconnect_client(self): - """断开客户端连接""" - if self.client_socket: - self.client_socket.close() - self.client_socket = None - print("客户端已断开连接") - - def stop_server(self): + """向所有已连接的客户端发送数据""" + disconnected_sockets = [] + + for client_socket in self.client_sockets: + try: + if client_socket: + json_data = json.dumps(data, ensure_ascii=False) + # 添加换行符作为结束标记 + client_socket.sendall((json_data + "\n").encode('utf-8')) + print(f"已向客户端发送数据: {json_data}") + except Exception as e: + print(f"向客户端发送数据时出错: {e}") + disconnected_sockets.append(client_socket) + + # 清理断开连接的套接字 + for sock in disconnected_sockets: + if sock in self.client_sockets: + self.client_sockets.remove(sock) + + def stop(self): """停止服务端""" - self.running = False - if self.client_socket: - self.client_socket.close() + self.is_running = False + + # 关闭所有客户端连接 + for sock in self.client_sockets: + try: + sock.close() + except Exception as e: + print(f"关闭客户端连接时发生错误: {e}") + + # 关闭服务器套接字 if self.server_socket: - self.server_socket.close() + try: + self.server_socket.close() + except Exception as e: + print(f"关闭服务器套接字时发生错误: {e}") + print("TCP服务端已停止") + self.client_sockets.clear() + + +# 使用示例 +if __name__ == '__main__': + server = TCPServer(host='127.0.0.1', port=8888) + server.start()