from API_TitleServer import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from Config import *
from loguru import logger
from HelperGetUserMusicDetail import getUserFullMusicDetail
from HelperMusicDB import getMusicTitle
import requests
# 日志设置
if False:
import sys
log_level = "DEBUG"
log_format = "{time:YYYY-MM-DD HH:mm:ss.SSS zz} | {level: <8} | Line {line: >4} ({file}): {message}"
logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True)
logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True)
# 水鱼查分器的 API 地址
BASE_URL = 'https://www.diving-fish.com/api/maimaidxprober'
# 水鱼查分器的成绩状态转换
COMBO_ID_TO_NAME = ['', 'fc', 'fcp', 'ap', 'app']
SYNC_ID_TO_NAME = ['', 'fs', 'fsp', 'fsd', 'fsdp', 'sync']
def apiDivingFish(method:str, apiPath:str, importToken:str, data=None):
'''水鱼查分器的 API 通讯实现'''
headers = {
"Import-Token": importToken
}
if method == 'POST':
headers['Content-Type'] = 'application/json'
logger.info(f'水鱼查分器 API 请求:{method} {BASE_URL + apiPath}')
if method == 'POST':
response = requests.post(
url=BASE_URL + apiPath,
json=data,
headers=headers,
)
elif method == 'GET':
response = requests.get(
url=BASE_URL + apiPath,
headers=headers,
)
else:
raise NotImplementedError
logger.info(f'水鱼查分器请求结果:{response.status_code}')
logger.debug(f'水鱼查分器回应:{response.text}')
if response.status_code != 200:
return False
return response.json()
def getFishRecords(importToken: str) -> dict:
return apiDivingFish('GET', '/player/records', importToken)
def updateFishRecords(importToken: str, records: list[dict]) -> dict:
return apiDivingFish('POST', '/player/update_records', importToken, records)
def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list:
'''舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式'''
divingFishList = []
for currentMusicDetail in userMusicDetailList:
# musicId 大于 100000 属于宴谱,不计入
if currentMusicDetail['musicId'] >= 100000:
continue
# 获得歌名
currentMusicTitle = getMusicTitle(currentMusicDetail['musicId'])
# 如果数据库里未找到此歌曲
if currentMusicTitle == "R_ERR_MUSIC_ID_NOT_IN_DATABASE":
logger.warning(f"数据库无此歌曲 跳过: {currentMusicDetail['musicId']}")
continue
# 每一个乐曲都判断下是 DX 还是标准
if currentMusicDetail['musicId'] >= 10000:
notesType = 'DX'
else:
notesType = 'SD'
# 追加进列表
try:
divingFishList.append({
'achievements': (currentMusicDetail['achievement'] / 10000), # 水鱼的成绩是 float 而非舞萌的 int
'title': currentMusicTitle,
'type': notesType, # 我不理解这为什么不能在后端判断
'level_index': currentMusicDetail['level'],
'fc': COMBO_ID_TO_NAME[currentMusicDetail['comboStatus']],
'fs': SYNC_ID_TO_NAME[currentMusicDetail['syncStatus']],
'dxScore': currentMusicDetail['deluxscoreMax'],
})
except:
logger.error(f"Fish Format Translate Error: {currentMusicDetail}")
# debug output fish list to file
#with open("fishList.txt", "w", encoding="utf-8") as f:
# f.write(str(divingFishList))
return divingFishList
def isVaildFishToken(importToken:str):
'''通过尝试获取一次成绩,检查水鱼查分器的 Token 是否有效
有效返回 True,无效返回 False'''
result = apiDivingFish('GET', '/player/records', importToken)
logger.debug(f"水鱼查分器 Token 检查结果:{result}")
if result == False:
logger.info("检查出了一个无效的水鱼token")
return False
return True
def implUserMusicToDivingFish(userId:int, fishImportToken:str):
'''上传所有成绩到水鱼的参考实现,返回成绩的数量或者False'''
logger.info("开始上传舞萌成绩到水鱼查分器!")
userFullMusicDetailList = getUserFullMusicDetail(userId)
logger.info("成功得到成绩!转换成水鱼格式..")
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
logger.info("转换成功!开始上传水鱼..")
if not updateFishRecords(fishImportToken, divingFishData)
logger.error("上传失败!")
return False
return len(divingFishData)
if __name__ == '__main__':
if True:
userId = testUid2
importToken = testImportToken
#currentLoginTimestamp = generateTimestamp()
implUserMusicToDivingFish(userId, importToken)