From 17df13c08e4c47fab721e6108e9546b4396073f0 Mon Sep 17 00:00:00 2001 From: pengqi Date: Thu, 29 Jan 2026 18:30:22 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=865=E4=B8=AA=E7=94=B5?= =?UTF-8?q?=E7=A3=81=E9=98=80+=E4=BF=AE=E6=94=B9=E4=BA=86=E4=BC=A0?= =?UTF-8?q?=E9=80=81=E5=B8=A6=E4=BC=A0=E6=84=9F=E5=99=A8=E7=9A=84=E9=80=BB?= =?UTF-8?q?=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- EMV/EMV.py | 142 ++++-- EMV/EMV_test.py | 148 ++++-- RK1106/readme.md | 8 +- .../conveyor_master_controller1.py | 2 +- .../conveyor_master_controller2.py | 8 +- .../conveyor_master_controller2_test.py | 431 ------------------ readme.md | 13 + robot/drop.py | 365 +++++++++++++++ robot/util_time.py | 63 ++- servo/servo.py | 2 +- 10 files changed, 670 insertions(+), 512 deletions(-) delete mode 100644 conveyor_controller/conveyor_master_controller2_test.py create mode 100644 robot/drop.py diff --git a/EMV/EMV.py b/EMV/EMV.py index 06a0e03..7cb607d 100644 --- a/EMV/EMV.py +++ b/EMV/EMV.py @@ -18,16 +18,21 @@ HOST = '192.168.5.18' PORT = 50000 # 控件命名映射 -SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 -SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 -SOLENOID_VALVE3 = 'solenoid_valve3' # 控制吸取设备的电磁阀3 +SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 +SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 +ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1 +ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2 +ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3 +ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4 +ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5 + # 传感器命名映射 -CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 -CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 -PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 -PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 -FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 +CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 +CONVEYOR2_SENSOR = 'conveyor2_sensor' # 传送带2的行程开关 +PRESS_SENSOR1 = 'press_sensor1' # 传送带1旁边的按压开关1 +PRESS_SENSOR2 = 'press_sensor2' # 传送带1旁边的按压开关2 +FIBER_SENSOR = 'fiber_sensor' # 传送带1旁边的光纤传感器 # 控件控制报文 valve_commands = { @@ -39,9 +44,25 @@ valve_commands = { 'open': '00000000000601050001FF00', 'close': '000000000006010500010000', }, - SOLENOID_VALVE3: { + ABSORB_SOLENOID_VALVE1: { 'open': '00000000000601050002FF00', 'close': '000000000006010500020000', + }, + ABSORB_SOLENOID_VALVE2: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500030000', + }, + ABSORB_SOLENOID_VALVE3: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500040000', + }, + ABSORB_SOLENOID_VALVE4: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500050000', + }, + ABSORB_SOLENOID_VALVE5: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500060000', } } @@ -55,31 +76,39 @@ read_status_command = { device_bit_map = { SOLENOID_VALVE1: 0, SOLENOID_VALVE2: 1, - SOLENOID_VALVE3: 2, + ABSORB_SOLENOID_VALVE1: 2, + ABSORB_SOLENOID_VALVE2: 3, + ABSORB_SOLENOID_VALVE3: 4, + ABSORB_SOLENOID_VALVE4: 5, + ABSORB_SOLENOID_VALVE5: 6 } device_name_map = { SOLENOID_VALVE1: "电磁阀1", SOLENOID_VALVE2: "电磁阀2", - SOLENOID_VALVE3: "电磁阀3", + ABSORB_SOLENOID_VALVE1: "吸取装置电磁阀1", + ABSORB_SOLENOID_VALVE2: "吸取装置电磁阀2", + ABSORB_SOLENOID_VALVE3: "吸取装置电磁阀3", + ABSORB_SOLENOID_VALVE4: "吸取装置电磁阀4", + ABSORB_SOLENOID_VALVE5: "吸取装置电磁阀5", } # 传感器对应位(从低到高) sensor_bit_map = { - CONVEYOR1_SENSOR: 0, - CONVEYOR2_SENSOR: 1, - PRESS_SENSOR1: 2, - PRESS_SENSOR2: 3, - FIBER_SENSOR: 6 + FIBER_SENSOR: 0, + PRESS_SENSOR1: 1, + PRESS_SENSOR2: 2, + CONVEYOR1_SENSOR: 4, + CONVEYOR2_SENSOR: 3, # 根据你继电器的配置,继续添加更多传感器 } sensor_name_map = { - CONVEYOR1_SENSOR: '传送带1开关', - CONVEYOR2_SENSOR: '传送带2开关', + FIBER_SENSOR: '光纤传感器', PRESS_SENSOR1: '按压开关1', PRESS_SENSOR2: '按压开关2', - FIBER_SENSOR: '光纤传感器' + CONVEYOR1_SENSOR: '传送带1开关', + CONVEYOR2_SENSOR: '传送带2开关' } # -------------全局事件------------- @@ -165,12 +194,18 @@ class RelayController: status = self.get_all_device_status(command_type) return status.get(device_name, None) - def open(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + def open(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False, + absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False, + absorb_solenoid_valve5=False): """ 根据状态决定是否执行开操作 :param solenoid_valve1:是否打开电磁阀1 :param solenoid_valve2:是否打开电磁阀2 - :param solenoid_valve3:是否打开电磁阀3 + :param absorb_solenoid_valve1:是否打开吸取装置电磁阀1 + :param absorb_solenoid_valve2:是否打开吸取装置电磁阀2 + :param absorb_solenoid_valve3:是否打开吸取装置电磁阀3 + :param absorb_solenoid_valve4:是否打开吸取装置电磁阀4 + :param absorb_solenoid_valve5:是否打开吸取装置电磁阀5 :return: """ global valve1_open_time, valve1_open_flag @@ -188,18 +223,45 @@ class RelayController: logging.info("打开电磁阀2") self.send_command(valve_commands[SOLENOID_VALVE2]['open']) - if solenoid_valve3 and not status.get(SOLENOID_VALVE3, False): - logging.info("打开电磁阀3") - self.send_command(valve_commands[SOLENOID_VALVE3]['open']) + if absorb_solenoid_valve1 and not status.get(ABSORB_SOLENOID_VALVE1, False): + logging.info("打开吸取装置电磁阀1") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve2 and not status.get(ABSORB_SOLENOID_VALVE2, False): + logging.info("打开吸取装置电磁阀2") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve3 and not status.get(ABSORB_SOLENOID_VALVE3, False): + logging.info("打开吸取装置电磁阀3") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve4 and not status.get(ABSORB_SOLENOID_VALVE4, False): + logging.info("打开吸取装置电磁阀4") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve5 and not status.get(ABSORB_SOLENOID_VALVE5, False): + logging.info("打开吸取装置电磁阀5") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['open']) time.sleep(1) # 实际测试需要考虑这个延时是否合适 # 根据状态决定是否执行关操作 - def close(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + def close(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False, + absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False, + absorb_solenoid_valve5=False): """ 根据状态决定是否执行关操作 :param solenoid_valve1:是否关闭电磁阀1 :param solenoid_valve2:是否关闭电磁阀2 - :param solenoid_valve3:是否关闭电磁阀3 + :param absorb_solenoid_valve1:是否关闭吸取电磁阀1 + :param absorb_solenoid_valve2:是否关闭吸取电磁阀2 + :param absorb_solenoid_valve3:是否关闭吸取电磁阀3 + :param absorb_solenoid_valve4:是否关闭吸取电磁阀4 + :param absorb_solenoid_valve5:是否关闭吸取电磁阀5 + :return: """ global valve1_open_flag @@ -217,9 +279,29 @@ class RelayController: logging.info("关闭电磁阀2") self.send_command(valve_commands[SOLENOID_VALVE2]['close']) - if solenoid_valve2 and status.get(SOLENOID_VALVE3, True): - logging.info("关闭电磁阀3") - self.send_command(valve_commands[SOLENOID_VALVE3]['close']) + if absorb_solenoid_valve1 and status.get(ABSORB_SOLENOID_VALVE1, True): + logging.info("关闭吸取装置电磁阀1") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve2 and status.get(ABSORB_SOLENOID_VALVE2, True): + logging.info("关闭吸取装置电磁阀2") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve3 and status.get(ABSORB_SOLENOID_VALVE3, True): + logging.info("关闭吸取装置电磁阀3") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve4 and status.get(ABSORB_SOLENOID_VALVE4, True): + logging.info("关闭吸取装置电磁阀4") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve5 and status.get(ABSORB_SOLENOID_VALVE5, True): + logging.info("关闭吸取装置电磁阀5") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['close']) time.sleep(1) # 实际测试需要考虑这个延时是否合适 def control_solenoid(self): @@ -339,7 +421,7 @@ def control_solenoid(): global_relay.open(solenoid_valve1=True, solenoid_valve2=True) logging.info("电磁阀1、2已打开") # 等待线条掉落(最多等待1秒) - timeout = 2.0 + timeout = 3.0 start_time = time.time() fiber_detected = False # 等待光纤传感器触发或超时 diff --git a/EMV/EMV_test.py b/EMV/EMV_test.py index 688b967..00ca765 100644 --- a/EMV/EMV_test.py +++ b/EMV/EMV_test.py @@ -3,7 +3,7 @@ ''' # @Time : 2026/1/8 16:52 # @Author : reenrr -# @File : test.py +# @File : EMV_test.py # @Desc : 网络继电器控制输入、输出设备测试程序 ''' import socket @@ -18,9 +18,13 @@ HOST = '192.168.5.18' PORT = 50000 # 控件命名映射 -SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 -SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 -SOLENOID_VALVE3 = 'solenoid_valve3' # 控制吸取设备的电磁阀3 +SOLENOID_VALVE1 = 'solenoid_valve1' # 控制排料机构NG时的电磁阀1 +SOLENOID_VALVE2 = 'solenoid_valve2' # 控制排料机构NG时的电磁阀2 +ABSORB_SOLENOID_VALVE1 = 'absorb_solenoid_valve1' # 控制吸取设备的电磁阀1 +ABSORB_SOLENOID_VALVE2 = 'absorb_solenoid_valve2' # 控制吸取设备的电磁阀2 +ABSORB_SOLENOID_VALVE3 = 'absorb_solenoid_valve3' # 控制吸取设备的电磁阀3 +ABSORB_SOLENOID_VALVE4 = 'absorb_solenoid_valve4' # 控制吸取设备的电磁阀4 +ABSORB_SOLENOID_VALVE5 = 'absorb_solenoid_valve5' # 控制吸取设备的电磁阀5 # 传感器命名映射 CONVEYOR1_SENSOR = 'conveyor1_sensor' # 传送带1的行程开关 @@ -39,9 +43,25 @@ valve_commands = { 'open': '00000000000601050001FF00', 'close': '000000000006010500010000', }, - SOLENOID_VALVE3: { + ABSORB_SOLENOID_VALVE1: { 'open': '00000000000601050002FF00', 'close': '000000000006010500020000', + }, + ABSORB_SOLENOID_VALVE2: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500030000', + }, + ABSORB_SOLENOID_VALVE3: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500040000', + }, + ABSORB_SOLENOID_VALVE4: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500050000', + }, + ABSORB_SOLENOID_VALVE5: { + 'open': '00000000000601050002FF00', + 'close': '000000000006010500060000', } } @@ -55,33 +75,43 @@ read_status_command = { device_bit_map = { SOLENOID_VALVE1: 0, SOLENOID_VALVE2: 1, - SOLENOID_VALVE3: 2, + ABSORB_SOLENOID_VALVE1: 2, + ABSORB_SOLENOID_VALVE2: 3, + ABSORB_SOLENOID_VALVE3: 4, + ABSORB_SOLENOID_VALVE4: 5, + ABSORB_SOLENOID_VALVE5: 6 + } device_name_map = { SOLENOID_VALVE1: "电磁阀1", SOLENOID_VALVE2: "电磁阀2", - SOLENOID_VALVE3: "电磁阀3", + ABSORB_SOLENOID_VALVE1: "吸取装置电磁阀1", + ABSORB_SOLENOID_VALVE2: "吸取装置电磁阀2", + ABSORB_SOLENOID_VALVE3: "吸取装置电磁阀3", + ABSORB_SOLENOID_VALVE4: "吸取装置电磁阀4", + ABSORB_SOLENOID_VALVE5: "吸取装置电磁阀5", } # 传感器对应位(从低到高) sensor_bit_map = { - CONVEYOR1_SENSOR: 0, - CONVEYOR2_SENSOR: 1, - PRESS_SENSOR1: 2, - PRESS_SENSOR2: 3, - FIBER_SENSOR: 6 + FIBER_SENSOR: 0, + PRESS_SENSOR1: 1, + PRESS_SENSOR2: 2, + CONVEYOR1_SENSOR: 4, + CONVEYOR2_SENSOR: 3, # 根据你继电器的配置,继续添加更多传感器 } sensor_name_map = { - CONVEYOR1_SENSOR: '传送带1开关', - CONVEYOR2_SENSOR: '传送带2开关', + FIBER_SENSOR: '光纤传感器', PRESS_SENSOR1: '按压开关1', PRESS_SENSOR2: '按压开关2', - FIBER_SENSOR: '光纤传感器' + CONVEYOR1_SENSOR: '传送带1开关', + CONVEYOR2_SENSOR: '传送带2开关' } + # -------------全局事件------------- press_sensors_triggered = Event() fiber_triggered = Event() # 光纤传感器触发事件 @@ -172,39 +202,75 @@ class RelayController: status = self.get_all_device_status(command_type) return status.get(device_name, None) - def open(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + def open(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False, + absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False, + absorb_solenoid_valve5=False): """ 根据状态决定是否执行开操作 :param solenoid_valve1:是否打开电磁阀1 :param solenoid_valve2:是否打开电磁阀2 - :param solenoid_valve3:是否打开电磁阀3 + :param absorb_solenoid_valve1:是否打开吸取装置电磁阀1 + :param absorb_solenoid_valve2:是否打开吸取装置电磁阀2 + :param absorb_solenoid_valve3:是否打开吸取装置电磁阀3 + :param absorb_solenoid_valve4:是否打开吸取装置电磁阀4 + :param absorb_solenoid_valve5:是否打开吸取装置电磁阀5 + :return: """ - global valve1_open_flag + global valve1_open_time, valve1_open_flag status = self.get_all_device_status() if solenoid_valve1 and not status.get(SOLENOID_VALVE1, False): - logging.info("打开电磁阀1") + print("打开电磁阀1") self.send_command(valve_commands[SOLENOID_VALVE1]['open']) # 记录电磁阀1打开时的时间戳和标志 with fiber_lock: + valve1_open_time = time.time() valve1_open_flag = True if solenoid_valve2 and not status.get(SOLENOID_VALVE2, False): - logging.info("打开电磁阀2") + print("打开电磁阀2") self.send_command(valve_commands[SOLENOID_VALVE2]['open']) - if solenoid_valve3 and not status.get(SOLENOID_VALVE3, False): - logging.info("打开电磁阀3") - self.send_command(valve_commands[SOLENOID_VALVE3]['open']) + if absorb_solenoid_valve1 and not status.get(ABSORB_SOLENOID_VALVE1, False): + print("打开吸取装置电磁阀1") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['open']) time.sleep(1) # 实际测试需要考虑这个延时是否合适 - def close(self, solenoid_valve1=False, solenoid_valve2=False, solenoid_valve3=False): + if absorb_solenoid_valve2 and not status.get(ABSORB_SOLENOID_VALVE2, False): + print("打开吸取装置电磁阀2") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve3 and not status.get(ABSORB_SOLENOID_VALVE3, False): + print("打开吸取装置电磁阀3") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve4 and not status.get(ABSORB_SOLENOID_VALVE4, False): + print("打开吸取装置电磁阀4") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve5 and not status.get(ABSORB_SOLENOID_VALVE5, False): + print("打开吸取装置电磁阀5") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['open']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + # 根据状态决定是否执行关操作 + def close(self, solenoid_valve1=False, solenoid_valve2=False, absorb_solenoid_valve1=False, + absorb_solenoid_valve2=False, absorb_solenoid_valve3=False, absorb_solenoid_valve4=False, + absorb_solenoid_valve5=False): """ 根据状态决定是否执行关操作 :param solenoid_valve1:是否关闭电磁阀1 :param solenoid_valve2:是否关闭电磁阀2 - :param solenoid_valve3:是否关闭电磁阀3 + :param absorb_solenoid_valve1:是否关闭吸取电磁阀1 + :param absorb_solenoid_valve2:是否关闭吸取电磁阀2 + :param absorb_solenoid_valve3:是否关闭吸取电磁阀3 + :param absorb_solenoid_valve4:是否关闭吸取电磁阀4 + :param absorb_solenoid_valve5:是否关闭吸取电磁阀5 + :return: """ global valve1_open_flag @@ -212,19 +278,39 @@ class RelayController: status = self.get_all_device_status() if solenoid_valve1 and status.get(SOLENOID_VALVE1, True): - logging.info("关闭电磁阀1") + print("关闭电磁阀1") self.send_command(valve_commands[SOLENOID_VALVE1]['close']) # 重置电磁阀1打开标志 with fiber_lock: valve1_open_flag = False if solenoid_valve2 and status.get(SOLENOID_VALVE2, True): - logging.info("关闭电磁阀2") + print("关闭电磁阀2") self.send_command(valve_commands[SOLENOID_VALVE2]['close']) - if solenoid_valve2 and status.get(SOLENOID_VALVE3, True): - logging.info("关闭电磁阀3") - self.send_command(valve_commands[SOLENOID_VALVE3]['close']) + if absorb_solenoid_valve1 and status.get(ABSORB_SOLENOID_VALVE1, True): + print("关闭吸取装置电磁阀1") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE1]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve2 and status.get(ABSORB_SOLENOID_VALVE2, True): + print("关闭吸取装置电磁阀2") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE2]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve3 and status.get(ABSORB_SOLENOID_VALVE3, True): + print("关闭吸取装置电磁阀3") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE3]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve4 and status.get(ABSORB_SOLENOID_VALVE4, True): + print("关闭吸取装置电磁阀4") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE4]['close']) + time.sleep(1) # 实际测试需要考虑这个延时是否合适 + + if absorb_solenoid_valve5 and status.get(ABSORB_SOLENOID_VALVE5, True): + print("关闭吸取装置电磁阀5") + self.send_command(valve_commands[ABSORB_SOLENOID_VALVE5]['close']) time.sleep(1) # 实际测试需要考虑这个延时是否合适 def fiber_sensor_monitor(self): @@ -355,7 +441,7 @@ def control_solenoid(): logging.info("电磁阀1、2已打开") # 等待线条掉落(最多等待2秒) - timeout = 2.0 + timeout = 3.0 start_time = time.time() fiber_detected = False # 等待光纤传感器触发或超时 diff --git a/RK1106/readme.md b/RK1106/readme.md index 30a7bb0..4a59aa5 100644 --- a/RK1106/readme.md +++ b/RK1106/readme.md @@ -2,4 +2,10 @@ 192.168.5.100,自启动设置了IP # 测试: -先使用stepper_motor_test1进行单独测试,看会不会掉步 \ No newline at end of file +先使用stepper_motor_test1进行单独测试,看会不会掉步 + +# 相关引脚 +![img.png](img.png) +GPIO32 --脉冲引脚 +GPIO33 --方向引脚 +剩下两个引脚为地线 \ No newline at end of file diff --git a/conveyor_controller/conveyor_master_controller1.py b/conveyor_controller/conveyor_master_controller1.py index b10ea76..82f36d2 100644 --- a/conveyor_controller/conveyor_master_controller1.py +++ b/conveyor_controller/conveyor_master_controller1.py @@ -228,7 +228,7 @@ class SingleMotorController: current_time = time.time() # 4. 检测到挡板且满足防抖条件 - if (not sensor_status) and (not self.sensor_locked) and \ + if sensor_status and (not self.sensor_locked) and \ (current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME: logging.info(f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板!立即响应") self.last_sensor_trigger = current_time diff --git a/conveyor_controller/conveyor_master_controller2.py b/conveyor_controller/conveyor_master_controller2.py index 8e63462..daf4196 100644 --- a/conveyor_controller/conveyor_master_controller2.py +++ b/conveyor_controller/conveyor_master_controller2.py @@ -204,8 +204,8 @@ class SingleMotorController: current_time = time.time() # 3. 检测到挡板且满足防抖条件 - if (not sensor_status) and (not self.sensor_locked) and \ - (current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME: + if sensor_status and (not self.sensor_locked) and \ + (current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME: print( f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板!立即停止") self.last_sensor_trigger = current_time @@ -298,7 +298,7 @@ class MasterConveyorController: except Exception as e: raise RuntimeError(f"485串口初始化失败: {e}") - def on_sensor_triggered(self): + def on_sensor_triggered(self, *args): """传感器触发回调(同步两个传送带停止)""" with self.sync_lock: # 检查是否两个传送带都已触发 @@ -319,6 +319,8 @@ class MasterConveyorController: self.conveyor1.start_motor_once() self.conveyor2.start_motor_once() + time.sleep(0.5) # 等待挡板过去,后续看还需不需要修改参数 + # 2. 启动传感器检测线程 self.conveyor1.start_sensor_thread(self) self.conveyor2.start_sensor_thread(self) diff --git a/conveyor_controller/conveyor_master_controller2_test.py b/conveyor_controller/conveyor_master_controller2_test.py deleted file mode 100644 index 111c06e..0000000 --- a/conveyor_controller/conveyor_master_controller2_test.py +++ /dev/null @@ -1,431 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -''' -# @Time : 2026/1/6 10:41 -# @Author : reenrr -# @File : conveyor_master_controller2_test.py -# @Desc : 传送带1和2协同控制 两个传送带同步走一个挡板的距离--测试代码 -''' -import logging -import threading -import time -from datetime import datetime -import serial -from EMV.EMV_test import RelayController - -logging.getLogger("pymodbus").setLevel(logging.CRITICAL) - -# --- 全局参数配置 --- -SERIAL_PORT = '/dev/ttyUSB0' # 单串口(485总线) -BAUD_RATE = 115200 -ACTION_DELAY = 5 -SLAVE_ID_1 = 1 # 传送带1轴地址 -SLAVE_ID_2 = 2 # 传送带2轴地址 -SERIAL_TIMEOUT = 0.05 # 串口超时50ms -SENSOR_DEBOUNCE_TIME = 0.2 # 传感器防抖时间(200ms) -DETECTION_INTERVAL = 0.05 # 传感器检测间隔(50ms) -WAIT_INIT_RELEASE_TIMEOUT = 3.0 # 等待初始挡板离开超时(10秒) - -# 全局串口锁(485总线必须单指令发送,避免冲突) -GLOBAL_SERIAL_LOCK = threading.Lock() - -class SingleMotorController: - """单个电机控制器(按轴地址自动匹配指令集,复用全局串口)""" - def __init__(self, slave_id, conveyor_id, action_delay, serial_obj, global_serial_lock): - self.slave_id = slave_id # 轴地址(1/2) - self.conveyor_id = conveyor_id # 传送带编号(1/2) - self.action_delay = action_delay - self.ser = serial_obj # 复用主控制器的串口实例 - self.global_serial_lock = global_serial_lock # 全局串口锁 - - # 初始化指令 - self._init_commands() - - # 核心状态标志 - self.status_thread_is_running = False # 传感器线程运行标志 - self.is_running = False # 电机是否已启动 - self.is_stopped = False # 电机是否已停止 - self.sensor_triggered = False # 传感器检测到挡板(无信号) - self.sensor_locked = False # 传感器锁定(防抖) - self.last_sensor_trigger = 0 # 上次触发时间(防抖) - self.init_release_done = False # 初始挡板是否已离开 - - # 线程对象 - self.monitor_thread = None - self.relay_controller = RelayController() - - # 锁 - self.sensor_lock = threading.Lock() - self.state_lock = threading.Lock() - - def _init_commands(self): - """根据轴地址初始化指令集""" - if self.slave_id == 1: - # --------传送带1(轴地址1)指令集-------- - self.start_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xC6]) # 启动指令 - self.stop_command = bytes([0x01, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xFA]) # 停止指令 - self.speed_commands = [ - bytes([0x01, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0xB3]), # 设定PR0为速度模式 - bytes([0x01, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xCB]), # 设定PR0速度 -30 - bytes([0x01, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x66]), # 设定PR0加速度 - bytes([0x01, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0xA6]), # 设定PR0减速度 - ] - elif self.slave_id == 2: - # --------传送带2(轴地址2)指令集-------- - self.start_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x10, 0x37, 0xF5]) # 启动指令 - self.stop_command = bytes([0x02, 0x06, 0x60, 0x02, 0x00, 0x40, 0x37, 0xC9]) # 停止指令 - self.speed_commands = [ - bytes([0x02, 0x06, 0x62, 0x00, 0x00, 0x02, 0x17, 0x80]), # 设定PR0为速度模式 - bytes([0x02, 0x06, 0x62, 0x03, 0xFF, 0xE2, 0xA7, 0xF8]), # 设定PR0速度 -30 - bytes([0x02, 0x06, 0x62, 0x04, 0x00, 0x32, 0x56, 0x55]), # 设定PR0加速度 - bytes([0x02, 0x06, 0x62, 0x05, 0x00, 0x32, 0x07, 0x95]), # 设定PR0减速度 - ] - else: - raise ValueError(f"不支持的轴地址:{self.slave_id},仅支持1/2") - - # 打印指令(调试用) - print(f"[传送带{self.conveyor_id}] 加载轴地址{self.slave_id}指令集:") - print(f" 启动指令: {self.start_command.hex(' ')}") - print(f" 停止指令: {self.stop_command.hex(' ')}") - - def send_command_list(self, command_list, delay=0.05): - """ - 批量发送指令列表(加全局锁,指令间延时避免总线冲突) - :param command_list: 待发送的指令列表 - :param delay: 指令间延时(默认0.05s) - """ - if not (self.ser and self.ser.is_open): - print(f"传送带{self.conveyor_id}串口未打开,跳过指令列表发送") - return - for idx, cmd in enumerate(command_list): - self.send_and_receive_raw(cmd, f"指令{idx + 1}") - time.sleep(delay) # 485总线指令间必须加延时 - - def clear_buffer(self): - """清空串口缓冲区(加全局锁)""" - if self.ser and self.ser.is_open: - with self.global_serial_lock: - self.ser.reset_input_buffer() - self.ser.reset_output_buffer() - while self.ser.in_waiting > 0: - self.ser.read(self.ser.in_waiting) - time.sleep(0.001) - - def send_and_receive_raw(self, command, description): - """ - 底层串口通信(核心:使用全局锁保证单指令发送) - :param command: 待发送的指令 - :param description: 指令描述 - :return: 接收到的响应 - """ - if not (self.ser and self.ser.is_open): - print(f"传送带{self.conveyor_id}串口未打开,跳过发送") - return None - - try: - self.clear_buffer() - # 全局锁:同一时间仅一个设备发送指令 - with self.global_serial_lock: - send_start = time.perf_counter() - self.ser.write(command) - self.ser.flush() - send_cost = (time.perf_counter() - send_start) * 1000 - - # 接收响应(仅对应轴地址的设备会回复) - recv_start = time.perf_counter() - response = b"" - while (time.perf_counter() - recv_start) < SERIAL_TIMEOUT: - if self.ser.in_waiting > 0: - chunk = self.ser.read(8) - response += chunk - if len(response) >= 8: - break - time.sleep(0.001) - recv_cost = (time.perf_counter() - recv_start) * 1000 - - # 处理响应 - valid_resp = response[:8] if len(response) >= 8 else response - print(f"[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S.%f')[:-3]}]") - print(f" 发送 {description}: {command.hex(' ')} (耗时: {send_cost:.2f}ms)") - print(f" 接收响应: {valid_resp.hex(' ')} (长度: {len(valid_resp)}, 耗时: {recv_cost:.2f}ms)") - - return valid_resp - - except Exception as e: - print(f"传送带{self.conveyor_id}通信异常 ({description}): {e}") - return None - - def start_motor(self): - """启动电机""" - with self.state_lock: - if self.is_running or self.is_stopped: - print(f"[传送带{self.conveyor_id}] 电机已启动/停止,无需重复启动") - return - - # 1. 发送速度模式配置指令 - print(f"[传送带{self.conveyor_id}] 配置速度模式...") - self.send_command_list(self.speed_commands[:4]) - - # 2. 发送启动指令 - print(f"[传送带{self.conveyor_id}] 发送启动指令") - self.send_and_receive_raw(self.start_command, "启动传送带") - - with self.state_lock: - self.is_running = True - - def stop_motor(self): - """停止电机""" - with self.state_lock: - if self.is_stopped: - return - self.is_stopped = True - self.is_running = False - - # 发送停止指令 - print(f"[传送带{self.conveyor_id}] 发送停止指令") - self.send_and_receive_raw(self.stop_command, "停止传送带") - - def wait_init_sensor_release(self): - """等待初始挡板离开传感器(传感器从无信号→有信号)""" - print(f"[传送带{self.conveyor_id}] 等待初始挡板离开传感器(超时{WAIT_INIT_RELEASE_TIMEOUT}秒)") - start_time = time.time() - - while time.time() - start_time < WAIT_INIT_RELEASE_TIMEOUT: - # 读取传感器状态:True=有信号(无遮挡),False=无信号(遮挡) - sensor_status = self.get_sensor_status() - - if sensor_status: # 挡板离开,传感器有信号 - print(f"[传送带{self.conveyor_id}] 初始挡板已离开传感器") - with self.state_lock: - self.init_release_done = True - return True - - time.sleep(DETECTION_INTERVAL) - - # 超时处理 - print(f"[传送带{self.conveyor_id}] 等待初始挡板离开超时!强制标记为已离开") - with self.state_lock: - self.init_release_done = True - return False - - def get_sensor_status(self): - """读取传感器状态""" - if self.conveyor_id == 1: - return self.relay_controller.get_device_status('conveyor1_sensor', 'sensors') - else: - return self.relay_controller.get_device_status('conveyor2_sensor', 'sensors') - - def monitor_conveyors_sensor_status(self, master_controller): - """ - 传感器检测线程(仅检测运行中挡板遮挡) - :param master_controller: 主控制器对象 - """ - print(f"[传送带{self.conveyor_id}] 传感器检测线程已启动(检测间隔:{DETECTION_INTERVAL}s)") - - while self.status_thread_is_running and not master_controller.global_stop_flag: - try: - with self.sensor_lock: - # 1. 全局停止/电机已停止时跳过检测 - if master_controller.global_stop_flag or self.is_stopped: - time.sleep(DETECTION_INTERVAL) - continue - - # 2. 等待初始挡板离开后再开始检测 - if not self.init_release_done: - time.sleep(DETECTION_INTERVAL) - continue - - # 3. 读取传感器状态 - sensor_status = self.get_sensor_status() - current_time = time.time() - - # 4. 检测到挡板遮挡(无信号)且满足防抖条件 → 触发停止 - if (not sensor_status) and (not self.sensor_locked) and \ - (current_time - self.last_sensor_trigger) > SENSOR_DEBOUNCE_TIME: - print( - f"\n[传送带{self.conveyor_id}] [{datetime.now().strftime('%H:%M:%S')}] 检测到挡板遮挡!准备停止") - self.last_sensor_trigger = current_time - self.sensor_triggered = True - self.sensor_locked = True - - # 立即停止当前传送带 - self.stop_motor() - - # 通知主控制器同步状态 - master_controller.on_sensor_triggered() - - time.sleep(DETECTION_INTERVAL) - - except Exception as e: - print(f"[传送带{self.conveyor_id}] 传感器检测异常: {e}") - time.sleep(0.1) - - print(f"[传送带{self.conveyor_id}] 传感器检测线程已停止") - - def start_sensor_thread(self, master_controller): - """ - 启动传感器检测线程 - :param master_controller: 主控制器对象 - """ - if self.monitor_thread and self.monitor_thread.is_alive(): - return - - self.status_thread_is_running = True - self.monitor_thread = threading.Thread( - target=self.monitor_conveyors_sensor_status, - args=(master_controller,), - daemon=True - ) - self.monitor_thread.start() - -class MasterConveyorController: - """主控制器 - 单串口管理两个传送带(485总线)""" - def __init__(self): - self.global_stop_flag = False - self.both_stopped = False # 两个传送带是否都已停止 - - # 1. 初始化单串口(485总线) - self.ser = None - self._init_serial() - - # 2. 初始化两个电机控制器(复用同一个串口) - self.conveyor1 = SingleMotorController( - slave_id=SLAVE_ID_1, - conveyor_id=1, - action_delay=ACTION_DELAY, - serial_obj=self.ser, - global_serial_lock=GLOBAL_SERIAL_LOCK - ) - self.conveyor2 = SingleMotorController( - slave_id=SLAVE_ID_2, - conveyor_id=2, - action_delay=ACTION_DELAY, - serial_obj=self.ser, - global_serial_lock=GLOBAL_SERIAL_LOCK - ) - - # 同步锁 - self.sync_lock = threading.Lock() - - def _init_serial(self): - """初始化485总线串口(主控制器统一管理)""" - try: - self.ser = serial.Serial( - port=SERIAL_PORT, - baudrate=BAUD_RATE, - bytesize=serial.EIGHTBITS, - parity=serial.PARITY_NONE, - stopbits=serial.STOPBITS_ONE, - timeout=SERIAL_TIMEOUT, - write_timeout=SERIAL_TIMEOUT, - xonxoff=False, - rtscts=False, - dsrdtr=False - ) - if self.ser.is_open: - print(f"成功初始化485总线串口 {SERIAL_PORT}(波特率{BAUD_RATE})") - # 初始化时清空缓冲区 - with GLOBAL_SERIAL_LOCK: - self.ser.reset_input_buffer() - self.ser.reset_output_buffer() - else: - raise RuntimeError("串口初始化失败:无法打开串口") - except Exception as e: - raise RuntimeError(f"485串口初始化失败: {e}") - - def on_sensor_triggered(self): - """传感器触发回调(同步两个传送带停止)""" - with self.sync_lock: - # 检查是否两个传送带都检测到挡板遮挡 - if self.conveyor1.sensor_triggered and self.conveyor2.sensor_triggered: - # 停止未停止的传送带 - if not self.conveyor1.is_stopped: - self.conveyor1.stop_motor() - if not self.conveyor2.is_stopped: - self.conveyor2.stop_motor() - - self.both_stopped = True - print(f"\n[主控制器] 两个传送带都已检测到挡板并停止!任务完成") - self.global_stop_flag = True # 标记全局停止 - - def start_all_conveyors(self): - """启动所有传送带(按需求顺序:启动电机→等待初始挡板离开→开启传感器检测)""" - print("\n=== 主控制器:启动所有传送带===") - - # 检查串口是否正常 - if not (self.ser and self.ser.is_open): - print("[主控制器] 485串口未打开,无法启动") - return False - - # 1. 第一步:同步启动两个电机 - print("[主控制器] 同步启动两个传送带(初始挡板遮挡传感器)") - self.conveyor1.start_motor() - self.conveyor2.start_motor() - - # 2. 第二步:等待两个传送带的初始挡板都离开传感器 - print("[主控制器] 等待两个传送带的初始挡板离开传感器...") - self.conveyor1.wait_init_sensor_release() - self.conveyor2.wait_init_sensor_release() - - # 3. 第三步:初始挡板都离开后,启动传感器检测线程 - print("[主控制器] 初始挡板已全部离开,启动传感器检测线程") - self.conveyor1.start_sensor_thread(self) - self.conveyor2.start_sensor_thread(self) - - print("[主控制器] 传送带启动流程完成,等待检测挡板遮挡...") - return True - - def stop_all_conveyors(self): - """停止所有传送带并关闭串口""" - print("\n=== 主控制器:停止所有传送带 ===") - self.global_stop_flag = True - - # 强制停止两个电机 - self.conveyor1.stop_motor() - self.conveyor2.stop_motor() - - # 关闭串口 - time.sleep(1) - with GLOBAL_SERIAL_LOCK: - if self.ser and self.ser.is_open: - self.ser.close() - print(f"485总线串口 {SERIAL_PORT} 已关闭") - - print("[主控制器] 所有传送带已停止并关闭串口") - - def run(self): - """主运行函数""" - try: - if not self.start_all_conveyors(): - return - - # 主线程等待:直到两个传送带都停止或手动退出 - while not self.global_stop_flag: - if self.both_stopped: - break - time.sleep(0.5) - - except KeyboardInterrupt: - print("\n\n[主控制器] 检测到退出指令,正在停止系统...") - except Exception as e: - print(f"\n[主控制器] 程序异常: {e}") - finally: - self.stop_all_conveyors() - print("\n=== 主控制器:程序执行完毕 ===") - -# -----------传送带对外接口-------------- -def conveyor_control(): - """主函数""" - try: - master = MasterConveyorController() - master.run() - except RuntimeError as e: - print(f"系统启动失败: {e}") - except Exception as e: - print(f"未知异常: {e}") - -# ------------测试接口-------------- -if __name__ == '__main__': - conveyor_control() - - diff --git a/readme.md b/readme.md index b2764b6..2a2b05a 100644 --- a/readme.md +++ b/readme.md @@ -16,4 +16,17 @@ sudo chmod 666 /dev/ttyACM0 # 过年之后调试,需要注意 ## 1.修改网络继电器的IP,将1网段改成5网段 +# 各设备IP地址 +工控机:192.168.5.50 +笔记本:192.168.5.105 +网络继电器:192.168.5.18 +RK1106:192.168.5.100 + +# 网络继电器传感器: +1、传感器1:DI5 +2、传感器2:DI4 +3、光纤传感器:DI1 +4、双按压传感器:DI2、3 +5、吸取装置电磁阀:DO4、5、6、7、8 + diff --git a/robot/drop.py b/robot/drop.py new file mode 100644 index 0000000..73f9d41 --- /dev/null +++ b/robot/drop.py @@ -0,0 +1,365 @@ +""" +# @Time : 2025/12/12 11:05 +# @Author : reenrr +# @File : drop.py +# @Desc : +""" + +from ast import mod +import configparser +import os +from typing import Optional +from Position import Real_Position +from FeedModel import LineModel, PositionModel +import Constant + + +class DropPositionManager: + """ + 负责读取/写入ini格式的码垛点位配置文件 + """ + def __init__(self, config_path=Constant.dropLine_set_file): + """ + :param config_path: 配置文件路径 + """ + self.config_path = config_path + self.config = configparser.ConfigParser() # 配置文件解析器实例,用于读取/写入ini文件 + self._load_config() + + # --- 本地缓存 --- + self._current_lineid: Optional[int] = None # 当前缓存的路径组ID(对应DropLineX) + self._current_point_id: Optional[int] = None # 当前缓存的投料点ID(对应DropPoints{point}) + self._current_path: list = [] # 当前缓存的完整路径点列表(PositionModel对象列表) + self._current_index: int = 0 # 当前路径中的索引(用于遍历路径点,返回下一个点位) + self._current_drop_section: dict = {} # 存储每个主配置节(DropLineX)对应的最大码垛点编号,用于新增点位时的编号自增 + + def _load_config(self): + """加载配置文件""" + if not os.path.exists(self.config_path): + raise FileNotFoundError(f"配置文件不存在: {self.config_path}") + self.config.read(self.config_path, encoding='utf-8') + + def get_next_drop_position(self, lineid: int, point: int) -> Optional[PositionModel]: + """ + 获取指定 lineid 和 point 的下一个路径点。 + + :param lineid: 路径组 ID(对应 DropLineX) + :param point: 投料点 ID(对应 DropPoints{point}) + :return: 下一个路径点,若无则返回 None + """ + # 如果 lineid 或 point 改变,重新加载路径 + if self._current_lineid != lineid or self._current_point_id != point: + self._load_point_path(lineid, point) + self._current_lineid = lineid + self._current_point_id = point + self._current_index = 0 + + # 如果索引未超过路径长度,返回当前索引的点 + if self._current_index < len(self._current_path): + next_point = self._current_path[self._current_index] + self._current_index += 1 + pos = next_point.get_position() + print(f"🎯 返回点: status={next_point.status}, linetype={next_point.lineType}, " + f"position=({pos.X:.3f}, {pos.Y:.3f}, {pos.Z:.3f})") + return next_point + + # 路径结束 + print("✅ 当前点集合路径已结束") + return None + + def _load_point_path(self, lineid: int, point_id: int): + """加载指定 lineid 和 point_id 的完整路径 + (点集合dropmidpoint、droppoint、resetpoint) + :param lineid: 路径组 ID(对应 DropLineX) + param point_id: 投料点 ID(对应 DropPoints{point}) + """ + self._current_path = [] + + # 检查是否存在 DropPoints{point_id} + drop_point_sec = f"DropPoints{point_id}" + if not self.config.has_section(drop_point_sec): + print(f"❌ 配置错误:不存在 {drop_point_sec}") + return + + if self.config.getint(drop_point_sec, "lineid") != lineid: + print(f"❌ {drop_point_sec} 不属于 lineid={lineid}") + return + + try: + drop_pos = self._read_position_from_section(drop_point_sec) + except Exception as e: + print(f"❌ 读取 {drop_point_sec} 失败: {e}") + return + + # 1. 加载 DropMidPoint{point_id}-*(按 level 升序) + mid_points = [] + # _mid_section_start=f"DropMidPoint{point_id}-" + _mid_section_start="DropMidPoint1-" + for sec in self.config.sections(): + if sec.startswith(_mid_section_start) and self.config.getint(sec, "lineid") == lineid: + try: + level = int(sec.split('-')[1]) + pos = self._read_position_from_section(sec) + mid_points.append((level, pos)) + break + except Exception as e: + print(f"❌ 解析 {sec} 失败: {e}") + + mid_points.sort(key=lambda x: x[0]) + + # 2. 加载 ResetPoint{point_id}-*(按 level 升序) + reset_points = [] + # _reset_section_start=f"ResetPoint{point_id}-" + _reset_section_start="ResetPoint1-" + for sec in self.config.sections(): + if sec.startswith(_reset_section_start) and self.config.getint(sec, "lineid") == lineid: + try: + level = int(sec.split('-')[1]) + pos = self._read_position_from_section(sec) + reset_points.append((level, pos)) + break + except Exception as e: + print(f"❌ 解析 {sec} 失败: {e}") + reset_points.sort(key=lambda x: x[0]) + + # 3. 组装路径 + # a. DropMidPoint + for _, pos in mid_points: + self._current_path.append(pos) + + # b. DropPoints + self._current_path.append(drop_pos) + + # c. ResetPoint + for _, pos in reset_points: + # model = PositionModel() + # model.init_position(pos) + # model.status = 10 # FReverse + # model.lineType = 4 + self._current_path.append(pos) + + print(f"✅ 已加载 DropLine{lineid} 中 DropPoints{point_id} 的路径,共 {len(self._current_path)} 个点") + + def _read_position_from_section(self, section: str) -> PositionModel: + """从配置文件的 section 中读取位置信息""" + model = PositionModel() + + pos = Real_Position() + pos.X = self.config.getfloat(section, "x") + pos.Y = self.config.getfloat(section, "y") + pos.Z = self.config.getfloat(section, "z") + pos.U = self.config.getfloat(section, "u") + pos.V = self.config.getfloat(section, "v") + pos.W = self.config.getfloat(section, "w") + model.init_position(pos) + + model.lineType=self.config.getint(section, "linetype") + model.status=self.config.getint(section, "status") + model.id=self.config.getint(section, "id") + model.order=self.config.getint(section, "order") + model.lineId=self.config.getint(section, "lineid") + # 保存section名称,用于排序 + model.section = section + + return model + + def _read_position_return_leveL(self,config_reader,section,lineid)->Optional[tuple]: + """ + 读取指定配置节的点位信息,并返回元组 + :param config_reader: 配置文件解析器 + :param section: 配置节名称 + :param lineid: 路径组ID + """ + try: + if config_reader.getint(section, "lineid") == lineid: + # 提取 - 后的数字 + level = int(section.split('-')[1]) + position_model = self._read_position_from_section(section) + return (level, position_model) + except Exception as e: + print(f"{__name__}:_read_position_return_leveL:{e}") + return None + + + + def load_path_points(self,lineid: int) ->Optional[LineModel]: + """根据lineid加载所有码垛的路径信息""" + #默认码垛的lineid从10开始 + _lineid=lineid+10 + line_model = LineModel(_lineid) + line_model.line_category = 2 + line_model.id = _lineid + + # 查找主表 DropLineX + main_section = f"{Constant.dropLine_set_section}{lineid}" + if self.config.has_section(main_section): + line_model.name = self.config.get(main_section, "name", fallback=f"路径{lineid}") + else: + return None + + # 先收集所有 DropPoints 开头的 section + drop_points_sections = [] + for sec in self.config.sections(): + if sec.startswith("DropPoints"): + try: + if self.config.getint(sec, "lineid") == lineid: + # 提取数字部分 + num_part = sec.replace("DropPoints", "") + if num_part.isdigit(): + drop_points_sections.append((sec, int(num_part))) + except Exception as e: + print(f"{__name__}:_load_path_points异常1:{e}") + continue + + # 按数字排序 DropPoints sections + drop_points_sections.sort(key=lambda x: x[1]) + # 使用main_section作为键存储当前最大的drop-section编号 + self._current_drop_section[main_section] = drop_points_sections[-1][1] if drop_points_sections else 1 + mid_points_other=[] #最后点没有匹配的droppoints + # 遍历每个 DropPoints,按照_load_point_path的逻辑加载对应的中间点和复位点 + for sec, point_num in drop_points_sections: + try: + # 1. 加载 DropMidPoint{point_num}-*(按 level 升序) + mid_points = [] + for s in self.config.sections(): + if s.startswith(f"DropMidPoint{point_num}-"): + _mid=self._read_position_return_leveL(self.config,s,lineid) + if _mid: + mid_points.append(_mid) + if point_num==(len(drop_points_sections)): + if s.startswith(f"DropMidPoint{point_num+1}-"): + _mid=self._read_position_return_leveL(self.config,s,lineid) + if _mid: + mid_points_other.append(_mid) + + + # 按level升序排序 + mid_points.sort(key=lambda x: x[0]) + # 添加中间点到路径 + for _, pos in mid_points: + line_model.positions.append(pos) + + # 加载 ResetPoint{point_num}-*(按 level 升序) + reset_points = [] + for s in self.config.sections(): + if s.startswith(f"ResetPoint{point_num}-"): + _reset=self._read_position_return_leveL(self.config,s,lineid) + if _reset: + reset_points.append(_reset) + # 按level升序排序 + reset_points.sort(key=lambda x: x[0]) + # 添加复位点到路径 + for _, pos in reset_points: + line_model.positions.append(pos) + #添加当前 DropPoints + try: + position_model = self._read_position_from_section(sec) + line_model.positions.append(position_model) + except Exception as e: + print(f"{__name__}:_load_path_points异常3:{e}") + except Exception as e: + print(f"{__name__}:_load_path_points异常4:{e}") + + if mid_points_other: + mid_points_other.sort(key=lambda x: x[0]) + for _, pos in mid_points_other: + line_model.positions.append(pos) + return line_model + + def save_path_points(self, line_model: LineModel): + """根据lineid保存所有码垛的路径信息""" + #默认码垛的lineid从10开始,保存的时候减一 + _lineid=line_model.id-10 + if _lineid<=0: + return + self.config.read(Constant.dropLine_set_file, encoding='utf-8') + # 查找主表 DropLineX + main_section = f"{Constant.dropLine_set_section}{_lineid}" + if not self.config.has_section(main_section): + self.config.add_section(main_section) + self.config.set(main_section, "name", line_model.name) + self.config.set(main_section, "id", str(_lineid)) + _current_reset_index=1 + _current_mid_index=1 + _current_drop_section_val=self._current_drop_section[main_section] + # 保存每个DropPoints + for i, pos in enumerate(line_model.positions): + if pos.lineId == _lineid or pos.lineId == line_model.id: + #最后一个扔包点 + if pos.section.startswith(f"DropMidPoint{_current_drop_section_val+1}"): + _current_mid_index=int(pos.section.split('-')[-1])+1 + if pos.section.startswith(f"ResetPoint{_current_drop_section_val}"): + _current_reset_index=int(pos.section.split('-')[-1])+1 + #新增的数据,如果是前点,需要获取后点的数据 + if pos.section.startswith("Position"): + pos.lineId = _lineid + + if pos.status is None: + continue + # FDropMid = 7 + elif pos.status==7: + #只有一个 + pos.section = f"DropMidPoint{_current_drop_section_val+1}-{_current_mid_index}" + _current_mid_index+=1 + + elif pos.status==9: + pos.section = f"DropPoints{_current_drop_section_val+1}" + _current_drop_section_val+=1 + _current_mid_index=1 + _current_reset_index=1 + + elif pos.status==10: + pos.section = f"ResetPoint{_current_drop_section_val}-{_current_reset_index}" + _current_reset_index+=1 + #保存数据 + pos.save_position_model(self.config) + + with open(Constant.dropLine_set_file, 'w', encoding='utf-8') as f: + self.config.write(f) + + def del_drop_point(self,section): + self.config.read(Constant.dropLine_set_file, encoding = 'utf-8') + self.config.remove_section(section) + with open(Constant.dropLine_set_file, 'w', encoding='utf-8') as f: + self.config.write(f) + +def _get_point_debug_info(manager, pos, model): + config = manager.config + for sec in config.sections(): + if sec.startswith("DropPoints"): + try: + x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z") + if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001: + point_id = config.getint(sec, "id") + return f"📌 DropPoints{point_id} | id={point_id}" + except: pass + + elif sec.startswith("DropMidPoint"): + try: + parts = sec.split('-') + if len(parts) != 2: continue + point_id = int(''.join(filter(str.isdigit, parts[0]))) + level = int(parts[1]) + x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z") + if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001: + return f"📍 DropMidPoint{point_id}-{level} | id={point_id}, level={level}" + except: pass + + elif sec.startswith("ResetPoint"): + try: + parts = sec.split('-') + if len(parts) != 2: continue + point_id = int(''.join(filter(str.isdigit, parts[0]))) + level = int(parts[1]) + x, y, z = config.getfloat(sec, "x"), config.getfloat(sec, "y"), config.getfloat(sec, "z") + if abs(x - pos.X) < 0.001 and abs(y - pos.Y) < 0.001 and abs(z - pos.Z) < 0.001: + return f"🔙 ResetPoint{point_id}-{level} | id={point_id}, level={level}" + except: pass + return "❓ 未知点位" + +# 测试 +if __name__ == "__main__": + # manager = DropPositionManager("drop.ini") + manager = DropPositionManager() + lineid = 1 + manager.load_path_points(1) diff --git a/robot/util_time.py b/robot/util_time.py index 2074adb..3bda200 100644 --- a/robot/util_time.py +++ b/robot/util_time.py @@ -16,7 +16,7 @@ class MyTimer: ts = time.time() return int(ts * 1000) # Convert to milliseconds -# CTon class equivalent in Python +# 通断延时器类 class CTon: def __init__(self): self.m_unET = 0 @@ -47,18 +47,34 @@ class CTon: self.m_bPause = False def SetPause(self, value): - if self.m_bIn: + """ + 设置延时暂停/恢复状态,实现延时过程的暂停与继续 + :param value: 暂停标志 True表示暂停,False表示恢复 + """ + if self.m_bIn: # 仅当输入有效时,运行设置暂停 self.m_bPause = value - if self.m_bPause: + if self.m_bPause: # 暂停时,记录当前已流逝的延时时间,用于恢复记时补偿 self.m_unPauseET = MyTimer.gMyGetTickCount() - self.m_unStartTime def SetOver(self, value): + """ + 手动设置延时完成标记 + """ self.m_bOver = value def GetStartTime(self): + """ + 获取延时启动时的时间戳 + """ return self.m_unStartTime def Q(self, value_i, value_pt): + """ + 触发延时逻辑,判断延时是否完成 + :param value_i: 当前输入信号,True表示输入有效,False表示无效 + :param value_pt: 延时时间,单位毫秒 + :return: True表示延时完成,False表示延时未完成 + """ self.m_bIn = value_i self.m_unPT = value_pt un_tick = MyTimer.gMyGetTickCount() @@ -78,36 +94,52 @@ class CTon: return self.m_bIn and (un_tick >= (self.m_unStartTime + self.m_unPT)) -# CClockPulse class equivalent in Python +# 时钟脉冲类 class CClockPulse: def __init__(self): - self.m_bFirstOut = True - self.m_bTonAOut = False - self.m_bTonBOut = False - self.m_cTonA = CTon() - self.m_cTonB = CTon() + self.m_bFirstOut = True # 首次输出标记 + self.m_bTonAOut = False # 延时器A的输出状态 + self.m_bTonBOut = False # 延时器B的输出状态 + self.m_cTonA = CTon() # 延时器A + self.m_cTonB = CTon() # 延时器B def Q(self, value_i, run_time, stop_time): - if self.m_bFirstOut: + """ + 生成周期性脉冲信号,返回当前脉冲输出状态 + :param value_i: 输入信号,True表示输入有效,False表示无效 + :param run_time: 运行时间,单位毫秒 + :param stop_time: 停止时间,单位毫秒 + :return: True表示脉冲输出有效,False表示脉冲输出无效 + """ + if self.m_bFirstOut: # 首次输出:按照[运行时间->停止时间]的顺序生成脉冲 self.m_bTonAOut = self.m_cTonA.Q(not self.m_bTonBOut and value_i, run_time) self.m_bTonBOut = self.m_cTonB.Q(self.m_bTonAOut and value_i, stop_time) return not self.m_bTonAOut and value_i - else: + else: # 非首次输出:循环交替[停止时间->运行时间],生成周期性脉冲 self.m_bTonAOut = self.m_cTonA.Q(not self.m_bTonBOut and value_i, stop_time) self.m_bTonBOut = self.m_cTonB.Q(self.m_bTonAOut and value_i, run_time) return self.m_bTonAOut and value_i -# CDelayOut class equivalent in Python +# 延时输出类,显示先等待,后输出,输出完成后自动复位的逻辑 class CDelayOut: def __init__(self): self.m_cOutTon = CTon() self.m_cmWaitTon = CTon() def Reset(self): + """ + 复位两个延时器的所有状态 + """ self.m_cOutTon.SetReset() self.m_cmWaitTon.SetReset() def Q(self, value_i, wait_time, out_time): + """ + 实现等待->输出->复位的闭环逻辑 + :param value_i: 输入信号,True表示输入有效,False表示无效 + :param wait_time: 等待时间,单位毫秒 + :param out_time: 输出时间,单位毫秒 + """ if self.m_cmWaitTon.Q(value_i, wait_time): if self.m_cOutTon.Q(True, out_time): self.m_cOutTon.SetReset() @@ -117,12 +149,15 @@ class CDelayOut: return True return False -# CRisOrFall class equivalent in Python +# 边沿检测类,上升沿或下降沿触发 class CRisOrFall: def __init__(self): self.m_bTemp = False def Q(self, value_i, ris_or_fall): + """ + 边沿检测方法:检测输入信号的上升沿或下降沿,返回检测结果 + """ result = False if value_i != self.m_bTemp: if ris_or_fall and value_i: # Rising edge @@ -132,7 +167,7 @@ class CRisOrFall: self.m_bTemp = value_i return result -# CTof class equivalent in Python +# 断通电延时器类 class CTof: def __init__(self): self.m_cDelayTon = CTon() diff --git a/servo/servo.py b/servo/servo.py index 30436fc..8046fa6 100644 --- a/servo/servo.py +++ b/servo/servo.py @@ -116,7 +116,7 @@ class ServoController: def cleanup(self): """清理资源""" self.disable_all_servos() - if self.port_handler.is_open(): + if self.port_handler.is_open: self.port_handler.closePort() logging.info("串口已关闭")