import os import time import hashlib import requests import json import shutil from pathlib import Path from typing import Dict, List, Tuple, Optional STATE_FILE = ".transfer_state.json" MAX_CHUNK_SIZE = 1024 * 1024 # 1MB chunks VALID_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.gif'} MAX_RETRIES = 5 BACKOFF_BASE = 2 # Exponential backoff base def send_images( folder_path: str, server_url: str, site_name: str, line_id: str, purpose: str, max_retries: int = MAX_RETRIES ) -> None: """ 发送图像文件到服务器,支持断点续传和重试机制 Args: folder_path: 本地图像文件夹路径 server_url: 服务器URL (e.g., "http://localhost:5000/upload") site_name: 现场名称 line_id: 现场线号 purpose: 图像用途 (DET/SEG等) max_retries: 最大重试次数 """ folder = Path(folder_path) if not folder.is_dir(): raise ValueError(f"Invalid folder path: {folder_path}") # 初始化状态文件 state_path = folder / STATE_FILE transfer_state = _load_transfer_state(state_path) # 获取待发送文件列表 (过滤状态文件和非图像文件) files_to_send = [ f for f in folder.iterdir() if f.is_file() and f.name != STATE_FILE and f.suffix.lower() in VALID_IMAGE_EXTENSIONS ] for file_path in files_to_send: file_id = _get_file_id(file_path) file_size = file_path.stat().st_size # 初始化文件传输状态 if file_id not in transfer_state: transfer_state[file_id] = { "file_path": str(file_path), "total_size": file_size, "sent": 0, "retry_count": 0 } file_state = transfer_state[file_id] retry_count = 0 last_error = None while file_state["sent"] < file_size and retry_count < max_retries: try: # 发送当前块 chunk_start = file_state["sent"] chunk_end = min(chunk_start + MAX_CHUNK_SIZE, file_size) chunk_size = chunk_end - chunk_start with open(file_path, 'rb') as f: f.seek(chunk_start) chunk_data = f.read(chunk_size) # 准备请求 params = { "site_name": site_name, "line_id": line_id, "purpose": purpose, "file_id": file_id, "start_byte": chunk_start, "total_size": file_size if chunk_start == 0 else None } files = {"chunk": (file_path.name, chunk_data, "application/octet-stream")} # 发送请求 response = requests.post( server_url, params=params, files=files, timeout=30 ) # 处理响应 if response.status_code == 200: # 更新已发送字节数 file_state["sent"] = chunk_end transfer_state[file_id] = file_state _save_transfer_state(state_path, transfer_state) # 传输完成 if file_state["sent"] >= file_size: _cleanup_after_success(file_path, state_path, transfer_state, file_id) retry_count = 0 # 重置重试计数器 else: raise Exception(f"Server error: {response.status_code}, {response.text}") except Exception as e: last_error = str(e) retry_count += 1 file_state["retry_count"] = retry_count transfer_state[file_id] = file_state _save_transfer_state(state_path, transfer_state) # 指数退避重试 wait_time = BACKOFF_BASE ** retry_count print(f"Retry {retry_count}/{max_retries} for {file_path.name} in {wait_time}s: {last_error}") time.sleep(wait_time) # 处理传输失败 if file_state["sent"] < file_size: print(f"Failed to send {file_path.name} after {max_retries} attempts") # 验证服务器连接 if _check_server_health(server_url): print("Server is reachable - skipping this file") else: print("Server unreachable - will retry later") # 重置重试计数器以便下次尝试 file_state["retry_count"] = 0 transfer_state[file_id] = file_state _save_transfer_state(state_path, transfer_state) def _load_transfer_state(state_path: Path) -> Dict: """加载传输状态""" if state_path.exists(): try: with open(state_path, 'r') as f: return json.load(f) except: return {} return {} def _save_transfer_state(state_path: Path, state: Dict) -> None: """原子化保存传输状态""" temp_path = state_path.with_suffix('.tmp') with open(temp_path, 'w') as f: json.dump(state, f, indent=2) shutil.move(str(temp_path), str(state_path)) def _get_file_id(file_path: Path) -> str: """生成文件唯一ID (路径+修改时间+大小)""" stat = file_path.stat() unique_str = f"{file_path.resolve()}|{stat.st_mtime}|{stat.st_size}" return hashlib.sha256(unique_str.encode()).hexdigest()[:16] def _cleanup_after_success( file_path: Path, state_path: Path, transfer_state: Dict, file_id: str ) -> None: """传输成功后清理""" # 删除本地文件 file_path.unlink() # 清除状态记录 if file_id in transfer_state: del transfer_state[file_id] _save_transfer_state(state_path, transfer_state) print(f"Successfully sent and deleted {file_path.name}") def _check_server_health(server_url: str) -> bool: """检查服务器健康状态""" try: # 提取基础URL (移除/upload部分) base_url = server_url.rsplit('/', 1)[0] response = requests.get(f"{base_url}/health", timeout=5) return response.status_code == 200 except: return False if __name__ == "__main__": # 示例用法 send_images( folder_path=r"C:\Users\chuyi\Pictures\test", server_url="http://www.xj-robot.com:6000/upload", site_name="FactoryA", line_id="Line1", purpose="DET" )