#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/6/18 10:27 # @Author : reenrr # @File : send_target-test.py ''' import socket import json import time target_weight = 200 set_vibrate_time = 10 cmd_set_target = { # 称量 "command": "set_target", "payload": { "target_weight": target_weight, "algorithm": "pid", "direction_control": False } } cmd_get_weight = { # 获取重量 "command": "get_weight" } cmd_set_zero = { # 去皮 "command": "set_zero", "payload": { } } cmd_set_vibrate = { # 振动控制 "command": "set_vibrate", "payload": { "time": set_vibrate_time # 单位S } } def start_weight(host='127.0.0.1', port=5000): with socket.socket() as s: try: s.connect((host, port)) s.sendall(json.dumps(cmd_set_zero).encode()) print("已发送去皮指令") time.sleep(1) # 等待去皮完成 s.sendall(json.dumps(cmd_get_weight).encode()) print("请求当前重量...") data = s.recv(1024) response = json.loads(data.decode()) current_weight = response.get("current_weight") print(f"当前重量:{current_weight}") except Exception as e: print(f"操作过程中发生异常:{e}") with socket.socket() as s: try: if isinstance(current_weight, (int, float)) and abs(current_weight) < 0.01: print("重量为0,开始称量流程") s.sendall(json.dumps(cmd_set_target).encode()) print("已发送目标值设定指令") while True: try: data = s.recv(1024) if not data: print("连接已关闭") break response = json.loads(data.decode()) current_weight = response.get("current_weight") if current_weight is not None: print(f"当前重量:{current_weight}") if current_weight >= target_weight: print("目标已达到") break except Exception as e: print(f"接收或解析重量数据失败:{e}") break else: print("当前重量不为0,中止称量流程,请检查设备") except Exception as e: print(f"操作过程中发生异常:{e}") if __name__ == "__main__": if start_weight(host='127.0.0.1', port=5000): print("称重完毕") else: print("称重发生错误") exit(-1)