测试模块

This commit is contained in:
cdeyw
2025-09-29 09:19:30 +08:00
parent dbb66972b6
commit bd7820df76
7 changed files with 1609 additions and 21 deletions

1153
Fedding.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -9,6 +9,7 @@ class SystemState:
self.lower_feeding_stage = 0 # 0:未下料, 1:第一阶段, 2:第二阶段, 3:第三阶段, 4:等待模具车对齐
self.lower_feeding_cycle = 0 # 下料斗下料循环次数
self.upper_feeding_count = 0 # 上料斗已下料次数
self.upper_feeding_max = 2 #上料斗最大下料次数
# 重量相关
self.last_upper_weight = 0

View File

@ -4,17 +4,17 @@ from unittest.mock import patch, MagicMock
import sys
import os
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'src'))
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from src.control.feeding_controller import FeedingControlSystem
from feeding.process import FeedingProcess
class TestFeedingProcess(unittest.TestCase):
@patch('src.control.feeding_process.RelayController')
@patch('src.control.feeding_process.InverterController')
@patch('src.control.feeding_process.TransmitterController')
@patch('feeding.process.RelayController')
@patch('feeding.process.InverterController')
@patch('feeding.process.TransmitterController')
def test_initialization(self, mock_transmitter, mock_inverter, mock_relay):
"""测试初始化"""
# 创建模拟对象
@ -28,28 +28,27 @@ class TestFeedingProcess(unittest.TestCase):
mock_transmitter.return_value = mock_transmitter_instance
# 创建系统实例
system = FeedingControlSystem()
system = FeedingProcess()
# 验证初始化
self.assertIsNotNone(system)
self.assertFalse(system._running)
self.assertFalse(system.state.running)
def test_set_feeding_parameters(self):
"""测试设置下料参数"""
with patch('src.control.feeding_process.RelayController'), \
patch('src.control.feeding_process.InverterController'), \
patch('src.control.feeding_process.TransmitterController'):
system = FeedingControlSystem()
system.set_feeding_parameters(
target_vehicle_weight=3000,
upper_buffer_weight=300,
single_batch_weight=1500
)
with patch('feeding.process.RelayController'), \
patch('feeding.process.InverterController'), \
patch('feeding.process.TransmitterController'):
system = FeedingProcess()
# 通过settings修改参数
system.settings.single_batch_weight = 1500
system.settings.min_required_weight = 300
system.settings.target_vehicle_weight = 3000
self.assertEqual(system.target_vehicle_weight, 3000)
self.assertEqual(system.upper_buffer_weight, 300)
self.assertEqual(system.single_batch_weight, 1500)
self.assertEqual(system.settings.target_vehicle_weight, 3000)
self.assertEqual(system.settings.min_required_weight, 300)
self.assertEqual(system.settings.single_batch_weight, 1500)
if __name__ == '__main__':
unittest.main()
unittest.main()

View File

@ -0,0 +1,161 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from hardware.inverter import InverterController
from pymodbus.exceptions import ModbusException
class TestInverterController(unittest.TestCase):
def setUp(self):
"""测试前的准备工作"""
# 创建模拟的继电器控制器
self.mock_relay = MagicMock()
self.mock_relay.modbus_client = MagicMock()
# 创建变频器控制器实例
self.inverter = InverterController(relay_controller=self.mock_relay)
def test_inverter_initialization(self):
"""测试变频器控制器初始化"""
self.assertEqual(self.inverter.relay_controller, self.mock_relay)
self.assertEqual(self.inverter.max_frequency, 400.0)
# 检查配置
self.assertIn('slave_id', self.inverter.config)
self.assertIn('frequency_register', self.inverter.config)
self.assertIn('start_register', self.inverter.config)
self.assertIn('stop_register', self.inverter.config)
self.assertIn('start_command', self.inverter.config)
self.assertIn('stop_command', self.inverter.config)
def test_set_frequency_success(self):
"""测试设置频率成功"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 设置频率
result = self.inverter.set_frequency(200.0)
# 验证调用
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_called_once()
self.assertTrue(result)
def test_set_frequency_connection_failed(self):
"""测试设置频率时连接失败"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = False
# 设置频率
result = self.inverter.set_frequency(200.0)
# 验证结果
self.assertFalse(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_not_called()
def test_set_frequency_modbus_exception(self):
"""测试设置频率时Modbus异常"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.write_register.side_effect = ModbusException("Modbus错误")
# 设置频率
result = self.inverter.set_frequency(200.0)
# 验证结果
self.assertFalse(result)
def test_set_frequency_value_clamping(self):
"""测试频率值限制"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 测试超过最大值的频率
result = self.inverter.set_frequency(500.0) # 超过400.0最大值
# 验证调用
self.assertTrue(result)
self.mock_relay.modbus_client.write_register.assert_called_once()
def test_control_start_success(self):
"""测试启动变频器成功"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 启动变频器
result = self.inverter.control('start')
# 验证调用
self.assertTrue(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_called_once()
def test_control_stop_success(self):
"""测试停止变频器成功"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 停止变频器
result = self.inverter.control('stop')
# 验证调用
self.assertTrue(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_called_once()
def test_control_invalid_action(self):
"""测试无效的控制动作"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 使用无效动作
result = self.inverter.control('invalid_action')
# 验证结果
self.assertFalse(result)
self.mock_relay.modbus_client.connect.assert_not_called()
self.mock_relay.modbus_client.write_register.assert_not_called()
def test_control_connection_failed(self):
"""测试控制时连接失败"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = False
# 启动变频器
result = self.inverter.control('start')
# 验证结果
self.assertFalse(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_not_called()
def test_control_modbus_exception(self):
"""测试控制时Modbus异常"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.write_register.side_effect = ModbusException("Modbus错误")
# 启动变频器
result = self.inverter.control('start')
# 验证结果
self.assertFalse(result)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,109 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from hardware.relay import RelayController
class TestRelayController(unittest.TestCase):
def setUp(self):
"""测试前的准备工作"""
self.relay_host = '192.168.0.18'
self.relay_port = 50000
self.relay = RelayController(host=self.relay_host, port=self.relay_port)
def test_relay_initialization(self):
"""测试继电器控制器初始化"""
self.assertEqual(self.relay.host, self.relay_host)
self.assertEqual(self.relay.port, self.relay_port)
self.assertIsNotNone(self.relay.modbus_client)
# 检查设备映射
self.assertIn(RelayController.DOOR_UPPER, self.relay.device_bit_map)
self.assertIn(RelayController.DOOR_LOWER_1, self.relay.device_bit_map)
self.assertIn(RelayController.DOOR_LOWER_2, self.relay.device_bit_map)
self.assertIn(RelayController.BREAK_ARCH_UPPER, self.relay.device_bit_map)
self.assertIn(RelayController.BREAK_ARCH_LOWER, self.relay.device_bit_map)
@patch('hardware.relay.socket.socket')
def test_send_command_success(self, mock_socket_class):
"""测试发送命令成功"""
# 配置模拟对象
mock_socket_instance = MagicMock()
mock_socket_class.return_value.__enter__.return_value = mock_socket_instance
mock_socket_instance.recv.return_value = b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x00'
# 发送命令
response = self.relay.send_command(self.relay.read_status_command)
# 验证调用
mock_socket_instance.connect.assert_called_once_with((self.relay_host, self.relay_port))
mock_socket_instance.send.assert_called_once()
self.assertIsNotNone(response)
@patch('hardware.relay.socket.socket')
def test_send_command_exception(self, mock_socket_class):
"""测试发送命令异常处理"""
# 配置模拟对象以引发异常
mock_socket_class.return_value.__enter__.side_effect = Exception("网络错误")
# 发送命令
response = self.relay.send_command(self.relay.read_status_command)
# 验证结果
self.assertIsNone(response)
def test_control_valid_device_action(self):
"""测试控制有效设备和动作"""
with patch.object(self.relay, 'send_command') as mock_send:
# 测试打开上料斗门
self.relay.control(RelayController.DOOR_UPPER, 'open')
mock_send.assert_called_once()
def test_control_invalid_device(self):
"""测试控制无效设备"""
with patch.object(self.relay, 'send_command') as mock_send:
# 测试无效设备
self.relay.control('invalid_device', 'open')
mock_send.assert_not_called()
def test_control_invalid_action(self):
"""测试控制无效动作"""
with patch.object(self.relay, 'send_command') as mock_send:
# 测试无效动作
self.relay.control(RelayController.DOOR_UPPER, 'invalid_action')
mock_send.assert_not_called()
@patch.object(RelayController, 'send_command')
def test_get_status_success(self, mock_send_command):
"""测试获取状态成功"""
# 模拟返回的有效响应
mock_send_command.return_value = b'\x00\x00\x00\x00\x00\x06\x01\x01\x00\x1F'
status = self.relay.get_status()
# 验证返回的状态字典
self.assertIsInstance(status, dict)
self.assertIn(RelayController.DOOR_UPPER, status)
mock_send_command.assert_called_once_with(self.relay.read_status_command)
@patch.object(RelayController, 'send_command')
def test_get_status_failure(self, mock_send_command):
"""测试获取状态失败"""
# 模拟返回的无效响应
mock_send_command.return_value = None
status = self.relay.get_status()
# 验证返回空状态
self.assertIsInstance(status, dict)
self.assertEqual(len(status), 0)
if __name__ == '__main__':
unittest.main()

View File

@ -0,0 +1,165 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from hardware.transmitter import TransmitterController
from pymodbus.exceptions import ModbusException
class TestTransmitterController(unittest.TestCase):
def setUp(self):
"""测试前的准备工作"""
# 创建模拟的继电器控制器
self.mock_relay = MagicMock()
self.mock_relay.modbus_client = MagicMock()
# 创建变送器控制器实例
self.transmitter = TransmitterController(relay_controller=self.mock_relay)
def test_transmitter_initialization(self):
"""测试变送器控制器初始化"""
self.assertEqual(self.transmitter.relay_controller, self.mock_relay)
# 检查配置
self.assertIn(1, self.transmitter.config) # 上料斗
self.assertIn(2, self.transmitter.config) # 下料斗
# 检查上料斗配置
upper_config = self.transmitter.config[1]
self.assertEqual(upper_config['slave_id'], 1)
self.assertEqual(upper_config['weight_register'], 0x01)
self.assertEqual(upper_config['register_count'], 2)
# 检查下料斗配置
lower_config = self.transmitter.config[2]
self.assertEqual(lower_config['slave_id'], 2)
self.assertEqual(lower_config['weight_register'], 0x01)
self.assertEqual(lower_config['register_count'], 2)
def test_read_data_valid_transmitter_id(self):
"""测试读取有效变送器ID的数据"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果
mock_result = MagicMock()
mock_result.registers = [0, 1500] # 表示1500kg
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取上料斗数据
weight = self.transmitter.read_data(1)
# 验证调用
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.read_holding_registers.assert_called_once()
self.assertEqual(weight, 1.5) # 1500kg / 1000 = 1.5kg
def test_read_data_invalid_transmitter_id(self):
"""测试读取无效变送器ID的数据"""
# 读取无效ID的数据
weight = self.transmitter.read_data(99)
# 验证结果
self.assertIsNone(weight)
self.mock_relay.modbus_client.connect.assert_not_called()
self.mock_relay.modbus_client.read_holding_registers.assert_not_called()
def test_read_data_connection_failed(self):
"""测试读取数据时连接失败"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = False
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.read_holding_registers.assert_not_called()
def test_read_data_modbus_exception(self):
"""测试读取数据时Modbus异常"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.read_holding_registers.side_effect = ModbusException("Modbus错误")
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
def test_read_data_register_count_2(self):
"""测试读取2个寄存器的数据"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果 (高位寄存器=1低位寄存器=0xE848表示128000)
mock_result = MagicMock()
mock_result.registers = [1, 0xE848] # 1 * 65536 + 59464 = 125000
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果 (125000 / 1000 = 125kg)
self.assertEqual(weight, 125.0)
def test_read_data_register_count_1(self):
"""测试读取1个寄存器的数据模拟配置变更"""
# 临时修改配置以测试单寄存器读取
self.transmitter.config[1]['register_count'] = 1
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果
mock_result = MagicMock()
mock_result.registers = [1500] # 表示1500kg
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertEqual(weight, 1500.0) # 1500kg
def test_read_data_invalid_register_count(self):
"""测试不支持的寄存器数量"""
# 临时修改配置以测试无效寄存器数量
self.transmitter.config[1]['register_count'] = 3
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果
mock_result = MagicMock()
mock_result.registers = [0, 0, 0]
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
def test_read_data_result_is_exception(self):
"""测试读取结果为异常对象"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.read_holding_registers.return_value = Exception("读取错误")
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
if __name__ == '__main__':
unittest.main()