Files
Feeding_control_system/service/api_http_client.py

285 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from logging import exception
from typing import Optional, Dict, Any
import time
import threading
import requests
from busisness.models import LoginResponse
from config.ini_manager import ini_manager
class BaseHttpClient:
"""基础HTTP客户端 - 纯网络请求功能"""
def __init__(self):
"""初始化基础HTTP客户端"""
self._session = requests.Session()
self._settings = None
def request(self, method: str, url: str, data: Dict[str, Any] = None,
headers: Dict[str, str] = None, timeout: int = None,
retries: int = 0, retry_interval: float = 1.0,
**kwargs) -> Dict[str, Any]:
"""
发送HTTP请求支持重试机制
Args:
method: HTTP方法
url: 请求URL
data: 请求数据
headers: 请求头
timeout: 超时时间
retries: 重试次数(可选,默认使用配置中的重试次数)
retry_interval: 重试间隔默认1秒
**kwargs: 其他参数
Returns:
Dict[str, Any]: 响应数据
"""
if headers is None:
headers = {}
# 重试逻辑
for attempt in range(retries + 1):
try:
if method.upper() == 'GET':
response = self._session.get(
url, headers=headers, timeout=timeout, **kwargs
)
elif method.upper() == 'POST':
response = self._session.post(
url, json=data, headers=headers, timeout=timeout, **kwargs
)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
# 检查响应状态
response.raise_for_status()
# 解析JSON响应
return response.json()
except Exception as e:
# 如果是最后一次尝试,直接抛出异常
if attempt == retries:
print(f"请求失败(第{attempt + 1}次尝试): {e}")
raise
# 打印重试信息
print(f"请求失败(第{attempt + 1}次尝试),{retry_interval}秒后重试: {e}")
# 等待重试间隔
time.sleep(retry_interval)
def get(self, url: str, headers: Dict[str, str] = None,
timeout: int = None, retries: int = 0, retry_interval: float = 1.0,
**kwargs) -> Dict[str, Any]:
"""
GET请求
Args:
url: 请求URL
headers: 请求头
timeout: 超时时间
**kwargs: 其他参数
Returns:
Dict[str, Any]: 响应数据
"""
return self.request('GET', url, headers=headers, timeout=timeout, retries=retries, retry_interval=retry_interval, **kwargs)
def post(self, url: str, data: Dict[str, Any] = None,
headers: Dict[str, str] = None, timeout: int = None,
retries: int = 0, retry_interval: float = 1.0,
**kwargs) -> Dict[str, Any]:
"""
POST请求
Args:
url: 请求URL
data: 请求数据
headers: 请求头
timeout: 超时时间
retries: 重试次数(可选,默认使用配置中的重试次数)
retry_interval: 重试间隔默认1秒
**kwargs: 其他参数
Returns:
Dict[str, Any]: 响应数据
"""
return self.request('POST', url, data=data, headers=headers, timeout=timeout, retries=retries, retry_interval=retry_interval, **kwargs)
class ApiHttpClient(BaseHttpClient):
"""API客户端 - 业务API调用整合认证和单例功能"""
def __init__(self):
"""初始化API客户端"""
"""初始化API客户端"""
super().__init__()
# 认证缓存
self._auth_cache = {
'app_id': None,
'expire_time': None,
'sign_token': None,
'zr_jwt': None
}
self._cache_lock = threading.RLock()
@property
def settings(self):
"""获取配置对象,由子类实现"""
if self._settings is None:
self._settings = ini_manager
return self._settings
def login(self) -> bool:
"""
用户登录获取AppID
Args:
url: 登录URL可选默认使用配置中的URL
login_model: 登录请求模型(可选,默认使用配置中的模型)
Returns:
bool: 登录是否成功
"""
url = self.settings.api_login_url
login_model = self.settings.api_login_model
print("开始登录...")
try:
# 发送登录请求
response_data = self.request(
method='POST',
url=url,
data=login_model,
timeout=self.settings.api_timeout
)
# 解析登录响应
login_response = LoginResponse(**response_data)
if login_response.Code != 200:
error_msg = login_response.Message or "登录失败"
print(f"获取AppID失败: {error_msg}")
return False
# 更新认证缓存
with self._cache_lock:
self._auth_cache.update({
'app_id': login_response.app_id,
'expire_time': login_response.expire_time,
'sign_token': login_response.sign_token,
'zr_jwt': login_response.zr_jwt
})
print(f"成功获取AppID: {self._auth_cache['app_id']}")
print(f"过期时间: {self._auth_cache['expire_time']}")
return True
except Exception as e:
print(f"登录过程中出现异常: {e}")
self._clear_auth_cache()
return False
def is_app_id_valid(self) -> bool:
"""检查AppID是否有效"""
with self._cache_lock:
expire_time = self._auth_cache.get('expire_time')
if not expire_time:
return False
# 检查是否过期提前12小时过期避免临界情况
try:
expire_timestamp = time.mktime(time.strptime(expire_time, '%Y-%m-%d %H:%M:%S'))
is_valid = time.time() < expire_timestamp - self.settings.api_auth_timeout
if not is_valid:
print("认证信息已过期")
return is_valid
except (ValueError, TypeError):
print("日期格式不正确")
return False
def _get_auth_headers(self) -> Dict[str, str]:
"""获取认证头信息"""
with self._cache_lock:
app_id = self._auth_cache.get('app_id')
headers = {
'AppID': app_id,
'Content-Type': 'application/json'
}
return headers
def get(self, url: str, auth: bool = True, **kwargs) -> Dict[str, Any]:
"""
GET请求支持认证检查
Args:
url: 请求URL
auth: 是否需要认证
timeout: 超时时间
retries: 重试次数
retry_interval: 重试间隔
**kwargs: 其他参数
Returns:
Dict[str, Any]: 响应数据
"""
if auth:
if not self.is_app_id_valid():
self.login()
if not self.is_app_id_valid():
raise Exception("登录失败无法获取有效AppID")
auth_headers = self._get_auth_headers()
return self.request(method='GET', url=url, headers=auth_headers, timeout=self.settings.api_timeout,
retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs)
else:
return self.request(method='GET', url=url, timeout=self.settings.api_timeout,
retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs)
def post(self, url: str, data: Dict[str, Any] = None, auth: bool = True,**kwargs) -> Dict[str, Any]:
"""
POST请求支持认证检查
Args:
url: 请求URL
data: 请求数据
auth: 是否需要认证
timeout: 超时时间
retries: 重试次数
retry_interval: 重试间隔
**kwargs: 其他参数
Returns:
Dict[str, Any]: 响应数据
"""
if auth:
if not self.is_app_id_valid():
self.login()
if not self.is_app_id_valid():
raise Exception("登录失败无法获取有效AppID")
auth_headers = self._get_auth_headers()
return self.request(method='POST', url=url, data=data, headers=auth_headers, timeout=self.settings.api_timeout,
retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs)
else:
return self.request(method='POST', url=url, data=data, timeout=self.settings.api_timeout,
retries=self.settings.api_max_retries, retry_interval=self.settings.api_retry_interval, **kwargs)
def _clear_auth_cache(self):
"""清除认证缓存"""
with self._cache_lock:
self._auth_cache.clear()
# 重新初始化必要的键
self._auth_cache.update({
'app_id': None,
'expire_time': None,
'sign_token': None,
'zr_jwt': None
})
print("认证缓存已清除")
api_http_client = ApiHttpClient()