Files
weight_test/metric_device.py
2025-11-16 16:30:11 +08:00

223 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# metric_device.py
import socket
import threading
import time
import statistics
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
@dataclass
class DeviceReading:
"""标准化设备读数"""
weight: Optional[int] # 重量None 表示 OL/ER
raw_data: bytes # 原始字节
status: str # 'ST', 'US', 'OL', 'ER'
timestamp: float # 本地接收时间time.time()
rtt_ms: float # 网络往返延迟(毫秒)
is_valid: bool # 是否含有效重量
def to_dict(self) -> Dict[str, Any]:
return {
**asdict(self),
"timestamp_iso": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.timestamp)),
"weight_kg": self.weight / 1000.0 if self.weight is not None else None,
}
class MetricDevice(ABC):
"""抽象计量设备基类 —— 支持 get() + RTT 统计"""
def __init__(self, ip: str, port: int, name: str = ""):
self.ip = ip
self.port = port
self.name = name or f"{ip}:{port}"
self._latest_reading: Optional[DeviceReading] = None
self._lock = threading.Lock()
# 📊 RTT 统计(滑动窗口)
self._rtt_window: List[float] = []
self._rtt_max_len = 100 # 保留最近 100 次 RTT
self._rtt_stats = {
"min_ms": float('inf'),
"max_ms": 0.0,
"avg_ms": 0.0,
"count": 0,
}
def _update_rtt(self, rtt_ms: float):
"""更新 RTT 统计"""
self._rtt_window.append(rtt_ms)
if len(self._rtt_window) > self._rtt_max_len:
self._rtt_window.pop(0)
rtt_list = self._rtt_window
self._rtt_stats.update({
"min_ms": min(rtt_list) if rtt_list else 0,
"max_ms": max(rtt_list) if rtt_list else 0,
"avg_ms": statistics.mean(rtt_list) if rtt_list else 0,
"count": len(rtt_list),
})
def get_rtt_stats(self) -> Dict[str, float]:
"""获取 RTT 统计信息"""
with self._lock:
return self._rtt_stats.copy()
def get(self) -> Optional[Dict[str, Any]]:
"""✅ 统一 get() 接口:返回最新读数(含 RTT"""
with self._lock:
if self._latest_reading is None:
return None
return self._latest_reading.to_dict()
@abstractmethod
def _connect_and_run(self):
"""子类实现连接与数据接收循环"""
pass
def start(self):
"""启动后台线程"""
thread = threading.Thread(target=self._connect_and_run, daemon=True, name=f"Dev-{self.name}")
thread.start()
def _update_reading(self, reading: DeviceReading):
"""线程安全更新最新读数"""
with self._lock:
self._latest_reading = reading
class TCPScaleDevice(MetricDevice):
"""TCP ASCII 协议称重设备(支持您的两种设备)"""
def __init__(self, ip: str, port: int, name: str = ""):
super().__init__(ip, port, name)
self._buffer = b""
self._socket: Optional[socket.socket] = None
self._running = True
def _parse_line(self, line: bytes) -> Optional[DeviceReading]:
try:
clean = line.strip()
if not clean:
return None
parts = clean.split(b',')
if len(parts) < 3:
return DeviceReading(
weight=None, raw_data=clean, status="??",
timestamp=time.time(), rtt_ms=0.0, is_valid=False
)
status = parts[0].decode('ascii', errors='replace').upper()
mode = parts[1].decode('ascii', errors='replace').upper()
weight_str = parts[2].decode('ascii', errors='replace')
weight = None
is_valid = False
if weight_str.replace('+', '').replace('-', '').isdigit():
try:
weight = int(weight_str)
is_valid = True
except ValueError:
pass
return DeviceReading(
weight=weight,
raw_data=clean,
status=status,
timestamp=time.time(),
rtt_ms=0.0, # 稍后更新
is_valid=is_valid,
)
except Exception:
return None
def _extract_lines(self, data: bytes) -> List[bytes]:
"""自适应行分割"""
self._buffer += data
lines = []
if b'\r\n' in self._buffer or b'\n' in self._buffer:
norm = self._buffer.replace(b'\r\n', b'\n')
parts = norm.split(b'\n')
*complete, self._buffer = parts
lines = [line for line in complete if line]
else:
# 无换行符:整包作为单行
if self._buffer:
lines = [self._buffer]
self._buffer = b""
return lines
def _connect_and_run(self):
while self._running:
try:
# 🔁 重连逻辑
if self._socket is None:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(5.0)
# ⏱️ 测量连接 RTT
t0 = time.perf_counter()
self._socket.connect((self.ip, self.port))
rtt_connect = (time.perf_counter() - t0) * 1000 # ms
self._update_rtt(rtt_connect)
print(f"[{self.name}] Connected (RTT={rtt_connect:.1f}ms)")
# 接收数据
t0 = time.perf_counter()
data = self._socket.recv(1024)
recv_rtt = (time.perf_counter() - t0) * 1000
if not data:
break
# 更新 RTT粗略recv 耗时 ≈ 半 RTT
self._update_rtt(recv_rtt * 2)
# 解析
lines = self._extract_lines(data)
for line in lines:
reading = self._parse_line(line)
if reading:
reading.rtt_ms = recv_rtt * 2 # 近似 RTT
self._update_reading(reading)
except socket.timeout:
continue
except (OSError, socket.error) as e:
print(f"[{self.name}] Error: {e}")
if self._socket:
self._socket.close()
self._socket = None
time.sleep(2)
except Exception as e:
print(f"[{self.name}] Fatal: {e}")
break
if self._socket:
self._socket.close()
# ===== 设计模式:设备工厂 + 管理器 =====
class DeviceManager:
"""设备工厂 + 单例管理避免重复创建同IP:Port设备"""
_instances: Dict[str, MetricDevice] = {}
_lock = threading.Lock()
@classmethod
def get_device(cls, ip: str, port: int, name: str = "") -> MetricDevice:
"""✅ 工厂方法:按 (ip, port) 单例返回设备"""
key = f"{ip}:{port}"
with cls._lock:
if key not in cls._instances:
# 🔧 策略模式:未来可扩展 ModbusDevice、HTTPDevice 等
if port in (502, 8234): # 您的设备端口
device = TCPScaleDevice(ip, port, name)
device.start()
cls._instances[key] = device
else:
raise ValueError(f"Unsupported port {port} for auto-detect")
return cls._instances[key]
@classmethod
def get_all_devices(cls) -> List[MetricDevice]:
return list(cls._instances.values())