62 lines
1.9 KiB
Python
62 lines
1.9 KiB
Python
"""SQL Server数据库操作模块"""
|
|
import pyodbc
|
|
from config.settings import SQL_SERVER_CONFIG
|
|
|
|
|
|
class SQLServerDB:
|
|
def __init__(self):
|
|
self.connection = None
|
|
self.config = SQL_SERVER_CONFIG
|
|
|
|
def connect(self):
|
|
"""连接SQL Server数据库"""
|
|
connection_string = (
|
|
"DRIVER={SQL Server};"
|
|
f"SERVER={self.config['server']};"
|
|
f"DATABASE={self.config['database']};"
|
|
f"UID={self.config['username']};"
|
|
f"PWD={self.config['password']};"
|
|
)
|
|
self.connection = pyodbc.connect(connection_string)
|
|
return self.connection
|
|
|
|
def insert_produce_data(self, insert_data):
|
|
"""插入数据到Produce表"""
|
|
if not self.connection:
|
|
self.connect()
|
|
|
|
cursor = self.connection.cursor()
|
|
columns = ", ".join(insert_data.keys())
|
|
placeholders = ", ".join(["?" for _ in insert_data.values()])
|
|
sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})"
|
|
|
|
cursor.execute(sql, list(insert_data.values()))
|
|
self.connection.commit()
|
|
|
|
return cursor.rowcount
|
|
|
|
def check_existing_tasks(self, erp_ids):
|
|
"""检查SQL Server中任务是否还存在"""
|
|
if not self.connection:
|
|
self.connect()
|
|
|
|
if not erp_ids:
|
|
return set()
|
|
|
|
erp_ids_str = [str(erp_id) for erp_id in erp_ids]
|
|
placeholders = ','.join('?' * len(erp_ids_str))
|
|
check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})"
|
|
|
|
cursor = self.connection.cursor()
|
|
cursor.execute(check_query, erp_ids_str)
|
|
results = cursor.fetchall()
|
|
|
|
existing_tasks = {str(row[0]) for row in results}
|
|
return existing_tasks
|
|
|
|
def close(self):
|
|
"""关闭数据库连接"""
|
|
if self.connection:
|
|
self.connection.close()
|
|
self.connection = None
|