Files
Feeding_control_system/hardware/transmitter.py

119 lines
4.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# hardware/transmitter.py
from pymodbus.exceptions import ModbusException
class TransmitterController:
def __init__(self, relay_controller):
self.relay_controller = relay_controller
# 变送器配置
self.config = {
1: { # 上料斗
'slave_id': 1,
'weight_register': 0x01,
'register_count': 2
},
2: { # 下料斗
'slave_id': 2,
'weight_register': 0x01,
'register_count': 2
}
}
def read_data_back(self, transmitter_id):
"""读取变送器数据"""
try:
if transmitter_id not in self.config:
print(f"无效变送器ID: {transmitter_id}")
return None
config = self.config[transmitter_id]
if not self.relay_controller.modbus_client.connect():
print("无法连接网络继电器Modbus服务")
return None
result = self.relay_controller.modbus_client.read_holding_registers(
address=config['weight_register'],
count=config['register_count'],
slave=config['slave_id']
)
if isinstance(result, Exception):
print(f"读取变送器 {transmitter_id} 失败: {result}")
return None
# 根据图片示例,正确解析数据
if config['register_count'] == 2:
# 获取原始字节数组
raw_data = result.registers
# 组合成32位整数
weight = (raw_data[0] << 16) + raw_data[1]
weight = weight / 1000.0 # 单位转换为千克
elif config['register_count'] == 1:
weight = float(result.registers[0])
else:
print(f"不支持的寄存器数量: {config['register_count']}")
return None
print(f"变送器 {transmitter_id} 读取重量: {weight}kg")
return weight
except ModbusException as e:
print(f"Modbus通信错误: {e}")
return None
except Exception as e:
print(f"数据解析错误: {e}")
return None
finally:
self.relay_controller.modbus_client.close()
# 直接读取 变送器返回的数据
def read_data(self, transmitter_id):
"""
Args: transmitter_id 为1 表示上料斗, 为2 表示下料斗
return: 读取成功返回重量 weight: int, 失败返回 None
"""
if transmitter_id == 1:
# 上料斗变送器的信息:
IP = "192.168.250.63"
PORT = 502
TIMEOUT = 5 # 超时时间为 5秒
weight = None
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.settimeout(TIMEOUT)
s.connect((IP, PORT))
print(f"✅ 连接 {IP}:{PORT} 成功")
# 接收数据变送器主动推送单次recv即可获取一条完整数据
data = s.recv(1024)
if data:
# print(f"收到原始数据:{data}")
weight = self.parse_weight(data)
else:
print("❌ 未收到设备数据")
except ConnectionRefusedError:
print(f"❌ 连接失败:{IP}:{PORT} 拒绝连接(设备离线/端口错误)")
except socket.timeout:
print(f"❌ 超时:{TIMEOUT}秒内未收到数据")
except Exception as e:
print(f"❌ 读取异常:{e}")
# 成功返回重量int失败返回None
return weight
def parse_weight(raw_data):
"""解析函数:提取重量数值(如从 b'ST,NT,+0000175\r\n' 中提取 175)"""
try:
data_str = raw_data.decode('utf-8').strip()
parts = data_str.split(',')
weight_part = parts[2].strip()
return int(''.join(filter(str.isdigit, weight_part)))
except (IndexError, ValueError, UnicodeDecodeError) as e:
# print(f"数据解析失败:{e},原始数据:{raw_data}")
# 注意可能会出现解析失败的情况此时返回None界面不用更新数据就行
return None