#!/usr/bin/env python # -*- coding: utf-8 -*- """ # @Time : 2026/3/6 16:28 # @Author : reenrr # @File : clsRecipeTb.py # @Desc : 配方数据库控制器 """ import copy from common.dbORMType import stuTbRecipe from common.generalFunc import CCommon from sqliteModule.sqlite_db import g_dbInst class CTbRecipeMgt: # public def __init__(self): self._m_tbName = 't_proc_info' self._m_record = [] # 存储表中记录,每条记录是一个stuTbRecipe类型 self._m_error: str = '' # 存储反馈的错误信息 self._getAllRecipeInfo() # 读取配方所有信息 pass # 创建配方 def addNewRecipe(self, recipeName: str, creator: str) -> bool: """ 创建新配方 :param recipeName: 配方名称 :param creator: 创建人 :return: 是否创建成功,操作成功返回True,失败返回False """ key_list = list(stuTbRecipe.keys()) strTime = CCommon.fnGetCurrentTime() update_time = strTime strSqlCmd = ('INSERT INTO ' + self._m_tbName + '(' + key_list[0] + ',' + key_list[1] + ',' + key_list[2] + ',' + key_list[3] + ') VALUES (?, ?, ?, ?)') bRes = g_dbInst.execute_sql(strSqlCmd, (recipeName, strTime, update_time, creator)) if bRes: # 更新缓存 stuTbRecipe[key_list[0]] = recipeName stuTbRecipe[key_list[1]] = strTime stuTbRecipe[key_list[2]] = update_time stuTbRecipe[key_list[3]] = creator other = copy.deepcopy(stuTbRecipe) self._m_record.append(other) return bRes # 删除配方 def delRecipe(self, recipeNames: list[str]) -> bool: """ 删除配方 :param recipeName: 配方名称列表 :return: 是否删除成功,操作成功返回True,失败返回False """ if not recipeNames: return False # 支持批量删除 key_list = list(stuTbRecipe.keys()) placeholders = ','.join(['?'] * len(recipeNames)) strSqlCmd = ('DELETE FROM ' + self._m_tbName + ' WHERE ' + key_list[0] + ' IN (' + placeholders + ')') bRes = g_dbInst.execute_sql(strSqlCmd, tuple(recipeNames)) if bRes and self._m_record: # 更新缓存 nTotal = len(self._m_record) nScan = 0 while nScan < nTotal: recipe_name = list(self._m_record[nScan].items())[0][1] if recipe_name in recipeNames: self._m_record.pop(nScan) nTotal -= 1 continue nScan += 1 return bRes # 读取所有配方名称、创建时间、更新时间、创建者 def readRecipeID_time_creator(self) -> list[tuple[str, str, str, str]]: lstRet = [] if self._m_record: for item in self._m_record: lstSingle = list(item.items()) singleItem = (lstSingle[0][1], lstSingle[1][1], lstSingle[2][1], lstSingle[3][1]) lstRet.append(singleItem) return lstRet # 读取所有配方名称 def readAllRecipeNames(self) -> list: lstRet = [] if self._m_record: for item in self._m_record: lstRet.append(list(item.items())[0][1]) return lstRet # 更新配方 def updateRecipe(self) -> bool: pass # ------------------------------------------- # 获取错误信息 def getLastError(self) -> str: return self._m_error # private # 读取所有配方信息到缓存 def _getAllRecipeInfo(self) -> None: strSqlCmd = 'SELECT * FROM ' + self._m_tbName allRecipes = g_dbInst.fetch_all(strSqlCmd) if allRecipes: theField = list(stuTbRecipe.keys()) for recipe in allRecipes: stuTbRecipe[theField[0]] = recipe[0] stuTbRecipe[theField[1]] = recipe[1] stuTbRecipe[theField[2]] = recipe[2] stuTbRecipe[theField[3]] = recipe[3] other = copy.deepcopy(stuTbRecipe) self._m_record.append(other) # ************************************************************** g_recipeCtrl = CTbRecipeMgt() # 全局对象实例