Files
Feeding_control_system/Fedding.py
2025-09-13 15:49:17 +08:00

524 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import binascii
import time
import threading
import struct
from pymodbus.client import ModbusTcpClient
from pymodbus.exceptions import ModbusException
class FeedingControlSystem:
def __init__(self, relay_host='192.168.0.18', relay_port=50000):
# 网络继电器配置 (作为Modbus TCP转RS485网关)
self.relay_host = relay_host
self.relay_port = relay_port
# 创建网络继电器的Modbus TCP客户端
self.relay_modbus_client = ModbusTcpClient(relay_host, port=relay_port)
# 定义继电器DO端口映射
self.DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动门
self.DOOR_LOWER_1 = 'door_lower_1' # DO1 - 出砼门控制1
self.DOOR_LOWER_2 = 'door_lower_2' # DO2 - 出砼门控制2
self.BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱
self.BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱
# 继电器控制命令 (原始Socket方式)
self.relay_commands = {
self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'},
self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'},
self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'},
self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'},
self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'}
}
# 读取状态命令
self.read_status_command = '000000000006010100000008'
# 设备位映射
self.device_bit_map = {
self.DOOR_UPPER: 0,
self.DOOR_LOWER_1: 1,
self.DOOR_LOWER_2: 2,
self.BREAK_ARCH_UPPER: 3,
self.BREAK_ARCH_LOWER: 4
}
# 变频器配置 (通过网络继电器转发)
self.inverter_config = {
'slave_id': 1, # 变频器从机地址
'frequency_register': 0x01, # 频率设置寄存器
'start_register': 0x00, # 启动命令寄存器
'stop_register': 0x01 # 停止命令寄存器
}
# 变送器配置 (通过网络继电器转发)
self.transmitter_config = {
1: { # 上料斗变送器
'slave_id': 1, # 从机地址
'weight_register': 0x00, # 重量寄存器地址
'register_count': 2 # 读取寄存器数量
},
2: { # 下料斗变送器
'slave_id': 2, # 从机地址
'weight_register': 0x00, # 重量寄存器地址
'register_count': 2 # 读取寄存器数量
}
}
# 系统状态
self._running = False
self._monitor_thread = None
self.min_required_weight = 50 # 最小需要重量(kg)
# 状态标志
self.upper_door_position = 'default' # default, to_mixer, returning
self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:第三阶段
self.last_upper_weight = 0
self.last_lower_weight = 0
self.last_weight_time = time.time()
# 添加权重读取错误计数
self.upper_weight_error_count = 0
self.lower_weight_error_count = 0
self.max_error_count = 3 # 最大连续错误次数
def send_relay_command(self, command_hex):
"""发送命令到网络继电器 (原始Socket方式)"""
try:
byte_data = binascii.unhexlify(command_hex)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect((self.relay_host, self.relay_port))
sock.send(byte_data)
response = sock.recv(1024)
print(f"收到继电器响应: {binascii.hexlify(response)}")
return response
except Exception as e:
print(f"继电器通信错误: {e}")
return None
def get_relay_status(self):
"""获取继电器状态"""
response = self.send_relay_command(self.read_status_command)
status_dict = {}
if response and len(response) >= 10:
status_byte = response[9]
status_bin = f"{status_byte:08b}"[::-1]
for key, bit_index in self.device_bit_map.items():
status_dict[key] = status_bin[bit_index] == '1'
else:
print("读取继电器状态失败或响应无效")
return status_dict
def control_relay(self, device, action):
"""控制继电器开关"""
if device in self.relay_commands and action in self.relay_commands[device]:
print(f"控制继电器 {device} {action}")
self.send_relay_command(self.relay_commands[device][action])
time.sleep(0.1)
else:
print(f"无效的继电器设备或动作: {device}, {action}")
def read_transmitter_data_via_relay(self, transmitter_id):
"""通过网络继电器读取变送器数据 (Modbus TCP转RS485)"""
try:
# 获取变送器配置
if transmitter_id not in self.transmitter_config:
print(f"无效的变送器ID: {transmitter_id}")
return None
config = self.transmitter_config[transmitter_id]
# 连接网络继电器的Modbus服务
if not self.relay_modbus_client.connect():
print("无法连接到网络继电器Modbus服务")
return None
# 通过网络继电器读取变送器数据
result = self.relay_modbus_client.read_holding_registers(
address=config['weight_register'],
count=config['register_count'],
slave=config['slave_id']
)
if isinstance(result, Exception):
print(f"读取变送器 {transmitter_id} 数据失败: {result}")
return None
# 解析重量数据 (假设是32位浮点数由两个16位寄存器组成)
if config['register_count'] == 2:
# 组合两个寄存器为32位浮点数 (大端序)
weight_bytes = struct.pack('>HH', result.registers[0], result.registers[1])
weight = struct.unpack('>f', weight_bytes)[0]
elif config['register_count'] == 1:
# 单个寄存器直接作为整数
weight = float(result.registers[0])
else:
print(f"不支持的寄存器数量: {config['register_count']}")
return None
print(f"变送器 {transmitter_id} 读取重量: {weight}kg")
# 重置错误计数
if transmitter_id == 1:
self.upper_weight_error_count = 0
else:
self.lower_weight_error_count = 0
return weight
except ModbusException as e:
print(f"变送器 {transmitter_id} Modbus通信错误: {e}")
return None
except Exception as e:
print(f"变送器 {transmitter_id} 数据解析错误: {e}")
return None
finally:
self.relay_modbus_client.close()
def set_inverter_frequency_via_relay(self, frequency):
"""通过网络继电器设置变频器频率"""
try:
# 连接网络继电器的Modbus服务
if not self.relay_modbus_client.connect():
print("无法连接到网络继电器Modbus服务")
return False
# 写入频率值 (假设频率值需要转换为Hz*10)
result = self.relay_modbus_client.write_register(
self.inverter_config['frequency_register'],
int(frequency * 10),
slave=self.inverter_config['slave_id']
)
if isinstance(result, Exception):
print(f"设置变频器频率失败: {result}")
return False
print(f"设置变频器频率为 {frequency}Hz")
return True
except ModbusException as e:
print(f"变频器Modbus通信错误: {e}")
return False
finally:
self.relay_modbus_client.close()
def control_inverter_via_relay(self, action):
"""通过网络继电器控制变频器启停"""
try:
if not self.relay_modbus_client.connect():
print("无法连接到网络继电器Modbus服务")
return False
if action == 'start':
result = self.relay_modbus_client.write_register(
self.inverter_config['start_register'],
1,
slave=self.inverter_config['slave_id']
)
print("启动变频器")
elif action == 'stop':
result = self.relay_modbus_client.write_register(
self.inverter_config['stop_register'],
1,
slave=self.inverter_config['slave_id']
)
print("停止变频器")
else:
print(f"无效的变频器操作: {action}")
return False
if isinstance(result, Exception):
print(f"控制变频器失败: {result}")
return False
return True
except ModbusException as e:
print(f"变频器控制错误: {e}")
return False
finally:
self.relay_modbus_client.close()
def read_transmitter_data(self, transmitter_id):
"""读取变送器数据 (按需读取,仅在下料时)"""
return self.read_transmitter_data_via_relay(transmitter_id)
def set_inverter_frequency(self, frequency):
"""设置变频器频率"""
return self.set_inverter_frequency_via_relay(frequency)
def control_inverter(self, action):
"""控制变频器启停"""
return self.control_inverter_via_relay(action)
def check_upper_material_request(self):
"""检查是否需要向上料斗要料"""
current_weight = self.read_transmitter_data(1)
# 如果读取失败,增加错误计数
if current_weight is None:
self.upper_weight_error_count += 1
print(f"上料斗重量读取失败,错误计数: {self.upper_weight_error_count}")
# 如果连续错误次数超过阈值,触发报警但不自动要料
if self.upper_weight_error_count >= self.max_error_count:
print("警告:上料斗重量传感器连续读取失败,请检查设备连接")
return False # 读取失败时不触发要料
# 重置错误计数
self.upper_weight_error_count = 0
if current_weight < self.min_required_weight:
print("上料斗重量不足,通知搅拌楼要料")
self.move_upper_door_to_mixer()
return True
return False
def move_upper_door_to_mixer(self):
"""移动上料斗到搅拌楼出砼口"""
print("移动上料斗到搅拌楼出砼口")
self.control_relay(self.DOOR_UPPER, 'open')
self.upper_door_position = 'to_mixer'
def return_upper_door(self):
"""返回上料斗"""
print("上料斗返回原位")
self.control_relay(self.DOOR_UPPER, 'close')
self.upper_door_position = 'default'
def start_lower_feeding(self):
"""开始下料过程"""
if self.lower_feeding_stage != 0:
print("下料已在进行中")
return
print("开始三阶段下料过程")
self.lower_feeding_stage = 1
self.feeding_stage_one()
def feeding_stage_one(self):
"""第一阶段下料"""
print("开始第一阶段下料 (1/3)")
# 设置变频器频率 (假设第一阶段频率为30Hz)
self.set_inverter_frequency(30.0)
self.control_inverter('start')
# 控制出砼门
self.control_relay(self.DOOR_LOWER_1, 'open')
self.control_relay(self.DOOR_LOWER_2, 'open')
# 监控下料过程
start_time = time.time()
initial_weight = self.read_transmitter_data(2)
# 如果初始重量读取失败,使用默认值或取消操作
if initial_weight is None:
print("无法获取初始重量,取消下料操作")
self.finish_feeding()
return
target_weight = initial_weight + 33.3 # 假设每阶段下料1/3
while self.lower_feeding_stage == 1:
current_weight = self.read_transmitter_data(2)
# 如果读取失败,增加错误计数并检查是否超过阈值
if current_weight is None:
self.lower_weight_error_count += 1
if self.lower_weight_error_count >= self.max_error_count:
print("下料斗重量传感器连续读取失败,停止下料")
self.finish_feeding()
return
else:
self.lower_weight_error_count = 0 # 重置错误计数
# 检查是否达到目标重量或超时
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.lower_feeding_stage = 2
self.feeding_stage_two()
break
time.sleep(2) # 每2秒读取一次
def feeding_stage_two(self):
"""第二阶段下料"""
print("开始第二阶段下料 (2/3)")
# 调整变频器频率 (假设第二阶段频率为40Hz)
self.set_inverter_frequency(40.0)
start_time = time.time()
initial_weight = self.read_transmitter_data(2)
# 如果初始重量读取失败,使用默认值或取消操作
if initial_weight is None:
print("无法获取初始重量,取消下料操作")
self.finish_feeding()
return
target_weight = initial_weight + 33.3
while self.lower_feeding_stage == 2:
current_weight = self.read_transmitter_data(2)
# 如果读取失败,增加错误计数并检查是否超过阈值
if current_weight is None:
self.lower_weight_error_count += 1
if self.lower_weight_error_count >= self.max_error_count:
print("下料斗重量传感器连续读取失败,停止下料")
self.finish_feeding()
return
else:
self.lower_weight_error_count = 0 # 重置错误计数
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.lower_feeding_stage = 3
self.feeding_stage_three()
break
time.sleep(2) # 每2秒读取一次
def feeding_stage_three(self):
"""第三阶段下料"""
print("开始第三阶段下料 (3/3)")
# 调整变频器频率 (假设第三阶段频率为50Hz)
self.set_inverter_frequency(50.0)
start_time = time.time()
initial_weight = self.read_transmitter_data(2)
# 如果初始重量读取失败,使用默认值或取消操作
if initial_weight is None:
print("无法获取初始重量,取消下料操作")
self.finish_feeding()
return
target_weight = initial_weight + 33.3
while self.lower_feeding_stage == 3:
current_weight = self.read_transmitter_data(2)
# 如果读取失败,增加错误计数并检查是否超过阈值
if current_weight is None:
self.lower_weight_error_count += 1
if self.lower_weight_error_count >= self.max_error_count:
print("下料斗重量传感器连续读取失败,停止下料")
self.finish_feeding()
return
else:
self.lower_weight_error_count = 0 # 重置错误计数
if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30:
self.lower_feeding_stage = 0
self.finish_feeding()
break
time.sleep(2) # 每2秒读取一次
def finish_feeding(self):
"""完成下料"""
print("下料完成,关闭出砼门")
self.control_inverter('stop')
self.control_relay(self.DOOR_LOWER_1, 'close')
self.control_relay(self.DOOR_LOWER_2, 'close')
def handle_overflow_control(self, overflow_detected, door_opening_large):
"""处理溢料控制"""
if overflow_detected and door_opening_large:
print("检测到溢料且出砼门开口较大,调小出砼门")
# 通过重复开关控制角度
self.control_relay(self.DOOR_LOWER_1, 'close')
time.sleep(0.1)
self.control_relay(self.DOOR_LOWER_1, 'open')
time.sleep(0.1)
def check_arch_blocking(self):
"""检查是否需要破拱 (按需读取)"""
current_time = time.time()
# 检查上料斗 (按需读取)
upper_weight = self.read_transmitter_data(1)
if upper_weight is not None: # 只有在成功读取重量时才检查堵塞
if (abs(upper_weight - self.last_upper_weight) < 0.1) and \
(current_time - self.last_weight_time) > 10:
print("上料斗可能堵塞,启动破拱")
self.control_relay(self.BREAK_ARCH_UPPER, 'open')
time.sleep(2)
self.control_relay(self.BREAK_ARCH_UPPER, 'close')
# 检查下料斗 (按需读取)
lower_weight = self.read_transmitter_data(2)
if lower_weight is not None: # 只有在成功读取重量时才检查堵塞
if (abs(lower_weight - self.last_lower_weight) < 0.1) and \
(current_time - self.last_weight_time) > 10:
print("下料斗可能堵塞,启动破拱")
self.control_relay(self.BREAK_ARCH_LOWER, 'open')
time.sleep(2)
self.control_relay(self.BREAK_ARCH_LOWER, 'close')
# 更新重量记录(只有在成功读取时才更新)
if upper_weight is not None:
self.last_upper_weight = upper_weight
if lower_weight is not None:
self.last_lower_weight = lower_weight
if upper_weight is not None or lower_weight is not None:
self.last_weight_time = current_time
def monitor_system(self):
"""监控系统状态"""
while self._running:
try:
# 检查是否需要要料
self.check_upper_material_request()
# 检查破拱 (按需读取)
self.check_arch_blocking()
time.sleep(1)
except Exception as e:
print(f"监控线程错误: {e}")
def start(self):
"""启动系统"""
if self._running:
print("系统已在运行")
return
print("启动控制系统")
self._running = True
self._monitor_thread = threading.Thread(target=self.monitor_system, daemon=True)
self._monitor_thread.start()
def stop(self):
"""停止系统"""
if not self._running:
print("系统未在运行")
return
print("停止控制系统")
self._running = False
if self._monitor_thread is not None:
self._monitor_thread.join()
print("控制系统已停止")
# 使用示例
if __name__ == "__main__":
system = FeedingControlSystem(relay_host='192.168.0.18', relay_port=50000)
# 启动系统监控
system.start()
# 模拟操作
try:
# 检查是否需要要料
system.check_upper_material_request()
# 开始下料
system.start_lower_feeding()
# 运行一段时间
time.sleep(60)
except KeyboardInterrupt:
print("程序被中断")
finally:
system.stop()