From b04d41fba851d66e736da0db5983fc5fef5b8648 Mon Sep 17 00:00:00 2001 From: cdeyw <827523911@qq.com> Date: Sat, 13 Sep 2025 15:49:17 +0800 Subject: [PATCH] script --- Fedding.py | 523 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 523 insertions(+) create mode 100644 Fedding.py diff --git a/Fedding.py b/Fedding.py new file mode 100644 index 0000000..2f8f138 --- /dev/null +++ b/Fedding.py @@ -0,0 +1,523 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +import socket +import binascii +import time +import threading +import struct +from pymodbus.client import ModbusTcpClient +from pymodbus.exceptions import ModbusException + + +class FeedingControlSystem: + def __init__(self, relay_host='192.168.0.18', relay_port=50000): + # 网络继电器配置 (作为Modbus TCP转RS485网关) + self.relay_host = relay_host + self.relay_port = relay_port + + # 创建网络继电器的Modbus TCP客户端 + self.relay_modbus_client = ModbusTcpClient(relay_host, port=relay_port) + + # 定义继电器DO端口映射 + self.DOOR_UPPER = 'door_upper' # DO0 - 上料斗滑动门 + self.DOOR_LOWER_1 = 'door_lower_1' # DO1 - 出砼门控制1 + self.DOOR_LOWER_2 = 'door_lower_2' # DO2 - 出砼门控制2 + self.BREAK_ARCH_UPPER = 'break_arch_upper' # DO3 - 上料斗破拱 + self.BREAK_ARCH_LOWER = 'break_arch_lower' # DO4 - 下料斗破拱 + + # 继电器控制命令 (原始Socket方式) + self.relay_commands = { + self.DOOR_UPPER: {'open': '00000000000601050000FF00', 'close': '000000000006010500000000'}, + self.DOOR_LOWER_1: {'open': '00000000000601050001FF00', 'close': '000000000006010500010000'}, + self.DOOR_LOWER_2: {'open': '00000000000601050002FF00', 'close': '000000000006010500020000'}, + self.BREAK_ARCH_UPPER: {'open': '00000000000601050003FF00', 'close': '000000000006010500030000'}, + self.BREAK_ARCH_LOWER: {'open': '00000000000601050004FF00', 'close': '000000000006010500040000'} + } + + # 读取状态命令 + self.read_status_command = '000000000006010100000008' + + # 设备位映射 + self.device_bit_map = { + self.DOOR_UPPER: 0, + self.DOOR_LOWER_1: 1, + self.DOOR_LOWER_2: 2, + self.BREAK_ARCH_UPPER: 3, + self.BREAK_ARCH_LOWER: 4 + } + + # 变频器配置 (通过网络继电器转发) + self.inverter_config = { + 'slave_id': 1, # 变频器从机地址 + 'frequency_register': 0x01, # 频率设置寄存器 + 'start_register': 0x00, # 启动命令寄存器 + 'stop_register': 0x01 # 停止命令寄存器 + } + + # 变送器配置 (通过网络继电器转发) + self.transmitter_config = { + 1: { # 上料斗变送器 + 'slave_id': 1, # 从机地址 + 'weight_register': 0x00, # 重量寄存器地址 + 'register_count': 2 # 读取寄存器数量 + }, + 2: { # 下料斗变送器 + 'slave_id': 2, # 从机地址 + 'weight_register': 0x00, # 重量寄存器地址 + 'register_count': 2 # 读取寄存器数量 + } + } + + # 系统状态 + self._running = False + self._monitor_thread = None + self.min_required_weight = 50 # 最小需要重量(kg) + + # 状态标志 + self.upper_door_position = 'default' # default, to_mixer, returning + self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:第三阶段 + self.last_upper_weight = 0 + self.last_lower_weight = 0 + self.last_weight_time = time.time() + + # 添加权重读取错误计数 + self.upper_weight_error_count = 0 + self.lower_weight_error_count = 0 + self.max_error_count = 3 # 最大连续错误次数 + + def send_relay_command(self, command_hex): + """发送命令到网络继电器 (原始Socket方式)""" + try: + byte_data = binascii.unhexlify(command_hex) + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.connect((self.relay_host, self.relay_port)) + sock.send(byte_data) + response = sock.recv(1024) + print(f"收到继电器响应: {binascii.hexlify(response)}") + return response + except Exception as e: + print(f"继电器通信错误: {e}") + return None + + def get_relay_status(self): + """获取继电器状态""" + response = self.send_relay_command(self.read_status_command) + status_dict = {} + + if response and len(response) >= 10: + status_byte = response[9] + status_bin = f"{status_byte:08b}"[::-1] + + for key, bit_index in self.device_bit_map.items(): + status_dict[key] = status_bin[bit_index] == '1' + else: + print("读取继电器状态失败或响应无效") + + return status_dict + + def control_relay(self, device, action): + """控制继电器开关""" + if device in self.relay_commands and action in self.relay_commands[device]: + print(f"控制继电器 {device} {action}") + self.send_relay_command(self.relay_commands[device][action]) + time.sleep(0.1) + else: + print(f"无效的继电器设备或动作: {device}, {action}") + + def read_transmitter_data_via_relay(self, transmitter_id): + """通过网络继电器读取变送器数据 (Modbus TCP转RS485)""" + try: + # 获取变送器配置 + if transmitter_id not in self.transmitter_config: + print(f"无效的变送器ID: {transmitter_id}") + return None + + config = self.transmitter_config[transmitter_id] + + # 连接网络继电器的Modbus服务 + if not self.relay_modbus_client.connect(): + print("无法连接到网络继电器Modbus服务") + return None + + # 通过网络继电器读取变送器数据 + result = self.relay_modbus_client.read_holding_registers( + address=config['weight_register'], + count=config['register_count'], + slave=config['slave_id'] + ) + + if isinstance(result, Exception): + print(f"读取变送器 {transmitter_id} 数据失败: {result}") + return None + + # 解析重量数据 (假设是32位浮点数,由两个16位寄存器组成) + if config['register_count'] == 2: + # 组合两个寄存器为32位浮点数 (大端序) + weight_bytes = struct.pack('>HH', result.registers[0], result.registers[1]) + weight = struct.unpack('>f', weight_bytes)[0] + elif config['register_count'] == 1: + # 单个寄存器直接作为整数 + weight = float(result.registers[0]) + else: + print(f"不支持的寄存器数量: {config['register_count']}") + return None + + print(f"变送器 {transmitter_id} 读取重量: {weight}kg") + + # 重置错误计数 + if transmitter_id == 1: + self.upper_weight_error_count = 0 + else: + self.lower_weight_error_count = 0 + + return weight + + except ModbusException as e: + print(f"变送器 {transmitter_id} Modbus通信错误: {e}") + return None + except Exception as e: + print(f"变送器 {transmitter_id} 数据解析错误: {e}") + return None + finally: + self.relay_modbus_client.close() + + def set_inverter_frequency_via_relay(self, frequency): + """通过网络继电器设置变频器频率""" + try: + # 连接网络继电器的Modbus服务 + if not self.relay_modbus_client.connect(): + print("无法连接到网络继电器Modbus服务") + return False + + # 写入频率值 (假设频率值需要转换为Hz*10) + result = self.relay_modbus_client.write_register( + self.inverter_config['frequency_register'], + int(frequency * 10), + slave=self.inverter_config['slave_id'] + ) + + if isinstance(result, Exception): + print(f"设置变频器频率失败: {result}") + return False + + print(f"设置变频器频率为 {frequency}Hz") + return True + except ModbusException as e: + print(f"变频器Modbus通信错误: {e}") + return False + finally: + self.relay_modbus_client.close() + + def control_inverter_via_relay(self, action): + """通过网络继电器控制变频器启停""" + try: + if not self.relay_modbus_client.connect(): + print("无法连接到网络继电器Modbus服务") + return False + + if action == 'start': + result = self.relay_modbus_client.write_register( + self.inverter_config['start_register'], + 1, + slave=self.inverter_config['slave_id'] + ) + print("启动变频器") + elif action == 'stop': + result = self.relay_modbus_client.write_register( + self.inverter_config['stop_register'], + 1, + slave=self.inverter_config['slave_id'] + ) + print("停止变频器") + else: + print(f"无效的变频器操作: {action}") + return False + + if isinstance(result, Exception): + print(f"控制变频器失败: {result}") + return False + + return True + except ModbusException as e: + print(f"变频器控制错误: {e}") + return False + finally: + self.relay_modbus_client.close() + + def read_transmitter_data(self, transmitter_id): + """读取变送器数据 (按需读取,仅在下料时)""" + return self.read_transmitter_data_via_relay(transmitter_id) + + def set_inverter_frequency(self, frequency): + """设置变频器频率""" + return self.set_inverter_frequency_via_relay(frequency) + + def control_inverter(self, action): + """控制变频器启停""" + return self.control_inverter_via_relay(action) + + def check_upper_material_request(self): + """检查是否需要向上料斗要料""" + current_weight = self.read_transmitter_data(1) + + # 如果读取失败,增加错误计数 + if current_weight is None: + self.upper_weight_error_count += 1 + print(f"上料斗重量读取失败,错误计数: {self.upper_weight_error_count}") + + # 如果连续错误次数超过阈值,触发报警但不自动要料 + if self.upper_weight_error_count >= self.max_error_count: + print("警告:上料斗重量传感器连续读取失败,请检查设备连接") + return False # 读取失败时不触发要料 + + # 重置错误计数 + self.upper_weight_error_count = 0 + + if current_weight < self.min_required_weight: + print("上料斗重量不足,通知搅拌楼要料") + self.move_upper_door_to_mixer() + return True + return False + + def move_upper_door_to_mixer(self): + """移动上料斗到搅拌楼出砼口""" + print("移动上料斗到搅拌楼出砼口") + self.control_relay(self.DOOR_UPPER, 'open') + self.upper_door_position = 'to_mixer' + + def return_upper_door(self): + """返回上料斗""" + print("上料斗返回原位") + self.control_relay(self.DOOR_UPPER, 'close') + self.upper_door_position = 'default' + + def start_lower_feeding(self): + """开始下料过程""" + if self.lower_feeding_stage != 0: + print("下料已在进行中") + return + + print("开始三阶段下料过程") + self.lower_feeding_stage = 1 + self.feeding_stage_one() + + def feeding_stage_one(self): + """第一阶段下料""" + print("开始第一阶段下料 (1/3)") + # 设置变频器频率 (假设第一阶段频率为30Hz) + self.set_inverter_frequency(30.0) + self.control_inverter('start') + + # 控制出砼门 + self.control_relay(self.DOOR_LOWER_1, 'open') + self.control_relay(self.DOOR_LOWER_2, 'open') + + # 监控下料过程 + start_time = time.time() + initial_weight = self.read_transmitter_data(2) + + # 如果初始重量读取失败,使用默认值或取消操作 + if initial_weight is None: + print("无法获取初始重量,取消下料操作") + self.finish_feeding() + return + + target_weight = initial_weight + 33.3 # 假设每阶段下料1/3 + + while self.lower_feeding_stage == 1: + current_weight = self.read_transmitter_data(2) + # 如果读取失败,增加错误计数并检查是否超过阈值 + if current_weight is None: + self.lower_weight_error_count += 1 + if self.lower_weight_error_count >= self.max_error_count: + print("下料斗重量传感器连续读取失败,停止下料") + self.finish_feeding() + return + else: + self.lower_weight_error_count = 0 # 重置错误计数 + + # 检查是否达到目标重量或超时 + if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: + self.lower_feeding_stage = 2 + self.feeding_stage_two() + break + time.sleep(2) # 每2秒读取一次 + + def feeding_stage_two(self): + """第二阶段下料""" + print("开始第二阶段下料 (2/3)") + # 调整变频器频率 (假设第二阶段频率为40Hz) + self.set_inverter_frequency(40.0) + + start_time = time.time() + initial_weight = self.read_transmitter_data(2) + + # 如果初始重量读取失败,使用默认值或取消操作 + if initial_weight is None: + print("无法获取初始重量,取消下料操作") + self.finish_feeding() + return + + target_weight = initial_weight + 33.3 + + while self.lower_feeding_stage == 2: + current_weight = self.read_transmitter_data(2) + # 如果读取失败,增加错误计数并检查是否超过阈值 + if current_weight is None: + self.lower_weight_error_count += 1 + if self.lower_weight_error_count >= self.max_error_count: + print("下料斗重量传感器连续读取失败,停止下料") + self.finish_feeding() + return + else: + self.lower_weight_error_count = 0 # 重置错误计数 + + if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: + self.lower_feeding_stage = 3 + self.feeding_stage_three() + break + time.sleep(2) # 每2秒读取一次 + + def feeding_stage_three(self): + """第三阶段下料""" + print("开始第三阶段下料 (3/3)") + # 调整变频器频率 (假设第三阶段频率为50Hz) + self.set_inverter_frequency(50.0) + + start_time = time.time() + initial_weight = self.read_transmitter_data(2) + + # 如果初始重量读取失败,使用默认值或取消操作 + if initial_weight is None: + print("无法获取初始重量,取消下料操作") + self.finish_feeding() + return + + target_weight = initial_weight + 33.3 + + while self.lower_feeding_stage == 3: + current_weight = self.read_transmitter_data(2) + # 如果读取失败,增加错误计数并检查是否超过阈值 + if current_weight is None: + self.lower_weight_error_count += 1 + if self.lower_weight_error_count >= self.max_error_count: + print("下料斗重量传感器连续读取失败,停止下料") + self.finish_feeding() + return + else: + self.lower_weight_error_count = 0 # 重置错误计数 + + if (current_weight is not None and current_weight >= target_weight) or (time.time() - start_time) > 30: + self.lower_feeding_stage = 0 + self.finish_feeding() + break + time.sleep(2) # 每2秒读取一次 + + def finish_feeding(self): + """完成下料""" + print("下料完成,关闭出砼门") + self.control_inverter('stop') + self.control_relay(self.DOOR_LOWER_1, 'close') + self.control_relay(self.DOOR_LOWER_2, 'close') + + def handle_overflow_control(self, overflow_detected, door_opening_large): + """处理溢料控制""" + if overflow_detected and door_opening_large: + print("检测到溢料且出砼门开口较大,调小出砼门") + # 通过重复开关控制角度 + self.control_relay(self.DOOR_LOWER_1, 'close') + time.sleep(0.1) + self.control_relay(self.DOOR_LOWER_1, 'open') + time.sleep(0.1) + + def check_arch_blocking(self): + """检查是否需要破拱 (按需读取)""" + current_time = time.time() + + # 检查上料斗 (按需读取) + upper_weight = self.read_transmitter_data(1) + if upper_weight is not None: # 只有在成功读取重量时才检查堵塞 + if (abs(upper_weight - self.last_upper_weight) < 0.1) and \ + (current_time - self.last_weight_time) > 10: + print("上料斗可能堵塞,启动破拱") + self.control_relay(self.BREAK_ARCH_UPPER, 'open') + time.sleep(2) + self.control_relay(self.BREAK_ARCH_UPPER, 'close') + + # 检查下料斗 (按需读取) + lower_weight = self.read_transmitter_data(2) + if lower_weight is not None: # 只有在成功读取重量时才检查堵塞 + if (abs(lower_weight - self.last_lower_weight) < 0.1) and \ + (current_time - self.last_weight_time) > 10: + print("下料斗可能堵塞,启动破拱") + self.control_relay(self.BREAK_ARCH_LOWER, 'open') + time.sleep(2) + self.control_relay(self.BREAK_ARCH_LOWER, 'close') + + # 更新重量记录(只有在成功读取时才更新) + if upper_weight is not None: + self.last_upper_weight = upper_weight + if lower_weight is not None: + self.last_lower_weight = lower_weight + + if upper_weight is not None or lower_weight is not None: + self.last_weight_time = current_time + + def monitor_system(self): + """监控系统状态""" + while self._running: + try: + # 检查是否需要要料 + self.check_upper_material_request() + + # 检查破拱 (按需读取) + self.check_arch_blocking() + + time.sleep(1) + except Exception as e: + print(f"监控线程错误: {e}") + + def start(self): + """启动系统""" + if self._running: + print("系统已在运行") + return + print("启动控制系统") + self._running = True + self._monitor_thread = threading.Thread(target=self.monitor_system, daemon=True) + self._monitor_thread.start() + + def stop(self): + """停止系统""" + if not self._running: + print("系统未在运行") + return + print("停止控制系统") + self._running = False + if self._monitor_thread is not None: + self._monitor_thread.join() + print("控制系统已停止") + + +# 使用示例 +if __name__ == "__main__": + system = FeedingControlSystem(relay_host='192.168.0.18', relay_port=50000) + + # 启动系统监控 + system.start() + + # 模拟操作 + try: + # 检查是否需要要料 + system.check_upper_material_request() + + # 开始下料 + system.start_lower_feeding() + + # 运行一段时间 + time.sleep(60) + + except KeyboardInterrupt: + print("程序被中断") + finally: + system.stop()