From 3da220586c024eefbf3027641df88034e3739b25 Mon Sep 17 00:00:00 2001 From: xiongyi <827523911@qq.com> Date: Wed, 19 Nov 2025 16:25:48 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4=E6=89=B9=E5=A4=84=E7=90=86?= =?UTF-8?q?=E5=BC=80=E6=9C=BA=E8=87=AA=E5=90=AF=EF=BC=8C=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=B8=BAwindows=E5=90=8E=E5=8F=B0=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .idea/InsertData.iml | 3 +- .idea/misc.xml | 2 +- API/send_data_test.py | 43 ++--------- insert_data_service.py | 168 +++++++++++++++++++++++++++++++++++++++++ logs/service.log | 3 + main.py | 23 +++--- 6 files changed, 190 insertions(+), 52 deletions(-) create mode 100644 insert_data_service.py create mode 100644 logs/service.log diff --git a/.idea/InsertData.iml b/.idea/InsertData.iml index 74d515a..de37856 100644 --- a/.idea/InsertData.iml +++ b/.idea/InsertData.iml @@ -2,9 +2,10 @@ + - + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index df3bfff..0e06d19 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -3,5 +3,5 @@ - + \ No newline at end of file diff --git a/API/send_data_test.py b/API/send_data_test.py index a7d031c..346ab6f 100644 --- a/API/send_data_test.py +++ b/API/send_data_test.py @@ -208,56 +208,23 @@ def get_not_pour_info(): { "ArtifactID": "QR1B12000151AD", - "ArtifactActionID": 346482967298138, - "ArtifactIDVice1": "Q00001AD", - "ProduceRingNumber": 1, - "MouldCode": "SHR2B1-9", - "SkeletonID": "QR1B12000046A", - "RingTypeCode": "R22", - "SizeSpecification": "6900*1500", - "BuriedDepth": "中埋", - "BlockNumber": "F", - "HoleRingMarking": "否", - "GroutingPipeMarking": "否", - "PolypropyleneFiberMarking": "否", - "BetonVolume": 0.6, - "BetonTaskID": "20251020-01" - }, - { - "ArtifactID": "QR1B12000151AD", - "ArtifactActionID": 3464829672981339, - "ArtifactIDVice1": "Q00001AD", - "ProduceRingNumber": 1, - "MouldCode": "SHR2B1-9", - "SkeletonID": "QR1B12000046A", - "RingTypeCode": "R22", - "SizeSpecification": "6900*1500", - "BuriedDepth": "中埋", - "BlockNumber": "F", - "HoleRingMarking": "否", - "GroutingPipeMarking": "否", - "PolypropyleneFiberMarking": "否", - "BetonVolume": 0.6, - "BetonTaskID": "20251020-01" - }, - { - "ArtifactID": "QR1B32000153AD", "ArtifactActionID": 346482967298140, "ArtifactIDVice1": "Q00001AD", "ProduceRingNumber": 1, - "MouldCode": "SHR2B3-9", - "SkeletonID": "QR1B2000048A", + "MouldCode": "SHR2B1-9", + "SkeletonID": "QR1B12000046A", "RingTypeCode": "R22", "SizeSpecification": "6900*1500", "BuriedDepth": "中埋", - "BlockNumber": "F", + "BlockNumber": "L3", "HoleRingMarking": "否", "GroutingPipeMarking": "否", "PolypropyleneFiberMarking": "否", - "BetonVolume": 0.6, + "BetonVolume": 1.9, "BetonTaskID": "20251020-01" }, + ] } diff --git a/insert_data_service.py b/insert_data_service.py new file mode 100644 index 0000000..c337190 --- /dev/null +++ b/insert_data_service.py @@ -0,0 +1,168 @@ +import win32serviceutil +import win32service +import win32event +import servicemanager +import socket +import time +import threading +from opc.client import OPCClient +from API.client import APIClient +from API.mix_weight_api import MixWeightAPI +from services.task_service import TaskService +from services.monitoring_service import MonitoringService +from database.access_db import AccessDB +from database.sql_server import SQLServerDB +from tcp.server import TCPServer +from config.settings import ( + ACCESS_DB_PATH, ACCESS_DB_PASSWORD, + TCP_HOST, TCP_PORT, TCP_CLIENT_HOST, TCP_CLIENT_PORT, + CHECK_INTERVAL, MAX_AGE_HOURS +) +from utils.helpers import cleanup_old_timestamps + + +class InsertDataService(win32serviceutil.ServiceFramework): + _svc_name_ = "InsertDataService" + _svc_display_name_ = "Insert Data Service" + _svc_description_ = "Service for inserting data into database" + + def __init__(self, args): + win32serviceutil.ServiceFramework.__init__(self, args) + self.hWaitStop = win32event.CreateEvent(None, 0, 0, None) + self.running = True + self.tcp_server = None + self.data_client = None + + def SvcStop(self): + self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING) + win32event.SetEvent(self.hWaitStop) + self.running = False + + # 停止TCP服务端 + if self.tcp_server: + self.tcp_server.stop() + # 停止TCP客户端 + if self.data_client: + self.data_client.stop() + + def SvcDoRun(self): + servicemanager.LogMsg(servicemanager.EVENTLOG_INFORMATION_TYPE, + servicemanager.PYS_SERVICE_STARTED, + (self._svc_name_, '')) + self.main() + + def start_api_service(self): + """启动配比重量API服务""" + api = MixWeightAPI() + api.run(host='127.0.0.1', port=5001, debug=False, threaded=True) + + def main(self): + try: + api_thread = threading.Thread(target=self.start_api_service) + api_thread.daemon = True + api_thread.start() + + # 初始化TCP服务端 + self.tcp_server = TCPServer(host=TCP_HOST, port=TCP_PORT) + tcp_server_thread = threading.Thread(target=self.tcp_server.start) + tcp_server_thread.daemon = True + tcp_server_thread.start() + + # 初始化发送数据的OPC客户端 + data_client = OPCClient(url=f'opc.tcp://{TCP_CLIENT_HOST}:{TCP_CLIENT_PORT}') + data_client.start() + + # 等待服务端启动 + time.sleep(1) + + # 初始化服务 + api_client = APIClient() + task_service = TaskService(self.tcp_server) + monitoring_service = MonitoringService(self.tcp_server) + + # 步骤1:获取AppID + app_id = api_client.login() + task_service.api_client.app_id = app_id + + # 存储上次获取的所有ArtifactID + last_artifact_ids = set() + last_artifact_list = [] # 用于存储上一次的完整artifact_list + + # 启动Access数据库Flag监控线程 + access_monitor_thread = threading.Thread(target=monitoring_service.monitor_access_flag_changes) + access_monitor_thread.daemon = True + access_monitor_thread.start() + + while self.running: + try: + # 步骤2:获取所有未浇筑信息 + tasks, artifact_list, send_list, half_volume = task_service.process_not_pour_info() + current_artifact_ids = {task["artifact_id"] for task in tasks} + + # 检查artifact_list是否发生变化 + if artifact_list != last_artifact_list: + servicemanager.LogInfoMsg(f"检测到artifact_list更新: {artifact_list}") + + # 处理新出现的任务 + new_artifact_ids = current_artifact_ids - last_artifact_ids + if new_artifact_ids: + servicemanager.LogInfoMsg(f"检测到 {len(new_artifact_ids)} 个新任务") + + for task in tasks: + if task["artifact_id"] in new_artifact_ids: + task_info = api_client.get_task_info(task["beton_task_id"]) + + # 步骤4:连接Access数据库并获取最大Mark值 + access_db = AccessDB(ACCESS_DB_PATH, ACCESS_DB_PASSWORD) + try: + max_mark = access_db.get_max_mark() + finally: + access_db.close() + + erp_id = int(max_mark) + 1 + + # 步骤5:连接SQL Server数据库并插入数据 + sql_db = SQLServerDB() + try: + # 检查 block_number 是否为 "补方" + if task["block_number"] == "补方": + print(f"任务 {task['artifact_id']} 的 block_number 为 '补方',跳过派单") + task_service.insert_into_produce_table(sql_db, task_info, + task["beton_volume"], + erp_id, task["artifact_id"],half_volume, 0) + continue + + print(f"处理新任务: {task['artifact_id']}") + + # 获取任务单信息 + task_service.insert_into_produce_table(sql_db, task_info, task["beton_volume"], + erp_id, task["artifact_id"], half_volume,1) + with monitoring_service.tasks_lock: + monitoring_service.monitored_tasks.add(erp_id) + monitoring_service.inserted_tasks[erp_id] = task["artifact_id"] + servicemanager.LogInfoMsg( + f"任务 {erp_id} (ArtifactID: {task['artifact_id']}) 已添加到监控列表") + finally: + sql_db.close() + + # 更新上次获取的ArtifactID集合和artifact_list + last_artifact_ids = current_artifact_ids + last_artifact_list = artifact_list.copy() + + # 每10分钟清理一次过期的时间戳记录 + cleanup_old_timestamps(task_service.artifact_timestamps, current_artifact_ids, MAX_AGE_HOURS) + + # 每10秒检查一次 + time.sleep(CHECK_INTERVAL) + + except Exception as e: + servicemanager.LogErrorMsg(f"发生错误: {e}") + # 继续循环,避免程序退出 + time.sleep(2) + + except Exception as e: + servicemanager.LogErrorMsg(f"服务异常终止: {e}") + + +if __name__ == '__main__': + win32serviceutil.HandleCommandLine(InsertDataService) diff --git a/logs/service.log b/logs/service.log new file mode 100644 index 0000000..fea54fe --- /dev/null +++ b/logs/service.log @@ -0,0 +1,3 @@ +2025-11-19 16:00:55,695 - INFO - WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. + * Running on http://127.0.0.1:5001 +2025-11-19 16:00:55,695 - INFO - Press CTRL+C to quit diff --git a/main.py b/main.py index f9fd434..e629ed1 100644 --- a/main.py +++ b/main.py @@ -11,21 +11,20 @@ from tcp.server import TCPServer from opc.client import OPCClient from config.settings import ( ACCESS_DB_PATH, ACCESS_DB_PASSWORD, - TCP_HOST, TCP_PORT, TCP_CLIENT_HOST, TCP_CLIENT_PORT, + TCP_HOST, TCP_PORT, TCP_CLIENT_HOST, TCP_CLIENT_PORT, CHECK_INTERVAL, MAX_AGE_HOURS ) from utils.helpers import cleanup_old_timestamps -# 假设同事提供的函数 -def save_to_custom_table(misid, flag, task_id, produce_mix_id, project_name, beton_grade, adjusted_volume, artifact_id): - """保存到自定义数据表的函数""" - print(f"保存到自定义数据表: MISID={misid}, Flag={flag}, TaskID={task_id}, 调整后方量={adjusted_volume}") + + def start_api_service(): """启动配比重量API服务""" api = MixWeightAPI() api.run(host='127.0.0.1', port=5001, debug=False, threaded=True) + def main(): global tcp_server, data_client api_thread = threading.Thread(target=start_api_service) @@ -82,13 +81,13 @@ def main(): # 获取所有未浇筑信息 tasks, artifact_list, send_list, half_volume = task_service.process_not_pour_info() - + # 如果API调用失败,等待一段时间再重试 if tasks is None or artifact_list is None: print("获取未浇筑信息失败,稍后重试...") time.sleep(10) continue - + current_artifact_ids = {task["artifact_id"] for task in tasks} # 检查artifact_list是否发生变化 @@ -103,7 +102,7 @@ def main(): for task in tasks: if task["artifact_id"] in new_artifact_ids: task_info = api_client.get_task_info(task["beton_task_id"]) - + # 如果获取任务信息失败,跳过该任务 if task_info is None: print(f"无法获取任务信息,跳过任务: {task['artifact_id']}") @@ -124,15 +123,15 @@ def main(): # 检查 block_number 是否为 "补方" if task["block_number"] == "补方": print(f"任务 {task['artifact_id']} 的 block_number 为 '补方',跳过派单") - task_service.insert_into_produce_table(sql_db, task_info, task["beton_volume"], erp_id, - task["artifact_id"], half_volume, 0) + task_service.insert_into_produce_table(sql_db, task_info, task["beton_volume"], + erp_id,task["artifact_id"], half_volume, 0) continue print(f"处理新任务: {task['artifact_id']}") # 获取任务单信息 - task_service.insert_into_produce_table(sql_db, task_info, task["beton_volume"], erp_id, - task["artifact_id"], half_volume, 1) + task_service.insert_into_produce_table(sql_db, task_info, task["beton_volume"], + erp_id,task["artifact_id"], half_volume, 1) with monitoring_service.tasks_lock: monitoring_service.monitored_tasks.add(erp_id) monitoring_service.inserted_tasks[erp_id] = task["artifact_id"]