Files
Feeding_control_system/insertData.py

225 lines
7.6 KiB
Python
Raw Normal View History

2025-10-28 19:51:36 +08:00
import requests
import pyodbc
from datetime import datetime
import time
# 配置信息
BASE_URL = "https://www.shnthy.com:9154" # 外网地址
LOGIN_URL = f"{BASE_URL}/api/user/perlogin"
MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9"
TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task"
NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour" # 新增接口
# 登录参数
LOGIN_DATA = {
"Program": 11,
"SC": "1000000001",
"loginName": "leduser",
"password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d"
}
# 计算SHA256密码
def hash_password(password):
return password
LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"])
# 获取AppID
def get_app_id():
response = requests.post(LOGIN_URL, json=LOGIN_DATA)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
print(f"获取到AppID: {data['Data']['AppID']}")
return data["Data"]["AppID"]
raise Exception("登录失败无法获取AppID")
# 获取模具的管片信息并提取TaskID
def get_mould_info(app_id):
headers = {"AppID": app_id}
response = requests.get(MOULD_INFO_URL, headers=headers)
if response.status_code == 205:
data = response.json()
if data.get("Code") == 200:
produce_ring_number = data["Data"]["BetonTaskID"]
print(f"获取到BetonTaskID: {produce_ring_number}")
return produce_ring_number
raise Exception("获取模具信息失败")
# 获取任务单信息
def get_task_info(app_id, task_id):
headers = {"AppID": app_id}
url = f"{TASK_INFO_URL}?TaskId={task_id}"
response = requests.get(url, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
task_data = data["Data"]
print(f"获取到任务单信息:")
print(f" TaskID: {task_data['TaskID']}")
print(f" ProduceMixID: {task_data['ProduceMixID']}")
print(f" ProjectName: {task_data['ProjectName']}")
print(f" BetonGrade: {task_data['BetonGrade']}")
print(f" MixID: {task_data['MixID']}")
print(f" PlannedVolume: {task_data['PlannedVolume']}")
print(f" ProducedVolume: {task_data['ProducedVolume']}")
print(f" Progress: {task_data['Progress']}")
print(f" TaskDateText: {task_data['TaskDateText']}")
print(f" TaskStatusText: {task_data['TaskStatusText']}")
return task_data
raise Exception("获取任务单信息失败")
# 获取未浇筑信息(新增)
def get_not_pour_info(app_id):
headers = {"AppID": app_id}
response = requests.get(NOT_POUR_INFO_URL, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get("Code") == 200:
# 处理列表数据
artifact_list = data["Data"]
if len(artifact_list) > 0:
first_artifact = artifact_list[0] # 获取第一个元素
beton_task_id = first_artifact["BetonTaskID"]
beton_volume = first_artifact["BetonVolume"]
artifact_id = first_artifact["ArtifactActionID"] # 获取ArtifactID
print(f"获取到BetonTaskID: {beton_task_id}")
print(f"获取到BetonVolume: {beton_volume}")
print(f"获取到ArtifactActionID: {artifact_id}")
return beton_task_id, beton_volume, artifact_id
else:
raise Exception("未找到未浇筑信息")
raise Exception("获取未浇筑信息失败")
# 连接Access数据库
def connect_to_access_db(db_path, password):
conn_str = (
r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'
f'DBQ={db_path};'
f'PWD={password};'
)
return pyodbc.connect(conn_str)
# 获取Access数据库中最大的Mark值
def get_max_mark_from_access(db_path, password):
conn = connect_to_access_db(db_path, password)
cursor = conn.cursor()
# 查询最大的Mark值
cursor.execute("SELECT MAX(Mark) FROM Produce")
max_mark = cursor.fetchone()[0]
# 如果没有记录返回0
if max_mark is None:
max_mark = 0
conn.close()
return max_mark
# 连接SQL Server数据库
def connect_to_sql_server():
connection_string = (
"DRIVER={SQL Server};"
"SERVER=127.0.0.1;"
"DATABASE=BS23DB;"
"UID=sa;"
"PWD=123;"
)
return pyodbc.connect(connection_string)
# 插入数据到Produce表
def insert_into_produce_table(connection, task_info, beton_volume, erp_id):
cursor = connection.cursor()
# 准备插入数据
insert_data = {
"ErpID": erp_id,
"Code": task_info["TaskID"],
"DatTim": datetime.now(),
"Recipe": task_info["ProduceMixID"],
"MorRec": "",
"ProdMete": beton_volume,
"MorMete": 0.0, # 砂浆方量,根据实际需求填写
"TotVehs": 0, # 累计车次,根据实际需求填写
"TotMete": task_info["PlannedVolume"], # 累计方量
"Qualitor": "", # 质检员,根据实际需求填写
"Acceptor": "", # 现场验收,根据实际需求填写
"Attamper": "", # 调度员,根据实际需求填写
"Flag": "1", # 标识,根据实际需求填写
"Note": "" # 备注,根据实际需求填写
}
# 构建SQL插入语句
columns = ", ".join(insert_data.keys())
placeholders = ", ".join(["?" for _ in insert_data.values()])
sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})"
# 执行插入操作
cursor.execute(sql, list(insert_data.values()))
connection.commit()
print(f"数据已成功插入到Produce表中")
# 主函数
def main():
try:
# 步骤1获取AppID
app_id = get_app_id()
# 上次获取的ArtifactID用于检测变化
last_artifact_id = None
# Access数据库路径和密码
access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径
access_password = "BCS7.2_SDBS" # Access数据库密码
while True:
try:
# 步骤2获取未浇筑信息中的BetonTaskID、BetonVolume和ArtifactID
beton_task_id, beton_volume, artifact_id = get_not_pour_info(app_id)
# beton_task_id = "20251016-01"
# 检查ArtifactID是否发生变化
if artifact_id != last_artifact_id:
print(f"检测到新任务: {artifact_id}")
# 步骤3使用BetonTaskID获取任务单信息
task_info = get_task_info(app_id, beton_task_id)
# 步骤4连接Access数据库并获取最大Mark值
max_mark = get_max_mark_from_access(access_db_path, access_password)
erp_id = int(max_mark) + 1 # 在最大Mark值基础上加1
print(f"获取到ERP ID: {erp_id}")
# 步骤5连接SQL Server数据库并插入数据
connection = connect_to_sql_server()
insert_into_produce_table(connection, task_info, beton_volume, erp_id)
connection.close()
# 更新上次获取的ArtifactID
last_artifact_id = artifact_id
# 每2秒检查一次
time.sleep(2)
except Exception as e:
print(f"发生错误: {e}")
# 继续循环,避免程序退出
time.sleep(2)
except KeyboardInterrupt:
print("程序已停止")
if __name__ == "__main__":
main()