From 67cce2411ea872dcafb7b40537b3ec36d6c097a9 Mon Sep 17 00:00:00 2001 From: xiongyi <827523911@qq.com> Date: Tue, 18 Nov 2025 11:23:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86TCP=E4=BF=AE=E6=94=B9=E4=B8=BAOPC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config/settings.py | 4 +-- main.py | 19 ++++++----- services/task_service.py | 24 ++++++-------- tcp/client.py | 70 ---------------------------------------- 4 files changed, 23 insertions(+), 94 deletions(-) delete mode 100644 tcp/client.py diff --git a/config/settings.py b/config/settings.py index 16053e7..1fe9642 100644 --- a/config/settings.py +++ b/config/settings.py @@ -26,9 +26,9 @@ SQL_SERVER_CONFIG = { TCP_HOST = '127.0.0.1' TCP_PORT = 8888 -# 新增TCP客户端配置 +# 新增OPC UA客户端配置 TCP_CLIENT_HOST = '10.6.242.150' -TCP_CLIENT_PORT = 8889 +TCP_CLIENT_PORT = 4840 # 其他配置 MAX_AGE_HOURS = 24 diff --git a/main.py b/main.py index 81cfdcc..f9fd434 100644 --- a/main.py +++ b/main.py @@ -8,7 +8,7 @@ from services.monitoring_service import MonitoringService from database.access_db import AccessDB from database.sql_server import SQLServerDB from tcp.server import TCPServer -from tcp.client import TCPClient +from opc.client import OPCClient from config.settings import ( ACCESS_DB_PATH, ACCESS_DB_PASSWORD, TCP_HOST, TCP_PORT, TCP_CLIENT_HOST, TCP_CLIENT_PORT, @@ -16,6 +16,10 @@ from config.settings import ( ) from utils.helpers import cleanup_old_timestamps +# 假设同事提供的函数 +def save_to_custom_table(misid, flag, task_id, produce_mix_id, project_name, beton_grade, adjusted_volume, artifact_id): + """保存到自定义数据表的函数""" + print(f"保存到自定义数据表: MISID={misid}, Flag={flag}, TaskID={task_id}, 调整后方量={adjusted_volume}") def start_api_service(): """启动配比重量API服务""" @@ -34,8 +38,8 @@ def main(): tcp_server_thread.daemon = True tcp_server_thread.start() - # 初始化发送数据的TCP客户端 - data_client = TCPClient(host=TCP_CLIENT_HOST, port=TCP_CLIENT_PORT) + # 初始化发送数据的OPC客户端 + data_client = OPCClient(url=f'opc.tcp://{TCP_CLIENT_HOST}:{TCP_CLIENT_PORT}') data_client.start() # 等待服务端启动 @@ -121,14 +125,14 @@ def main(): if task["block_number"] == "补方": print(f"任务 {task['artifact_id']} 的 block_number 为 '补方',跳过派单") task_service.insert_into_produce_table(sql_db, task_info, task["beton_volume"], erp_id, - task["artifact_id"], 0) + task["artifact_id"], half_volume, 0) continue print(f"处理新任务: {task['artifact_id']}") - # 步骤3:获取任务单信息 + # 获取任务单信息 task_service.insert_into_produce_table(sql_db, task_info, task["beton_volume"], erp_id, - task["artifact_id"], 1) + task["artifact_id"], half_volume, 1) with monitoring_service.tasks_lock: monitoring_service.monitored_tasks.add(erp_id) monitoring_service.inserted_tasks[erp_id] = task["artifact_id"] @@ -148,7 +152,6 @@ def main(): except Exception as e: print(f"发生错误: {e}") - # 继续循环,避免程序退出 time.sleep(2) except KeyboardInterrupt: @@ -156,7 +159,7 @@ def main(): # 停止TCP服务端 if 'tcp_server' in locals(): tcp_server.stop() - # 停止TCP客户端 + # 停止OPC客户端 if 'data_client' in locals(): data_client.stop() diff --git a/services/task_service.py b/services/task_service.py index bb27a65..e72aa6b 100644 --- a/services/task_service.py +++ b/services/task_service.py @@ -1,12 +1,9 @@ """任务处理服务""" from datetime import datetime from API.client import APIClient -from database.access_db import AccessDB from database.sql_server import SQLServerDB -from config.settings import ACCESS_DB_PATH, ACCESS_DB_PASSWORD from utils.helpers import get_f_block_positions -from tcp.client import TCPClient -import threading +from opc.client import OPCClient import time @@ -18,7 +15,7 @@ class TaskService: self.artifact_timestamps = {} self.tcp_server = tcp_server from config.settings import TCP_CLIENT_HOST, TCP_CLIENT_PORT - self.data_client = TCPClient(host=TCP_CLIENT_HOST, port=TCP_CLIENT_PORT) + self.data_client = OPCClient(url=f'opc.tcp://{TCP_CLIENT_HOST}:{TCP_CLIENT_PORT}') self.data_client.start() def process_not_pour_info(self): @@ -334,7 +331,7 @@ class TaskService: if artifact_id not in self.artifact_timestamps: self.artifact_timestamps[artifact_id] = datetime.now() - def insert_into_produce_table(self, connection, task_info, beton_volume, erp_id, artifact_id, status): + def insert_into_produce_table(self, connection, task_info, beton_volume, erp_id, artifact_id, half_volume, status): """插入数据到Produce表""" sql_db = SQLServerDB() if status == 1: @@ -359,9 +356,7 @@ class TaskService: sql_db.insert_produce_data(insert_data) print(f"数据已成功插入到Produce表中,ERP ID: {erp_id}") - # 调用同事提供的保存函数,将数据保存到自定义数据表 try: - # 假设同事提供的函数名为 save_to_custom_table # 参数包括: MISID(即erp_id), Flag, TaskID, ProduceMixID, ProjectName, BetonGrade, 调整后的方量 self.save_to_custom_table( misid=erp_id, @@ -371,14 +366,14 @@ class TaskService: project_name=task_info["ProjectName"], beton_grade=task_info["BetonGrade"], adjusted_volume=round(beton_volume,2), - artifact_id=artifact_id + artifact_id=artifact_id, + half_volume=half_volume, # 已经调整后的方量 ) print(f"任务 {erp_id} 的数据已保存到自定义数据表") except Exception as e: print(f"调用保存函数时出错: {e}") - # 发送数据给TCP客户端 if self.tcp_server: try: time.sleep(2) @@ -410,7 +405,8 @@ class TaskService: project_name=task_info["ProjectName"], beton_grade=task_info["BetonGrade"], adjusted_volume=round(beton_volume,2), - artifact_id=artifact_id + artifact_id=artifact_id, + half_volume=half_volume, ) print(f"任务 {erp_id} 的数据已保存到自定义数据表") except Exception as e: @@ -419,7 +415,7 @@ class TaskService: return erp_id - def save_to_custom_table(self, misid, flag, task_id, produce_mix_id, project_name, beton_grade, adjusted_volume, artifact_id): + def save_to_custom_table(self, misid, flag, task_id, produce_mix_id, project_name, beton_grade, adjusted_volume, artifact_id,half_volume): try: task_data = { "erp_id": misid, @@ -430,6 +426,7 @@ class TaskService: "beton_grade": beton_grade, "adjusted_volume": adjusted_volume, "flag": flag, + "half_volume":half_volume, "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S") } self.data_client.send_data(task_data) @@ -439,7 +436,6 @@ class TaskService: print(f"原计划保存到自定义数据表: MISID={misid}, Flag={flag}, TaskID={task_id}, 调整后方量={adjusted_volume}") def update_custom_table_status(self, erp_id, status): - # 通过专用TCP客户端发送状态更新到另一台电脑 try: status_data = { "cmd": "update_status", @@ -451,4 +447,4 @@ class TaskService: print(f"任务状态更新已发送到另一台电脑: ERP ID={erp_id}, 状态={status}") except Exception as e: print(f"发送状态更新到另一台电脑时出错: {e}") - print(f"原计划更新自定义数据表状态: ERP ID={erp_id}, 状态={status}") + print(f"原计划更新自定义数据表状态: ERP ID={erp_id}, 状态={status}") \ No newline at end of file diff --git a/tcp/client.py b/tcp/client.py deleted file mode 100644 index e4bc758..0000000 --- a/tcp/client.py +++ /dev/null @@ -1,70 +0,0 @@ -import socket -import json -import threading -import time - - -class TCPClient: - def __init__(self, host='127.0.0.1', port=8889): - self.host = host - self.port = port - self.client_socket = None - self.is_connected = False - self.reconnect_interval = 5 # 重连间隔(秒) - self.is_running = False - - def connect(self): - """连接到TCP服务端""" - try: - self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.client_socket.connect((self.host, self.port)) - self.is_connected = True - print(f"已连接到 {self.host}:{self.port}") - return True - except Exception as e: - print(f"连接到服务端失败: {e}") - self.is_connected = False - return False - - def start(self): - """启动客户端并维持连接""" - self.is_running = True - reconnect_thread = threading.Thread(target=self._reconnect_worker, daemon=True) - reconnect_thread.start() - - def _reconnect_worker(self): - """重连工作线程""" - while self.is_running: - if not self.is_connected: - print(f"尝试重新连接到 {self.host}:{self.port}...") - self.connect() - time.sleep(self.reconnect_interval) - - def send_data(self, data): - """发送数据到服务端""" - if not self.is_connected or not self.client_socket: - print("客户端未连接到服务端") - return False - - try: - json_data = json.dumps(data, ensure_ascii=False) - # 添加换行符作为结束标记 - self.client_socket.sendall((json_data + "\n").encode('utf-8')) - print(f"已发送数据: {json_data}") - return True - except Exception as e: - print(f"发送数据时出错: {e}") - self.is_connected = False - return False - - def stop(self): - """停止客户端""" - self.is_running = False - self.is_connected = False - if self.client_socket: - try: - self.client_socket.close() - except Exception as e: - print(f"关闭客户端套接字时出错: {e}") - print("TCP客户端已停止") -