chore: 更新最新代码
This commit is contained in:
194
image/transport_client.py
Normal file
194
image/transport_client.py
Normal file
@ -0,0 +1,194 @@
|
||||
import os
|
||||
import time
|
||||
import hashlib
|
||||
import requests
|
||||
import json
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
|
||||
STATE_FILE = ".transfer_state.json"
|
||||
MAX_CHUNK_SIZE = 1024 * 1024 # 1MB chunks
|
||||
VALID_IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.bmp', '.gif'}
|
||||
MAX_RETRIES = 5
|
||||
BACKOFF_BASE = 2 # Exponential backoff base
|
||||
|
||||
def send_images(
|
||||
folder_path: str,
|
||||
server_url: str,
|
||||
site_name: str,
|
||||
line_id: str,
|
||||
purpose: str,
|
||||
max_retries: int = MAX_RETRIES
|
||||
) -> None:
|
||||
"""
|
||||
发送图像文件到服务器,支持断点续传和重试机制
|
||||
|
||||
Args:
|
||||
folder_path: 本地图像文件夹路径
|
||||
server_url: 服务器URL (e.g., "http://localhost:5000/upload")
|
||||
site_name: 现场名称
|
||||
line_id: 现场线号
|
||||
purpose: 图像用途 (DET/SEG等)
|
||||
max_retries: 最大重试次数
|
||||
"""
|
||||
folder = Path(folder_path)
|
||||
if not folder.is_dir():
|
||||
raise ValueError(f"Invalid folder path: {folder_path}")
|
||||
|
||||
# 初始化状态文件
|
||||
state_path = folder / STATE_FILE
|
||||
transfer_state = _load_transfer_state(state_path)
|
||||
|
||||
# 获取待发送文件列表 (过滤状态文件和非图像文件)
|
||||
files_to_send = [
|
||||
f for f in folder.iterdir()
|
||||
if f.is_file()
|
||||
and f.name != STATE_FILE
|
||||
and f.suffix.lower() in VALID_IMAGE_EXTENSIONS
|
||||
]
|
||||
|
||||
for file_path in files_to_send:
|
||||
file_id = _get_file_id(file_path)
|
||||
file_size = file_path.stat().st_size
|
||||
|
||||
# 初始化文件传输状态
|
||||
if file_id not in transfer_state:
|
||||
transfer_state[file_id] = {
|
||||
"file_path": str(file_path),
|
||||
"total_size": file_size,
|
||||
"sent": 0,
|
||||
"retry_count": 0
|
||||
}
|
||||
|
||||
file_state = transfer_state[file_id]
|
||||
retry_count = 0
|
||||
last_error = None
|
||||
|
||||
while file_state["sent"] < file_size and retry_count < max_retries:
|
||||
try:
|
||||
# 发送当前块
|
||||
chunk_start = file_state["sent"]
|
||||
chunk_end = min(chunk_start + MAX_CHUNK_SIZE, file_size)
|
||||
chunk_size = chunk_end - chunk_start
|
||||
|
||||
with open(file_path, 'rb') as f:
|
||||
f.seek(chunk_start)
|
||||
chunk_data = f.read(chunk_size)
|
||||
|
||||
# 准备请求
|
||||
params = {
|
||||
"site_name": site_name,
|
||||
"line_id": line_id,
|
||||
"purpose": purpose,
|
||||
"file_id": file_id,
|
||||
"start_byte": chunk_start,
|
||||
"total_size": file_size if chunk_start == 0 else None
|
||||
}
|
||||
|
||||
files = {"chunk": (file_path.name, chunk_data, "application/octet-stream")}
|
||||
|
||||
# 发送请求
|
||||
response = requests.post(
|
||||
server_url,
|
||||
params=params,
|
||||
files=files,
|
||||
timeout=30
|
||||
)
|
||||
|
||||
# 处理响应
|
||||
if response.status_code == 200:
|
||||
# 更新已发送字节数
|
||||
file_state["sent"] = chunk_end
|
||||
transfer_state[file_id] = file_state
|
||||
_save_transfer_state(state_path, transfer_state)
|
||||
|
||||
# 传输完成
|
||||
if file_state["sent"] >= file_size:
|
||||
_cleanup_after_success(file_path, state_path, transfer_state, file_id)
|
||||
retry_count = 0 # 重置重试计数器
|
||||
else:
|
||||
raise Exception(f"Server error: {response.status_code}, {response.text}")
|
||||
|
||||
except Exception as e:
|
||||
last_error = str(e)
|
||||
retry_count += 1
|
||||
file_state["retry_count"] = retry_count
|
||||
transfer_state[file_id] = file_state
|
||||
_save_transfer_state(state_path, transfer_state)
|
||||
|
||||
# 指数退避重试
|
||||
wait_time = BACKOFF_BASE ** retry_count
|
||||
print(f"Retry {retry_count}/{max_retries} for {file_path.name} in {wait_time}s: {last_error}")
|
||||
time.sleep(wait_time)
|
||||
|
||||
# 处理传输失败
|
||||
if file_state["sent"] < file_size:
|
||||
print(f"Failed to send {file_path.name} after {max_retries} attempts")
|
||||
# 验证服务器连接
|
||||
if _check_server_health(server_url):
|
||||
print("Server is reachable - skipping this file")
|
||||
else:
|
||||
print("Server unreachable - will retry later")
|
||||
# 重置重试计数器以便下次尝试
|
||||
file_state["retry_count"] = 0
|
||||
transfer_state[file_id] = file_state
|
||||
_save_transfer_state(state_path, transfer_state)
|
||||
|
||||
def _load_transfer_state(state_path: Path) -> Dict:
|
||||
"""加载传输状态"""
|
||||
if state_path.exists():
|
||||
try:
|
||||
with open(state_path, 'r') as f:
|
||||
return json.load(f)
|
||||
except:
|
||||
return {}
|
||||
return {}
|
||||
|
||||
def _save_transfer_state(state_path: Path, state: Dict) -> None:
|
||||
"""原子化保存传输状态"""
|
||||
temp_path = state_path.with_suffix('.tmp')
|
||||
with open(temp_path, 'w') as f:
|
||||
json.dump(state, f, indent=2)
|
||||
shutil.move(str(temp_path), str(state_path))
|
||||
|
||||
def _get_file_id(file_path: Path) -> str:
|
||||
"""生成文件唯一ID (路径+修改时间+大小)"""
|
||||
stat = file_path.stat()
|
||||
unique_str = f"{file_path.resolve()}|{stat.st_mtime}|{stat.st_size}"
|
||||
return hashlib.sha256(unique_str.encode()).hexdigest()[:16]
|
||||
|
||||
def _cleanup_after_success(
|
||||
file_path: Path,
|
||||
state_path: Path,
|
||||
transfer_state: Dict,
|
||||
file_id: str
|
||||
) -> None:
|
||||
"""传输成功后清理"""
|
||||
# 删除本地文件
|
||||
file_path.unlink()
|
||||
# 清除状态记录
|
||||
if file_id in transfer_state:
|
||||
del transfer_state[file_id]
|
||||
_save_transfer_state(state_path, transfer_state)
|
||||
print(f"Successfully sent and deleted {file_path.name}")
|
||||
|
||||
def _check_server_health(server_url: str) -> bool:
|
||||
"""检查服务器健康状态"""
|
||||
try:
|
||||
# 提取基础URL (移除/upload部分)
|
||||
base_url = server_url.rsplit('/', 1)[0]
|
||||
response = requests.get(f"{base_url}/health", timeout=5)
|
||||
return response.status_code == 200
|
||||
except:
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
# 示例用法
|
||||
send_images(
|
||||
folder_path=r"C:\Users\chuyi\Pictures\test",
|
||||
server_url="http://www.xj-robot.com:6000/upload",
|
||||
site_name="FactoryA",
|
||||
line_id="Line1",
|
||||
purpose="DET"
|
||||
)
|
||||
Reference in New Issue
Block a user