This commit is contained in:
2025-10-24 14:55:58 +08:00
parent afdefbaca6
commit 93d412b8fe
11 changed files with 7486 additions and 0 deletions

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,536 @@
from .crc16 import crc16
class command_hex:
"""
命令十六进制转换模块
提供将十六进制和RFID参数的转换
"""
#region RFID 工作模式(可设置主动模式推送信息)
@staticmethod
def build_set_mode_command(address, read_mode=0, mode_state=0, mem_inven=1,
first_adr=0, word_num=1, tag_filt_time=0)->bytes:
"""
构建设置读写器工作模式的指令命令0x35
工作模式中可设置主动模式下的起始地址,推送的存储区,以及标签输出间隔时间
参数:
address: 读写器地址0xXX格式
read_mode: 工作模式选择 (0=应答模式, 1=主动模式, 2=触发模式低电平, 3=触发模式高电平)
mode_state: 输出方式和其他设置
Bit1: 0=韦根输出, 1=RS232输出
Bit2: 0=开蜂鸣器, 1=关蜂鸣器
Bit3: 0=First_Adr为字地址, 1=First_Adr为字节地址
mem_inven: 存储区选择 (0=保留区, 1=EPC, 2=TID, 3=用户, 4=多张查询, 5=单张查询, 6=多张TID询查)
first_adr: 起始地址
word_num: 读取字数量 (1-32)
tag_filt_time: 主动模式下标签输出间隔时间 (0-255)*1s
返回:
bytes: 完整的命令字节串包含CRC校验
"""
# 验证参数
if word_num < 1 or word_num > 32:
raise ValueError("Word_Num必须在1-32范围内")
if mem_inven not in [0, 1, 2, 3, 4, 5, 6]:
raise ValueError("Mem_Inven参数无效")
# 构建命令帧
length = 0x0A # 命令长度
cmd = 0x35 # 命令代码
# 构建数据部分
data = [
read_mode & 0xFF,
mode_state & 0xFF,
mem_inven & 0xFF,
first_adr & 0xFF,
word_num & 0xFF,
tag_filt_time & 0xFF
]
# 构建命令帧
frame = [length, address & 0xFF, cmd] + data
# 计算CRC16
crc_value = crc16.cal(frame)
# 添加CRCLSB在前MSB在后
frame.append(crc_value & 0xFF) # LSB
frame.append((crc_value >> 8) & 0xFF) # MSB
return bytes(frame)
@staticmethod
def parse_set_mode_hex(response)->bool:
"""
解析设置工作模式命令的应答数据
参数:
response: 读写器返回的完整应答数据
返回:
bool: True表示设置成功False表示失败
"""
if len(response)<=1 or len(response) != response[0]+1: # 0x05 + 2字节CRC = 7字节
raise ValueError("应答数据长度不正确")
if response[2] != 0x35: # 确认是0x35命令的应答
raise ValueError("应答命令不匹配")
if not crc16.verify(response):
raise ValueError("CRC校验失败")
return response[3] == 0x00 # 0x00表示成功
@staticmethod
def build_read_mode_command(address)->bytes:
"""
读取读写器工作模式参数的指令
参数:
address: 读写器地址0xXX格式
返回:
bytes: 完整的命令字节串包含CRC校验
"""
# 构建命令帧
length = 0x04 # 命令长度
cmd = 0x36 # 命令代码
# 构建命令帧
frame = [length, address & 0xFF, cmd]
# 计算CRC16
crc_value = crc16.cal(frame)
# 添加CRCLSB在前MSB在后
frame.append(crc_value & 0xFF) # LSB
frame.append((crc_value >> 8) & 0xFF) # MSB
return bytes(frame)
@staticmethod
def parse_read_mode_hex(response)->dict:
"""
解析读取工作模式参数命令的应答数据
参数:
response: 读写器返回的完整应答数据包含CRC校验
返回:
dict: 解析后的参数字典
"""
if len(response)<=1 or len(response) != response[0]+1: # 0x11 = 19字节
raise ValueError("应答数据长度不正确")
if response[2] != 0x36: # 确认是0x36命令的应答
raise ValueError("应答命令不匹配")
if response[3] != 0x00: # 检查状态是否成功
raise ValueError(f"命令执行失败,状态码: {response[3]:02X}")
if not crc16.verify(response):
raise ValueError("CRC校验失败")
# 解析参数
params = {
# 韦根参数
'wg_mode': response[4],
'wg_data_interval': response[5],
'wg_pulse_width': response[6],
'wg_pulse_interval': response[7],
# 工作模式参数
'read_mode': response[8],
'mode_state': response[9],
'mem_inven': response[10],
'first_adr': response[11],
'word_num': response[12],
'tag_filt_time': response[13],
'reserved1': response[14],
'reserved2': response[15]
}
return params
#endregion RFID 工作模式
#region RFID 功率设置(距离远近)
@staticmethod
def build_set_power_command(address, power_value)->bytes:
"""
构建设置读写器功率的命令命令0x2F
参数:
address: 读写器地址0xXX格式
power_value: 功率值十进制范围0~26单位dBm
返回:
bytes: 完整的命令字节串包含CRC校验
"""
# 验证功率参数范围
if power_value < 0 or power_value > 26:
raise ValueError("功率值必须在0~26范围内")
# 构建命令帧
length = 0x05 # 命令长度
cmd = 0x2F # 命令代码
# 构建命令帧
frame = [length, address & 0xFF, cmd, power_value & 0xFF]
# 计算CRC16
crc_value = crc16.cal(frame)
# 添加CRCLSB在前MSB在后
frame.append(crc_value & 0xFF) # LSB
frame.append((crc_value >> 8) & 0xFF) # MSB
return bytes(frame)
@staticmethod
def parse_set_power_hex(response)->bool:
"""
解析设置功率命令的应答数据
参数:
response: 读写器返回的完整应答数据
返回:
bool: True表示设置成功False表示失败
"""
if len(response)<=1 or len(response) != response[0]+1: # 0x05 + 2字节CRC = 7字节
raise ValueError("应答数据长度不正确")
if response[2] != 0x2F: # 确认是0x2F命令的应答
raise ValueError("应答命令不匹配")
if not crc16.verify(response):
raise ValueError("CRC校验失败")
return response[3] == 0x00 # 0x00表示成功
#endregion RFID 功率设置
#region 接触模式中的读卡间隔设置
@staticmethod
def build_trigger_delay_command(address, trigger_time):
"""
构建主动模式触发延时设置命令命令0x3B
参数:
address: 读写器地址0xXX格式
trigger_time: 触发有效时间范围0~254单位秒255表示获取当前设置
返回:
bytes: 完整的命令字节串包含CRC校验
"""
# 验证触发时间参数范围
if trigger_time < 0 or trigger_time > 255:
raise ValueError("触发时间必须在0~255范围内")
# 构建命令帧
length = 0x05 # 命令长度
cmd = 0x3B # 命令代码
# 构建命令帧
frame = [length, address & 0xFF, cmd, trigger_time & 0xFF]
# 计算CRC16
crc_value = crc16.cal(frame)
# 添加CRCLSB在前MSB在后
frame.append(crc_value & 0xFF) # LSB
frame.append((crc_value >> 8) & 0xFF) # MSB
return bytes(frame)
@staticmethod
def parse_trigger_delay_hex(response):
"""
解析触发延时设置命令的应答数据
参数:
response: 读写器返回的完整应答数据
返回:
int: 当前触发有效时间值0~254表示秒数
"""
if len(response)<=1 or len(response) != response[0]+1: # 0x06 + 2字节CRC = 8字节
raise ValueError("应答数据长度不正确")
if response[2] != 0x3B: # 确认是0x3B命令的应答
raise ValueError("应答命令不匹配")
if not crc16.verify(response):
raise ValueError("CRC校验失败")
if response[3] != 0x00: # 检查状态是否成功
raise ValueError(f"命令执行失败,状态码: {response[3]:02X}")
# 返回触发时间值
return response[4]
#endregion 接触模式中的读卡间隔设置
#region 接触模式中的读卡间隔设置
@staticmethod
def build_set_readinterval_command(address, delay_time)->bytes:
"""
构建设置主动模式读卡间隔命令命令0x42
参数:
address: 读写器地址0xXX格式
delay_time: 读卡间隔时间范围0~254单位毫秒255表示读取参数
返回:
bytes: 完整的命令字节串包含CRC校验
"""
# 验证间隔时间参数范围
if delay_time < 0 or delay_time > 255:
raise ValueError("间隔时间必须在0~255范围内")
# 构建命令帧
length = 0x05 # 命令长度
cmd = 0x42 # 命令代码
# 构建命令帧
frame = [length, address & 0xFF, cmd, delay_time & 0xFF]
# 计算CRC16
crc_value = crc16.cal(frame)
# 添加CRCLSB在前MSB在后
frame.append(crc_value & 0xFF) # LSB
frame.append((crc_value >> 8) & 0xFF) # MSB
return bytes(frame)
@staticmethod
def parse_set_readinterval_hex(response)->bool:
"""
解析设置主动模式读卡间隔命令的应答数据
参数:
response: 读写器返回的完整应答数据
返回:
bool: True表示设置成功False表示失败
"""
if len(response)<=1 or len(response) != response[0]+1: # 0x05 + 2字节CRC = 7字节
raise ValueError("应答数据长度不正确")
if response[2] != 0x42: # 确认是0x42命令的应答
raise ValueError("应答命令不匹配")
if not crc16.verify(response):
raise ValueError("CRC校验失败")
return response[3] == 0x00 # 0x00表示成功
#endregion 接触模式中的读卡间隔设置
#region 获取读写器信息
@staticmethod
def build_reader_info_command(address):
"""
构建读取读写器信息命令命令0x21
参数:
address: 读写器地址0xXX格式
返回:
bytes: 完整的命令字节串包含CRC校验
"""
# 构建命令帧
length = 0x04 # 命令长度
cmd = 0x21 # 命令代码
# 构建命令帧
frame = [length, address & 0xFF, cmd]
# 计算CRC16
crc_value = crc16.cal(frame)
# 添加CRCLSB在前MSB在后
frame.append(crc_value & 0xFF) # LSB
frame.append((crc_value >> 8) & 0xFF) # MSB
return bytes(frame)
@staticmethod
def parse_reader_info_hex(response):
"""
解析读取读写器信息命令的应答数据
参数:
response: 读写器返回的完整应答数据包含CRC校验
返回:
dict: 解析后的读写器信息字典
"""
# 根据文档和实际测试,有效应答应至少包含基本信息部分
if len(response)<=1 or len(response) != response[0]+1: # 至少需要包含基本部分
raise ValueError(f"应答数据长度不正确预期至少13字节实际收到{len(response)}字节")
if response[2] != 0x21: # 确认是0x21命令的应答
raise ValueError("应答命令不匹配")
if response[3] != 0x00: # 检查状态是否成功
raise ValueError(f"命令执行失败,状态码: {response[3]:02X}")
if not crc16.verify(response):
raise ValueError("CRC校验失败")
# 解析版本号2字节
version_main = response[4] # 主版本号
version_sub = response[5] # 子版本号
version_str = f"{version_main}.{version_sub}"
# 解析读写器类型
reader_type = response[6]
reader_type_desc = "UHFReader2001" if reader_type == 0x89 else f"未知类型(0x{reader_type:02X})"
# 解析协议支持信息
protocol_support = response[7]
support_6c = bool(protocol_support & 0x02) # Bit1
support_6b = bool(protocol_support & 0x01) # Bit0
# 解析频率信息
max_freq = response[8]
min_freq = response[9]
# 解析频段设置
max_freq_band_bits = (max_freq >> 6) & 0x03 # Bit7-Bit6
min_freq_band_bits = (min_freq >> 6) & 0x03 # Bit7-Bit6
# 根据频段设置位确定频段
freq_band = "未知"
if max_freq_band_bits == 0x00:
if min_freq_band_bits == 0x00:
freq_band = "保留"
elif min_freq_band_bits == 0x01:
freq_band = "Chinese band2"
elif min_freq_band_bits == 0x02:
freq_band = "US band"
elif min_freq_band_bits == 0x03:
freq_band = "Korean band"
elif max_freq_band_bits == 0x01 and min_freq_band_bits == 0x00:
freq_band = "EU band"
# 解析实际频率值(忽略频段设置位)
actual_max_freq = max_freq & 0x3F # 清除Bit7-Bit6
actual_min_freq = min_freq & 0x3F # 清除Bit7-Bit6
# 解析功率
power = response[10]
# 解析询查时间(已作废)
scntm = response[11]
# 构建返回数据
reader_info = {
'version': version_str,
'version_main': version_main,
'version_sub': version_sub,
'reader_type': reader_type,
'reader_type_desc': reader_type_desc,
'protocol_support': protocol_support,
'support_18000_6c': support_6c,
'support_18000_6b': support_6b,
'max_freq_raw': max_freq,
'min_freq_raw': min_freq,
'max_freq_value': actual_max_freq,
'min_freq_value': actual_min_freq,
'freq_band': freq_band,
'power': power,
'scntm': scntm,
'reserved1': response[11],
'reserved2': response[12]
}
return reader_info
#endregion 获取读写器信息
#region 解析18000-6C协议,用户区数据
@staticmethod
def parse_user_data_hex(response)->str:
"""
解析18000-6C协议的读取数据应答命令0xEE
当读写器支持18000-6C协议Mem_Inven为0x00~0x03时使用
数据格式:
Len Adr reCmd Status Data[] CRC-16
0xXX 0xXX 0xEE 0x00 Word1Word2,… LSB MSB
参数:
response: 读写器返回的完整应答数据包含CRC校验
返回:
- loc_string: 将数据转换为ASCII字符串
"""
# 寻找连续三个0x2C的情况
for idx in range(len(response) - 2):
if response[idx] == 0x2C and response[idx+1] == 0x2C and response[idx+2] == 0x2C:
response=response[:idx]
# 验证响应长度
if len(response) <= 1 or len(response) != response[0] + 1:
raise ValueError("应答数据长度不正确")
# 验证是否为0xEE命令的应答
if response[2] != 0xEE:
raise ValueError("应答命令不匹配预期0xEE")
# 检查执行状态
status = response[3]
if status != 0x00:
raise ValueError(f"命令执行失败,状态码: {status:02X}")
# 验证CRC校验
if not crc16.verify(response):
raise ValueError("CRC校验失败")
# 提取Data[]部分不包含CRC字节
# 数据部分从第4个字节开始索引3直到倒数第2个字节不含CRC
data_part = response[4:-2]
# 过滤掉null字符(0x00)这些字符在ASCII解码后通常不可见但会占用字符串长度
data_part = bytes([b for b in data_part if b != 0x00])
loc_string = ''
if data_part:
try:
loc_string = data_part.decode('ascii')
except UnicodeDecodeError:
print(f"无法将数据转换为ASCII字符串: {data_part}")
return loc_string
#endregion 解析18000-6C协议,用户区数据
if __name__ == "__main__":
print("\n测试解析18000-6C协议数据响应示例:")
# 假设的18000-6C数据响应长度为0x0C包含4个字节的Data部分2个字
# 示例数据Len=0x0C, Adr=0x00, reCmd=0xEE, Status=0x00, Data=[0x12,0x34,0x56,0x78], CRC=计算值
# 构建测试数据
test_data_6c = [
0x21, # 长度
0x00, # 地址
0xEE, # 命令
0x00, # 状态成功
0x53, 0x48, 0x52, 0x32, 0x42, 0x32, 0x2D, 0x31, 0x32, 0x2C, 0x42, 0x32, 0x2C, 0x36, 0x36, 0x30, 0x30, 0x2A, 0x31, 0x35, 0x30, 0x30, 0x2C, 0x31, 0x2E, 0x39, 0x31, 0x30
]
# 计算CRC并添加到数据末尾
crc_value = crc16.cal(test_data_6c)
test_data_6c.append(crc_value & 0xFF) # LSB
test_data_6c.append((crc_value >> 8) & 0xFF) # MSB
test_data_6c_bytes = bytes(test_data_6c)
print(f"测试数据: {' '.join(f'{b:02X}' for b in test_data_6c_bytes)}")
test_data_6c.append(0x2C)
test_data_6c.append(0x2C)
test_data_6c.append(0x2C)
try:
parsed_6c_data =command_hex.parse_user_data_hex(test_data_6c_bytes)
print("解析结果:")
print(f" ASCII字符串: {parsed_6c_data}")
except Exception as e:
print(f"解析错误: {e}")

69
hardware/RFID/crc16.py Normal file
View File

@ -0,0 +1,69 @@
class crc16:
"""
CRC16校验函数
"""
@staticmethod
def cal(data_bytes):
"""
计算CRC16校验值不包含最后两个字节的CRC
参数:
data_bytes: 要计算CRC16的字节数据可以是bytes、bytearray或整数列表
返回:
int: 计算得到的CRC16校验值
"""
PRESET_VALUE = 0xFFFF
POLYNOMIAL = 0x8408
# 初始化CRC值
crc_value = PRESET_VALUE
length = len(data_bytes)
# 处理每个字节
for i in range(length):
# 获取当前字节确保是0-255范围内的整数
current_byte = data_bytes[i] & 0xFF
# 与当前CRC值异或
crc_value ^= current_byte
# 处理每个位
for j in range(8):
# 检查最低位
if crc_value & 0x0001:
# 最低位为1右移后与多项式异或
crc_value = (crc_value >> 1) ^ POLYNOMIAL
else:
# 最低位为0仅右移
crc_value = crc_value >> 1
# 确保crc_value是16位无符号整数
crc_value &= 0xFFFF
return crc_value
@staticmethod
def verify(data_bytes):
"""
验证数据后两位的CRC MSB和LSB是否正确
参数:
data_bytes: 要验证CRC16的字节数据可以是bytes、bytearray或整数列表
返回:
bool: True表示CRC验证通过False表示验证失败
"""
if len(data_bytes) < 2:
raise ValueError("数据长度至少需要2字节以包含CRC校验值")
# 提取数据部分除去最后两个CRC字节
data_part = data_bytes[:-2]
# 提取接收到的CRC值LSB在前MSB在后
received_crc_lsb = data_bytes[-2]
received_crc_msb = data_bytes[-1]
received_crc = (received_crc_msb << 8) | received_crc_lsb
# 计算数据部分的CRC值
calculated_crc = crc16.cal(data_part)
# 比较计算得到的CRC值和接收到的CRC值
return calculated_crc == received_crc

View File

@ -0,0 +1,513 @@
import socket
import threading
import time
import binascii
from typing import Optional, Callable, Dict, Any, Set
from collections import Counter
from .crc16 import crc16
from .command_hex import command_hex
class rfid_service:
"""
RFID读写器服务
"""
def __init__(self, host='192.168.1.190', port=6000):
"""
初始化RFID控制器
参数:
host: RFID读写器IP地址
port: RFID读写器端口号
"""
self.host = host
self.port = port
self._thread_signal = False
self._receive_thread = None
self._callback = None
self._pause_receive = False
self._data_buffer = [] # 收到推送的数据
self._last_collect_time = None
self._data_lock = threading.Lock() # 用于保护数据缓冲区的锁
# 需要过滤掉的数据(字符串)
self._filter_value = None
self.check_time_seconds = 60.0 # 采集数据时间(秒)
# 超时设置
self._connect_timeout = 5.0 # 连接超时时间(秒)
self.client_socket = None
self.connected = False
#链接失败次数
self._error_count = 0
# #30个字(读卡器工作模式中设定)+2字节CRC+4字节 len+0x00+oxEE+0x00
self._buffer_length=66
#设置读取命令设置时连接最大次数
self._max_command_count=3
def CreatConnect(self)->bool:
try:
if self.client_socket:
self.client_socket.close()
self.connected = False
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.client_socket.settimeout(self._connect_timeout)
print(f"准备连接RFID读写器")
self.client_socket.connect((self.host, self.port))
self.connected = True
print(f"成功连接到RFID读写器: {self.host}:{self.port}")
self._error_count = 0 # 重置错误计数
return True
except Exception as e:
print(f"连接RFID读写器失败: {e}")
self.connected = False
self._error_count += 1
return False
def connect(self) -> bool:
"""
连接到RFID读写器
返回:
bool: 连接成功返回True失败返回False
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(self._connect_timeout)
sock.connect((self.host, self.port))
print(f"成功连接到RFID读写器: {self.host}:{self.port}")
return True
except Exception as e:
print(f"连接RFID读写器失败: {e}")
return False
def _send_command(self, command_data: bytes) -> Optional[bytes]:
"""
发送命令到RFID读写器并等待响应
参数:
command_data: 要发送的命令字节数据
返回:
bytes: 读写器响应数据如果失败返回None
"""
for i in range(self._max_command_count):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(self._connect_timeout) # 设置连接超时
sock.connect((self.host, self.port))
sock.send(command_data)
response = sock.recv(1024)
print(f"收到RFID响应: {binascii.hexlify(response).decode()}")
return response
except socket.timeout:
if i==self._max_command_count-1:
print(f"RFID命令响应超时,已达最大重试次数: {self._max_command_count}")
return None
else:
print(f"RFID命令响应超时 (超时设置: {self._connect_timeout}秒,)重连中.")
except Exception as e:
print(f"RFID命令发送错误: {e}")
if i==self._max_command_count-1:
print(f"RFID命令发送错误,已达最大重试次数: {self._max_command_count}")
return None
#region ======= 配置命令封装 =======
def set_working_mode(self, address: int, mode_params: Dict[str, Any]) -> bool:
"""
设置工作模式
参数:
address: 读写器地址
mode_params: 工作模式参数字典,包含以下键:
- read_mode: 读取模式 (0x00-0x03),0应答模式,1为主动模式,2触发模式(低电平有效),3触发模式(高电平有效)
- mode_state: Bit1=0时韦根输出Bit1=1时RS232输出,Bit2=0时开蜂鸣器提示Bit2=1时关蜂鸣器提示.
- mem_inven: 0x00保留区0x01EPC存储器0x02TID存储器0x03用户存储器0x04多张查询0x05单张查询。0x06多张TID询查
- first_adr: 第一个地址 (0x00-0xFF)
- word_num: 字数量Word_Num不能超过3
- tag_time: 主动模式下标签输出间隔时间(0~255)*1s对同一张标签在间隔时间内只输出一次
返回:
bool: 设置成功返回True失败返回False
"""
try:
# 构建命令
command = command_hex.build_set_mode_command(
address=address,
# wg_mode=mode_params.get('wg_mode', 0),
# wg_data_interval=mode_params.get('wg_data_interval', 0),
# wg_pulse_width=mode_params.get('wg_pulse_width', 0),
# wg_pulse_interval=mode_params.get('wg_pulse_interval', 0),
read_mode=mode_params.get('read_mode', 1),
mode_state=mode_params.get('mode_state', 2), #开蜂鸣器提示1为不开
mem_inven=mode_params.get('mem_inven', 3),
first_adr=mode_params.get('first_adr', 0),
word_num=mode_params.get('word_num', 30),
tag_filt_time=mode_params.get('tag_filt_time', 0)
)
# 发送命令
response = self._send_command(command)
if response:
# 解析响应
try:
return command_hex.parse_set_mode_hex(response)
except Exception as e:
print(f"解析设置响应错误: {e}")
return False
return False
except Exception as e:
print(f"设置工作模式错误: {e}")
return False
def read_working_mode(self, address: int) -> Optional[Dict[str, Any]]:
"""
读取工作模式
参数:
address: 读写器地址
返回:
dict: 工作模式参数如果失败返回None
"""
try:
# 构建命令
command = command_hex.build_read_mode_command(address=address)
# 发送命令
response = self._send_command(command)
if response:
# 解析响应
try:
return command_hex.parse_read_mode_hex(response)
except Exception as e:
print(f"解析读取响应错误: {e}")
return None
return None
except Exception as e:
print(f"读取工作模式错误: {e}")
return None
def set_power(self, address: int, power_value: int) -> bool:
"""
设置读写器功率
参数:
address: 读写器地址
power_value: 功率值 (0-26 dBm)
返回:
bool: 设置成功返回True失败返回False
"""
try:
# 构建命令
command =command_hex.build_set_power_command(address=address, power_value=power_value)
# 发送命令
response = self._send_command(command)
if response:
# 解析响应
try:
return command_hex.parse_set_power_hex(response)
except Exception as e:
print(f"解析功率设置响应错误: {e}")
return False
return False
except Exception as e:
print(f"设置功率错误: {e}")
return False
def set_trigger_delay(self, address: int, delay_time: int) -> bool:
"""
设置触发延时
参数:
address: 读写器地址
delay_time: 延时时间 (0-254秒255表示读取参数)
返回:
bool: 设置成功返回True失败返回False
"""
try:
# 构建命令
command = command_hex.build_trigger_delay_command(address=address, delay_time=delay_time)
# 发送命令
response = self._send_command(command)
if response:
# 解析响应
try:
if delay_time == 255:
# 读取模式
current_delay = command_hex.parse_trigger_delay_hex(response)
print(f"当前触发延时: {current_delay}")
return True
else:
# 设置模式
return True # 如果发送成功且没有异常,认为设置成功
except Exception as e:
print(f"解析触发延时响应错误: {e}")
return False
return False
except Exception as e:
print(f"设置触发延时错误: {e}")
return False
def set_read_interval(self, address: int, interval_time: int) -> bool:
"""
设置主动模式读卡间隔
参数:
address: 读写器地址
interval_time: 读卡间隔 (0-254毫秒255表示读取参数)
返回:
bool: 设置成功返回True失败返回False
"""
try:
# 构建命令
command = command_hex.build_set_readinterval_command(address=address, delay_time=interval_time)
# 发送命令
response = self._send_command(command)
if response:
# 解析响应
try:
return command_hex.parse_set_readinterval_hex(response)
except Exception as e:
print(f"解析读卡间隔响应错误: {e}")
return False
return False
except Exception as e:
print(f"设置读卡间隔错误: {e}")
return False
def get_read_interval(self, address: int) -> int:
"""
设置主动模式读卡间隔
参数:
address: 读写器地址
返回:
int: 当前读卡间隔0-254毫秒如果失败返回-1
"""
try:
# 构建命令
command = command_hex.build_set_readinterval_command(address=address, delay_time=255)
# 发送命令
response = self._send_command(command)
if response:
# 解析响应
try:
loc_suc= command_hex.parse_set_readinterval_hex(response)
if loc_suc:
return response[4]
else:
return -1
except Exception as e:
print(f"解析读卡间隔响应错误: {e}")
return False
return False
except Exception as e:
print(f"设置读卡间隔错误: {e}")
return False
def read_reader_info(self, address: int) -> Optional[Dict[str, Any]]:
"""
读取读写器信息
参数:
address: 读写器地址
返回:
dict: 读写器信息如果失败返回None
"""
try:
# 构建读取读写器信息的命令
command = command_hex.build_reader_info_command(address=address)
# 发送命令
response = self._send_command(command)
if response:
# 解析响应
try:
return command_hex.parse_reader_info_hex(response)
except Exception as e:
print(f"解析读写器信息响应错误: {e}")
return None
return None
except Exception as e:
print(f"读取读写器信息错误: {e}")
return None
#endregion
def start_receiver(self, callback: Optional[Callable[[str], None]] = None)->bool:
"""
开始接收RFID推送的数据
参数:
callback: 接收数据的回调函数参数为收集到的RFID字符串数据
"""
if self._thread_signal:
print("RFID数据接收已经在运行")
return True
self._callback = callback
if self.CreatConnect():
self._thread_signal = True
# 清空数据缓冲区
with self._data_lock:
self._data_buffer.clear()
self._receive_thread = threading.Thread(target=self._run, daemon=True,name="RFID_Receive")
self._receive_thread.start()
print("RFID数据接收已启动")
return True
else:
print("RFID数据接收启动失败")
return False
def _run(self):
"""
接收线程的主循环用于接收RFID推送的数据
"""
while self._thread_signal:
if self._pause_receive:
time.sleep(1)
continue
# 重置数据收集时间
self._last_collect_time = time.time()
while time.time() - self._last_collect_time < self.check_time_seconds:
# 持续接收数据
try:
# 使用缓冲区累积接收数据,正确处理固定长度数据包
received_data = b""
remaining_bytes = self._buffer_length
while remaining_bytes > 0:
chunk = self.client_socket.recv(remaining_bytes)
# 检查连接是否关闭
if not chunk:
print(f"[连接关闭] 接收过程中连接已关闭,已接收 {len(received_data)} 字节,预期 {self._buffer_length} 字节")
if not self.CreatConnect():
print(f"[连接断开] 重连失败,已累计失败 {self._error_count}等待1秒后重试")
time.sleep(1)
break
received_data += chunk
remaining_bytes -= len(chunk)
print(f"[数据接收] 已接收 {len(received_data)}/{self._buffer_length} 字节")
# 只有接收到完整的数据才算成功
if remaining_bytes == 0:
print(f"[数据接收] 成功接收完整数据包 ({self._buffer_length} 字节)")
data = received_data # 保存完整的数据包
self._error_count=0
else:
continue # 没有接收到任何数据,继续下一次循环
except (ConnectionResetError, ConnectionAbortedError, BrokenPipeError) as e:
# 明确区分连接断开情况
print(f"[连接断开] 网络连接已断开,错误类型: {type(e).__name__},错误详情: {e}")
# 尝试重新连接,但避免频繁重连
if not self.CreatConnect():
print(f"[连接断开] 重连失败,已累计失败 {self._error_count}等待1秒后重试")
time.sleep(1)
continue # 继续下一次循环
except socket.timeout:
# 明确区分超时情况
print(f"[超时] 接收数据超时,当前时间: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# 超时不重置连接,继续等待
continue # 超时继续下一次循环
except Exception as e:
# 记录其他类型的异常
print(f"[其他错误] RFID数据接收错误错误类型: {type(e).__name__},错误详情: {e}")
# 判断是否可能是连接问题
if isinstance(e, (OSError, IOError)) and '远程主机强迫关闭了一个现有的连接' in str(e):
print("[其他错误] 错误原因疑似连接断开,将尝试重新连接")
self.CreatConnect()
time.sleep(1)
continue # 其他错误继续下一次循环
if data:
loc_str = command_hex.parse_user_data_hex(data)
print(f"收到RFID推送数据: {binascii.hexlify(data).decode()}")
if loc_str:
# 将数据添加到缓冲区
with self._data_lock:
if self._filter_value == loc_str:
continue
self._data_buffer.append(loc_str)
self._pause_receive = True
self._process_collected_data()
def _process_collected_data(self):
"""
处理收集到的数据,过滤掉指定值,并调用回调函数
"""
with self._data_lock:
# 如果缓冲区有数据
if self._data_buffer and self._callback:
try:
# 统计数据缓冲区中各字符串出现的次数
# 这里self._data_buffer是字符串列表Counter会统计每个完整字符串出现的次数
counter = Counter(self._data_buffer)
# 处理空缓冲区情况避免IndexError
if counter:
# 获取出现次数最多的字符串及其出现次数
# most_common(1)返回[(most_common_string, count)]
most_common_string, count = counter.most_common(1)[0]
print(f"出现次数最多的字符串是: '{most_common_string}', 出现次数: {count}")
# 使用出现次数最多的字符串作为结果传递给回调
self._callback(most_common_string)
else:
# 空缓冲区情况
print("数据缓冲区为空")
self._callback(None)
print(f"收集了{len(self._data_buffer)}条RFID数据过滤后数据为{most_common_string}")
except Exception as e:
print(f"回调函数执行错误: {e}")
finally:
# 清空缓冲区
self._data_buffer.clear()
else:
print("未接收到数据")
# self._callback(None)
def __del__(self):
"""
析构函数,确保在对象销毁时关闭连接
"""
if self.client_socket:
self.client_socket.close()
self.client_socket = None
def stop_receiver(self):
"""
停止接收RFID推送的数据
"""
self._thread_signal = False
if self._receive_thread:
self._receive_thread.join(timeout=2.0)
print("RFID数据接收已停止")
# 关闭连接
if self.client_socket:
self.client_socket.close()
self.client_socket = None

6258
resources/resources_rc.py Normal file

File diff suppressed because it is too large Load Diff

110
tests/test_rfid.py Normal file
View File

@ -0,0 +1,110 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RFID
"""
import sys
import os
import time
# 添加项目根目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from hardware.RFID.rfid_service import rfid_service
def test_data_callback(raw_data):
"""
测试用的数据接收回调函数
"""
print(f"[回调] 收到RFID数据: {raw_data}")
def test_rfid_functions():
"""
测试RFIDHardware的主要功能
"""
# 初始化RFID控制器
rfid = rfid_service(host='192.168.1.190', port=6000)
# print("=== RFID硬件测试开始 ===")
# # 测试连接
print("\n1. 测试连接:")
# connected = rfid.connect()
# print(f"连接状态: {'成功' if connected else '失败'}")
# 如果连接成功,继续测试其他功能
if True:
# 测试读取工作模式
print("\n2. 测试读取工作模式:")
# mode_data = rfid.read_working_mode(address=0x00)
# if mode_data:
# print("读取到工作模式参数:")
# for key, value in mode_data.items():
# print(f" {key}: {value:02X} ({value})")
# rfid.set_working_mode(address=0x00, mode_params={
# 'word_num': 0x1E
# })
# # 测试读取读写器信息
# print("\n3. 测试读取读写器信息:")
# reader_info = rfid.read_reader_info(address=0x00)
# if reader_info:
# print("读取到读写器信息:")
# for key, value in reader_info.items():
# print(f" {key}: {value}")
# else:
# print("读取读写器信息失败")
# 测试设置功率 (仅演示,实际使用时根据需要调整)
# print("\n3. 测试设置功率 (演示,不实际执行):")
# power_success = rfid.set_power(address=0x00, power_value=6)
# print(f"功率设置{'成功' if power_success else '失败'}")
# # 测试设置读卡间隔 (仅演示)
# print("\n4. 测试设置读卡间隔:")
# interval_success = rfid.set_read_interval(address=0x00, interval_time=200)
# print(f"读卡间隔设置{'成功' if interval_success else '失败'}")
# time.sleep(2)
# interval = rfid.read_interval(address=0x00)
# print(f"当前读卡间隔: {interval}毫秒")
# 测试数据接收功能 (注意:这会启动一个后台线程)
# print("\n5. 测试数据接收功能 (启动接收线程5秒后停止):")
try:
rfid._callback = test_data_callback
rfid._data_buffer = []
rfid._data_buffer.append('SHR2B2-12,B2,6600 * 1500,1.910')
rfid._data_buffer.append('SHR2B2-12,B2,6600 * 1500,1.910')
rfid._data_buffer.append('SHR2B2-12,B2,6600 * 1500,1.910')
rfid._data_buffer.append('SHR2B2-12,B2,6600 * 1500,1.910')
rfid._data_buffer.append('SHR2B2-12,B2,6600 * 1500,1.910')
rfid._data_buffer.append('SHR2B2-12,B2,6600 * 1500,1.910')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._data_buffer.append('THR B1-12,B1,6600 * 1500,1.900')
rfid._process_collected_data()
# rfid.start_receiver(callback=test_data_callback)
# print("接收线程已启动,等待接收数据...")
# 等待5秒模拟接收过程
time.sleep(60*60)
finally:
# 确保停止接收线程
# rfid.stop_receiver()
print("\n=== RFID硬件测试结束 ===")
if __name__ == "__main__":
try:
test_rfid_functions()
except KeyboardInterrupt:
print("\n测试被用户中断")
except Exception as e:
print(f"测试过程中发生错误: {e}")