0209整理

This commit is contained in:
2026-02-10 10:18:17 +08:00
parent d6ad01274a
commit 6e74eaf206
62 changed files with 838 additions and 9136 deletions

View File

@ -1,259 +0,0 @@
import serial
import time
import struct
class InovanceMD520:
def __init__(self, port='COM4', baudrate=9600, timeout=1):
"""
初始化汇川MD520变频器通信
:param port: 串口名称Windows为COMxLinux为/dev/ttyUSBx
:param baudrate: 波特率默认9600
:param timeout: 超时时间,秒
"""
self.port = port
self.baudrate = baudrate
self.timeout = timeout
self.ser = None
def connect(self):
"""连接串口"""
try:
self.ser = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=self.timeout
)
print(f"成功连接到串口 {self.port}")
return True
except serial.SerialException as e:
print(f"连接串口失败: {e}")
return False
def disconnect(self):
"""断开串口连接"""
if self.ser and self.ser.is_open:
self.ser.close()
print("串口连接已关闭")
def calculate_crc(self, data):
"""
计算Modbus CRC16校验码
:param data: 字节数据
:return: CRC校验码低位在前高位在后
"""
crc = 0xFFFF
for byte in data:
crc ^= byte
for _ in range(8):
if crc & 0x0001:
crc = (crc >> 1) ^ 0xA001
else:
crc = crc >> 1
return struct.pack('<H', crc) # 小端序,低位在前
def query_frequency(self, slave_addr=0x01):
"""
查询变频器运行频率
:param slave_addr: 从站地址默认1
:return: 频率值Hz如查询失败返回None
"""
if not self.ser or not self.ser.is_open:
print("串口未连接")
return None
# 构建查询指令读取运行频率地址1001H
# 01 03 10 01 00 01
cmd_data = bytes([
slave_addr, # 从站地址
0x03, # 功能码:读取保持寄存器
0x10, 0x01, # 寄存器地址1001H运行频率
0x00, 0x01 # 读取1个寄存器
])
# 计算CRC
crc = self.calculate_crc(cmd_data)
# 完整指令
full_cmd = cmd_data + crc
print(f"发送查询指令: {full_cmd.hex().upper()}")
try:
# 清空输入缓冲区
self.ser.reset_input_buffer()
# 发送指令
self.ser.write(full_cmd)
# 等待响应根据文档需要等待3.5个字节时间)
time.sleep(0.01) # 约10ms延迟
# 读取响应
# 正常响应格式:地址(1) + 功能码(1) + 字节数(1) + 数据(2) + CRC(2) = 7字节
response = self.ser.read(7)
if len(response) < 7:
print(f"响应数据长度不足: {len(response)} 字节")
return None
print(f"收到响应: {response.hex().upper()}")
# 验证响应
if response[0] != slave_addr:
print(f"从站地址不匹配: 期望{slave_addr:02X}, 收到{response[0]:02X}")
return None
if response[1] != 0x03:
print(f"功能码错误: 期望03, 收到{response[1]:02X}")
return None
if response[2] != 0x02:
print(f"数据长度错误: 期望02, 收到{response[2]:02X}")
return None
# 验证CRC
received_crc = response[-2:]
calculated_crc = self.calculate_crc(response[:-2])
if received_crc != calculated_crc:
print("CRC校验失败")
return None
# 解析频率数据(高字节在前)
frequency_raw = (response[3] << 8) | response[4]
frequency_hz = frequency_raw / 100.0 # 转换为Hz单位0.01Hz
print(f"原始频率值: {frequency_raw} (0x{frequency_raw:04X})")
print(f"实际频率: {frequency_hz:.2f} Hz")
return frequency_hz
except Exception as e:
print(f"通信错误: {e}")
return None
def read_register(self, slave_addr, register_addr, register_count=1):
"""
通用读取寄存器方法
:param slave_addr: 从站地址
:param register_addr: 寄存器地址16位
:param register_count: 读取的寄存器数量
:return: 读取的数据列表
"""
if not self.ser or not self.ser.is_open:
print("串口未连接")
return None
# 构建读取指令
cmd_data = bytes([
slave_addr, # 从站地址
0x03, # 功能码:读取保持寄存器
(register_addr >> 8) & 0xFF, # 寄存器地址高字节
register_addr & 0xFF, # 寄存器地址低字节
(register_count >> 8) & 0xFF, # 寄存器数量高字节
register_count & 0xFF # 寄存器数量低字节
])
# 计算CRC
crc = self.calculate_crc(cmd_data)
full_cmd = cmd_data + crc
print(f"发送读取指令: {full_cmd.hex().upper()}")
try:
self.ser.reset_input_buffer()
self.ser.write(full_cmd)
time.sleep(0.01)
# 计算预期响应长度
expected_length = 5 + 2 * register_count # 地址1 + 功能码1 + 字节数1 + 数据2*N + CRC2
response = self.ser.read(expected_length)
if len(response) < expected_length:
print(f"响应数据长度不足: {len(response)} 字节,期望 {expected_length} 字节")
return None
print(f"收到响应: {response.hex().upper()}")
# 验证CRC
received_crc = response[-2:]
calculated_crc = self.calculate_crc(response[:-2])
if received_crc != calculated_crc:
print("CRC校验失败")
return None
# 解析数据
data_length = response[2]
data_bytes = response[3:3 + data_length]
results = []
for i in range(0, len(data_bytes), 2):
value = (data_bytes[i] << 8) | data_bytes[i + 1]
results.append(value)
return results
except Exception as e:
print(f"通信错误: {e}")
return None
def main():
# 创建变频器对象
inverter = InovanceMD520(port='COM3', baudrate=9600)
# 连接串口
if not inverter.connect():
return
try:
while True:
print("\n" + "=" * 50)
print("汇川MD520变频器频率查询")
print("=" * 50)
# 查询运行频率
frequency = inverter.query_frequency(slave_addr=0x01)
if frequency is not None:
print(f"✅ 当前运行频率: {frequency:.2f} Hz")
else:
print("❌ 频率查询失败")
# 可选:读取其他监控参数
print("\n--- 其他监控参数 ---")
# 读取母线电压 (地址1002H)
voltage_data = inverter.read_register(0x01, 0x1002)
if voltage_data:
voltage = voltage_data[0] / 10.0 # 单位0.1V
print(f"母线电压: {voltage:.1f} V")
# 读取输出电压 (地址1003H)
output_voltage_data = inverter.read_register(0x01, 0x1003)
if output_voltage_data:
output_voltage = output_voltage_data[0] # 单位1V
print(f"输出电压: {output_voltage} V")
# 读取输出电流 (地址1004H)
current_data = inverter.read_register(0x01, 0x1004)
if current_data:
current = current_data[0] / 100.0 # 单位0.01A
print(f"输出电流: {current:.2f} A")
# 等待5秒后再次查询
print("\n等待5秒后继续查询...")
time.sleep(5)
except KeyboardInterrupt:
print("\n用户中断查询")
finally:
# 断开连接
inverter.disconnect()
if __name__ == "__main__":
main()

16
tests/close_all_relay.py Normal file
View File

@ -0,0 +1,16 @@
# main.py
import time
from config.settings import app_set_config
from hardware.relay import RelayController
from hardware.inverter import InverterController
from hardware.transmitter import TransmitterController
import time
import vision.visual_callback_1203 as angle_visual
def main():
replay_controller=RelayController()
replay_controller.close_all()
if __name__ == "__main__":
main()

39
tests/led-app.py Normal file
View File

@ -0,0 +1,39 @@
# main.py
import time
from hardware.relay import RelayController
from hardware.transmitter import TransmitterController
from service.mould_service import app_web_service
import time
# from LED_send.led_send import send_led_data
def main():
relay_c = RelayController()
transmitter_c = TransmitterController(relay_c)
while True:
led_info = app_web_service.get_pouring_led()
upper_weight=transmitter_c.read_data(1)
lower_weight=transmitter_c.read_data(2)
if led_info:
# 提取RingTypeCode从第3个字符开始到"-"之前的部分
if "-" in led_info.MouldCode:
# 找到"-"的位置
dash_index = led_info.MouldCode.index("-")
# 从索引2开始提取到"-"之前的部分
led_info.RingTypeCode = led_info.MouldCode[2:dash_index]
if led_info.MouldCode.find("F")>=0:
led_info.VibrationFrequency="4min/"+ led_info.VibrationFrequency
else:
led_info.VibrationFrequency="5min/"+ led_info.VibrationFrequency
led_info.UpperWeight=upper_weight
led_info.LowerWeight=lower_weight
# led_info.VibrationFrequency="5min/220hz"
send_led_data(led_info)
time.sleep(1)
if __name__ == "__main__":
main()

97
tests/pd_system.py Normal file
View File

@ -0,0 +1,97 @@
import sys
import os
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import threading
import time
import queue
from core.system_state import SystemState,FeedStatus
from hardware.relay import RelayController
from hardware.inverter import InverterController
from hardware.transmitter import TransmitterController
from config.ini_manager import ini_manager
from hardware.upper_plc import OmronFinsPollingService
from vision.visual_callback_dq import VisualCallback
from opc.opcua_client_feed import OpcuaClientFeed
from busisness.blls import ArtifactBll,PDRecordBll
from busisness.models import ArtifactInfoModel,PDRecordModel
class FeedingControlSystem:
def __init__(self):
print('FeedingControlSystem初始化')
self.pd_record_bll=PDRecordBll()
def send_pd_data(self):
"""
发送PD数据到OPC队列
"""
# 构建PD数据
_cur_mould='SHR2B1-13'
'F块L1块需要设置重量'
_weight=0
_pdrecords = self.pd_record_bll.get_last_pds(_cur_mould)
if _pdrecords:
_pdrecord=_pdrecords[0]
if _pdrecord.TaskID:
if _pdrecord.BlockNumber=='F':
print(f'{_pdrecord.MouldCode} F块不发送派单数据')
print(f'{_pdrecord.MouldCode} F块不发送派单数据')
print(f'{_pdrecord.MouldCode} F块不发送派单数据')
return True
_fact_volumn=self.get_fact_volumn(_pdrecord.MouldCode,_pdrecord.BlockNumber,_weight)
if _fact_volumn>0:
_pdrecord.FBetonVolume=_fact_volumn
print(f'{_pdrecord.MouldCode}-{_pdrecord.BlockNumber} 实际派单方量:{_fact_volumn},{_fact_volumn},{_fact_volumn}')
print(f'{_pdrecord.MouldCode}-{_pdrecord.BlockNumber} 实际派单方量:{_fact_volumn},{_fact_volumn},{_fact_volumn}')
print(f'{_pdrecord.MouldCode}-{_pdrecord.BlockNumber} 实际派单方量:{_fact_volumn},{_fact_volumn},{_fact_volumn}')
# self.state._pd_data=_pdrecord
return True
else:
return False
else:
print(f'{_pdrecord.MouldCode} 未获取到数据-(等待扫码)')
return False
else:
print(f'接口数据异常')
return False
def get_fact_volumn(self,mould_code:str,block_number:str='',_weight:float=0) -> float:
"""获取实际派单发量"""
_now_volumn=0
_pd_volumn=0
print(f'get_fact_volumn当前重量{_weight}')
_now_volumn=_weight/2500
if not block_number and '-' in mould_code:
block_number = mould_code.split('-')[0][-2:]
if block_number=='B1':
_pd_volumn=1.9
if _weight>750:
#留0.3
_pd_volumn=_pd_volumn-_now_volumn+0.3
if _pd_volumn<1:
_pd_volumn=1
if block_number in ['B2','B3']:
_pd_volumn=1.9
elif block_number=='L1':
_pd_volumn=2.0
elif block_number=='L2':
#多F块后面剩下的大约500那下完F后多的就可以减去
_pd_volumn=2
# if _weight>1300:
_pd_volumn=_pd_volumn-_now_volumn+0.5
if _pd_volumn>2.1:
_pd_volumn=2.1
elif _pd_volumn<0.8:
_pd_volumn=0.8
return round(_pd_volumn,1)
if __name__ == "__main__":
system = FeedingControlSystem()
system.send_pd_data()

View File

@ -1,54 +0,0 @@
# tests/test_feeding_process.py
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
from config.settings import app_set_config
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from feeding.process import FeedingProcess
class TestFeedingProcess(unittest.TestCase):
@patch('feeding.process.RelayController')
@patch('feeding.process.InverterController')
@patch('feeding.process.TransmitterController')
def test_initialization(self, mock_transmitter, mock_inverter, mock_relay):
"""测试初始化"""
# 创建模拟对象
mock_relay_instance = MagicMock()
mock_relay.return_value = mock_relay_instance
mock_inverter_instance = MagicMock()
mock_inverter.return_value = mock_inverter_instance
mock_transmitter_instance = MagicMock()
mock_transmitter.return_value = mock_transmitter_instance
# 创建系统实例
system = FeedingProcess()
# 验证初始化
self.assertIsNotNone(system)
self.assertFalse(system.state.running)
def test_set_feeding_parameters(self):
"""测试设置下料参数"""
with patch('feeding.process.RelayController'), \
patch('feeding.process.InverterController'), \
patch('feeding.process.TransmitterController'):
system = FeedingProcess()
#修改参数 app_set_config.single_batch_weight = 1500
app_set_config.min_required_weight = 300
app_set_config.target_vehicle_weight = 3000
self.assertEqual(app_set_config.target_vehicle_weight, 3000)
self.assertEqual(app_set_config.min_required_weight, 300)
self.assertEqual(app_set_config.single_batch_weight, 1500)
if __name__ == '__main__':
unittest.main()

View File

@ -1,161 +0,0 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from hardware.inverter import InverterController
from pymodbus.exceptions import ModbusException
class TestInverterController(unittest.TestCase):
def setUp(self):
"""测试前的准备工作"""
# 创建模拟的继电器控制器
self.mock_relay = MagicMock()
self.mock_relay.modbus_client = MagicMock()
# 创建变频器控制器实例
self.inverter = InverterController(relay_controller=self.mock_relay)
def test_inverter_initialization(self):
"""测试变频器控制器初始化"""
self.assertEqual(self.inverter.relay_controller, self.mock_relay)
self.assertEqual(self.inverter.max_frequency, 400.0)
# 检查配置
self.assertIn('slave_id', self.inverter.config)
self.assertIn('frequency_register', self.inverter.config)
self.assertIn('start_register', self.inverter.config)
self.assertIn('stop_register', self.inverter.config)
self.assertIn('start_command', self.inverter.config)
self.assertIn('stop_command', self.inverter.config)
def test_set_frequency_success(self):
"""测试设置频率成功"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 设置频率
result = self.inverter.set_frequency(200.0)
# 验证调用
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_called_once()
self.assertTrue(result)
def test_set_frequency_connection_failed(self):
"""测试设置频率时连接失败"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = False
# 设置频率
result = self.inverter.set_frequency(200.0)
# 验证结果
self.assertFalse(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_not_called()
def test_set_frequency_modbus_exception(self):
"""测试设置频率时Modbus异常"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.write_register.side_effect = ModbusException("Modbus错误")
# 设置频率
result = self.inverter.set_frequency(200.0)
# 验证结果
self.assertFalse(result)
def test_set_frequency_value_clamping(self):
"""测试频率值限制"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 测试超过最大值的频率
result = self.inverter.set_frequency(500.0) # 超过400.0最大值
# 验证调用
self.assertTrue(result)
self.mock_relay.modbus_client.write_register.assert_called_once()
def test_control_start_success(self):
"""测试启动变频器成功"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 启动变频器
result = self.inverter.control('start')
# 验证调用
self.assertTrue(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_called_once()
def test_control_stop_success(self):
"""测试停止变频器成功"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
mock_result = MagicMock()
self.mock_relay.modbus_client.write_register.return_value = mock_result
# 停止变频器
result = self.inverter.control('stop')
# 验证调用
self.assertTrue(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_called_once()
def test_control_invalid_action(self):
"""测试无效的控制动作"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 使用无效动作
result = self.inverter.control('invalid_action')
# 验证结果
self.assertFalse(result)
self.mock_relay.modbus_client.connect.assert_not_called()
self.mock_relay.modbus_client.write_register.assert_not_called()
def test_control_connection_failed(self):
"""测试控制时连接失败"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = False
# 启动变频器
result = self.inverter.control('start')
# 验证结果
self.assertFalse(result)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.write_register.assert_not_called()
def test_control_modbus_exception(self):
"""测试控制时Modbus异常"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.write_register.side_effect = ModbusException("Modbus错误")
# 启动变频器
result = self.inverter.control('start')
# 验证结果
self.assertFalse(result)
if __name__ == '__main__':
unittest.main()

View File

@ -1,165 +0,0 @@
import unittest
from unittest.mock import patch, MagicMock
import sys
import os
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from hardware.transmitter import TransmitterController
from pymodbus.exceptions import ModbusException
class TestTransmitterController(unittest.TestCase):
def setUp(self):
"""测试前的准备工作"""
# 创建模拟的继电器控制器
self.mock_relay = MagicMock()
self.mock_relay.modbus_client = MagicMock()
# 创建变送器控制器实例
self.transmitter = TransmitterController(relay_controller=self.mock_relay)
def test_transmitter_initialization(self):
"""测试变送器控制器初始化"""
self.assertEqual(self.transmitter.relay_controller, self.mock_relay)
# 检查配置
self.assertIn(1, self.transmitter.config) # 上料斗
self.assertIn(2, self.transmitter.config) # 下料斗
# 检查上料斗配置
upper_config = self.transmitter.config[1]
self.assertEqual(upper_config['slave_id'], 1)
self.assertEqual(upper_config['weight_register'], 0x01)
self.assertEqual(upper_config['register_count'], 2)
# 检查下料斗配置
lower_config = self.transmitter.config[2]
self.assertEqual(lower_config['slave_id'], 2)
self.assertEqual(lower_config['weight_register'], 0x01)
self.assertEqual(lower_config['register_count'], 2)
def test_read_data_valid_transmitter_id(self):
"""测试读取有效变送器ID的数据"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果
mock_result = MagicMock()
mock_result.registers = [0, 1500] # 表示1500kg
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取上料斗数据
weight = self.transmitter.read_data(1)
# 验证调用
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.read_holding_registers.assert_called_once()
self.assertEqual(weight, 1.5) # 1500kg / 1000 = 1.5kg
def test_read_data_invalid_transmitter_id(self):
"""测试读取无效变送器ID的数据"""
# 读取无效ID的数据
weight = self.transmitter.read_data(99)
# 验证结果
self.assertIsNone(weight)
self.mock_relay.modbus_client.connect.assert_not_called()
self.mock_relay.modbus_client.read_holding_registers.assert_not_called()
def test_read_data_connection_failed(self):
"""测试读取数据时连接失败"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = False
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
self.mock_relay.modbus_client.connect.assert_called_once()
self.mock_relay.modbus_client.read_holding_registers.assert_not_called()
def test_read_data_modbus_exception(self):
"""测试读取数据时Modbus异常"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.read_holding_registers.side_effect = ModbusException("Modbus错误")
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
def test_read_data_register_count_2(self):
"""测试读取2个寄存器的数据"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果 (高位寄存器=1低位寄存器=0xE848表示128000)
mock_result = MagicMock()
mock_result.registers = [1, 0xE848] # 1 * 65536 + 59464 = 125000
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果 (125000 / 1000 = 125kg)
self.assertEqual(weight, 125.0)
def test_read_data_register_count_1(self):
"""测试读取1个寄存器的数据模拟配置变更"""
# 临时修改配置以测试单寄存器读取
self.transmitter.config[1]['register_count'] = 1
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果
mock_result = MagicMock()
mock_result.registers = [1500] # 表示1500kg
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertEqual(weight, 1500.0) # 1500kg
def test_read_data_invalid_register_count(self):
"""测试不支持的寄存器数量"""
# 临时修改配置以测试无效寄存器数量
self.transmitter.config[1]['register_count'] = 3
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
# 模拟读取结果
mock_result = MagicMock()
mock_result.registers = [0, 0, 0]
self.mock_relay.modbus_client.read_holding_registers.return_value = mock_result
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
def test_read_data_result_is_exception(self):
"""测试读取结果为异常对象"""
# 配置模拟对象
self.mock_relay.modbus_client.connect.return_value = True
self.mock_relay.modbus_client.read_holding_registers.return_value = Exception("读取错误")
# 读取数据
weight = self.transmitter.read_data(1)
# 验证结果
self.assertIsNone(weight)
if __name__ == '__main__':
unittest.main()

View File

@ -1,65 +0,0 @@
import os
import sys
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
import unittest
from unittest.mock import MagicMock
from vision.camera import CameraController
# from core.vision import Vision
class TestVision(unittest.TestCase):
def setUp(self):
self.jj=2
# self.testclass2 = MagicMock()
# self.testclass = TestClass(self.testclass2)
def test_capture_frame(self):
# 测试capture_frame方法
camera=CameraController()
result = camera.capture_frame()
self.assertIsNone(result, msg="capture_frame方法测试失败")
camera.capture_frame_exec.assert_called_once()
# def test_first(self):
# 测试TestClass的add方法
# mock_testclass2 = MagicMock()
# # mock_testclass2.i = 1
# # mock_testclass2.j = 2
# test_class = TestClass(mock_testclass2)
# result = test_class.add()
# # 验证结果
# self.assertEqual(result, 3, msg="add方法测试失败")
# def test_second(self):
# 测试TestClass2的mock行为
# mock_testclass2 = MagicMock(spec=TestClass2)
# mock_testclass2.sub.return_value = 1
# result = mock_testclass2.sub()
# # 测试返回值
# self.assertEqual(result, 1, msg="sub方法测试失败")
# mock_testclass2.sub.assert_called_once()
class TestClass:
def __init__(self,testclass2):
self.testclass2 = testclass2
pass
def add(self):
return self.testclass2.i + self.testclass2.j
class TestClass2:
def __init__(self):
self.i = 1
self.j = 2
pass
def sub(self):
return self.j - self.i
if __name__ == '__main__':
unittest.main()

405
tests/upper_plc.py Normal file
View File

@ -0,0 +1,405 @@
import socket
import struct
import time
import threading
import logging
from enum import Enum
from typing import Optional, Callable
class FinsServiceStatus(Enum):
"""服务状态枚举"""
DISCONNECTED = "未连接"
CONNECTING = "连接中"
CONNECTED = "已连接"
POLLING = "轮询中"
ERROR = "错误"
STOPPED = "已停止"
class FinsPoolFullError(Exception):
"""连接池已满异常"""
pass
class OmronFinsPollingService:
"""欧姆龙FINS协议数据轮询服务严格三指令版本"""
def __init__(self, plc_ip: str, plc_port: int = 9600):
"""
初始化FINS服务
Args:
plc_ip: PLC IP地址
plc_port: PLC端口默认9600
"""
self.plc_ip = plc_ip
self.plc_port = plc_port
# 服务状态
self._status = FinsServiceStatus.DISCONNECTED
self._socket: Optional[socket.socket] = None
# 轮询控制
self._polling_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._polling_interval = 1.0
# 回调函数
self._data_callbacks = []
self._status_callbacks = []
# 最新数据
self._latest_data: Optional[int] = None
self._last_update_time: Optional[float] = None
# 配置日志
self._setup_logging()
def _setup_logging(self):
"""配置日志"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger("FinsPollingService")
def _update_status(self, new_status: FinsServiceStatus, message: str = ""):
"""更新状态并触发回调"""
old_status = self._status
if old_status != new_status:
self._status = new_status
self.logger.info(f"状态变更: {old_status.value} -> {new_status.value} {message}")
for callback in self._status_callbacks:
try:
callback(old_status, new_status, message)
except Exception as e:
self.logger.error(f"状态回调执行失败: {e}")
def _send_and_receive(self, data: bytes, expected_length: int = 1024) -> bytes:
"""发送数据并接收响应"""
if not self._socket:
raise ConnectionError("Socket未连接")
print("_send_and_receive发送:",data)
self._socket.send(data)
response = self._socket.recv(expected_length)
return response
def _check_handshake_response(self, response: bytes):
"""检查握手响应"""
if len(response) < 24:
raise ConnectionError("握手响应数据不完整")
# 检查响应头
if response[0:4] != b'FINS':
raise ConnectionError("无效的FINS响应头")
# 检查命令代码
command_code = struct.unpack('>I', response[8:12])[0]
if command_code != 0x01:
raise ConnectionError(f"握手命令代码错误: 0x{command_code:08X}")
# 检查错误代码
error_code = struct.unpack('>I', response[12:16])[0]
if error_code == 0x20:
raise FinsPoolFullError("FINS连接池已满")
elif error_code != 0x00:
raise ConnectionError(f"握手错误代码: 0x{error_code:08X}")
self.logger.info("握手成功")
def _check_query_response(self, response: bytes) -> int:
"""检查查询响应并返回数据"""
if len(response) < 30:
raise ConnectionError("查询响应数据不完整")
# 检查响应头
if response[0:4] != b'FINS':
raise ConnectionError("无效的FINS响应头")
# 检查命令代码
command_code = struct.unpack('>I', response[8:12])[0]
if command_code != 0x02:
raise ConnectionError(f"查询命令代码错误: 0x{command_code:08X}")
# 检查错误代码
error_code = struct.unpack('>I', response[12:16])[0]
if error_code != 0x00:
raise ConnectionError(f"查询错误代码: 0x{error_code:08X}")
# 提取数据字节(最后一个字节)
data_byte = response[-1]
return data_byte
def _check_logout_response(self, response: bytes):
"""检查注销响应"""
if len(response) < 16:
raise ConnectionError("注销响应数据不完整")
# 检查响应头
if response[0:4] != b'FINS':
raise ConnectionError("无效的FINS响应头")
# 检查命令代码
command_code = struct.unpack('>I', response[8:12])[0]
if command_code != 0x03:
raise ConnectionError(f"注销命令代码错误: 0x{command_code:08X}")
# 检查错误代码
error_code = struct.unpack('>I', response[12:16])[0]
if error_code != 0x02:
raise ConnectionError(f"注销错误代码: 0x{error_code:08X}")
self.logger.info("注销成功")
def connect(self) -> bool:
"""
连接到PLC并完成握手
Returns:
bool: 连接是否成功
"""
if self._status == FinsServiceStatus.CONNECTED:
self.logger.warning("已经连接到PLC")
return True
self._update_status(FinsServiceStatus.CONNECTING, "开始连接PLC")
try:
# 创建socket连接
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(10.0)
self._socket.connect((self.plc_ip, self.plc_port))
self.logger.info(f"TCP连接已建立: {self.plc_ip}:{self.plc_port}")
# 指令1: 握手
# 46 49 4E 53 00 00 00 0C 00 00 00 00 00 00 00 00 00 00 00 DC
handshake_cmd = bytes.fromhex("46 49 4E 53 00 00 00 0C 00 00 00 00 00 00 00 00 00 00 00 DC")
self.logger.debug("发送握手指令")
response = self._send_and_receive(handshake_cmd, 24)
# 检查握手响应
self._check_handshake_response(response)
self._update_status(FinsServiceStatus.CONNECTED, "握手成功")
return True
except FinsPoolFullError:
self._update_status(FinsServiceStatus.ERROR, "连接池已满")
raise
except Exception as e:
self._update_status(FinsServiceStatus.ERROR, f"连接失败: {e}")
if self._socket:
self._socket.close()
self._socket = None
raise
def query_data(self) -> Optional[int]:
"""
查询PLC数据
Returns:
int: 数据值(0-255)
"""
if self._status != FinsServiceStatus.POLLING:
raise ConnectionError("未连接到PLC无法查询")
try:
# 指令2: 查询
# 46 49 4E 53 00 00 00 1A 00 00 00 02 00 00 00 00 80 00 30 00 E9 00 00 DC 00 00 01 01 B0 00 00 00 00 01
query_cmd = bytes.fromhex("46 49 4E 53 00 00 00 1A 00 00 00 02 00 00 00 00 80 00 30 00 E9 00 00 DC 00 00 01 01 B0 00 00 00 00 01")
self.logger.debug("发送查询指令")
response = self._send_and_receive(query_cmd, 1024)
# 检查查询响应并提取数据
data_byte = self._check_query_response(response)
self._latest_data = data_byte
self._last_update_time = time.time()
# 触发数据回调
binary_str = bin(data_byte)
for callback in self._data_callbacks:
try:
callback(data_byte, binary_str)
except Exception as e:
self.logger.error(f"数据回调执行失败: {e}")
self.logger.debug(f"查询成功: 数据=0x{data_byte:02X} ({binary_str})")
return data_byte
except Exception as e:
self.logger.error(f"查询失败: {e}")
raise
def disconnect(self):
"""断开连接"""
if self._socket:
try:
# 指令3: 注销
# 46 49 4E 53 00 00 00 10 00 00 00 02 00 00 00 00 DC E9
logout_cmd = bytes.fromhex("46 49 4E 53 00 00 00 10 00 00 00 02 00 00 00 00 00 00 00 DC 00 00 00 E9")
self.logger.debug("发送注销指令")
response = self._send_and_receive(logout_cmd, 24)
# 检查注销响应
self._check_logout_response(response)
except Exception as e:
self.logger.error(f"注销过程中出错: {e}")
finally:
self._socket.close()
self._socket = None
self._update_status(FinsServiceStatus.DISCONNECTED, "连接已关闭")
def _polling_loop(self):
"""轮询循环"""
self.logger.info("数据轮询循环启动")
while not self._stop_event.is_set():
try:
if self._status == FinsServiceStatus.CONNECTED:
self._update_status(FinsServiceStatus.POLLING, "正在查询数据")
self.query_data()
self._update_status(FinsServiceStatus.CONNECTED, "查询完成")
else:
# 尝试重新连接
try:
self.connect()
except FinsPoolFullError:
self.logger.error("连接池已满,等待后重试...")
time.sleep(5)
except Exception as e:
self.logger.error(f"连接失败: {e}, 等待后重试...")
time.sleep(2)
except Exception as e:
self.logger.error(f"轮询查询失败: {e}")
self._update_status(FinsServiceStatus.ERROR, f"查询错误: {e}")
# 查询失败不影响连接状态保持CONNECTED状态
self._update_status(FinsServiceStatus.CONNECTED, "准备下一次查询")
time.sleep(1)
# 等待轮询间隔
self._stop_event.wait(self._polling_interval)
self.logger.info("数据轮询循环停止")
def start_polling(self, interval: float = 1.0):
"""
启动数据轮询服务
Args:
interval: 轮询间隔(秒)
"""
if self._polling_thread and self._polling_thread.is_alive():
self.logger.warning("轮询服务已在运行")
return
self._polling_interval = interval
self._stop_event.clear()
# 先建立连接
try:
self.connect()
except Exception as e:
self.logger.error(f"初始连接失败: {e}")
# 继续启动轮询,轮询循环会尝试重连
# 启动轮询线程
self._polling_thread = threading.Thread(target=self._polling_loop, daemon=True)
self._polling_thread.start()
self.logger.info(f"数据轮询服务已启动,间隔: {interval}")
def stop_polling(self):
"""停止数据轮询服务"""
self.logger.info("正在停止数据轮询服务...")
self._stop_event.set()
if self._polling_thread and self._polling_thread.is_alive():
self._polling_thread.join(timeout=5.0)
self.disconnect()
self._update_status(FinsServiceStatus.STOPPED, "服务已停止")
self.logger.info("数据轮询服务已停止")
# === 公共接口 ===
def get_service_status(self) -> dict:
"""获取服务状态"""
return {
'status': self._status.value,
'is_connected': self._status == FinsServiceStatus.CONNECTED,
'is_polling': self._polling_thread and self._polling_thread.is_alive(),
'latest_data': self._latest_data,
'latest_data_binary': bin(self._latest_data) if self._latest_data is not None else None,
'last_update_time': self._last_update_time,
'plc_ip': self.plc_ip,
'plc_port': self.plc_port,
'polling_interval': self._polling_interval
}
def get_latest_data(self) -> Optional[int]:
"""获取最新数据"""
return self._latest_data
def get_latest_data_binary(self) -> Optional[str]:
"""获取最新数据的二进制表示"""
return bin(self._latest_data) if self._latest_data is not None else None
# === 回调注册接口 ===
def register_data_callback(self, callback: Callable[[int, str], None]):
"""注册数据更新回调"""
self._data_callbacks.append(callback)
def register_status_callback(self, callback: Callable[[FinsServiceStatus, FinsServiceStatus, str], None]):
"""注册状态变化回调"""
self._status_callbacks.append(callback)
def __enter__(self):
"""上下文管理器入口"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器出口,确保资源释放"""
self.stop_polling()
# 使用示例和测试
if __name__ == "__main__":
def on_data_update(data: int, binary: str):
#4即将振捣室5振捣室 64即将搅拌楼 66到达搅拌楼
print(f"[数据回调] 数值: 0x{data:02X} | 十进制: {data:3d} | 二进制: {binary}")
def on_status_change(old_status: FinsServiceStatus, new_status: FinsServiceStatus, message: str):
print(f"[状态回调] {old_status.value} -> {new_status.value} : {message}")
# 创建服务实例
service = OmronFinsPollingService("192.168.250.233") # 替换为实际PLC IP
# 注册回调
service.register_data_callback(on_data_update)
service.register_status_callback(on_status_change)
print("欧姆龙FINS数据轮询服务")
print("=" * 50)
try:
# 启动轮询服务每2秒查询一次
service.start_polling(interval=2.0)
# 主循环,定期显示服务状态
counter = 0
while True:
status = service.get_service_status()
counter += 1
time.sleep(1)
except KeyboardInterrupt:
print("\n\n接收到Ctrl+C正在停止服务...")
finally:
# 确保服务正确停止
service.stop_polling()
print("服务已安全停止")

62
tests/weight-app.py Normal file
View File

@ -0,0 +1,62 @@
# 读取原始数据文件
import datetime
input_file = r"C:\Users\fujin\Desktop\fsdownload\weight.txt" # 原始数据文件路径
output_file = r"C:\Users\fujin\Desktop\fsdownload\filtered_B_records.txt" # 输出结果文件路径 (使用原始字符串避免Unicode转义错误)
# 存储包含"B"标记的记录
filtered_records = []
# 读取并过滤数据
with open(input_file, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in lines:
# 筛选包含"B,"的行避免误匹配其他含B的字符
if "B," in line or "F," in line:
# 移除等号
cleaned_line = line.strip().replace('=', '')
# 分割行数据
parts = cleaned_line.split(',')
if len(parts) >= 2:
try:
# 解析前两个日期假设格式为YYYY-MM-DD或YYYY/MM/DD
date1_str = parts[0].strip()
date2_str = parts[1].strip()
# 尝试不同的日期格式
date_formats = ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d', '%Y/%m/%d']
date1 = None
date2 = None
for fmt in date_formats:
try:
if date1 is None:
date1 = datetime.datetime.strptime(date1_str, fmt)
if date2 is None:
date2 = datetime.datetime.strptime(date2_str, fmt)
except ValueError:
continue
if date1 and date2:
# 计算时间差(秒数)
total_seconds = (date2 - date1).total_seconds()
# 转换为分:秒格式
minutes = int(total_seconds // 60)
seconds = int(total_seconds % 60)
time_diff = f"{minutes:02d}:{seconds:02d}"
# 在行末尾添加时间差
cleaned_line = f"{cleaned_line},{time_diff}"
except Exception as e:
# 如果日期解析失败,保持原始行不变
pass
filtered_records.append(cleaned_line)
# 保存过滤后的数据
with open(output_file, "w", encoding="utf-8") as f:
f.write("\n".join(filtered_records))
print(f"过滤完成!共提取到 {len(filtered_records)} 条含'B'标记的记录")
print(f"结果已保存至:{output_file}")