Files
zjsh_yolov11/image/transport_client.py
2025-08-13 14:49:06 +08:00

194 lines
6.8 KiB
Python

import os
import time
import hashlib
import requests
import json
import shutil
from pathlib import Path
from typing import Dict, List, Tuple, Optional
STATE_FILE = ".transfer_state.json"
MAX_CHUNK_SIZE = 1024 * 1024 # 1MB chunks
VALID_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.gif'}
MAX_RETRIES = 5
BACKOFF_BASE = 2 # Exponential backoff base
def send_images(
folder_path: str,
server_url: str,
site_name: str,
line_id: str,
purpose: str,
max_retries: int = MAX_RETRIES
) -> None:
"""
发送图像文件到服务器,支持断点续传和重试机制
Args:
folder_path: 本地图像文件夹路径
server_url: 服务器URL (e.g., "http://localhost:5000/upload")
site_name: 现场名称
line_id: 现场线号
purpose: 图像用途 (DET/SEG等)
max_retries: 最大重试次数
"""
folder = Path(folder_path)
if not folder.is_dir():
raise ValueError(f"Invalid folder path: {folder_path}")
# 初始化状态文件
state_path = folder / STATE_FILE
transfer_state = _load_transfer_state(state_path)
# 获取待发送文件列表 (过滤状态文件和非图像文件)
files_to_send = [
f for f in folder.iterdir()
if f.is_file()
and f.name != STATE_FILE
and f.suffix.lower() in VALID_IMAGE_EXTENSIONS
]
for file_path in files_to_send:
file_id = _get_file_id(file_path)
file_size = file_path.stat().st_size
# 初始化文件传输状态
if file_id not in transfer_state:
transfer_state[file_id] = {
"file_path": str(file_path),
"total_size": file_size,
"sent": 0,
"retry_count": 0
}
file_state = transfer_state[file_id]
retry_count = 0
last_error = None
while file_state["sent"] < file_size and retry_count < max_retries:
try:
# 发送当前块
chunk_start = file_state["sent"]
chunk_end = min(chunk_start + MAX_CHUNK_SIZE, file_size)
chunk_size = chunk_end - chunk_start
with open(file_path, 'rb') as f:
f.seek(chunk_start)
chunk_data = f.read(chunk_size)
# 准备请求
params = {
"site_name": site_name,
"line_id": line_id,
"purpose": purpose,
"file_id": file_id,
"start_byte": chunk_start,
"total_size": file_size if chunk_start == 0 else None
}
files = {"chunk": (file_path.name, chunk_data, "application/octet-stream")}
# 发送请求
response = requests.post(
server_url,
params=params,
files=files,
timeout=30
)
# 处理响应
if response.status_code == 200:
# 更新已发送字节数
file_state["sent"] = chunk_end
transfer_state[file_id] = file_state
_save_transfer_state(state_path, transfer_state)
# 传输完成
if file_state["sent"] >= file_size:
_cleanup_after_success(file_path, state_path, transfer_state, file_id)
retry_count = 0 # 重置重试计数器
else:
raise Exception(f"Server error: {response.status_code}, {response.text}")
except Exception as e:
last_error = str(e)
retry_count += 1
file_state["retry_count"] = retry_count
transfer_state[file_id] = file_state
_save_transfer_state(state_path, transfer_state)
# 指数退避重试
wait_time = BACKOFF_BASE ** retry_count
print(f"Retry {retry_count}/{max_retries} for {file_path.name} in {wait_time}s: {last_error}")
time.sleep(wait_time)
# 处理传输失败
if file_state["sent"] < file_size:
print(f"Failed to send {file_path.name} after {max_retries} attempts")
# 验证服务器连接
if _check_server_health(server_url):
print("Server is reachable - skipping this file")
else:
print("Server unreachable - will retry later")
# 重置重试计数器以便下次尝试
file_state["retry_count"] = 0
transfer_state[file_id] = file_state
_save_transfer_state(state_path, transfer_state)
def _load_transfer_state(state_path: Path) -> Dict:
"""加载传输状态"""
if state_path.exists():
try:
with open(state_path, 'r') as f:
return json.load(f)
except:
return {}
return {}
def _save_transfer_state(state_path: Path, state: Dict) -> None:
"""原子化保存传输状态"""
temp_path = state_path.with_suffix('.tmp')
with open(temp_path, 'w') as f:
json.dump(state, f, indent=2)
shutil.move(str(temp_path), str(state_path))
def _get_file_id(file_path: Path) -> str:
"""生成文件唯一ID (路径+修改时间+大小)"""
stat = file_path.stat()
unique_str = f"{file_path.resolve()}|{stat.st_mtime}|{stat.st_size}"
return hashlib.sha256(unique_str.encode()).hexdigest()[:16]
def _cleanup_after_success(
file_path: Path,
state_path: Path,
transfer_state: Dict,
file_id: str
) -> None:
"""传输成功后清理"""
# 删除本地文件
file_path.unlink()
# 清除状态记录
if file_id in transfer_state:
del transfer_state[file_id]
_save_transfer_state(state_path, transfer_state)
print(f"Successfully sent and deleted {file_path.name}")
def _check_server_health(server_url: str) -> bool:
"""检查服务器健康状态"""
try:
# 提取基础URL (移除/upload部分)
base_url = server_url.rsplit('/', 1)[0]
response = requests.get(f"{base_url}/health", timeout=5)
return response.status_code == 200
except:
return False
if __name__ == "__main__":
# 示例用法
send_images(
folder_path=r"C:\Users\chuyi\Pictures\test",
server_url="http://www.xj-robot.com:6000/upload",
site_name="FactoryA",
line_id="Line1",
purpose="DET"
)