104 lines
3.7 KiB
Python
104 lines
3.7 KiB
Python
"""API客户端模块"""
|
||
import requests
|
||
import hashlib
|
||
from config.settings import BASE_URL, LOGIN_DATA
|
||
import time
|
||
|
||
|
||
class APIClient:
|
||
def __init__(self):
|
||
self.base_url = BASE_URL
|
||
self.login_url = f"{BASE_URL}/api/user/perlogin"
|
||
self.mould_info_url = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9"
|
||
self.task_info_url = f"{BASE_URL}/api/ext/artifact/task"
|
||
self.not_pour_info_url =f"{BASE_URL}/api/ext/mould/not_pour_rfid" #f"{BASE_URL}/api/ext/artifact/not_pour"#调试修改
|
||
self.app_id = None
|
||
|
||
|
||
def hash_password(self, password):
|
||
"""计算SHA256密码"""
|
||
return password
|
||
|
||
def login(self):
|
||
"""获取AppID"""
|
||
login_data = LOGIN_DATA.copy()
|
||
login_data["password"] = self.hash_password(login_data["password"])
|
||
|
||
try:
|
||
response = requests.post(self.login_url, json=login_data, timeout=10)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get("Code") == 200:
|
||
self.app_id = data["Data"]["AppID"]
|
||
return self.app_id
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"登录请求异常: {e}")
|
||
except Exception as e:
|
||
print(f"登录过程中发生未知错误: {e}")
|
||
|
||
print("登录失败,无法获取AppID")
|
||
return None
|
||
|
||
def get_mould_info(self):
|
||
"""获取模具的管片信息并提取TaskID"""
|
||
if not self.app_id:
|
||
print("请先登录获取AppID")
|
||
return None
|
||
|
||
try:
|
||
headers = {"AppID": self.app_id}
|
||
response = requests.get(self.mould_info_url, headers=headers, timeout=10)
|
||
if response.status_code == 205:
|
||
data = response.json()
|
||
if data.get("Code") == 200:
|
||
return data["Data"]["BetonTaskID"]
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"获取模具信息请求异常: {e}")
|
||
except Exception as e:
|
||
print(f"获取模具信息过程中发生未知错误: {e}")
|
||
|
||
print("获取模具信息失败")
|
||
return None
|
||
|
||
def get_task_info(self, task_id):
|
||
"""获取任务单信息"""
|
||
if not self.app_id:
|
||
print("请先登录获取AppID")
|
||
return None
|
||
|
||
try:
|
||
headers = {"AppID": self.app_id}
|
||
url = f"{self.task_info_url}?TaskId={task_id}"
|
||
response = requests.get(url, headers=headers, timeout=10)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get("Code") == 200:
|
||
return data["Data"]
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"获取任务单信息请求异常: {e}")
|
||
except Exception as e:
|
||
print(f"获取任务单信息过程中发生未知错误: {e}")
|
||
|
||
print("获取任务单信息失败")
|
||
return None
|
||
|
||
def get_not_pour_info(self):
|
||
"""获取所有未浇筑信息"""
|
||
if not self.app_id:
|
||
print("请先登录获取AppID")
|
||
return None
|
||
|
||
try:
|
||
headers = {"AppID": self.app_id}
|
||
response = requests.get(self.not_pour_info_url, headers=headers, timeout=10)
|
||
if response.status_code == 200:
|
||
data = response.json()
|
||
if data.get("Code") == 200:
|
||
return data["Data"]
|
||
except requests.exceptions.RequestException as e:
|
||
print(f"获取未浇筑信息请求异常: {e}")
|
||
except Exception as e:
|
||
print(f"获取未浇筑信息过程中发生未知错误: {e}")
|
||
|
||
print("获取未浇筑信息失败")
|
||
return None |