Files
Feeding_control_system/common/msg_db_helper.py

122 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import sqlite3
from datetime import datetime
from PySide6.QtCore import QMutex
"""
sqlite消息数据库管理: db/messages.db
"""
class DBHelper:
_instance = None # 单例模式
_mutex = QMutex()
def __new__(cls, db_name="db/messages.db"):
"""消息数据库连接管理器"""
cls._mutex.lock()
try:
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.db_name = db_name
cls._instance._init_db()
finally:
cls._mutex.unlock()
return cls._instance
def _init_db(self):
"""创建消息表"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
# 创建消息表id(主键)、内容、类型(1=系统消息2=预警)、是否处理、创建时间
cursor.execute('''
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
message_type INTEGER NOT NULL, -- 1: normal, 2: warning
is_processed INTEGER NOT NULL DEFAULT 0, -- 为1表示已经解决/已经处理
create_time TEXT NOT NULL, -- 创建时间
last_modified TEXT NOT NULL -- 最后修改时间(新增/更新时都会变)
)
''')
conn.commit()
conn.close()
def insert_message(self, content, message_type, is_processed=0):
"""插入消息: create_time和last_modified一开始都设为当前时间"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO messages (content, message_type, is_processed, create_time, last_modified)
VALUES (?, ?, ?, ?, ?)
''', (content, message_type, is_processed, current_time, current_time))
conn.commit()
conn.close()
def update_processed_status(self, content, message_type, is_processed=1):
"""更新is_processed状态, 同时更新last_modified为当前时间"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
cursor.execute('''
UPDATE messages
SET is_processed = ?, last_modified = ?
WHERE content = ? AND message_type = ?
''', (is_processed, current_time, content, message_type)) # 更新last_modified
conn.commit()
conn.close()
def get_latest_messages(self, message_type, limit=16):
"""查询最新limit条消息, 包含last_modified字段"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
# 按创建时间倒序确保取最新的消息返回时包含last_modified
cursor.execute('''
SELECT content, is_processed, create_time, last_modified
FROM messages
WHERE message_type = ?
ORDER BY create_time DESC
LIMIT ?
''', (message_type, limit))
messages = cursor.fetchall() # 结果:[(content, is_processed, create_time, last_modified), ...]
conn.close()
return messages[::-1] # 反转后按时间正序排列
if __name__ == "__main__":
"""测试用例:向数据库插入测试消息"""
import time
# 创建DBHelper实例会自动创建数据库和表
db = DBHelper()
# # 插入系统状态消息message_type=1
# db.insert_message(
# content="开始自动智能浇筑系统",
# message_type=1, # 系统消息
# is_processed=False
# )
# time.sleep(1)
# db.insert_message(
# content="智能浇筑系统启动成功",
# message_type=1,
# is_processed=False
# )
# time.sleep(1)
# db.insert_message(
# content="上料斗载荷达到80%",
# message_type=1, # 系统消息
# is_processed=True # 已处理
# )
# 插入预警消息message_type=2
db.insert_message(
content="振动频率异常",
message_type=2, # 预警消息
is_processed=False # 未处理
)
time.sleep(1)
db.insert_message(
content="料斗重量超出阈值",
message_type=2, # 预警消息
is_processed=True # 已处理
)