# 文件声明了封装用户信息表相关的数据表操作 import copy from common.dbORMType import stuTbUser from common.generalFunc import CCommon from sqliteModule.sqlite_db import g_dbInst # 用户类别 int ---> string def _int2str_usertype(userType: int) -> str: from msg_dict import g_uiCtrlScript if 0 == userType: strRet = g_uiCtrlScript['CTRL_USER_TYPE_ADMIN'] elif 1 == userType: strRet = g_uiCtrlScript['CTRL_USER_TYPE_TECH'] else: strRet = g_uiCtrlScript['CTRL_USER_TYPE_OP'] return strRet class CTbUserMgt: # public: def __init__(self): self._m_tbName = 't_user' self._m_record = [] # 存储表中记录,每条记录是一个stuTbUser类型 self._m_error: str = '' # 存储反馈的错误信息 self._getAllUserInfo() # 读取所有用户信息 # 创建用户 def addNewUser(self, userID: str, userType: int) -> bool: key_list = list(stuTbUser.keys()) strSqlCmd = ('INSERT INTO ' + self._m_tbName + '(' + key_list[0] + ',' + key_list[1] + ',' + key_list[3] + ') VALUES (?, ?, ?)') strTime = CCommon.fnGetCurrentTime() # 获取当前时间用于记录用户创建 bRes = g_dbInst.execute_sql(strSqlCmd, (userID, userType, strTime)) if bRes: # 更新到用户信息缓存 stuTbUser[key_list[0]] = userID stuTbUser[key_list[1]] = userType stuTbUser[key_list[3]] = strTime other = copy.deepcopy(stuTbUser) self._m_record.append(other) return bRes # 修改密码 def changePasswd(self, userID: str, userPasswd: str) -> bool: keys_list = list(stuTbUser.keys()) # 转换为列表 strSqlCmd = 'UPDATE ' + self._m_tbName + ' SET ' + keys_list[2] + ' = ? WHERE ' + keys_list[0] + ' = ?' bRes = g_dbInst.execute_sql(strSqlCmd, (userPasswd, userID)) if bRes and self._m_record: for user in self._m_record: if userID == user[keys_list[0]]: user[keys_list[2]] = userPasswd break return bRes # 删除用户 def delUser(self, userID: list[str]) -> bool: if not userID: return False # 使用 IN 语句支持批量删除 keys_list = list(stuTbUser.keys()) placeholders = ','.join(['?'] * len(userID)) strSqlCmd = f'DELETE FROM {self._m_tbName} WHERE {keys_list[0]} IN ({placeholders})' bRes = g_dbInst.execute_sql(strSqlCmd, tuple(userID)) # 删除成功了,要同步更新到缓存 if bRes and self._m_record: nTotal = len(self._m_record) nScan = 0 while nScan < nTotal: user_id = list(self._m_record[nScan].items())[0][1] if user_id in userID: del self._m_record[nScan] nTotal -= 1 continue nScan += 1 return bRes # 读取指定用户密码 def getPasswdByUserID(self, userID: str) -> str: strRes = None if self._m_record: for user in self._m_record: lstSingle = list(user.items()) if userID == lstSingle[0][1]: strRes = lstSingle[2][1] break return strRes # 读取所有用户id def readAllUserIDs(self) -> list: lstRet = [] if self._m_record: for item in self._m_record: lstRet.append(list(item.items())[0][1]) return lstRet # 读取除管理员以外其他用户的id, 类别和创建日期 def readUserId_type_date(self) -> list[tuple[str, str, str]]: lstRet = [] if self._m_record: for user in self._m_record: lstSingle = list(user.items()) # 只添加非管理员用户(用户类型不等于0) if lstSingle[1][1] != 0: # 使用_int2str_usertype转换用户类别为字符串 user_type_str = _int2str_usertype(lstSingle[1][1]) singleItem = (lstSingle[0][1], user_type_str, lstSingle[3][1]) lstRet.append(singleItem) return lstRet # 根据用户id获取其用户类别 def getUserTypeById(self, userID: str) -> str: strRet = None if self._m_record: for item in self._m_record: lstSingle = list(item.items()) if userID == lstSingle[0][1]: strRet = _int2str_usertype(lstSingle[1][1]) break return strRet # ------------------------------------------- # 获取错误信息 def getLastError(self) -> str: return self._m_error # private: # 一次读取所有用户信息,包括id, 类别,密码,创建日期 def _getAllUserInfo(self) -> None: strSqlCmd = 'SELECT * FROM ' + self._m_tbName allUsers = g_dbInst.fetch_all(strSqlCmd) if allUsers: theField = list(stuTbUser.keys()) for user in allUsers: stuTbUser[theField[0]] = user[0] stuTbUser[theField[1]] = user[1] stuTbUser[theField[2]] = user[2] stuTbUser[theField[3]] = user[3] other = copy.deepcopy(stuTbUser) self._m_record.append(other) # ************************************************************** g_userCtrl = CTbUserMgt() # 全局对象实例