Files
Feeding_control_system/database/sql_server.py

62 lines
1.9 KiB
Python
Raw Normal View History

2025-11-12 14:01:26 +08:00
"""SQL Server数据库操作模块"""
import pyodbc
from config.settings import SQL_SERVER_CONFIG
class SQLServerDB:
def __init__(self):
self.connection = None
self.config = SQL_SERVER_CONFIG
def connect(self):
"""连接SQL Server数据库"""
connection_string = (
"DRIVER={SQL Server};"
f"SERVER={self.config['server']};"
f"DATABASE={self.config['database']};"
f"UID={self.config['username']};"
f"PWD={self.config['password']};"
)
self.connection = pyodbc.connect(connection_string)
return self.connection
def insert_produce_data(self, insert_data):
"""插入数据到Produce表"""
if not self.connection:
self.connect()
cursor = self.connection.cursor()
columns = ", ".join(insert_data.keys())
placeholders = ", ".join(["?" for _ in insert_data.values()])
sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})"
cursor.execute(sql, list(insert_data.values()))
self.connection.commit()
return cursor.rowcount
def check_existing_tasks(self, erp_ids):
"""检查SQL Server中任务是否还存在"""
if not self.connection:
self.connect()
if not erp_ids:
return set()
erp_ids_str = [str(erp_id) for erp_id in erp_ids]
placeholders = ','.join('?' * len(erp_ids_str))
check_query = f"SELECT ErpID FROM Produce WHERE ErpID IN ({placeholders})"
cursor = self.connection.cursor()
cursor.execute(check_query, erp_ids_str)
results = cursor.fetchall()
existing_tasks = {str(row[0]) for row in results}
return existing_tasks
def close(self):
"""关闭数据库连接"""
if self.connection:
self.connection.close()
self.connection = None