From 12093c9b9b763fb9b511c08bfa4ecc9956894b87 Mon Sep 17 00:00:00 2001 From: mokurin000 <1348292515a@gmail.com> Date: Tue, 29 Jul 2025 02:27:10 +0800 Subject: [PATCH] refactor: module imports --- ActionChangeVersion.py | 5 +- ActionUnlockItem.py | 2 +- Best50_To_Diving_Fish.py | 147 +++++++++++---------- ChargeTicket.py | 117 ++++++++++------- HelperGetUserMusicDetail.py | 61 ++++----- HelperGetUserThing.py | 12 +- HelperLogInOut.py | 79 +++++++----- HelperMisc.py | 159 ++++++++++++----------- HelperMusicDB.py | 6 +- HelperUnlockThing.py | 81 ++++++------ HelperUploadUserPlayLog.py | 247 +++++++++++++++++++----------------- HelperUserAll.py | 247 ++++++++++++++++++++++-------------- 12 files changed, 651 insertions(+), 512 deletions(-) diff --git a/ActionChangeVersion.py b/ActionChangeVersion.py index 1f367d0..5ee4a1c 100644 --- a/ActionChangeVersion.py +++ b/ActionChangeVersion.py @@ -1,10 +1,11 @@ # 改变版本号,实现伪封号和解封号之类 from loguru import logger -from Config import * + from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperFullPlay import implFullPlayAction, generateMusicData from HelperGetUserThing import implGetUser_ +from MyConfig import testUid8 def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str: # Get User Charge @@ -35,7 +36,7 @@ def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) return result if __name__ == "__main__": - userId = testUid2 + userId = testUid8 currentLoginTimestamp = generateTimestamp() loginResult = apiLogin(currentLoginTimestamp, userId) diff --git a/ActionUnlockItem.py b/ActionUnlockItem.py index 532113f..09f5377 100644 --- a/ActionUnlockItem.py +++ b/ActionUnlockItem.py @@ -52,7 +52,7 @@ if __name__ == "__main__": exit() try: logger.info( - implUnlockSingleItem(10, 14, userId, currentLoginTimestamp, loginResult) + implUnlockSingleItem(14, 10, userId, currentLoginTimestamp, loginResult) ) logger.info(apiLogout(currentLoginTimestamp, userId)) finally: diff --git a/Best50_To_Diving_Fish.py b/Best50_To_Diving_Fish.py index 91c5191..a35652b 100644 --- a/Best50_To_Diving_Fish.py +++ b/Best50_To_Diving_Fish.py @@ -1,57 +1,56 @@ -from API_TitleServer import * -from HelperLogInOut import apiLogin, apiLogout, generateTimestamp -from Config import * +import requests from loguru import logger + from HelperGetUserMusicDetail import getUserFullMusicDetail from HelperMusicDB import getMusicTitle -import requests -import rapidjson as json + class divingFishAuthFailError(Exception): pass + class divingFishCommError(Exception): pass + # 水鱼查分器的 API 地址 -BASE_URL = 'https://www.diving-fish.com/api/maimaidxprober' +BASE_URL = "https://www.diving-fish.com/api/maimaidxprober" # 水鱼查分器的成绩状态转换 -COMBO_ID_TO_NAME = ['', 'fc', 'fcp', 'ap', 'app'] -SYNC_ID_TO_NAME = ['', 'fs', 'fsp', 'fsd', 'fsdp', 'sync'] +COMBO_ID_TO_NAME = ["", "fc", "fcp", "ap", "app"] +SYNC_ID_TO_NAME = ["", "fs", "fsp", "fsd", "fsdp", "sync"] -def apiDivingFish(method:str, apiPath:str, importToken:str, data=None): - '''水鱼查分器的 API 通讯实现''' - headers = { - "Import-Token": importToken - } - if method == 'POST': - headers['Content-Type'] = 'application/json' - logger.info(f'水鱼查分器 API 请求:{method} {BASE_URL + apiPath}') - if method == 'POST': + +def apiDivingFish(method: str, apiPath: str, importToken: str, data=None): + """水鱼查分器的 API 通讯实现""" + headers = {"Import-Token": importToken} + if method == "POST": + headers["Content-Type"] = "application/json" + logger.info(f"水鱼查分器 API 请求:{method} {BASE_URL + apiPath}") + if method == "POST": response = requests.post( url=BASE_URL + apiPath, json=data, headers=headers, ) - elif method == 'GET': + elif method == "GET": response = requests.get( url=BASE_URL + apiPath, headers=headers, ) - elif method == 'DELETE': + elif method == "DELETE": response = requests.delete( url=BASE_URL + apiPath, headers=headers, ) else: - logger.error(f'未知的请求方法:{method}') - raise ValueError(f'未知的请求方法:{method}') - - logger.info(f'水鱼查分器请求结果:{response.status_code}') - logger.debug(f'水鱼查分器回应:{response.text}') - finalResponseTextDecode = response.text.encode('utf-8').decode('unicode_escape') - logger.debug(f'水鱼查分器回应解码后:{finalResponseTextDecode}') + logger.error(f"未知的请求方法:{method}") + raise ValueError(f"未知的请求方法:{method}") + + logger.info(f"水鱼查分器请求结果:{response.status_code}") + logger.debug(f"水鱼查分器回应:{response.text}") + finalResponseTextDecode = response.text.encode("utf-8").decode("unicode_escape") + logger.debug(f"水鱼查分器回应解码后:{finalResponseTextDecode}") match response.status_code: case 200: return response.json() @@ -60,89 +59,103 @@ def apiDivingFish(method:str, apiPath:str, importToken:str, data=None): case _: raise divingFishCommError + def getFishRecords(importToken: str) -> dict: - '''获取水鱼查分器的成绩''' - return apiDivingFish('GET', '/player/records', importToken) + """获取水鱼查分器的成绩""" + return apiDivingFish("GET", "/player/records", importToken) + def updateFishRecords(importToken: str, records: list[dict]) -> dict: - '''上传成绩到水鱼查分器''' - return apiDivingFish('POST', '/player/update_records', importToken, records) + """上传成绩到水鱼查分器""" + return apiDivingFish("POST", "/player/update_records", importToken, records) -def resetFishRecords(fishImportToken:str): - '''重置水鱼查分器的用户数据''' - return apiDivingFish('DELETE', '/player/delete_records', fishImportToken) -def getFishUserInfo(userQQ:int): - '''按QQ获取水鱼查分器的用户信息''' - return apiDivingFish('POST', '/query/player', "", {"qq": userQQ}) +def resetFishRecords(fishImportToken: str): + """重置水鱼查分器的用户数据""" + return apiDivingFish("DELETE", "/player/delete_records", fishImportToken) + + +def getFishUserInfo(userQQ: int): + """按QQ获取水鱼查分器的用户信息""" + return apiDivingFish("POST", "/query/player", "", {"qq": userQQ}) + def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list: - '''舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式''' + """舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式""" divingFishList = [] for currentMusicDetail in userMusicDetailList: # musicId 大于 100000 属于宴谱,不计入 - if currentMusicDetail['musicId'] >= 100000: + if currentMusicDetail["musicId"] >= 100000: continue # 获得歌名 - currentMusicTitle = getMusicTitle(currentMusicDetail['musicId']) + currentMusicTitle = getMusicTitle(currentMusicDetail["musicId"]) # 如果数据库里未找到此歌曲 if currentMusicTitle == "R_ERR_MUSIC_ID_NOT_IN_DATABASE": logger.warning(f"数据库无此歌曲 跳过: {currentMusicDetail['musicId']}") continue # 每一个乐曲都判断下是 DX 还是标准 - if currentMusicDetail['musicId'] >= 10000: - notesType = 'DX' + if currentMusicDetail["musicId"] >= 10000: + notesType = "DX" else: - notesType = 'SD' + notesType = "SD" # 追加进列表 try: - divingFishList.append({ - 'achievements': (currentMusicDetail['achievement'] / 10000), # 水鱼的成绩是 float 而非舞萌的 int - 'title': currentMusicTitle, - 'type': notesType, - 'level_index': currentMusicDetail['level'], - 'fc': COMBO_ID_TO_NAME[currentMusicDetail['comboStatus']], - 'fs': SYNC_ID_TO_NAME[currentMusicDetail['syncStatus']], - 'dxScore': currentMusicDetail['deluxscoreMax'], - }) + divingFishList.append( + { + "achievements": ( + currentMusicDetail["achievement"] / 10000 + ), # 水鱼的成绩是 float 而非舞萌的 int + "title": currentMusicTitle, + "type": notesType, + "level_index": currentMusicDetail["level"], + "fc": COMBO_ID_TO_NAME[currentMusicDetail["comboStatus"]], + "fs": SYNC_ID_TO_NAME[currentMusicDetail["syncStatus"]], + "dxScore": currentMusicDetail["deluxscoreMax"], + } + ) except: logger.error(f"无法将 UserMusic 翻译成水鱼格式: {currentMusicDetail}") return divingFishList -def isVaildFishToken(importToken:str): - '''通过尝试获取一次成绩,检查水鱼查分器的 Token 是否有效 - 有效返回 True,无效返回 False''' - result = apiDivingFish('GET', '/player/records', importToken) + +def isVaildFishToken(importToken: str): + """通过尝试获取一次成绩,检查水鱼查分器的 Token 是否有效 + 有效返回 True,无效返回 False""" + result = apiDivingFish("GET", "/player/records", importToken) logger.debug(f"水鱼查分器 Token 检查结果:{result}") if result: return True return False -def implGetUserCurrentDXRating(userQQ:int): - '''获取用户当前的 DX RATING''' + +def implGetUserCurrentDXRating(userQQ: int): + """获取用户当前的 DX RATING""" try: playerData = getFishUserInfo(userQQ) - playerRating = playerData['rating'] + playerRating = playerData["rating"] logger.info(f"用户 {userQQ} 的 DX RATING 是 {playerRating}") except Exception as e: logger.warning(f"无法获取用户 {userQQ} 的 DX RATING: {e}") return False return playerRating -def implUserMusicToDivingFish(userId:int, fishImportToken:str): - '''上传所有成绩到水鱼的参考实现。 + +def implUserMusicToDivingFish(userId: int, fishImportToken: str): + """上传所有成绩到水鱼的参考实现。 返回一个 int 的 ErrorCode。 0: Success 1: Get User Music Fail 2: Auth Fail 3: Comm Error - ''' + """ logger.info("开始尝试上传舞萌成绩到水鱼查分器!") try: userFullMusicDetailList = getUserFullMusicDetail(userId) logger.info("成功得到成绩!转换成水鱼格式..") - divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList) + divingFishData = maimaiUserMusicDetailToDivingFishFormat( + userFullMusicDetailList + ) logger.info("转换成功!开始上传水鱼..") except Exception as e: logger.error(f"获取成绩失败!{e}") @@ -156,8 +169,9 @@ def implUserMusicToDivingFish(userId:int, fishImportToken:str): logger.error("水鱼查分器通讯失败!") return 3 + def generateDebugTestScore(): - '''生成测试成绩''' + """生成测试成绩""" return [ { "achievement": 1010000, @@ -165,7 +179,7 @@ def generateDebugTestScore(): "deluxscoreMax": 4026, "level": 4, "musicId": 834, - "syncStatus": 4 + "syncStatus": 4, }, { "achievement": 1010000, @@ -173,7 +187,6 @@ def generateDebugTestScore(): "deluxscoreMax": 4200, "level": 4, "musicId": 11663, - "syncStatus": 4 - } + "syncStatus": 4, + }, ] - diff --git a/ChargeTicket.py b/ChargeTicket.py index db5e372..2ce7f49 100644 --- a/ChargeTicket.py +++ b/ChargeTicket.py @@ -1,106 +1,127 @@ +from datetime import datetime, timedelta + # 倍票相关 API 的实现 import rapidjson as json import pytz -from datetime import datetime, timedelta +from loguru import logger -from Config import * from API_TitleServer import apiSDGB from HelperGetUserThing import implGetUser_ -from loguru import logger +from Config import ( + clientId, + placeId, + regionId, +) +from MyConfig import testUid2 + from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperFullPlay import implFullPlayAction, generateMusicData from HelperGetUserThing import implGetUser_ -def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str: - '''清空用户所有票的 API 请求器,返回 Json String。''' + +def implWipeTickets(userId: int, currentLoginTimestamp: int, currentLoginResult) -> str: + """清空用户所有票的 API 请求器,返回 Json String。""" # 先得到当前用户的 Charge 数据 currentUserCharge = implGetUser_("Charge", userId) # 取得 List - currentUserChargeList = currentUserCharge['userChargeList'] + currentUserChargeList = currentUserCharge["userChargeList"] # 所有 stock 都置为 0 for charge in currentUserChargeList: - charge['stock'] = 0 + charge["stock"] = 0 musicData = generateMusicData() userAllPatches = { - "upsertUserAll": { - "userChargeList": currentUserChargeList, - "userMusicDetailList": [musicData], - "isNewMusicDetailList": "1" #1避免覆盖 - }} + "upsertUserAll": { + "userChargeList": currentUserChargeList, + "userMusicDetailList": [musicData], + "isNewMusicDetailList": "1", # 1避免覆盖 + } + } - result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) + result = implFullPlayAction( + userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches + ) return result -def apiQueryTicket(userId:int) -> str: - '''查询已有票的 API 请求器,返回 Json String。''' + +def apiQueryTicket(userId: int) -> str: + """查询已有票的 API 请求器,返回 Json String。""" # 构建 Payload - data = json.dumps({ - "userId": userId - }) + data = json.dumps({"userId": userId}) # 发送请求 userdata_result = apiSDGB(data, "GetUserChargeApi", userId) # 返回响应 return userdata_result -def apiBuyTicket(userId:int, ticketType:int, price:int, playerRating:int, playCount:int) -> str: - '''倍票购买 API 的请求器''' - nowTime = datetime.now(pytz.timezone('Asia/Shanghai')) +def apiBuyTicket( + userId: int, ticketType: int, price: int, playerRating: int, playCount: int +) -> str: + """倍票购买 API 的请求器""" + + nowTime = datetime.now(pytz.timezone("Asia/Shanghai")) # 构造请求数据 Payload - data = json.dumps({ - "userId": userId, - "userChargelog": { - "chargeId": ticketType, - "price": price, - "purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"), - "playCount": playCount, - "playerRating": playerRating, - "placeId": placeId, - "regionId": regionId, - "clientId": clientId - }, - "userCharge": { - "chargeId": ticketType, - "stock": 1, - "purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"), - "validDate": (nowTime + timedelta(days=90)).replace(hour=4, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S") + data = json.dumps( + { + "userId": userId, + "userChargelog": { + "chargeId": ticketType, + "price": price, + "purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"), + "playCount": playCount, + "playerRating": playerRating, + "placeId": placeId, + "regionId": regionId, + "clientId": clientId, + }, + "userCharge": { + "chargeId": ticketType, + "stock": 1, + "purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"), + "validDate": (nowTime + timedelta(days=90)) + .replace(hour=4, minute=0, second=0) + .strftime("%Y-%m-%d %H:%M:%S"), + }, } - }) + ) # 发送请求,返回最终得到的 Json String 回执 return apiSDGB(data, "UpsertUserChargelogApi", userId) -def implBuyTicket(userId:int, ticketType:int): - ''' + +def implBuyTicket(userId: int, ticketType: int): + """ 购买倍票 API 的参考实现。 需要事先登录. 返回服务器响应的 Json string。 - ''' + """ # 先使用 GetUserData API 请求器,取得 rating 和 pc 数 currentUserData = implGetUser_("Data", userId) if currentUserData: - playerRating = currentUserData['userData']['playerRating'] - playCount = currentUserData['userData'].get('playCount', 0) + playerRating = currentUserData["userData"]["playerRating"] + playCount = currentUserData["userData"].get("playCount", 0) else: return False # 正式买票 - getTicketResponseStr = apiBuyTicket(userId, ticketType, ticketType-1, playerRating, playCount) + getTicketResponseStr = apiBuyTicket( + userId, ticketType, ticketType - 1, playerRating, playCount + ) # 返回结果 return getTicketResponseStr + if __name__ == "__main__": userId = testUid2 currentLoginTimestamp = generateTimestamp() loginResult = apiLogin(currentLoginTimestamp, userId) - if loginResult['returnCode'] != 1: + if loginResult["returnCode"] != 1: logger.info("登录失败") exit() try: logger.info(implBuyTicket(userId, 2)) # 购买倍票 - #logger.info(apiQueryTicket(userId)) + # logger.info(apiQueryTicket(userId)) finally: logger.info(apiLogout(currentLoginTimestamp, userId)) - #logger.warning("Error") + # logger.warning("Error") diff --git a/HelperGetUserMusicDetail.py b/HelperGetUserMusicDetail.py index 835de76..0e86d03 100644 --- a/HelperGetUserMusicDetail.py +++ b/HelperGetUserMusicDetail.py @@ -1,59 +1,62 @@ # 获取用户成绩的各种实现 -from API_TitleServer import * -from HelperLogInOut import apiLogin, apiLogout, generateTimestamp -from Config import * import rapidjson as json -from HelperMusicDB import getMusicTitle from loguru import logger -import sys + +from HelperMusicDB import getMusicTitle +from API_TitleServer import apiSDGB +from MyConfig import testUid # 日志设置 -#log_level = "DEBUG" -#log_format = "{time:YYYY-MM-DD HH:mm:ss.SSS zz} | {level: <8} | Line {line: >4} ({file}): {message}" -#logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True) -#logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True) +# log_level = "DEBUG" +# log_format = "{time:YYYY-MM-DD HH:mm:ss.SSS zz} | {level: <8} | Line {line: >4} ({file}): {message}" +# logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True) +# logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True) -def getUserMusicDetail(userId:int, nextIndex:int=0, maxCount:int=50) -> dict: + +def getUserMusicDetail(userId: int, nextIndex: int = 0, maxCount: int = 50) -> dict: """获取用户的成绩的API""" - data = json.dumps({ - "userId": int(userId), - "nextIndex": nextIndex, - "maxCount": maxCount - }) + data = json.dumps( + {"userId": int(userId), "nextIndex": nextIndex, "maxCount": maxCount} + ) return json.loads(apiSDGB(data, "GetUserMusicApi", userId)) + def getUserFullMusicDetail(userId: int): """获取用户的全部成绩""" currentUserMusicDetailList = [] - nextIndex:int|None = None # 初始化 nextIndex - while nextIndex != 0 or nextIndex is None: #只要还有nextIndex就一直获取获取 + nextIndex: int | None = None # 初始化 nextIndex + while nextIndex != 0 or nextIndex is None: # 只要还有nextIndex就一直获取获取 userMusicResponse = getUserMusicDetail(userId, nextIndex or 0) - nextIndex = userMusicResponse['nextIndex'] + nextIndex = userMusicResponse["nextIndex"] logger.info(f"NextIndex: {nextIndex}") # 处理已经没有 userMusicList 的情况 - if not userMusicResponse['userMusicList']: + if not userMusicResponse["userMusicList"]: break # 只要还有 userMusicList 就一直加进去,直到全部获取完毕 - for currentMusic in userMusicResponse['userMusicList']: - for currentMusicDetail in currentMusic['userMusicDetailList']: - if not currentMusicDetail['playCount'] > 0: + for currentMusic in userMusicResponse["userMusicList"]: + for currentMusicDetail in currentMusic["userMusicDetailList"]: + if not currentMusicDetail["playCount"] > 0: continue currentUserMusicDetailList.append(currentMusicDetail) return currentUserMusicDetailList + def parseUserFullMusicDetail(userFullMusicDetailList: list): """解析用户的全部成绩,给出一个迫真人类可读 list 套 dict""" musicDetailList = [] for currentMusicDetail in userFullMusicDetailList: - musicDetailList.append({ - '歌名': getMusicTitle(currentMusicDetail['musicId']), - '难度': currentMusicDetail['level'], - '分数': currentMusicDetail['achievement'] / 10000, - 'DX分数': currentMusicDetail['deluxscoreMax'] - }) + musicDetailList.append( + { + "歌名": getMusicTitle(currentMusicDetail["musicId"]), + "难度": currentMusicDetail["level"], + "分数": currentMusicDetail["achievement"] / 10000, + "DX分数": currentMusicDetail["deluxscoreMax"], + } + ) return musicDetailList -if __name__ == '__main__': + +if __name__ == "__main__": userId = testUid userFullMusicDetailList = getUserFullMusicDetail(userId) parsedUserFullMusicDetail = parseUserFullMusicDetail(userFullMusicDetailList) diff --git a/HelperGetUserThing.py b/HelperGetUserThing.py index a98f992..83fd5c2 100644 --- a/HelperGetUserThing.py +++ b/HelperGetUserThing.py @@ -1,9 +1,9 @@ # 获取用户数据的 API 实现 -from loguru import logger import rapidjson as json from API_TitleServer import apiSDGB -def implGetUser_(thing:str, userId:int, noLog=False) -> dict: + +def implGetUser_(thing: str, userId: int, noLog=False) -> dict: """获取用户某些数据的 API 实现,返回 Dict""" # 获取 Json String result = apiGetUserThing(userId, thing, noLog) @@ -12,14 +12,12 @@ def implGetUser_(thing:str, userId:int, noLog=False) -> dict: # 返回 Dict return userthingDict -def apiGetUserThing(userId:int, thing:str, noLog=False) -> str: + +def apiGetUserThing(userId: int, thing: str, noLog=False) -> str: """获取用户数据的 API 请求器,返回 Json String""" # 构建 Payload - data = json.dumps({ - "userId": userId - }) + data = json.dumps({"userId": userId}) # 发送请求 userthing_result = apiSDGB(data, "GetUser" + thing + "Api", userId, noLog) # 返回响应 return userthing_result - diff --git a/HelperLogInOut.py b/HelperLogInOut.py index aab428e..c8f7fdb 100644 --- a/HelperLogInOut.py +++ b/HelperLogInOut.py @@ -1,60 +1,81 @@ # 登录·登出实现 # 一般作为模块使用,但也可以作为 CLI 程序运行以强制登出账号。 -import rapidjson as json import time -from loguru import logger import random -from Config import * -from API_TitleServer import apiSDGB +import rapidjson as json +from loguru import logger -def apiLogin(timestamp:int, userId:int, noLog:bool=False) -> dict: +from API_TitleServer import apiSDGB +from Config import ( + clientId, + placeId, + regionId, +) +from MyConfig import testUid + + +def apiLogin(timestamp: int, userId: int, noLog: bool = False) -> dict: """登录,返回 dict""" - data = json.dumps({ - "userId": userId, - "accessCode": "", - "regionId": regionId, - "placeId": placeId, - "clientId": clientId, - "dateTime": timestamp, - "isContinue": False, - "genericFlag": 0, - }) + data = json.dumps( + { + "userId": userId, + "accessCode": "", + "regionId": regionId, + "placeId": placeId, + "clientId": clientId, + "dateTime": timestamp, + "isContinue": False, + "genericFlag": 0, + } + ) login_result = json.loads(apiSDGB(data, "UserLoginApi", userId, noLog)) if not noLog: - logger.info("登录:结果:"+ str(login_result)) + logger.info("登录:结果:" + str(login_result)) return login_result -def apiLogout(timestamp:int, userId:int, noLog:bool=False) -> dict: + +def apiLogout(timestamp: int, userId: int, noLog: bool = False) -> dict: """登出,返回 dict""" - data = json.dumps({ - "userId": userId, - "accessCode": "", - "regionId": regionId, - "placeId": placeId, - "clientId": clientId, - "dateTime": timestamp, - "type": 1 - }) + data = json.dumps( + { + "userId": userId, + "accessCode": "", + "regionId": regionId, + "placeId": placeId, + "clientId": clientId, + "dateTime": timestamp, + "type": 1, + } + ) logout_result = json.loads(apiSDGB(data, "UserLogoutApi", userId, noLog)) if not noLog: - logger.info("登出:结果:"+ str(logout_result)) + logger.info("登出:结果:" + str(logout_result)) return logout_result + def generateTimestampLegacy() -> int: """生成一个凑合用的时间戳""" timestamp = int(time.time()) - 60 logger.info(f"生成时间戳: {timestamp}") return timestamp + def generateTimestamp() -> int: """生成一个今天早上 10:00 随机偏移的时间戳""" - timestamp = int(time.mktime(time.strptime(time.strftime("%Y-%m-%d 10:00:00"), "%Y-%m-%d %H:%M:%S"))) + random.randint(-600, 600) + timestamp = int( + time.mktime( + time.strptime(time.strftime("%Y-%m-%d 10:00:00"), "%Y-%m-%d %H:%M:%S") + ) + ) + random.randint(-600, 600) logger.info(f"生成时间戳: {timestamp}") - logger.info(f"此时间戳对应的时间为: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))}") + logger.info( + f"此时间戳对应的时间为: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))}" + ) return timestamp + if __name__ == "__main__": print("强制登出 CLI") uid = testUid diff --git a/HelperMisc.py b/HelperMisc.py index c6abdbe..f6a7cce 100644 --- a/HelperMisc.py +++ b/HelperMisc.py @@ -4,9 +4,10 @@ import rapidjson as json from loguru import logger from HelperGetUserThing import implGetUser_ import unicodedata -from Config import * from HelperLogInOut import apiLogin, apiLogout, generateTimestamp +from MyConfig import testUid + def numberToLetter(number): """ @@ -17,10 +18,11 @@ def numberToLetter(number): else: return None + def maimaiVersionToHumanReadable(romVersion: str, dataVersion: str) -> str: try: - romVersionList = romVersion.split('.') - dataVersionList = dataVersion.split('.') + romVersionList = romVersion.split(".") + dataVersionList = dataVersion.split(".") except Exception as e: logger.warning(f"无法解析版本号: {romVersion} {dataVersion},错误:{e}") return "无效版本号:无法解析" @@ -52,24 +54,19 @@ def maimaiVersionToHumanReadable(romVersion: str, dataVersion: str) -> str: finalVersionString = f"{versionStringPrefix}{finalVersionList[0]}.{finalVersionList[1]}{finalVersionList[2]}" return finalVersionString - -levelIdDict = { - "绿": 0, - "黄": 1, - "红": 2, - "紫": 3, - "白": 4, - "宴": 5 -} + +levelIdDict = {"绿": 0, "黄": 1, "红": 2, "紫": 3, "白": 4, "宴": 5} + def getHalfWidthString(s): """全角转半角,舞萌ID用""" - return unicodedata.normalize('NFKC', s) + return unicodedata.normalize("NFKC", s) + def getHumanReadableLoginErrorCode(loginResult) -> str: - '''解析登录结果并且给出中文的报错解释''' - match loginResult['returnCode']: + """解析登录结果并且给出中文的报错解释""" + match loginResult["returnCode"]: case 1: return False case 100: @@ -79,10 +76,11 @@ def getHumanReadableLoginErrorCode(loginResult) -> str: case 103: return "❌ 试图登录的账号 UID 无效,请检查账号是否正确。" case _: - return "❌ 登录失败!这不应该发生,请反馈此问题。错误详情:"+ loginResult + return "❌ 登录失败!这不应该发生,请反馈此问题。错误详情:" + loginResult + def checkTechnologyUseCount(userId: int) -> int: - '''猜测账号是否用了科技,0没用过,其他为用过''' + """猜测账号是否用了科技,0没用过,其他为用过""" userData1 = implGetUser_("Data", userId) userData = userData1.get("userData", {}) userRegion = implGetUser_("Region", userId) @@ -92,13 +90,16 @@ def checkTechnologyUseCount(userId: int) -> int: allRegionPlayCount = 0 for region in userRegionList: allRegionPlayCount += region.get("playCount", 0) - logger.info(f"用户 {userId} 的总游玩次数: {playCount}, 各地区游玩次数: {allRegionPlayCount}") + logger.info( + f"用户 {userId} 的总游玩次数: {playCount}, 各地区游玩次数: {allRegionPlayCount}" + ) # 计算全部的 Region 加起来的游玩次数是否和 playCount 对不上,对不上就是用了科技 # 返回差值 return playCount - allRegionPlayCount -def getFriendlyUserData(userId:int) -> str: - '''生成一个(相对)友好的UserData的人话''' + +def getFriendlyUserData(userId: int) -> str: + """生成一个(相对)友好的UserData的人话""" userData1 = implGetUser_("Data", userId) userData = userData1.get("userData", {}) userRegion = implGetUser_("Region", userId) @@ -114,7 +115,7 @@ def getFriendlyUserData(userId:int) -> str: result += f"最近登录版本: {maimaiVersionToHumanReadable(userData.get('lastRomVersion'), userData.get('lastDataVersion'))} " result += f"最近登录地区: {userData.get('lastRegionName', '未知')}\n" result += f"注册日期: {userData.get('firstPlayDate')} " - result += f"注册版本: {maimaiVersionToHumanReadable(userData.get('firstRomVersion'),userData.get('firstDataVersion'))}\n" + result += f"注册版本: {maimaiVersionToHumanReadable(userData.get('firstRomVersion'), userData.get('firstDataVersion'))}\n" result += f"封号状态(banState): {banState}\n" try: logger.info(userRegion) @@ -124,28 +125,31 @@ def getFriendlyUserData(userId:int) -> str: return result -def getHumanReadableRegionData(userRegion:str) -> str: - '''生成一个人类可读的地区数据''' + +def getHumanReadableRegionData(userRegion: str) -> str: + """生成一个人类可读的地区数据""" userRegionList = userRegion.get("userRegionList") logger.info(userRegionList) result = "" for region in userRegionList: - regionName = WAHLAP_REGIONS.get(region['regionId'], '未知') - playCount = region['playCount'] - created = region['created'] + regionName = WAHLAP_REGIONS.get(region["regionId"], "未知") + playCount = region["playCount"] + created = region["created"] result += f"\n{regionName} 游玩次数: {playCount} 首次游玩: {created}" return result -def getHumanReadablePreview(preview_json_content:str) -> str: - '''简单,粗略地解释 Preview 的 Json String 为人话。''' + +def getHumanReadablePreview(preview_json_content: str) -> str: + """简单,粗略地解释 Preview 的 Json String 为人话。""" previewData = json.loads(preview_json_content) - userName = getHalfWidthString(previewData['userName']) - playerRating = previewData['playerRating'] + userName = getHalfWidthString(previewData["userName"]) + playerRating = previewData["playerRating"] finalString = f"用户名:{userName}\nDX RATING:{playerRating}\n" return finalString + def getHumanReadableLoginBonusList(jsonString: str): - '''生成一个人类可读的 Login Bonus 的列表''' + """生成一个人类可读的 Login Bonus 的列表""" data = json.loads(jsonString) result = [] @@ -157,32 +161,34 @@ def getHumanReadableLoginBonusList(jsonString: str): result.append(line) resultString = "" - for line in result: # 转成字符串 + for line in result: # 转成字符串 resultString += line + "\n" return resultString + def getHumanReadableTicketList(jsonString: str): - '''生成一个人类可读的 UserCharge 的列表''' + """生成一个人类可读的 UserCharge 的列表""" data = json.loads(jsonString) - userId = data['userId'] - length = data['length'] - userChargeList = data['userChargeList'] + userId = data["userId"] + length = data["length"] + userChargeList = data["userChargeList"] result = f"UID: {userId} 票槽大小: {length} 所有记录:" for currentItem in userChargeList: - chargeId = currentItem['chargeId'] - stock = currentItem['stock'] - purchaseDate = currentItem['purchaseDate'] - validDate = currentItem['validDate'] + chargeId = currentItem["chargeId"] + stock = currentItem["stock"] + purchaseDate = currentItem["purchaseDate"] + validDate = currentItem["validDate"] result += f"\nID: {chargeId} 持有: {stock}, 购买日期: {purchaseDate}, 有效期限: {validDate}" return result + def getHumanReadableUserData(userData) -> str: - '''生成一个人类可读的 UserData 的数据(比较详细)''' + """生成一个人类可读的 UserData 的数据(比较详细)""" userId = userData.get("userId") userData = userData.get("userData", {}) banState = userData.get("banState") @@ -239,39 +245,40 @@ def getHumanReadableUserData(userData) -> str: result += f"封号状态: {banState}\n" return result + WAHLAP_REGIONS = { - 1: '北京', - 2: '重庆', - 3: '上海', - 4: '天津', - 5: '安徽', - 6: '福建', - 7: '甘肃', - 8: '广东', - 9: '贵州', - 10: '海南', - 11: '河北', - 12: '黑龙江', - 13: '河南', - 14: '湖北', - 15: '湖南', - 16: '江苏', - 17: '江西', - 18: '吉林', - 19: '辽宁', - 20: '青海', - 21: '陕西', - 22: '山东', - 23: '山西', - 24: '四川', - 25: '(未知25)', - 26: '云南', - 27: '浙江', - 28: '广西', - 29: '内蒙古', - 30: '宁夏', - 31: '新疆', - 32: '西藏', + 1: "北京", + 2: "重庆", + 3: "上海", + 4: "天津", + 5: "安徽", + 6: "福建", + 7: "甘肃", + 8: "广东", + 9: "贵州", + 10: "海南", + 11: "河北", + 12: "黑龙江", + 13: "河南", + 14: "湖北", + 15: "湖南", + 16: "江苏", + 17: "江西", + 18: "吉林", + 19: "辽宁", + 20: "青海", + 21: "陕西", + 22: "山东", + 23: "山西", + 24: "四川", + 25: "(未知25)", + 26: "云南", + 27: "浙江", + 28: "广西", + 29: "内蒙古", + 30: "宁夏", + 31: "新疆", + 32: "西藏", } if __name__ == "__main__": @@ -284,12 +291,12 @@ if __name__ == "__main__": currentLoginTimestamp = generateTimestamp() loginResult = apiLogin(currentLoginTimestamp, userId) - if loginResult['returnCode'] != 1: + if loginResult["returnCode"] != 1: logger.info("登录失败") exit() try: logger.info(checkTechnologyUseCount(userId)) - #logger.info(apiQueryTicket(userId)) + # logger.info(apiQueryTicket(userId)) finally: logger.info(apiLogout(currentLoginTimestamp, userId)) - #logger.warning("Error") + # logger.warning("Error") diff --git a/HelperMusicDB.py b/HelperMusicDB.py index f089aab..d65f284 100644 --- a/HelperMusicDB.py +++ b/HelperMusicDB.py @@ -4,11 +4,11 @@ from loguru import logger def getMusicTitle(musicId: int) -> str: """从数据库获取音乐的标题""" - #logger.debug(f"查询歌名: {musicId}") + # logger.debug(f"查询歌名: {musicId}") musicInfo = musicDB.get(musicId) if not musicInfo: logger.warning(f"数据库里未找到此歌曲: {musicId}") return "R_ERR_MUSIC_ID_NOT_IN_DATABASE" musicName = musicInfo.get("name") - #logger.debug(f"成功查询到歌名: {musicName}") - return musicName \ No newline at end of file + # logger.debug(f"成功查询到歌名: {musicName}") + return musicName diff --git a/HelperUnlockThing.py b/HelperUnlockThing.py index 6ca4e35..eaaef89 100644 --- a/HelperUnlockThing.py +++ b/HelperUnlockThing.py @@ -1,43 +1,48 @@ # 解锁东西的一个通用的助手,不可独立使用 -from loguru import logger -from Config import * from HelperFullPlay import implFullPlayAction -def implUnlockThing(newUserItemList, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str: - musicData= ({ - "musicId": 11538, # Amber Chronicle - "level": 0, - "playCount": 1, - "achievement": 0, - "comboStatus": 0, - "syncStatus": 0, - "deluxscoreMax": 0, - "scoreRank": 0, - "extNum1": 0 - }) + +def implUnlockThing( + newUserItemList, userId: int, currentLoginTimestamp: int, currentLoginResult +) -> str: + musicData = { + "musicId": 11538, # Amber Chronicle + "level": 0, + "playCount": 1, + "achievement": 0, + "comboStatus": 0, + "syncStatus": 0, + "deluxscoreMax": 0, + "scoreRank": 0, + "extNum1": 0, + } userAllPatches = { - "upsertUserAll": { - "userMusicDetailList": [musicData], - "isNewMusicDetailList": "1", - "userItemList": newUserItemList, - "isNewItemList": "1" * len(newUserItemList) - }} - result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) + "upsertUserAll": { + "userMusicDetailList": [musicData], + "isNewMusicDetailList": "1", + "userItemList": newUserItemList, + "isNewItemList": "1" * len(newUserItemList), + } + } + result = implFullPlayAction( + userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches + ) return result + itemKindDict = { - "PLATE": 1, # 姓名框 - "TITLE": 2, # 称号 - "ICON": 3, # 头像 - "MUSIC": 5, # 歌 - "MUSIC_MASTER": 6, # 紫谱 - "MUSIC_RE_MASTER": 7,# 白谱 - "CHARACTER": 9, # 旅行伙伴 - "PARTNER": 10, # 搭档 - "FRAME": 11, # 背景板 - "TICKET": 12 # 功能票 -# "PRESENT": 4, # ? -# "MUSIC_STRONG": 8, # ? + "PLATE": 1, # 姓名框 + "TITLE": 2, # 称号 + "ICON": 3, # 头像 + "MUSIC": 5, # 歌 + "MUSIC_MASTER": 6, # 紫谱 + "MUSIC_RE_MASTER": 7, # 白谱 + "CHARACTER": 9, # 旅行伙伴 + "PARTNER": 10, # 搭档 + "FRAME": 11, # 背景板 + "TICKET": 12, # 功能票 + # "PRESENT": 4, # ? + # "MUSIC_STRONG": 8, # ? } itemKindzhCNDict = { @@ -50,9 +55,9 @@ itemKindzhCNDict = { "旅行伙伴": "CHARACTER", "搭档": "PARTNER", "背景板": "FRAME", - "功能票": "TICKET" -# "礼物": "PRESENT", -# "STRONG": "MUSIC_STRONG", + "功能票": "TICKET", + # "礼物": "PRESENT", + # "STRONG": "MUSIC_STRONG", } partnerList = { @@ -75,5 +80,5 @@ partnerList = { "26": "黒姫", "27": "俊达萌", "28": "乙姫(2024)", - "29": "青柠熊&柠檬熊(2024)" -} \ No newline at end of file + "29": "青柠熊&柠檬熊(2024)", +} diff --git a/HelperUploadUserPlayLog.py b/HelperUploadUserPlayLog.py index 8ab526e..44dadb5 100644 --- a/HelperUploadUserPlayLog.py +++ b/HelperUploadUserPlayLog.py @@ -8,133 +8,146 @@ from datetime import datetime from loguru import logger from API_TitleServer import apiSDGB -from Config import * +from Config import ( + placeId, + placeName, +) -def apiUploadUserPlaylog(userId:int, musicDataToBeUploaded, currentUserData2, loginId:int) -> str: + +def apiUploadUserPlaylog( + userId: int, musicDataToBeUploaded, currentUserData2, loginId: int +) -> str: """ 上传一个 UserPlayLog。 注意:成绩为随机的空成绩,只用作占位 返回 Json String。""" # 构建一个 PlayLog - data = json.dumps({ - "userId": int(userId), - "userPlaylogList": [ + data = json.dumps( { - "userId": 0, - "orderId": 0, - "playlogId": loginId, - "version": 1051000, - "placeId": placeId, - "placeName": placeName, - "loginDate": int(time.time()), #似乎和登录timestamp不同 - "playDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d'), - "userPlayDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0', - "type": 0, - "musicId": int(musicDataToBeUploaded['musicId']), - "level": int(musicDataToBeUploaded['level']), - "trackNo": 1, - "vsMode": 0, - "vsUserName": "", - "vsStatus": 0, - "vsUserRating": 0, - "vsUserAchievement": 0, - "vsUserGradeRank": 0, - "vsRank": 0, - "playerNum": 1, - "playedUserId1": 0, - "playedUserName1": "", - "playedMusicLevel1": 0, - "playedUserId2": 0, - "playedUserName2": "", - "playedMusicLevel2": 0, - "playedUserId3": 0, - "playedUserName3": "", - "playedMusicLevel3": 0, - "characterId1": currentUserData2['charaSlot'][0], - "characterLevel1": random.randint(1000,6500), - "characterAwakening1": 5, - "characterId2": currentUserData2['charaSlot'][1], - "characterLevel2": random.randint(1000,6500), - "characterAwakening2": 5, - "characterId3": currentUserData2['charaSlot'][2], - "characterLevel3": random.randint(1000,6500), - "characterAwakening3": 5, - "characterId4": currentUserData2['charaSlot'][3], - "characterLevel4": random.randint(1000,6500), - "characterAwakening4": 5, - "characterId5": currentUserData2['charaSlot'][4], - "characterLevel5": random.randint(1000,6500), - "characterAwakening5": 5, - "achievement": int(musicDataToBeUploaded['achievement']), - "deluxscore": int(musicDataToBeUploaded['deluxscoreMax']), - "scoreRank": int(musicDataToBeUploaded['scoreRank']), - "maxCombo": 0, - "totalCombo": random.randint(700,900), - "maxSync": 0, - "totalSync": 0, - "tapCriticalPerfect": 0, - "tapPerfect": 0, - "tapGreat": 0, - "tapGood": 0, - "tapMiss": random.randint(1,10), - "holdCriticalPerfect": 0, - "holdPerfect": 0, - "holdGreat": 0, - "holdGood": 0, - "holdMiss": random.randint(1,15), - "slideCriticalPerfect": 0, - "slidePerfect": 0, - "slideGreat": 0, - "slideGood": 0, - "slideMiss": random.randint(1,15), - "touchCriticalPerfect": 0, - "touchPerfect": 0, - "touchGreat": 0, - "touchGood": 0, - "touchMiss": random.randint(1,15), - "breakCriticalPerfect": 0, - "breakPerfect": 0, - "breakGreat": 0, - "breakGood": 0, - "breakMiss": random.randint(1,15), - "isTap": True, - "isHold": True, - "isSlide": True, - "isTouch": True, - "isBreak": True, - "isCriticalDisp": True, - "isFastLateDisp": True, - "fastCount": 0, - "lateCount": 0, - "isAchieveNewRecord": True, - "isDeluxscoreNewRecord": True, - "comboStatus": 0, - "syncStatus": 0, - "isClear": False, - "beforeRating": currentUserData2['playerRating'], - "afterRating": currentUserData2['playerRating'], - "beforeGrade": 0, - "afterGrade": 0, - "afterGradeRank": 1, - "beforeDeluxRating": currentUserData2['playerRating'], - "afterDeluxRating": currentUserData2['playerRating'], - "isPlayTutorial": False, - "isEventMode": False, - "isFreedomMode": False, - "playMode": 0, - "isNewFree": False, - "trialPlayAchievement": -1, - "extNum1": 0, - "extNum2": 0, - "extNum4": 3020, - "extBool1": False, - "extBool2": False + "userId": int(userId), + "userPlaylogList": [ + { + "userId": 0, + "orderId": 0, + "playlogId": loginId, + "version": 1051000, + "placeId": placeId, + "placeName": placeName, + "loginDate": int(time.time()), # 似乎和登录timestamp不同 + "playDate": datetime.now(pytz.timezone("Asia/Shanghai")).strftime( + "%Y-%m-%d" + ), + "userPlayDate": datetime.now( + pytz.timezone("Asia/Shanghai") + ).strftime("%Y-%m-%d %H:%M:%S") + + ".0", + "type": 0, + "musicId": int(musicDataToBeUploaded["musicId"]), + "level": int(musicDataToBeUploaded["level"]), + "trackNo": 1, + "vsMode": 0, + "vsUserName": "", + "vsStatus": 0, + "vsUserRating": 0, + "vsUserAchievement": 0, + "vsUserGradeRank": 0, + "vsRank": 0, + "playerNum": 1, + "playedUserId1": 0, + "playedUserName1": "", + "playedMusicLevel1": 0, + "playedUserId2": 0, + "playedUserName2": "", + "playedMusicLevel2": 0, + "playedUserId3": 0, + "playedUserName3": "", + "playedMusicLevel3": 0, + "characterId1": currentUserData2["charaSlot"][0], + "characterLevel1": random.randint(1000, 6500), + "characterAwakening1": 5, + "characterId2": currentUserData2["charaSlot"][1], + "characterLevel2": random.randint(1000, 6500), + "characterAwakening2": 5, + "characterId3": currentUserData2["charaSlot"][2], + "characterLevel3": random.randint(1000, 6500), + "characterAwakening3": 5, + "characterId4": currentUserData2["charaSlot"][3], + "characterLevel4": random.randint(1000, 6500), + "characterAwakening4": 5, + "characterId5": currentUserData2["charaSlot"][4], + "characterLevel5": random.randint(1000, 6500), + "characterAwakening5": 5, + "achievement": int(musicDataToBeUploaded["achievement"]), + "deluxscore": int(musicDataToBeUploaded["deluxscoreMax"]), + "scoreRank": int(musicDataToBeUploaded["scoreRank"]), + "maxCombo": 0, + "totalCombo": random.randint(700, 900), + "maxSync": 0, + "totalSync": 0, + "tapCriticalPerfect": 0, + "tapPerfect": 0, + "tapGreat": 0, + "tapGood": 0, + "tapMiss": random.randint(1, 10), + "holdCriticalPerfect": 0, + "holdPerfect": 0, + "holdGreat": 0, + "holdGood": 0, + "holdMiss": random.randint(1, 15), + "slideCriticalPerfect": 0, + "slidePerfect": 0, + "slideGreat": 0, + "slideGood": 0, + "slideMiss": random.randint(1, 15), + "touchCriticalPerfect": 0, + "touchPerfect": 0, + "touchGreat": 0, + "touchGood": 0, + "touchMiss": random.randint(1, 15), + "breakCriticalPerfect": 0, + "breakPerfect": 0, + "breakGreat": 0, + "breakGood": 0, + "breakMiss": random.randint(1, 15), + "isTap": True, + "isHold": True, + "isSlide": True, + "isTouch": True, + "isBreak": True, + "isCriticalDisp": True, + "isFastLateDisp": True, + "fastCount": 0, + "lateCount": 0, + "isAchieveNewRecord": True, + "isDeluxscoreNewRecord": True, + "comboStatus": 0, + "syncStatus": 0, + "isClear": False, + "beforeRating": currentUserData2["playerRating"], + "afterRating": currentUserData2["playerRating"], + "beforeGrade": 0, + "afterGrade": 0, + "afterGradeRank": 1, + "beforeDeluxRating": currentUserData2["playerRating"], + "afterDeluxRating": currentUserData2["playerRating"], + "isPlayTutorial": False, + "isEventMode": False, + "isFreedomMode": False, + "playMode": 0, + "isNewFree": False, + "trialPlayAchievement": -1, + "extNum1": 0, + "extNum2": 0, + "extNum4": 3020, + "extBool1": False, + "extBool2": False, + } + ], } - ] -}) + ) # 发送请求 result = apiSDGB(data, "UploadUserPlaylogListApi", userId) - logger.info("上传游玩记录:结果:"+ str(result)) + logger.info("上传游玩记录:结果:" + str(result)) # 返回响应 return result diff --git a/HelperUserAll.py b/HelperUserAll.py index 49b65b5..c597344 100644 --- a/HelperUserAll.py +++ b/HelperUserAll.py @@ -2,12 +2,20 @@ import pytz from datetime import datetime -from Config import * -from HelperGetUserThing import implGetUser_ - -from HelperGetUserMusicDetail import getUserMusicDetail from loguru import logger +from HelperGetUserThing import implGetUser_ +from HelperGetUserMusicDetail import getUserMusicDetail + +from Config import ( + clientId, + placeName, + placeId, + regionId, + regionName, +) + + def isNewMusicType(userId, musicId, level) -> str: """判断这首 musicId 在 isNewMusicDetailList 应该填什么 0: Edit @@ -16,21 +24,38 @@ def isNewMusicType(userId, musicId, level) -> str: 未完工,仅供测试 """ - userMusicDetailList = getUserMusicDetail(userId, musicId, 1)['userMusicList'][0]['userMusicDetailList'] + userMusicDetailList = getUserMusicDetail(userId, musicId, 1)["userMusicList"][0][ + "userMusicDetailList" + ] logger.info(userMusicDetailList) try: - if userMusicDetailList[0]['musicId'] == musicId and userMusicDetailList[0]['level'] == level: + if ( + userMusicDetailList[0]["musicId"] == musicId + and userMusicDetailList[0]["level"] == level + ): logger.info(f"We think {musicId} Level {level} should use EDIT.") return "0" except: return "1" -def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial): +def generateFullUserAll( + userId, + currentLoginResult, + currentLoginTimestamp, + currentUserData2, + currentPlaySpecial, +): """从服务器取得必要的数据并构建一个比较完整的 UserAll""" # 先构建一个基础 UserAll - currentUserAll = generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial) + currentUserAll = generateUserAllData( + userId, + currentLoginResult, + currentLoginTimestamp, + currentUserData2, + currentPlaySpecial, + ) # 然后从服务器取得必要的数据 currentUserExtend = implGetUser_("Extend", userId, True) @@ -41,61 +66,78 @@ def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, curre currentUserMissionData = implGetUser_("MissionData", userId, True) # 把这些数据都追加进去 - currentUserAll['upsertUserAll']['userExtend'] = [currentUserExtend['userExtend']] - currentUserAll['upsertUserAll']['userOption'] = [currentUserOption['userOption']] - currentUserAll['upsertUserAll']['userRatingList'] = [currentUserRating['userRating']] - currentUserAll['upsertUserAll']['userActivityList'] = [currentUserActivity['userActivity']] - currentUserAll['upsertUserAll']['userChargeList'] = currentUserCharge['userChargeList'] - currentUserAll['upsertUserAll']['userWeeklyData'] = currentUserMissionData['userWeeklyData'] + currentUserAll["upsertUserAll"]["userExtend"] = [currentUserExtend["userExtend"]] + currentUserAll["upsertUserAll"]["userOption"] = [currentUserOption["userOption"]] + currentUserAll["upsertUserAll"]["userRatingList"] = [ + currentUserRating["userRating"] + ] + currentUserAll["upsertUserAll"]["userActivityList"] = [ + currentUserActivity["userActivity"] + ] + currentUserAll["upsertUserAll"]["userChargeList"] = currentUserCharge[ + "userChargeList" + ] + currentUserAll["upsertUserAll"]["userWeeklyData"] = currentUserMissionData[ + "userWeeklyData" + ] # 完事 return currentUserAll -def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial): +def generateUserAllData( + userId, + currentLoginResult, + currentLoginTimestamp, + currentUserData2, + currentPlaySpecial, +): """构建一个非常基础的 UserAll 数据,必须手动填充一些数据""" - + data = { "userId": userId, - "playlogId": currentLoginResult['loginId'], + "playlogId": currentLoginResult["loginId"], "isEventMode": False, "isFreePlay": False, "upsertUserAll": { "userData": [ { "accessCode": "", - "userName": currentUserData2['userName'], + "userName": currentUserData2["userName"], "isNetMember": 1, - "point": currentUserData2['point'], - "totalPoint": currentUserData2['totalPoint'], - "iconId": currentUserData2['iconId'], - "plateId": currentUserData2['plateId'], - "titleId": currentUserData2['titleId'], - "partnerId": currentUserData2['partnerId'], - "frameId": currentUserData2['frameId'], - "selectMapId": currentUserData2['selectMapId'], - "totalAwake": currentUserData2['totalAwake'], - "gradeRating": currentUserData2['gradeRating'], - "musicRating": currentUserData2['musicRating'], - "playerRating": currentUserData2['playerRating'], - "highestRating": currentUserData2['highestRating'], - "gradeRank": currentUserData2['gradeRank'], - "classRank": currentUserData2['classRank'], - "courseRank": currentUserData2['courseRank'], - "charaSlot": currentUserData2['charaSlot'], - "charaLockSlot": currentUserData2['charaLockSlot'], - "contentBit": currentUserData2['contentBit'], - "playCount": currentUserData2['playCount'], - "currentPlayCount": currentUserData2['currentPlayCount'], + "point": currentUserData2["point"], + "totalPoint": currentUserData2["totalPoint"], + "iconId": currentUserData2["iconId"], + "plateId": currentUserData2["plateId"], + "titleId": currentUserData2["titleId"], + "partnerId": currentUserData2["partnerId"], + "frameId": currentUserData2["frameId"], + "selectMapId": currentUserData2["selectMapId"], + "totalAwake": currentUserData2["totalAwake"], + "gradeRating": currentUserData2["gradeRating"], + "musicRating": currentUserData2["musicRating"], + "playerRating": currentUserData2["playerRating"], + "highestRating": currentUserData2["highestRating"], + "gradeRank": currentUserData2["gradeRank"], + "classRank": currentUserData2["classRank"], + "courseRank": currentUserData2["courseRank"], + "charaSlot": currentUserData2["charaSlot"], + "charaLockSlot": currentUserData2["charaLockSlot"], + "contentBit": currentUserData2["contentBit"], + "playCount": currentUserData2["playCount"], + "currentPlayCount": currentUserData2["currentPlayCount"], "renameCredit": 0, - "mapStock": currentUserData2['mapStock'], - "eventWatchedDate": currentUserData2['eventWatchedDate'], + "mapStock": currentUserData2["mapStock"], + "eventWatchedDate": currentUserData2["eventWatchedDate"], "lastGameId": "SDGB", - "lastRomVersion": currentUserData2['lastRomVersion'], - "lastDataVersion": currentUserData2['lastDataVersion'], - #"lastLoginDate": currentLoginResult['lastLoginDate'], # sb - "lastLoginDate": currentUserData2['lastLoginDate'], # 等待测试 - "lastPlayDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0', + "lastRomVersion": currentUserData2["lastRomVersion"], + "lastDataVersion": currentUserData2["lastDataVersion"], + # "lastLoginDate": currentLoginResult['lastLoginDate'], # sb + "lastLoginDate": currentUserData2["lastLoginDate"], # 等待测试 + "lastPlayDate": datetime.now( + pytz.timezone("Asia/Shanghai") + ).strftime("%Y-%m-%d %H:%M:%S") + + ".0", "lastPlayCredit": 1, "lastPlayMode": 0, "lastPlaceId": placeId, @@ -107,68 +149,83 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre "lastCountryCode": "CHN", "lastSelectEMoney": 0, "lastSelectTicket": 0, - "lastSelectCourse": currentUserData2['lastSelectCourse'], + "lastSelectCourse": currentUserData2["lastSelectCourse"], "lastCountCourse": 0, "firstGameId": "SDGB", - "firstRomVersion": currentUserData2['firstRomVersion'], - "firstDataVersion": currentUserData2['firstDataVersion'], - "firstPlayDate": currentUserData2['firstPlayDate'], - "compatibleCmVersion": currentUserData2['compatibleCmVersion'], - "dailyBonusDate": currentUserData2['dailyBonusDate'], - "dailyCourseBonusDate": currentUserData2['dailyCourseBonusDate'], - "lastPairLoginDate": currentUserData2['lastPairLoginDate'], - "lastTrialPlayDate": currentUserData2['lastTrialPlayDate'], + "firstRomVersion": currentUserData2["firstRomVersion"], + "firstDataVersion": currentUserData2["firstDataVersion"], + "firstPlayDate": currentUserData2["firstPlayDate"], + "compatibleCmVersion": currentUserData2["compatibleCmVersion"], + "dailyBonusDate": currentUserData2["dailyBonusDate"], + "dailyCourseBonusDate": currentUserData2["dailyCourseBonusDate"], + "lastPairLoginDate": currentUserData2["lastPairLoginDate"], + "lastTrialPlayDate": currentUserData2["lastTrialPlayDate"], "playVsCount": 0, "playSyncCount": 0, "winCount": 0, "helpCount": 0, "comboCount": 0, - "totalDeluxscore": currentUserData2['totalDeluxscore'], - "totalBasicDeluxscore": currentUserData2['totalBasicDeluxscore'], - "totalAdvancedDeluxscore": currentUserData2['totalAdvancedDeluxscore'], - "totalExpertDeluxscore": currentUserData2['totalExpertDeluxscore'], - "totalMasterDeluxscore": currentUserData2['totalMasterDeluxscore'], - "totalReMasterDeluxscore": currentUserData2['totalReMasterDeluxscore'], - "totalSync": currentUserData2['totalSync'], - "totalBasicSync": currentUserData2['totalBasicSync'], - "totalAdvancedSync": currentUserData2['totalAdvancedSync'], - "totalExpertSync": currentUserData2['totalExpertSync'], - "totalMasterSync": currentUserData2['totalMasterSync'], - "totalReMasterSync": currentUserData2['totalReMasterSync'], - "totalAchievement": currentUserData2['totalAchievement'], - "totalBasicAchievement": currentUserData2['totalBasicAchievement'], - "totalAdvancedAchievement": currentUserData2['totalAdvancedAchievement'], - "totalExpertAchievement": currentUserData2['totalExpertAchievement'], - "totalMasterAchievement": currentUserData2['totalMasterAchievement'], - "totalReMasterAchievement": currentUserData2['totalReMasterAchievement'], - "playerOldRating": currentUserData2['playerOldRating'], - "playerNewRating": currentUserData2['playerNewRating'], + "totalDeluxscore": currentUserData2["totalDeluxscore"], + "totalBasicDeluxscore": currentUserData2["totalBasicDeluxscore"], + "totalAdvancedDeluxscore": currentUserData2[ + "totalAdvancedDeluxscore" + ], + "totalExpertDeluxscore": currentUserData2["totalExpertDeluxscore"], + "totalMasterDeluxscore": currentUserData2["totalMasterDeluxscore"], + "totalReMasterDeluxscore": currentUserData2[ + "totalReMasterDeluxscore" + ], + "totalSync": currentUserData2["totalSync"], + "totalBasicSync": currentUserData2["totalBasicSync"], + "totalAdvancedSync": currentUserData2["totalAdvancedSync"], + "totalExpertSync": currentUserData2["totalExpertSync"], + "totalMasterSync": currentUserData2["totalMasterSync"], + "totalReMasterSync": currentUserData2["totalReMasterSync"], + "totalAchievement": currentUserData2["totalAchievement"], + "totalBasicAchievement": currentUserData2["totalBasicAchievement"], + "totalAdvancedAchievement": currentUserData2[ + "totalAdvancedAchievement" + ], + "totalExpertAchievement": currentUserData2[ + "totalExpertAchievement" + ], + "totalMasterAchievement": currentUserData2[ + "totalMasterAchievement" + ], + "totalReMasterAchievement": currentUserData2[ + "totalReMasterAchievement" + ], + "playerOldRating": currentUserData2["playerOldRating"], + "playerNewRating": currentUserData2["playerNewRating"], "banState": 0, - "friendRegistSkip": currentUserData2['friendRegistSkip'], - "dateTime": currentLoginTimestamp + "friendRegistSkip": currentUserData2["friendRegistSkip"], + "dateTime": currentLoginTimestamp, } ], - "userExtend": [], #需要填上 - "userOption": [], #需要填上 + "userExtend": [], # 需要填上 + "userOption": [], # 需要填上 "userGhost": [], "userCharacterList": [], "userMapList": [], "userLoginBonusList": [], - "userRatingList": [], #需要填上 - "userItemList": [], #可选,但经常要填上 - "userMusicDetailList": [],#需要填上 + "userRatingList": [], # 需要填上 + "userItemList": [], # 可选,但经常要填上 + "userMusicDetailList": [], # 需要填上 "userCourseList": [], "userFriendSeasonRankingList": [], - "userChargeList": [], #需要填上 + "userChargeList": [], # 需要填上 "userFavoriteList": [], - "userActivityList": [], #需要填上 + "userActivityList": [], # 需要填上 "userMissionDataList": [], - "userWeeklyData": [],#应该需要填上 + "userWeeklyData": [], # 应该需要填上 "userGamePlaylogList": [ { - "playlogId": currentLoginResult['loginId'], + "playlogId": currentLoginResult["loginId"], "version": "1.51.00", - "playDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0', + "playDate": datetime.now(pytz.timezone("Asia/Shanghai")).strftime( + "%Y-%m-%d %H:%M:%S" + ) + + ".0", "playMode": 0, "useTicketId": -1, "playCredit": 1, @@ -177,9 +234,9 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre "isPlayTutorial": False, "isEventMode": False, "isNewFree": False, - "playCount": currentUserData2['playCount'], + "playCount": currentUserData2["playCount"], "playSpecial": currentPlaySpecial, - "playOtherUserId": 0 + "playOtherUserId": 0, } ], "user2pPlaylog": { @@ -189,9 +246,9 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre "userName2": "", "regionId": 0, "placeId": 0, - "user2pPlaylogDetailList": [] + "user2pPlaylogDetailList": [], }, - "userIntimateList": [], + "userIntimateList": [], "userShopItemStockList": [], "userGetPointList": [], "userTradeItemList": [], @@ -201,13 +258,13 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre "isNewMapList": "", "isNewLoginBonusList": "", "isNewItemList": "", - "isNewMusicDetailList": "", #可选但经常要填上 + "isNewMusicDetailList": "", # 可选但经常要填上 "isNewCourseList": "0", "isNewFavoriteList": "", "isNewFriendSeasonRankingList": "", "isNewUserIntimateList": "", "isNewFavoritemusicList": "", - "isNewKaleidxScopeList": "" - } - } + "isNewKaleidxScopeList": "", + }, + } return data