Files
ElecScalesMeasur/send_target1.py

99 lines
2.9 KiB
Python
Raw Permalink Normal View History

2025-09-30 14:57:14 +08:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
# @Time : 2025/6/18 10:27
# @Author : reenrr
# @File : send_target-test.py
'''
import socket
import json
import time
target_weight = 200
set_vibrate_time = 10
cmd_set_target = {
# 称量
"command": "set_target",
"payload": {
"target_weight": target_weight,
"algorithm": "pid",
"direction_control": False
}
}
cmd_get_weight = {
# 获取重量
"command": "get_weight"
}
cmd_set_zero = {
# 去皮
"command": "set_zero",
"payload": {
}
}
cmd_set_vibrate = { # 振动控制
"command": "set_vibrate",
"payload": {
"time": set_vibrate_time # 单位S
}
}
def start_weight(host='127.0.0.1', port=5000):
with socket.socket() as s:
try:
s.connect((host, port))
s.sendall(json.dumps(cmd_set_zero).encode())
print("已发送去皮指令")
time.sleep(1) # 等待去皮完成
s.sendall(json.dumps(cmd_get_weight).encode())
print("请求当前重量...")
data = s.recv(1024)
response = json.loads(data.decode())
current_weight = response.get("current_weight")
print(f"当前重量:{current_weight}")
except Exception as e:
print(f"操作过程中发生异常:{e}")
with socket.socket() as s:
try:
if isinstance(current_weight, (int, float)) and abs(current_weight) < 0.01:
print("重量为0开始称量流程")
s.sendall(json.dumps(cmd_set_target).encode())
print("已发送目标值设定指令")
while True:
try:
data = s.recv(1024)
if not data:
print("连接已关闭")
break
response = json.loads(data.decode())
current_weight = response.get("current_weight")
if current_weight is not None:
print(f"当前重量:{current_weight}")
if current_weight >= target_weight:
print("目标已达到")
break
except Exception as e:
print(f"接收或解析重量数据失败:{e}")
break
else:
print("当前重量不为0中止称量流程请检查设备")
except Exception as e:
print(f"操作过程中发生异常:{e}")
if __name__ == "__main__":
if start_weight(host='127.0.0.1', port=5000):
print("称重完毕")
else:
print("称重发生错误")
exit(-1)