Files
Feeding_control_system/API/client.py
2025-11-18 10:17:39 +08:00

103 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""API客户端模块"""
import requests
import hashlib
from config.settings import BASE_URL, LOGIN_DATA
import time
class APIClient:
def __init__(self):
self.base_url = BASE_URL
self.login_url = f"{BASE_URL}/api/user/perlogin"
self.mould_info_url = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9"
self.task_info_url = f"{BASE_URL}/api/ext/artifact/task"
self.not_pour_info_url = f"{BASE_URL}/api/ext/artifact/not_pour"
self.app_id = None
def hash_password(self, password):
"""计算SHA256密码"""
return password
def login(self):
"""获取AppID"""
login_data = LOGIN_DATA.copy()
login_data["password"] = self.hash_password(login_data["password"])
try:
response = requests.post(self.login_url, json=login_data, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
self.app_id = data["Data"]["AppID"]
return self.app_id
except requests.exceptions.RequestException as e:
print(f"登录请求异常: {e}")
except Exception as e:
print(f"登录过程中发生未知错误: {e}")
print("登录失败无法获取AppID")
return None
def get_mould_info(self):
"""获取模具的管片信息并提取TaskID"""
if not self.app_id:
print("请先登录获取AppID")
return None
try:
headers = {"AppID": self.app_id}
response = requests.get(self.mould_info_url, headers=headers, timeout=10)
if response.status_code == 205:
data = response.json()
if data.get("Code") == 200:
return data["Data"]["BetonTaskID"]
except requests.exceptions.RequestException as e:
print(f"获取模具信息请求异常: {e}")
except Exception as e:
print(f"获取模具信息过程中发生未知错误: {e}")
print("获取模具信息失败")
return None
def get_task_info(self, task_id):
"""获取任务单信息"""
if not self.app_id:
print("请先登录获取AppID")
return None
try:
headers = {"AppID": self.app_id}
url = f"{self.task_info_url}?TaskId={task_id}"
response = requests.get(url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
return data["Data"]
except requests.exceptions.RequestException as e:
print(f"获取任务单信息请求异常: {e}")
except Exception as e:
print(f"获取任务单信息过程中发生未知错误: {e}")
print("获取任务单信息失败")
return None
def get_not_pour_info(self):
"""获取所有未浇筑信息"""
if not self.app_id:
print("请先登录获取AppID")
return None
try:
headers = {"AppID": self.app_id}
response = requests.get(self.not_pour_info_url, headers=headers, timeout=10)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
return data["Data"]
except requests.exceptions.RequestException as e:
print(f"获取未浇筑信息请求异常: {e}")
except Exception as e:
print(f"获取未浇筑信息过程中发生未知错误: {e}")
print("获取未浇筑信息失败")
return None