Files
Feeding_control_system/test2.py

106 lines
4.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# import socket
# # 设备信息
# IP = "192.168.250.63"
# PORT = 502
# TIMEOUT = 10 # 超时时间(秒)
# def parse_weight(raw_data):
# """解析原始数据,提取重量(如从 b'ST,NT, +0000175\r\n' 中提取 175"""
# try:
# # 1. 字节串解码为字符串(去除 b'' 包裹)
# data_str = raw_data.decode('utf-8').strip() # strip() 去除首尾的换行符\r\n和空格
# # 此时 data_str 应为 'ST,NT, +0000175'
# # 2. 按逗号分割字符串,取第三个字段(包含重量的部分)
# parts = data_str.split(',') # 分割后为 ['ST', 'NT', '+0000175']
# weight_part = parts[2].strip() # 得到 '+0000175'
# # 3. 提取纯数字(去除可能的正负号和前导零)
# # 方法:替换掉非数字字符,再转换为整数
# weight_num = int(''.join(filter(str.isdigit, weight_part)))
# return weight_num
# except (IndexError, ValueError, UnicodeDecodeError) as e:
# print(f"解析失败:{e},原始数据:{raw_data}")
# return None
# # 创建TCP连接并持续读取
# with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
# try:
# s.settimeout(TIMEOUT)
# s.connect((IP, PORT))
# print(f"✅ 已连接到 {IP}:{PORT},开始持续读取重量...\n")
# while True: # 循环读取
# data = s.recv(1024) # 接收数据
# if data:
# print(f"原始数据:{data}")
# weight = parse_weight(data)
# if weight is not None:
# print(f"解析出重量:{weight}\n")
# else:
# print("❌ 未收到数据,等待下一次...\n")
# except ConnectionRefusedError:
# print(f"❌ 连接失败:{IP}:{PORT} 拒绝连接")
# except socket.timeout:
# print(f"❌ 超时:{TIMEOUT}秒内未收到数据")
# except Exception as e:
# print(f"❌ 发生错误:{e}")
import socket
import time # 导入时间模块,用于记录时间戳
# 设备信息
IP = "192.168.250.63"
PORT = 502
TIMEOUT = 10 # 超时时间(秒)
def parse_weight(raw_data):
"""解析原始数据,提取重量(如从 b'ST,NT, +0000175\r\n' 中提取 175"""
try:
data_str = raw_data.decode("utf-8").strip()
parts = data_str.split(",")
weight_part = parts[2].strip()
weight_num = int("".join(filter(str.isdigit, weight_part)))
return weight_num
except (IndexError, ValueError, UnicodeDecodeError) as e:
print(f"解析失败:{e},原始数据:{raw_data}")
return None
# 创建TCP连接并持续读取
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.settimeout(TIMEOUT)
s.connect((IP, PORT))
print(f"✅ 已连接到 {IP}:{PORT},开始持续读取重量...\n")
last_receive_time = None # 记录上一次收到有效数据的时间戳初始为None
while True: # 循环读取
data = s.recv(1024) # 接收数据
if data:
print(f"原始数据:{data}")
weight = parse_weight(data)
if weight is not None:
current_time = time.time() # 当前时间戳(秒,带小数)
print(f"解析出重量:{weight}")
# 计算与上一次有效数据的时间间隔
if last_receive_time is not None:
interval = current_time - last_receive_time # 间隔时间(秒)
print(f"与上一次数据的间隔:{interval:.3f}\n") # 保留3位小数
else:
print("(首次收到数据,无间隔记录)\n")
last_receive_time = current_time # 更新上一次时间戳
else:
print("❌ 未收到数据,等待下一次...\n")
except ConnectionRefusedError:
print(f"❌ 连接失败:{IP}:{PORT} 拒绝连接")
except socket.timeout:
print(f"❌ 超时:{TIMEOUT}秒内未收到数据")
except Exception as e:
print(f"❌ 发生错误:{e}")