from API_TitleServer import * from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from Config import * from loguru import logger from HelperGetUserMusicDetail import getUserFullMusicDetail from HelperMusicDB import getMusicTitle import requests # 日志设置 if False: import sys log_level = "DEBUG" log_format = "{time:YYYY-MM-DD HH:mm:ss.SSS zz} | {level: <8} | Line {line: >4} ({file}): {message}" logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True) logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True) # 水鱼查分器的 API 地址 BASE_URL = 'https://www.diving-fish.com/api/maimaidxprober' # 水鱼查分器的成绩状态转换 COMBO_ID_TO_NAME = ['', 'fc', 'fcp', 'ap', 'app'] SYNC_ID_TO_NAME = ['', 'fs', 'fsp', 'fsd', 'fsdp', 'sync'] def apiDivingFish(method:str, apiPath:str, importToken:str, data=None): '''水鱼查分器的 API 通讯实现''' headers = { "Import-Token": importToken } if method == 'POST': headers['Content-Type'] = 'application/json' logger.info(f'水鱼查分器 API 请求:{method} {BASE_URL + apiPath}') if method == 'POST': response = requests.post( url=BASE_URL + apiPath, json=data, headers=headers, ) elif method == 'GET': response = requests.get( url=BASE_URL + apiPath, headers=headers, ) else: raise NotImplementedError logger.info(f'水鱼查分器请求结果:{response.status_code}') logger.debug(f'水鱼查分器回应:{response.text}') if response.status_code != 200: return False return response.json() def getFishRecords(importToken: str) -> dict: return apiDivingFish('GET', '/player/records', importToken) def updateFishRecords(importToken: str, records: list[dict]) -> dict: return apiDivingFish('POST', '/player/update_records', importToken, records) def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list: '''舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式''' divingFishList = [] for currentMusicDetail in userMusicDetailList: # musicId 大于 100000 属于宴谱,不计入 if currentMusicDetail['musicId'] >= 100000: continue # 获得歌名 currentMusicTitle = getMusicTitle(currentMusicDetail['musicId']) # 如果数据库里未找到此歌曲 if currentMusicTitle == "R_ERR_MUSIC_ID_NOT_IN_DATABASE": logger.warning(f"数据库无此歌曲 跳过: {currentMusicDetail['musicId']}") continue # 每一个乐曲都判断下是 DX 还是标准 if currentMusicDetail['musicId'] >= 10000: notesType = 'DX' else: notesType = 'SD' # 追加进列表 try: divingFishList.append({ 'achievements': (currentMusicDetail['achievement'] / 10000), # 水鱼的成绩是 float 而非舞萌的 int 'title': currentMusicTitle, 'type': notesType, # 我不理解这为什么不能在后端判断 'level_index': currentMusicDetail['level'], 'fc': COMBO_ID_TO_NAME[currentMusicDetail['comboStatus']], 'fs': SYNC_ID_TO_NAME[currentMusicDetail['syncStatus']], 'dxScore': currentMusicDetail['deluxscoreMax'], }) except: print(currentMusicDetail) logger.error(f"Error: {currentMusicDetail}") return divingFishList def isVaildFishToken(importToken:str): '''通过尝试获取一次成绩,检查水鱼查分器的 Token 是否有效 有效返回 True,无效返回 False''' result = apiDivingFish('GET', '/player/records', importToken) logger.debug(f"水鱼查分器 Token 检查结果:{result}") if result == False: logger.info("检查出了一个无效的水鱼token") return False return True def implUserMusicToDivingFish(userId:int, fishImportToken:str): '''上传所有成绩到水鱼的参考实现''' logger.info("开始上传舞萌成绩到水鱼查分器!") userFullMusicDetailList = getUserFullMusicDetail(userId) logger.info("成功得到成绩!转换成水鱼格式..") divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList) logger.info("转换成功!开始上传水鱼..") return updateFishRecords(fishImportToken, divingFishData) if __name__ == '__main__': if True: userId = None importToken = None #currentLoginTimestamp = generateTimestamp() userFullMusicDetailList = getUserFullMusicDetail(userId) logger.warning("Now We Begin To Build DivingFish Data") divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList) logger.debug(divingFishData) logger.warning("Now We Begin To Update DivingFish Data") updateFishRecords(importToken, divingFishData)