commit d92e97e45da44d727e59a89824034dac248a09ce Author: chuyiwen Date: Sun Nov 16 16:30:11 2025 +0800 V1.0 diff --git a/main.py b/main.py new file mode 100644 index 0000000..bc99afc --- /dev/null +++ b/main.py @@ -0,0 +1,27 @@ +# main.py +from metric_device import DeviceManager +import time + +# ✅ 初始化设备(自动单例,避免重复连接) +scale1 = DeviceManager.get_device("192.168.250.66", 8234, "Line1") +scale2 = DeviceManager.get_device("192.168.250.63", 502, "Line2") + +# ✅ 统一 get() 接口获取最新数据 +try: + for i in range(10000): + time.sleep(0.01) + + data1 = scale1.get() # ← 您要的 get 接口! + data2 = scale2.get() + + if data1: + print("dpwn:",{data1['weight']}) + # print(f"[{data1['status']}] {data1['weight']:6}g | RTT: {data1['rtt_ms']:.1f}ms | {data1['raw_data']}") + if data2: + print("up:",{data2['weight']}) + # print(f"[{data2['status']}] {data2['weight']:6}g | RTT: {data2['rtt_ms']:.1f}ms | {data2['raw_data']}") + + + +except KeyboardInterrupt: + print("\nStopped.") \ No newline at end of file diff --git a/metric_device.py b/metric_device.py new file mode 100644 index 0000000..634741d --- /dev/null +++ b/metric_device.py @@ -0,0 +1,223 @@ +# metric_device.py +import socket +import threading +import time +import statistics +from abc import ABC, abstractmethod +from typing import Optional, Dict, Any, List +from dataclasses import dataclass, asdict + +@dataclass +class DeviceReading: + """标准化设备读数""" + weight: Optional[int] # 重量(None 表示 OL/ER) + raw_data: bytes # 原始字节 + status: str # 'ST', 'US', 'OL', 'ER' + timestamp: float # 本地接收时间(秒,time.time()) + rtt_ms: float # 网络往返延迟(毫秒) + is_valid: bool # 是否含有效重量 + + def to_dict(self) -> Dict[str, Any]: + return { + **asdict(self), + "timestamp_iso": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(self.timestamp)), + "weight_kg": self.weight / 1000.0 if self.weight is not None else None, + } + + +class MetricDevice(ABC): + """抽象计量设备基类 —— 支持 get() + RTT 统计""" + def __init__(self, ip: str, port: int, name: str = ""): + self.ip = ip + self.port = port + self.name = name or f"{ip}:{port}" + self._latest_reading: Optional[DeviceReading] = None + self._lock = threading.Lock() + + # 📊 RTT 统计(滑动窗口) + self._rtt_window: List[float] = [] + self._rtt_max_len = 100 # 保留最近 100 次 RTT + self._rtt_stats = { + "min_ms": float('inf'), + "max_ms": 0.0, + "avg_ms": 0.0, + "count": 0, + } + + def _update_rtt(self, rtt_ms: float): + """更新 RTT 统计""" + self._rtt_window.append(rtt_ms) + if len(self._rtt_window) > self._rtt_max_len: + self._rtt_window.pop(0) + + rtt_list = self._rtt_window + self._rtt_stats.update({ + "min_ms": min(rtt_list) if rtt_list else 0, + "max_ms": max(rtt_list) if rtt_list else 0, + "avg_ms": statistics.mean(rtt_list) if rtt_list else 0, + "count": len(rtt_list), + }) + + def get_rtt_stats(self) -> Dict[str, float]: + """获取 RTT 统计信息""" + with self._lock: + return self._rtt_stats.copy() + + def get(self) -> Optional[Dict[str, Any]]: + """✅ 统一 get() 接口:返回最新读数(含 RTT)""" + with self._lock: + if self._latest_reading is None: + return None + return self._latest_reading.to_dict() + + @abstractmethod + def _connect_and_run(self): + """子类实现连接与数据接收循环""" + pass + + def start(self): + """启动后台线程""" + thread = threading.Thread(target=self._connect_and_run, daemon=True, name=f"Dev-{self.name}") + thread.start() + + def _update_reading(self, reading: DeviceReading): + """线程安全更新最新读数""" + with self._lock: + self._latest_reading = reading + + +class TCPScaleDevice(MetricDevice): + """TCP ASCII 协议称重设备(支持您的两种设备)""" + def __init__(self, ip: str, port: int, name: str = ""): + super().__init__(ip, port, name) + self._buffer = b"" + self._socket: Optional[socket.socket] = None + self._running = True + + def _parse_line(self, line: bytes) -> Optional[DeviceReading]: + try: + clean = line.strip() + if not clean: + return None + parts = clean.split(b',') + if len(parts) < 3: + return DeviceReading( + weight=None, raw_data=clean, status="??", + timestamp=time.time(), rtt_ms=0.0, is_valid=False + ) + + status = parts[0].decode('ascii', errors='replace').upper() + mode = parts[1].decode('ascii', errors='replace').upper() + weight_str = parts[2].decode('ascii', errors='replace') + + weight = None + is_valid = False + + if weight_str.replace('+', '').replace('-', '').isdigit(): + try: + weight = int(weight_str) + is_valid = True + except ValueError: + pass + + return DeviceReading( + weight=weight, + raw_data=clean, + status=status, + timestamp=time.time(), + rtt_ms=0.0, # 稍后更新 + is_valid=is_valid, + ) + except Exception: + return None + + def _extract_lines(self, data: bytes) -> List[bytes]: + """自适应行分割""" + self._buffer += data + lines = [] + if b'\r\n' in self._buffer or b'\n' in self._buffer: + norm = self._buffer.replace(b'\r\n', b'\n') + parts = norm.split(b'\n') + *complete, self._buffer = parts + lines = [line for line in complete if line] + else: + # 无换行符:整包作为单行 + if self._buffer: + lines = [self._buffer] + self._buffer = b"" + return lines + + def _connect_and_run(self): + while self._running: + try: + # 🔁 重连逻辑 + if self._socket is None: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(5.0) + + # ⏱️ 测量连接 RTT + t0 = time.perf_counter() + self._socket.connect((self.ip, self.port)) + rtt_connect = (time.perf_counter() - t0) * 1000 # ms + self._update_rtt(rtt_connect) + print(f"[{self.name}] Connected (RTT={rtt_connect:.1f}ms)") + + # 接收数据 + t0 = time.perf_counter() + data = self._socket.recv(1024) + recv_rtt = (time.perf_counter() - t0) * 1000 + + if not data: + break + + # 更新 RTT(粗略:recv 耗时 ≈ 半 RTT) + self._update_rtt(recv_rtt * 2) + + # 解析 + lines = self._extract_lines(data) + for line in lines: + reading = self._parse_line(line) + if reading: + reading.rtt_ms = recv_rtt * 2 # 近似 RTT + self._update_reading(reading) + + except socket.timeout: + continue + except (OSError, socket.error) as e: + print(f"[{self.name}] Error: {e}") + if self._socket: + self._socket.close() + self._socket = None + time.sleep(2) + except Exception as e: + print(f"[{self.name}] Fatal: {e}") + break + + if self._socket: + self._socket.close() + + +# ===== 设计模式:设备工厂 + 管理器 ===== +class DeviceManager: + """设备工厂 + 单例管理(避免重复创建同IP:Port设备)""" + _instances: Dict[str, MetricDevice] = {} + _lock = threading.Lock() + + @classmethod + def get_device(cls, ip: str, port: int, name: str = "") -> MetricDevice: + """✅ 工厂方法:按 (ip, port) 单例返回设备""" + key = f"{ip}:{port}" + with cls._lock: + if key not in cls._instances: + # 🔧 策略模式:未来可扩展 ModbusDevice、HTTPDevice 等 + if port in (502, 8234): # 您的设备端口 + device = TCPScaleDevice(ip, port, name) + device.start() + cls._instances[key] = device + else: + raise ValueError(f"Unsupported port {port} for auto-detect") + return cls._instances[key] + + @classmethod + def get_all_devices(cls) -> List[MetricDevice]: + return list(cls._instances.values()) \ No newline at end of file