"""SQL Server数据库操作模块""" import pyodbc from config.settings import SQL_SERVER_CONFIG class SQLServerDB: def __init__(self): self.connection = None self.config = SQL_SERVER_CONFIG def connect(self): """连接SQL Server数据库""" connection_string = ( "DRIVER={SQL Server};" f"SERVER={self.config['server']};" f"DATABASE={self.config['database']};" f"UID={self.config['username']};" f"PWD={self.config['password']};" ) self.connection = pyodbc.connect(connection_string) return self.connection def insert_produce_data(self, insert_data): """插入数据到Produce表""" if not self.connection: self.connect() cursor = self.connection.cursor() columns = ", ".join(insert_data.keys()) placeholders = ", ".join(["?" for _ in insert_data.values()]) sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})" cursor.execute(sql, list(insert_data.values())) self.connection.commit() return cursor.rowcount def check_existing_tasks(self, erp_ids): """检查SQL Server中任务是否还存在""" if not self.connection: self.connect() if not erp_ids: return set() erp_ids_str = [str(erp_id) for erp_id in erp_ids] placeholders = ','.join('?' * len(erp_ids_str)) check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})" cursor = self.connection.cursor() cursor.execute(check_query, erp_ids_str) results = cursor.fetchall() existing_tasks = {str(row[0]) for row in results} return existing_tasks def close(self): """关闭数据库连接""" if self.connection: self.connection.close() self.connection = None