# metric_device.py import socket import threading import time import statistics from abc import ABC, abstractmethod from typing import Optional, Dict, Any, List from dataclasses import dataclass, asdict @dataclass class DeviceReading: """标准化设备读数""" weight: Optional[int] # 重量(None 表示 OL/ER) raw_data: bytes # 原始字节 status: str # 'ST', 'US', 'OL', 'ER' timestamp: float # 本地接收时间(秒,time.time()) rtt_ms: float # 网络往返延迟(毫秒) is_valid: bool # 是否含有效重量 def to_dict(self) -> Dict[str, Any]: return { **asdict(self), "timestamp_iso": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.timestamp)), "weight_kg": self.weight / 1000.0 if self.weight is not None else None, } class MetricDevice(ABC): """抽象计量设备基类 —— 支持 get() + RTT 统计""" def __init__(self, ip: str, port: int, name: str = ""): self.ip = ip self.port = port self.name = name or f"{ip}:{port}" self._latest_reading: Optional[DeviceReading] = None self._lock = threading.Lock() # 📊 RTT 统计(滑动窗口) self._rtt_window: List[float] = [] self._rtt_max_len = 100 # 保留最近 100 次 RTT self._rtt_stats = { "min_ms": float('inf'), "max_ms": 0.0, "avg_ms": 0.0, "count": 0, } def _update_rtt(self, rtt_ms: float): """更新 RTT 统计""" self._rtt_window.append(rtt_ms) if len(self._rtt_window) > self._rtt_max_len: self._rtt_window.pop(0) rtt_list = self._rtt_window self._rtt_stats.update({ "min_ms": min(rtt_list) if rtt_list else 0, "max_ms": max(rtt_list) if rtt_list else 0, "avg_ms": statistics.mean(rtt_list) if rtt_list else 0, "count": len(rtt_list), }) def get_rtt_stats(self) -> Dict[str, float]: """获取 RTT 统计信息""" with self._lock: return self._rtt_stats.copy() def get(self) -> Optional[Dict[str, Any]]: """✅ 统一 get() 接口:返回最新读数(含 RTT)""" with self._lock: if self._latest_reading is None: return None return self._latest_reading.to_dict() @abstractmethod def _connect_and_run(self): """子类实现连接与数据接收循环""" pass def start(self): """启动后台线程""" thread = threading.Thread(target=self._connect_and_run, daemon=True, name=f"Dev-{self.name}") thread.start() def _update_reading(self, reading: DeviceReading): """线程安全更新最新读数""" with self._lock: self._latest_reading = reading class TCPScaleDevice(MetricDevice): """TCP ASCII 协议称重设备(支持您的两种设备)""" def __init__(self, ip: str, port: int, name: str = ""): super().__init__(ip, port, name) self._buffer = b"" self._socket: Optional[socket.socket] = None self._running = True def _parse_line(self, line: bytes) -> Optional[DeviceReading]: try: clean = line.strip() if not clean: return None parts = clean.split(b',') if len(parts) < 3: return DeviceReading( weight=None, raw_data=clean, status="??", timestamp=time.time(), rtt_ms=0.0, is_valid=False ) status = parts[0].decode('ascii', errors='replace').upper() mode = parts[1].decode('ascii', errors='replace').upper() weight_str = parts[2].decode('ascii', errors='replace') weight = None is_valid = False if weight_str.replace('+', '').replace('-', '').isdigit(): try: weight = int(weight_str) is_valid = True except ValueError: pass return DeviceReading( weight=weight, raw_data=clean, status=status, timestamp=time.time(), rtt_ms=0.0, # 稍后更新 is_valid=is_valid, ) except Exception: return None def _extract_lines(self, data: bytes) -> List[bytes]: """自适应行分割""" self._buffer += data lines = [] if b'\r\n' in self._buffer or b'\n' in self._buffer: norm = self._buffer.replace(b'\r\n', b'\n') parts = norm.split(b'\n') *complete, self._buffer = parts lines = [line for line in complete if line] else: # 无换行符:整包作为单行 if self._buffer: lines = [self._buffer] self._buffer = b"" return lines def _connect_and_run(self): while self._running: try: # 🔁 重连逻辑 if self._socket is None: self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._socket.settimeout(5.0) # ⏱️ 测量连接 RTT t0 = time.perf_counter() self._socket.connect((self.ip, self.port)) rtt_connect = (time.perf_counter() - t0) * 1000 # ms self._update_rtt(rtt_connect) print(f"[{self.name}] Connected (RTT={rtt_connect:.1f}ms)") # 接收数据 t0 = time.perf_counter() data = self._socket.recv(1024) recv_rtt = (time.perf_counter() - t0) * 1000 if not data: break # 更新 RTT(粗略:recv 耗时 ≈ 半 RTT) self._update_rtt(recv_rtt * 2) # 解析 lines = self._extract_lines(data) for line in lines: reading = self._parse_line(line) if reading: reading.rtt_ms = recv_rtt * 2 # 近似 RTT self._update_reading(reading) except socket.timeout: continue except (OSError, socket.error) as e: print(f"[{self.name}] Error: {e}") if self._socket: self._socket.close() self._socket = None time.sleep(2) except Exception as e: print(f"[{self.name}] Fatal: {e}") break if self._socket: self._socket.close() # ===== 设计模式:设备工厂 + 管理器 ===== class DeviceManager: """设备工厂 + 单例管理(避免重复创建同IP:Port设备)""" _instances: Dict[str, MetricDevice] = {} _lock = threading.Lock() @classmethod def get_device(cls, ip: str, port: int, name: str = "") -> MetricDevice: """✅ 工厂方法:按 (ip, port) 单例返回设备""" key = f"{ip}:{port}" with cls._lock: if key not in cls._instances: # 🔧 策略模式:未来可扩展 ModbusDevice、HTTPDevice 等 if port in (502, 8234): # 您的设备端口 device = TCPScaleDevice(ip, port, name) device.start() cls._instances[key] = device else: raise ValueError(f"Unsupported port {port} for auto-detect") return cls._instances[key] @classmethod def get_all_devices(cls) -> List[MetricDevice]: return list(cls._instances.values())