import requests import pyodbc from datetime import datetime import time # 配置信息 BASE_URL = "https://www.shnthy.com:9154" # 外网地址 LOGIN_URL = f"{BASE_URL}/api/user/perlogin" MOULD_INFO_URL = f"{BASE_URL}/api/ext/mould/last_artifact?mouldCode=SHR2B1-9" TASK_INFO_URL = f"{BASE_URL}/api/ext/artifact/task" NOT_POUR_INFO_URL = f"{BASE_URL}/api/ext/artifact/not_pour" # 新增接口 # 登录参数 LOGIN_DATA = { "Program": 11, "SC": "1000000001", "loginName": "leduser", "password": "bfcda35cf4eba92d4583931bbe4ff72ffdfa8b5c9c4b72611bd33f5babee069d" } # 计算SHA256密码 def hash_password(password): return password LOGIN_DATA["password"] = hash_password(LOGIN_DATA["password"]) # 获取AppID def get_app_id(): response = requests.post(LOGIN_URL, json=LOGIN_DATA) if response.status_code == 200: data = response.json() if data.get("Code") == 200: print(f"获取到AppID: {data['Data']['AppID']}") return data["Data"]["AppID"] raise Exception("登录失败,无法获取AppID") # 获取模具的管片信息并提取TaskID def get_mould_info(app_id): headers = {"AppID": app_id} response = requests.get(MOULD_INFO_URL, headers=headers) if response.status_code == 205: data = response.json() if data.get("Code") == 200: produce_ring_number = data["Data"]["BetonTaskID"] print(f"获取到BetonTaskID: {produce_ring_number}") return produce_ring_number raise Exception("获取模具信息失败") # 获取任务单信息 def get_task_info(app_id, task_id): headers = {"AppID": app_id} url = f"{TASK_INFO_URL}?TaskId={task_id}" response = requests.get(url, headers=headers) if response.status_code == 200: data = response.json() if data.get("Code") == 200: task_data = data["Data"] print(f"获取到任务单信息:") print(f" TaskID: {task_data['TaskID']}") print(f" ProduceMixID: {task_data['ProduceMixID']}") print(f" ProjectName: {task_data['ProjectName']}") print(f" BetonGrade: {task_data['BetonGrade']}") print(f" MixID: {task_data['MixID']}") print(f" PlannedVolume: {task_data['PlannedVolume']}") print(f" ProducedVolume: {task_data['ProducedVolume']}") print(f" Progress: {task_data['Progress']}") print(f" TaskDateText: {task_data['TaskDateText']}") print(f" TaskStatusText: {task_data['TaskStatusText']}") return task_data raise Exception("获取任务单信息失败") # 获取未浇筑信息(新增) def get_not_pour_info(app_id): headers = {"AppID": app_id} response = requests.get(NOT_POUR_INFO_URL, headers=headers) if response.status_code == 200: data = response.json() if data.get("Code") == 200: # 处理列表数据 artifact_list = data["Data"] if len(artifact_list) > 0: first_artifact = artifact_list[0] # 获取第一个元素 beton_task_id = first_artifact["BetonTaskID"] beton_volume = first_artifact["BetonVolume"] artifact_id = first_artifact["ArtifactActionID"] # 获取ArtifactID print(f"获取到BetonTaskID: {beton_task_id}") print(f"获取到BetonVolume: {beton_volume}") print(f"获取到ArtifactActionID: {artifact_id}") return beton_task_id, beton_volume, artifact_id else: raise Exception("未找到未浇筑信息") raise Exception("获取未浇筑信息失败") # 连接Access数据库 def connect_to_access_db(db_path, password): conn_str = ( r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};' f'DBQ={db_path};' f'PWD={password};' ) return pyodbc.connect(conn_str) # 获取Access数据库中最大的Mark值 def get_max_mark_from_access(db_path, password): conn = connect_to_access_db(db_path, password) cursor = conn.cursor() # 查询最大的Mark值 cursor.execute("SELECT MAX(Mark) FROM Produce") max_mark = cursor.fetchone()[0] # 如果没有记录,返回0 if max_mark is None: max_mark = 0 conn.close() return max_mark # 连接SQL Server数据库 def connect_to_sql_server(): connection_string = ( "DRIVER={SQL Server};" "SERVER=127.0.0.1;" "DATABASE=BS23DB;" "UID=sa;" "PWD=123;" ) return pyodbc.connect(connection_string) # 插入数据到Produce表 def insert_into_produce_table(connection, task_info, beton_volume, erp_id): cursor = connection.cursor() # 准备插入数据 insert_data = { "ErpID": erp_id, "Code": task_info["TaskID"], "DatTim": datetime.now(), "Recipe": task_info["ProduceMixID"], "MorRec": "", "ProdMete": beton_volume, "MorMete": 0.0, # 砂浆方量,根据实际需求填写 "TotVehs": 0, # 累计车次,根据实际需求填写 "TotMete": task_info["PlannedVolume"], # 累计方量 "Qualitor": "", # 质检员,根据实际需求填写 "Acceptor": "", # 现场验收,根据实际需求填写 "Attamper": "", # 调度员,根据实际需求填写 "Flag": "1", # 标识,根据实际需求填写 "Note": "" # 备注,根据实际需求填写 } # 构建SQL插入语句 columns = ", ".join(insert_data.keys()) placeholders = ", ".join(["?" for _ in insert_data.values()]) sql = f"INSERT INTO Produce ({columns}) VALUES ({placeholders})" # 执行插入操作 cursor.execute(sql, list(insert_data.values())) connection.commit() print(f"数据已成功插入到Produce表中") # 主函数 def main(): try: # 步骤1:获取AppID app_id = get_app_id() # 上次获取的ArtifactID,用于检测变化 last_artifact_id = None # Access数据库路径和密码 access_db_path = "D:\\Janeoo-B12-DB\\Janeoo.2.mdb" # 替换为实际路径 access_password = "BCS7.2_SDBS" # Access数据库密码 while True: try: # 步骤2:获取未浇筑信息中的BetonTaskID、BetonVolume和ArtifactID beton_task_id, beton_volume, artifact_id = get_not_pour_info(app_id) # beton_task_id = "20251016-01" # 检查ArtifactID是否发生变化 if artifact_id != last_artifact_id: print(f"检测到新任务: {artifact_id}") # 步骤3:使用BetonTaskID获取任务单信息 task_info = get_task_info(app_id, beton_task_id) # 步骤4:连接Access数据库并获取最大Mark值 max_mark = get_max_mark_from_access(access_db_path, access_password) erp_id = int(max_mark) + 1 # 在最大Mark值基础上加1 print(f"获取到ERP ID: {erp_id}") # 步骤5:连接SQL Server数据库并插入数据 connection = connect_to_sql_server() insert_into_produce_table(connection, task_info, beton_volume, erp_id) connection.close() # 更新上次获取的ArtifactID last_artifact_id = artifact_id # 每2秒检查一次 time.sleep(2) except Exception as e: print(f"发生错误: {e}") # 继续循环,避免程序退出 time.sleep(2) except KeyboardInterrupt: print("程序已停止") if __name__ == "__main__": main()