Files
Feeding_control_system/service/opcua_angle_uploader.py

128 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from opcua import Client, ua
import traceback
import threading
import time
class OPCAngleUploader:
"""下料斗夹爪角度上传OPC"""
def __init__(self, opc_server_url="opc.tcp://localhost:4840/zjsh_feed/server/",
angle_node_id = "ns=2;i=13", upload_interval=0.5,
angle_threshold=5.0, angle_min = 3.0):
# OPC服务器配置
self.opc_server_url = opc_server_url
self.upload_interval = upload_interval # 固定0.5秒上传
self.angle_threshold = angle_threshold # 角度变化阈值(超过阈值才更新角度)
self.angle_min = angle_min # 角度最小值(低于该值为误判的角度,不更新)
# 配置 lower_angle 节点IDns=2;i=13
self.angle_node_id = angle_node_id # lower_angle节点ID
self.angle_node = None # 缓存节点对象,避免重复获取
# 线程安全相关
self.latest_angle = None # 最新角度缓存
self.last_uploaded_angle = None # 上一次成功上传opc的角度
self.angle_lock = threading.Lock() # 角度读写锁
self.exit_event = threading.Event() # 线程退出信号
self.upload_thread = None # 上传线程
# OPC客户端状态
self.opc_client = None
self.is_connected = False
def connect_opc(self):
"""连接OPC服务器, 获取lower_angle节点对象"""
try:
# 关闭旧连接
if self.opc_client:
try:
self.opc_client.disconnect()
except:
pass
# 初始化客户端并连接
self.opc_client = Client(self.opc_server_url)
self.opc_client.connect()
# 获取目标节点(仅获取一次,提升效率)
self.angle_node = self.opc_client.get_node(self.angle_node_id)
self.is_connected = True
# print(f"[OPC] 连接成功,已获取节点:{self.angle_node_id}lower_angle")
return True
except Exception as e:
print(f"[OPC] 连接/获取节点失败:{e}")
self.is_connected = False
self.angle_node = None
return False
def update_angle(self, angle):
"""线程安全更新最新角度值"""
with self.angle_lock:
self.latest_angle = angle
def _upload_worker(self):
"""上传线程核心: 每隔0.5秒写入角度节点"""
# 初始连接失败则5秒重试
while not self.exit_event.is_set() and not self.connect_opc():
# print(f"[OPC] 初始连接失败5秒后重试...")
time.sleep(5)
# 循环上传
while not self.exit_event.is_set():
try:
# 连接失效则重连
if not self.is_connected or not self.angle_node:
self.connect_opc()
time.sleep(1)
continue
# 线程安全读取角度
with self.angle_lock:
current_angle = self.latest_angle
last_angle = self.last_uploaded_angle
if current_angle is None:
time.sleep(self.upload_interval)
continue
need_upload = False
if last_angle is None:
need_upload = True
else:
angle_diff = abs(float(current_angle) - float(last_angle))
if angle_diff >= self.angle_threshold:
need_upload = True
# 写入角度到OPC节点
if need_upload and current_angle > self.angle_min:
angle_variant = ua.Variant(float(current_angle), ua.VariantType.Float)
self.angle_node.set_value(angle_variant)
# 更新上一次上传的角度
with self.angle_lock:
self.last_uploaded_angle = current_angle
# 0.5秒间隔
time.sleep(self.upload_interval)
except Exception as e:
print(f"[OPC] 上传异常:{e}")
traceback.print_exc()
self.is_connected = False # 标记连接失效,触发重连
time.sleep(1)
def start_upload(self):
"""启动上传线程(守护线程,随进程退出)"""
if not self.upload_thread or not self.upload_thread.is_alive():
self.exit_event.clear()
self.upload_thread = threading.Thread(target=self._upload_worker, daemon=True)
self.upload_thread.start()
def stop_upload(self):
"""停止线程并关闭OPC连接"""
self.exit_event.set()
if self.upload_thread and self.upload_thread.is_alive():
self.upload_thread.join(timeout=2)
if self.opc_client:
try:
self.opc_client.disconnect()
except:
pass
self.is_connected = False