#!/usr/bin/env python # -*- coding: utf-8 -*- ''' # @Time : 2025/9/19 09:48 # @Author : reenrr # @File : mock_server.py ''' import socket import json import threading import time from datetime import datetime import os class TCPServerSimulator: def __init__(self, host='127.0.0.1', port=8888, config_file='config.json'): self.host = host self.port = port self.server_socket = None self.is_running = False self.client_sockets = [] self.config_file = config_file # 初始状态为None self.data_template = None # 从配置文件中加载固定数据 self.load_config_data() # 模拟数据模板 if self.data_template is None: self.data_template = { "hopper_up_weight": 0.0, # 上料斗重量 "hopper_down_weight": 0.0 # 下料斗重量 } def load_config_data(self): """从配置文件中加载固定数据""" try: if os.path.exists(self.config_file): with open(self.config_file, 'r', encoding='utf-8') as f: self.data_template = json.load(f) print(f"成功从 {self.config_file} 加载配置数据") else: print(f"配置文件 {self.config_file} 不存在") except Exception as e: print(f"加载配置文件时发生错误:{e},将使用默认数据") self.data_template = None def start(self): """启动服务器""" self.is_running = True self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) print(f"服务器已启动,监听 {self.host}:{self.port}...") # 启动接受连接的线程 accept_thread = threading.Thread(target=self.accept_connections, daemon=True) accept_thread.start() try: while self.is_running: time.sleep(1) except KeyboardInterrupt: print("\n服务器正在关闭...") self.stop() def accept_connections(self): """接受客户端连接""" while self.is_running: try: client_socket, client_address = self.server_socket.accept() self.client_sockets.append(client_socket) print(f"客户端 {client_address} 已连接") # 发送数据 data = self.generate_simulated_data() self.send_data(client_socket, data) print(f"已向客户端 {client_address} 发送数据:{data}") # 启动一个线程监听客户端发送的指令 threading.Thread( target=self.listen_client_commands, args=(client_socket,client_address), daemon=True ).start() except Exception as e: if self.is_running: print(f"接受连接时发生错误: {e}") break def listen_client_commands(self, client_socket, client_address): """监听客户端发送的指令""" while self.is_running and client_socket in self.client_sockets: try: # 接收客户端发送的指令 data = client_socket.recv(1024).decode('utf-8').strip() if data: print(f"客户端 {client_address} 发送指令: {data}") else: print(f"客户端 {client_address} 已断开连接") self.client_sockets.remove(client_socket) client_socket.close() break except Exception as e: print(f"监听客户端 {client_address} 指令时发生错误: {e}") self.client_sockets.remove(client_socket) client_socket.close() break def generate_simulated_data(self): """生成模拟的状态数据""" if self.data_template is None: return None data = self.data_template.copy() data["timestamp"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') return data def send_data(self, client_socket, data): """向客户端发送数据""" try: # 转换为JSON字符串并添加换行符作为结束标记 if data is None: data_str = json.dumps(None) + "\n" else: data_str = json.dumps(data) + "\n" client_socket.sendall(data_str.encode('utf-8')) except Exception as e: print(f"向客户端 {client_socket.getpeername()} 发送数据时发生错误: {e}") def stop(self): """停止服务器""" self.is_running = False # 关闭所有客户端连接 for sock in self.client_sockets: try: sock.close() except Exception as e: print(f"关闭客户端连接时发生错误: {e}") # 关闭服务器套接字 if self.server_socket: try: self.server_socket.close() except Exception as e: print(f"关闭服务器套接字时发生错误: {e}") print("服务器已关闭") if __name__ == '__main__': server = TCPServerSimulator(host='127.0.0.1', port=8888) server.start()