Huge Rewrite!

This commit is contained in:
Remik1r3n 2025-02-02 03:17:13 +08:00
parent 3638de178d
commit 90d8b74c45
28 changed files with 1038 additions and 420 deletions

1
.gitignore vendored
View File

@ -1,5 +1,6 @@
# Anti-leak # Anti-leak
Private_Static_Settings.py Private_Static_Settings.py
MyConfig.py
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@ -5,35 +5,39 @@ import json
import re import re
# 计算 SHA256 # 计算 SHA256
def compute_sha256(input_str): def getSHA256(input_str):
"""SHA256计算""" """SHA256计算"""
return hashlib.sha256(input_str.encode('utf-8')).hexdigest().upper() return hashlib.sha256(input_str.encode('utf-8')).hexdigest().upper()
# 生成时间戳 # 生成时间戳
def get_timestamp(): def generateSEGATimestamp():
"""SEGA格式的 YYMMDDHHMMSS 时间戳sb玩意""" """SEGA格式的 YYMMDDHHMMSS 时间戳sb玩意"""
return time.strftime("%y%m%d%H%M%S", time.localtime()) return time.strftime("%y%m%d%H%M%S", time.localtime())
# 计算认证 key # 计算认证 key
def calculate_auth_key(time_stamp: str, chip_id: str, auth_key_param: str) -> str: def calcSEGAAimeDBAuthKey(varString:str, timestamp:str, commonKey:str="XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW") -> str:
"""计算 Key""" """计算 SEGA AimeDB 的认证 key"""
return hashlib.sha256((chip_id + time_stamp + auth_key_param).encode("utf-8")).hexdigest().upper() return hashlib.sha256((varString + timestamp + commonKey).encode("utf-8")).hexdigest().upper()
def apiAimeDB(qrCode):
"""AimeDB 扫码 API 实现"""
CHIP_ID = "A63E-01E68606624"
COMMON_KEY = "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW"
API_URL = "http://ai.sys-allnet.cn/wc_aime/api/get_data"
def apiAimeDB(qr_code, chip_id, auth_key_param, game_id, api_url):
"""AimeDB API 实现"""
# 生成一个时间戳 # 生成一个时间戳
time_stamp = get_timestamp() timestamp = generateSEGATimestamp()
# 使用时间戳计算 key # 使用时间戳计算 key
auth_key = calculate_auth_key(time_stamp, chip_id, auth_key_param) currentKey = calcSEGAAimeDBAuthKey(qrCode, timestamp, COMMON_KEY)
# 构造请求数据 # 构造请求数据
payload = { payload = {
"chipID": chip_id, "chipID": CHIP_ID,
"openGameID": game_id, "openGameID": "MAID",
"key": auth_key, "key": currentKey,
"qrCode": qr_code, "qrCode": qrCode,
"timestamp": time_stamp "timestamp": timestamp
} }
# 输出准备好的请求数据 # 输出准备好的请求数据
@ -42,11 +46,11 @@ def apiAimeDB(qr_code, chip_id, auth_key_param, game_id, api_url):
# 发送 POST 请求 # 发送 POST 请求
headers = { headers = {
"Connection": "Keep-Alive", "Connection": "Keep-Alive",
"Host": api_url.split("//")[-1].split("/")[0], "Host": API_URL.split("//")[-1].split("/")[0],
"User-Agent": "WC_AIME_LIB", "User-Agent": "WC_AIME_LIB",
"Content-Type": "application/json", "Content-Type": "application/json",
} }
response = requests.post(api_url, data=json.dumps(payload, separators=(',', ':')), headers=headers) response = requests.post(API_URL, data=json.dumps(payload, separators=(',', ':')), headers=headers)
# 返回服务器的响应 # 返回服务器的响应
return response return response
@ -64,20 +68,19 @@ def isSGWCFormat(input_string: str) -> bool:
return True return True
def implAimeDB(qrcode_content_full:str) -> str: def implAimeDB(qrCode:str, isAlreadyFinal:bool=False) -> str:
''' '''
Aime DB 的请求的参考实现 Aime DB 的请求的参考实现
提供完整 QRCode 内容返回响应的字符串Json格式 提供完整 QRCode 内容返回响应的字符串Json格式
''' '''
CHIP_ID = "A63E-01E68606624" if isAlreadyFinal:
AUTH_KEY_PARAM = "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW" qr_code_final = qrCode
GAME_ID = "MAID" else:
API_URL = "http://ai.sys-allnet.cn/wc_aime/api/get_data" # 提取有效部分(Hash)
qr_code_final = qrCode[20:]
qr_code_final = qrcode_content_full[20:]
# 发送请求 # 发送请求
response = apiAimeDB(qr_code_final, CHIP_ID, AUTH_KEY_PARAM, GAME_ID, API_URL) response = apiAimeDB(qr_code_final)
# 获得结果 # 获得结果
print("implAimeDB: StatusCode is ", response.status_code) print("implAimeDB: StatusCode is ", response.status_code)
@ -97,13 +100,12 @@ def implGetUID(qr_content:str) -> dict:
# 发送请求并处理响应 # 发送请求并处理响应
try: try:
result_string = implAimeDB(qr_content) result = json.loads(implAimeDB(qr_content))
result_dict = json.loads(result_string)
except: except:
return {'errorID': 60002} # 无法解码 Response 的内容 return {'errorID': 60002} # 无法解码 Response 的内容
# 返回结果 # 返回结果
return result_dict return result
if __name__ == "__main__": if __name__ == "__main__":
userInputQR = input("QRCode: ") userInputQR = input("QRCode: ")

View File

@ -13,7 +13,7 @@ from ctypes import c_int32
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad from Crypto.Util.Padding import unpad
from Static_Settings import * from Config import *
# 舞萌DX 2024 # 舞萌DX 2024
AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@" AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
@ -59,14 +59,15 @@ class aes_pkcs7(object):
def SDGBApiHash(api): def SDGBApiHash(api):
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest() return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
def apiSDGB(data:str, useApi, agentExtraData, maxRetries=3): def apiSDGB(data:str, useApi, agentExtraData, noLog=False):
''' '''
舞萌DX 2024 API 通讯用函数 舞萌DX 2024 API 通讯用函数
:param data: 请求数据 :param data: 请求数据
:param useApi: 使用的 API :param useApi: 使用的 API
:param agentExtraData: UA 附加信息机台相关则为狗号如A63E01E9564用户相关则为 UID :param agentExtraData: UA 附加信息机台相关则为狗号如A63E01E9564用户相关则为 UID
:param maxRetry: 最大重试次数, :param noLog: 是否不记录日志
''' '''
maxRetries = 3
# 历史遗留代码有时候会传入 int故先全部转 str # 历史遗留代码有时候会传入 int故先全部转 str
agentExtra = str(agentExtraData) agentExtra = str(agentExtraData)
@ -79,7 +80,8 @@ def apiSDGB(data:str, useApi, agentExtraData, maxRetries=3):
requests.packages.urllib3.disable_warnings() requests.packages.urllib3.disable_warnings()
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/" endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
logger.debug("TitleServer Request Start: "+ str(useApi)+" , Data: "+str(data)) if not noLog:
logger.debug("TitleServer Request Start: "+ str(useApi)+" , Data: "+str(data))
retries = 0 retries = 0
while retries < maxRetries: while retries < maxRetries:
@ -124,7 +126,8 @@ def apiSDGB(data:str, useApi, agentExtraData, maxRetries=3):
# 解压成功,解密请求并返回 # 解压成功,解密请求并返回
resultResponse = unpad(aes.decrypt(responseDecompressed), 16).decode() resultResponse = unpad(aes.decrypt(responseDecompressed), 16).decode()
logger.info("TitleServer:" + useApi + " Response: " + str(responseRaw.status_code)) logger.info("TitleServer:" + useApi + " Response: " + str(responseRaw.status_code))
logger.debug("TitleServer Response: " + str(resultResponse)) if not noLog:
logger.debug("TitleServer Response: " + str(resultResponse))
return resultResponse return resultResponse
# 除了 404 和 500 之外的错误重试 # 除了 404 和 500 之外的错误重试

View File

@ -1,13 +1,13 @@
# 改变版本号,实现伪封号和解封号 # 改变版本号,实现伪封号和解封号之类
from loguru import logger from loguru import logger
from Static_Settings import * from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction from HelperFullPlay import implFullPlayAction
def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLoginResult, dataVersion="1.40.09", romVersion="1.41.00") -> str: def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLoginResult, dataVersion="1.40.09", romVersion="1.41.00") -> str:
musicData= ({ musicData= ({
"musicId": 834, "musicId": 834, # PANDORA PARADOXXX
"level": 4, "level": 4,
"playCount": 1, "playCount": 1,
"achievement": 0, "achievement": 0,
@ -16,7 +16,7 @@ def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLogin
"deluxscoreMax": 0, "deluxscoreMax": 0,
"scoreRank": 0, "scoreRank": 0,
"extNum1": 0 "extNum1": 0
}) })
userAllPatches = { userAllPatches = {
"upsertUserAll": { "upsertUserAll": {
"userData": [{ "userData": [{
@ -27,7 +27,7 @@ def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLogin
"isNewMusicDetailList": "1" #1避免覆盖 "isNewMusicDetailList": "1" #1避免覆盖
}} }}
logger.info("Changing version number to " + dataVersion + " and " + romVersion) logger.info("Changing version number to " + dataVersion + " and " + romVersion)
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches, True) result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result return result
if __name__ == "__main__": if __name__ == "__main__":
@ -35,15 +35,11 @@ if __name__ == "__main__":
currentLoginTimestamp = generateTimestamp() currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId) loginResult = apiLogin(currentLoginTimestamp, userId)
musicId = 852 #229 is guruguru wash
levelId = 3 #3 is MASTER
if loginResult['returnCode'] != 1: if loginResult['returnCode'] != 1:
logger.info("登录失败") logger.info("登录失败")
exit() exit()
try: try:
#logger.info(implDeleteMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId)) logger.info(implChangeVersionNumber(userId, currentLoginTimestamp, loginResult, "1.00.00", "1.00.00"))
logger.info(implChangeVersionNumber(userId, currentLoginTimestamp, loginResult, "1.30.00", "1.30.00"))
logger.info(apiLogout(currentLoginTimestamp, userId)) logger.info(apiLogout(currentLoginTimestamp, userId))
finally: finally:
logger.info(apiLogout(currentLoginTimestamp, userId)) logger.info(apiLogout(currentLoginTimestamp, userId))

View File

@ -4,13 +4,13 @@
import json import json
from loguru import logger from loguru import logger
from Static_Settings import * from Config import *
from API_TitleServer import apiSDGB from API_TitleServer import apiSDGB
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction from HelperFullPlay import implFullPlayAction
def apiQueryLoginBonus(userId:int) -> str: def apiQueryLoginBonus(userId:int) -> str:
'''ログインボーナスを取得する API Requestor''' '''ログインボーナスを取得する API'''
data = json.dumps({ data = json.dumps({
"userId": int(userId), "userId": int(userId),
"nextIndex": 0, "nextIndex": 0,
@ -35,7 +35,7 @@ def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, b
"deluxscoreMax": 0, "deluxscoreMax": 0,
"scoreRank": 0, "scoreRank": 0,
"extNum1": 0 "extNum1": 0
} }
# サーバーからログインボーナスデータを取得 # サーバーからログインボーナスデータを取得
data = json.dumps({ data = json.dumps({
"userId": int(userId), "userId": int(userId),
@ -70,7 +70,7 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
# HDDから、ログインボーナスデータを読み込む # HDDから、ログインボーナスデータを読み込む
# アップデートがある場合、このファイルを更新する必要があります # アップデートがある場合、このファイルを更新する必要があります
# 必ず最新のデータを使用してください # 必ず最新のデータを使用してください
with open('loginBonus.json', encoding='utf-8') as file: with open('./Data/loginBonus.json', encoding='utf-8') as file:
cache = json.load(file) cache = json.load(file)
loginBonusIdList = [item['id'] for item in cache] loginBonusIdList = [item['id'] for item in cache]
logger.debug(f"ログインボーナスIDリスト: {loginBonusIdList}") logger.debug(f"ログインボーナスIDリスト: {loginBonusIdList}")

View File

@ -1,7 +1,7 @@
# 删除和上传成绩 # 删除和上传成绩
from loguru import logger from loguru import logger
from Static_Settings import * from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction from HelperFullPlay import implFullPlayAction
@ -26,8 +26,12 @@ def implDeleteMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginRe
return result return result
def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int, achievement:int, dxScore:int) -> str: def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int, achievement:int, dxScore:int) -> str:
'''VERY EARLY STAGE OF UPLOADING SCORES. DO NOT USE THIS FUNCTION.''' '''
VERY EARLY STAGE OF UPLOADING SCORES!!!! DO NOT USE THIS!!!!
上传成绩的参考实现
'''
# 要上传的数据
musicData= ({ musicData= ({
"musicId": musicId, "musicId": musicId,
"level": levelId, "level": levelId,
@ -42,7 +46,7 @@ def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginRe
userAllPatches = { userAllPatches = {
"upsertUserAll": { "upsertUserAll": {
"userMusicDetailList": [musicData], "userMusicDetailList": [musicData],
"isNewMusicDetailList": "0" # 0为编辑即可删除掉成绩 "isNewMusicDetailList": "1" # 0编辑 1插入
}} }}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result return result
@ -52,15 +56,15 @@ if __name__ == "__main__":
currentLoginTimestamp = generateTimestamp() currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId) loginResult = apiLogin(currentLoginTimestamp, userId)
musicId = 852 #229 is guruguru wash musicId = 852 #852 is tiamat
levelId = 3 #3 is MASTER levelId = 3 #3 is MASTER
if loginResult['returnCode'] != 1: if loginResult['returnCode'] != 1:
logger.info("登录失败") logger.info("登录失败")
exit() exit()
try: try:
#logger.info(implDeleteMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId)) logger.info(implDeleteMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId))
logger.info(implUploadMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId, 1000000, 100)) #logger.info(implUploadMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId, 1000000, 100))
logger.info(apiLogout(currentLoginTimestamp, userId)) logger.info(apiLogout(currentLoginTimestamp, userId))
finally: finally:
logger.info(apiLogout(currentLoginTimestamp, userId)) logger.info(apiLogout(currentLoginTimestamp, userId))

View File

@ -2,7 +2,7 @@
from loguru import logger from loguru import logger
from Static_Settings import * from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperUnlockThing import implUnlockThing from HelperUnlockThing import implUnlockThing
@ -48,7 +48,7 @@ if __name__ == "__main__":
loginResult = apiLogin(currentLoginTimestamp, userId) loginResult = apiLogin(currentLoginTimestamp, userId)
# Change you want item ID # Change you want item ID
wantToUnlockItemId = 11624 wantToUnlockItemId = 11538
if loginResult['returnCode'] != 1: if loginResult['returnCode'] != 1:
logger.info("登录失败") logger.info("登录失败")

View File

@ -1,82 +1,112 @@
# 非常 All-in Boom 的 B50 更新实现。
from API_TitleServer import * from API_TitleServer import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from Static_Settings import * from Config import *
import json
from MusicDB import musicDB
from loguru import logger from loguru import logger
from HelperGetUserMusicDetail import getUserFullMusicDetail
from HelperMusicDB import getMusicTitle
import requests
def getMusicTitle(musicId: int) -> str:
'''从数据库获取音乐的标题'''
logger.debug(f"查询歌名: {musicId}")
musicInfo = musicDB.get(musicId)
if not musicInfo:
logger.warning(f"数据库里未找到此歌曲: {musicId}")
return "ERR_R_MUSIC_ID_NOT_IN_DATABASE"
musicName = musicInfo.get("name")
logger.debug(f"成功查询到歌名: {musicName}")
return musicName
def getUserMusicDetail(userId:int, nextIndex:int=0, maxCount:int=50) -> dict: # 日志设置
'''获取用户的成绩的API''' if False:
data = json.dumps({ import sys
"userId": int(userId), log_level = "DEBUG"
"nextIndex": nextIndex, log_format = "<green>{time:YYYY-MM-DD HH:mm:ss.SSS zz}</green> | <level>{level: <8}</level> | <yellow>Line {line: >4} ({file}):</yellow> <b>{message}</b>"
"maxCount": maxCount logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True)
}) logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True)
return json.loads(apiSDGB(data, "GetUserMusicApi", userId))
# 水鱼查分器的 API 地址
BASE_URL = 'https://www.diving-fish.com/api/maimaidxprober'
# 水鱼查分器的成绩状态转换
COMBO_ID_TO_NAME = ['', 'fc', 'fcp', 'ap', 'app']
SYNC_ID_TO_NAME = ['', 'fs', 'fsp', 'fsd', 'fsdp', 'sync']
def apiDivingFish(method:str, apiPath:str, importToken:str, data:dict=None) -> dict:
'''水鱼查分器的 API 通讯实现'''
headers = {
"Import-Token": importToken
}
if method == 'POST':
headers['Content-Type'] = 'application/json'
logger.info(f'水鱼查分器 API 请求:{method} {BASE_URL + apiPath}')
if method == 'POST':
response = requests.post(
url=BASE_URL + apiPath,
json=data,
headers=headers,
)
elif method == 'GET':
response = requests.get(
url=BASE_URL + apiPath,
headers=headers,
)
else:
raise NotImplementedError
logger.info(f'水鱼查分器请求结果:{response.status_code} {response.text}')
if response.status_code != 200:
return False
return response.json()
def getFishRecords(importToken: str) -> dict:
return apiDivingFish('GET', '/player/records', importToken)
def updateFishRecords(importToken: str, records: list[dict]) -> dict:
return apiDivingFish('POST', '/player/update_records', importToken, records)
def maimaiUserMusicDetailToDivingFish(userMusicDetailList: list) -> list: def maimaiUserMusicDetailToDivingFish(userMusicDetailList: list) -> list:
'''舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式''' '''舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式'''
divingFishList = [] divingFishList = []
for currentMusicDetail in userMusicDetailList: for currentMusicDetail in userMusicDetailList:
# musicId 大于 100000 属于宴谱,不计入
if currentMusicDetail['musicId'] >= 100000:
continue
# 获得歌名
currentMusicTitle = getMusicTitle(currentMusicDetail['musicId'])
# 如果数据库里未找到此歌曲
if currentMusicTitle == "R_ERR_MUSIC_ID_NOT_IN_DATABASE":
logger.warning(f"数据库无此歌曲 跳过: {currentMusicDetail['musicId']}")
continue
# 每一个乐曲都判断下是 DX 还是标准 # 每一个乐曲都判断下是 DX 还是标准
if currentMusicDetail['musicId'] >= 10000: if currentMusicDetail['musicId'] >= 10000:
notesType = 'DX' notesType = 'DX'
else: else:
notesType = 'SD' notesType = 'SD'
# 追加进列表
divingFishList.append({ try:
'achievements': (currentMusicDetail['achievement'] / 10000), # 水鱼的成绩是 float 而非舞萌的 int divingFishList.append({
'title': (getMusicTitle(currentMusicDetail['musicId'])), # 水鱼用的是歌名而不是 ID导致不得不用数据库处理转换 'achievements': (currentMusicDetail['achievement'] / 10000), # 水鱼的成绩是 float 而非舞萌的 int
'type': notesType, # 我不理解这为什么不能在后端判断 'title': currentMusicTitle,
'level_index': currentMusicDetail['level'], 'type': notesType, # 我不理解这为什么不能在后端判断
'fc': currentMusicDetail['comboStatus'], 'level_index': currentMusicDetail['level'],
'fs': currentMusicDetail['syncStatus'], 'fc': COMBO_ID_TO_NAME[currentMusicDetail['comboStatus']],
'dxScore': currentMusicDetail['deluxscoreMax'], 'fs': SYNC_ID_TO_NAME[currentMusicDetail['syncStatus']],
}) 'dxScore': currentMusicDetail['deluxscoreMax'],
})
except:
print(currentMusicDetail)
logger.error(f"Error: {currentMusicDetail}")
return divingFishList return divingFishList
if __name__ == '__main__': if __name__ == '__main__':
userId = testUid if True:
currentLoginTimestamp = generateTimestamp() userId = testUid
loginResult = apiLogin(currentLoginTimestamp, userId) importToken = testImportToken
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1: if loginResult['returnCode'] != 1:
logger.info("登录失败") logger.info("登录失败")
exit() exit()
try: try:
## Begin userFullMusicDetailList = getUserFullMusicDetail(userId)
userMusicDetailList_current = [] logger.warning("Now We Begin To Build DivingFish Data")
divingFishData = maimaiUserMusicDetailToDivingFish(userFullMusicDetailList)
logger.debug(divingFishData)
logger.warning("Now We Begin To Update DivingFish Data")
updateFishRecords(importToken, divingFishData)
finally:
#logger.error(f"Error: {e}")
logger.info(apiLogout(currentLoginTimestamp, userId))
nextIndex:int|None = None # 初始化 nextIndex
while nextIndex != 0 or nextIndex is None: #只要还有nextIndex就一直获取获取
userMusicResponse = getUserMusicDetail(userId, nextIndex or 0)
nextIndex = userMusicResponse['nextIndex']
logger.info(f"NextIndex: {nextIndex}")
for currentMusic in userMusicResponse['userMusicList']:
for currentMusicDetail in currentMusic['userMusicDetailList']:
if not currentMusicDetail['playCount'] > 0:
continue
userMusicDetailList_current.append(currentMusicDetail)
print("---------------")
print(str(userMusicDetailList_current))
## End
#logger.info(apiLogout(currentLoginTimestamp, userId))
logger.warning("Now We Begin To Build DiveFish Data")
divingFishData = maimaiUserMusicDetailToDivingFish(userMusicDetailList_current)
logger.info(divingFishData)
finally:
#logger.error(f"Error: {e}")
logger.info(apiLogout(currentLoginTimestamp, userId))

View File

@ -3,7 +3,7 @@ import json
import pytz import pytz
from datetime import datetime, timedelta from datetime import datetime, timedelta
from Static_Settings import * from Config import *
from API_TitleServer import apiSDGB from API_TitleServer import apiSDGB
from HelperGetUserThing import apiGetUserData from HelperGetUserThing import apiGetUserData

View File

@ -5,4 +5,4 @@ placeName = "赛博时空枣庄市中店"
clientId = "A63E01E9564" clientId = "A63E01E9564"
# 日本精工,安全防漏 # 日本精工,安全防漏
#from Private_Static_Settings import * #from MyConfig import *

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
from Static_Settings import * from Config import *
from API_TitleServer import * from API_TitleServer import *
from GetPreview import apiGetUserPreview from GetPreview import apiGetUserPreview
from HelperLogInOut import apiLogout from HelperLogInOut import apiLogout

View File

View File

@ -2,6 +2,7 @@
import json import json
from API_TitleServer import apiSDGB from API_TitleServer import apiSDGB
from Config import *
def apiGetUserPreview(userId) -> str: def apiGetUserPreview(userId) -> str:
data = json.dumps({ data = json.dumps({
@ -12,5 +13,6 @@ def apiGetUserPreview(userId) -> str:
# CLI 示例 # CLI 示例
if __name__ == "__main__": if __name__ == "__main__":
userId = input("请输入用户 ID") #userId = input("请输入用户 ID")
userId = testUid
print(apiGetUserPreview(userId)) print(apiGetUserPreview(userId))

View File

@ -1,7 +1,7 @@
import json import json
from loguru import logger from loguru import logger
from Static_Settings import * from Config import *
from API_TitleServer import apiSDGB, calcSpecialNumber, WahlapServerBoomedError, Request500Error from API_TitleServer import apiSDGB, calcSpecialNumber, WahlapServerBoomedError, Request500Error
from HelperGetUserThing import implGetUser_ from HelperGetUserThing import implGetUser_
from HelperUploadUserPlayLog import apiUploadUserPlaylog from HelperUploadUserPlayLog import apiUploadUserPlaylog
@ -39,10 +39,19 @@ def applyUserAllPatches(userAll, patches):
if isinstance(value, dict) and key in userAll and isinstance(userAll[key], dict): if isinstance(value, dict) and key in userAll and isinstance(userAll[key], dict):
# 如果patch的值是字典并且userAll中对应的key也是字典递归处理 # 如果patch的值是字典并且userAll中对应的key也是字典递归处理
applyUserAllPatches(userAll[key], value) applyUserAllPatches(userAll[key], value)
elif isinstance(value, list) and key in userAll and isinstance(userAll[key], list):
# 如果值是列表,进行详细的更新处理
for i, patch_item in enumerate(value):
if i < len(userAll[key]) and isinstance(patch_item, dict) and isinstance(userAll[key][i], dict):
# 如果列表项是字典,更新字典中的字段
applyUserAllPatches(userAll[key][i], patch_item)
elif i >= len(userAll[key]):
# 如果patch的列表比userAll的列表长追加新的元素
userAll[key].append(patch_item)
else: else:
# 否则直接更新或添加key # 否则直接更新或添加key
userAll[key] = value userAll[key] = value
def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResult, musicData, userAllPatches, debugMode=False) -> str: def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResult, musicData, userAllPatches, debugMode=False) -> str:
''' '''
一份完整的上机实现可以打 patch 来实现各种功能 一份完整的上机实现可以打 patch 来实现各种功能
@ -81,7 +90,8 @@ def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResul
# 调试模式下直接输出数据 # 调试模式下直接输出数据
if debugMode: if debugMode:
logger.debug("调试模式:当前 UserAll 数据:" + json.dumps(currentUserAll, indent=4)) logger.debug("调试模式:构建出的 UserAll 数据:" + json.dumps(currentUserAll, indent=4))
logger.info("Bye!")
return return
# 建构 Json 数据 # 建构 Json 数据
@ -90,7 +100,7 @@ def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResul
try: try:
currentUserAllResult = json.loads(apiSDGB(data, "UpsertUserAllApi", userId)) currentUserAllResult = json.loads(apiSDGB(data, "UpsertUserAllApi", userId))
except Request500Error: except Request500Error:
logger.warning("500 Error Triggered. Rebuilding data.") logger.warning("上传 UserAll 出现 500. 重建数据.")
retries += 1 retries += 1
continue continue
except Exception: except Exception:

View File

@ -0,0 +1,66 @@
# 获取用户成绩的各种实现
from API_TitleServer import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from Config import *
import json
from HelperMusicDB import getMusicTitle
from loguru import logger
import sys
# 日志设置
#log_level = "DEBUG"
#log_format = "<green>{time:YYYY-MM-DD HH:mm:ss.SSS zz}</green> | <level>{level: <8}</level> | <yellow>Line {line: >4} ({file}):</yellow> <b>{message}</b>"
#logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True)
#logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True)
def getUserMusicDetail(userId:int, nextIndex:int=0, maxCount:int=50) -> dict:
'''获取用户的成绩的API'''
data = json.dumps({
"userId": int(userId),
"nextIndex": nextIndex,
"maxCount": maxCount
})
return json.loads(apiSDGB(data, "GetUserMusicApi", userId))
def getUserFullMusicDetail(userId: int) -> dict:
'''获取用户的全部成绩'''
userMusicDetailList_current = []
nextIndex:int|None = None # 初始化 nextIndex
while nextIndex != 0 or nextIndex is None: #只要还有nextIndex就一直获取获取
userMusicResponse = getUserMusicDetail(userId, nextIndex or 0)
nextIndex = userMusicResponse['nextIndex']
logger.info(f"NextIndex: {nextIndex}")
for currentMusic in userMusicResponse['userMusicList']:
for currentMusicDetail in currentMusic['userMusicDetailList']:
if not currentMusicDetail['playCount'] > 0:
continue
userMusicDetailList_current.append(currentMusicDetail)
return userMusicDetailList_current
def parseUserFullMusicDetail(userFullMusicDetailList: list) -> dict:
'''解析用户的全部成绩'''
musicDetailList = []
for currentMusicDetail in userFullMusicDetailList:
musicDetailList.append({
'歌名': getMusicTitle(currentMusicDetail['musicId']),
'难度': currentMusicDetail['level'],
'分数': currentMusicDetail['achievement'] / 10000,
'DX分数': currentMusicDetail['deluxscoreMax']
})
return musicDetailList
if __name__ == '__main__':
userId = testUid5
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1:
logger.info("登录失败")
exit()
try:
userFullMusicDetailList = getUserFullMusicDetail(userId)
parsedUserFullMusicDetail = parseUserFullMusicDetail(userFullMusicDetailList)
logger.info(parsedUserFullMusicDetail)
finally:
#logger.error(f"Error: {e}")
logger.info(apiLogout(currentLoginTimestamp, userId))

View File

@ -5,7 +5,7 @@ from API_TitleServer import apiSDGB
def apiGetUserData(userId:int) -> str: def apiGetUserData(userId:int) -> str:
'''已弃用,将逐步淘汰''' '''已弃用,将逐步淘汰'''
logger.warning("apiGetUserData 已弃用,将逐步淘汰。") logger.info("apiGetUserData 已弃用,将逐步淘汰。")
# 构建 Payload # 构建 Payload
data = json.dumps({ data = json.dumps({
"userId": userId "userId": userId
@ -15,21 +15,21 @@ def apiGetUserData(userId:int) -> str:
# 返回响应 # 返回响应
return userdata_result return userdata_result
def apiGetUserThing(userId:int, thing:str) -> str: def apiGetUserThing(userId:int, thing:str, noLog=False) -> str:
'''获取用户数据的 API 请求器,返回 Json String''' '''获取用户数据的 API 请求器,返回 Json String'''
# 构建 Payload # 构建 Payload
data = json.dumps({ data = json.dumps({
"userId": userId "userId": userId
}) })
# 发送请求 # 发送请求
userthing_result = apiSDGB(data, "GetUser" + thing + "Api", userId) userthing_result = apiSDGB(data, "GetUser" + thing + "Api", userId, noLog)
# 返回响应 # 返回响应
return userthing_result return userthing_result
def implGetUser_(thing:str, userId:int) -> dict: def implGetUser_(thing:str, userId:int, noLog=False) -> dict:
'''获取用户数据的 API 实现,返回 Dict''' '''获取用户某些数据的 API 实现,返回 Dict'''
# 获取 Json String # 获取 Json String
userthing_result = apiGetUserThing(userId, thing) userthing_result = apiGetUserThing(userId, thing, noLog)
# 转换为 Dict # 转换为 Dict
userthing_dict = json.loads(userthing_result) userthing_dict = json.loads(userthing_result)
# 返回 Dict # 返回 Dict

View File

@ -5,7 +5,7 @@ import json
import time import time
from loguru import logger from loguru import logger
from Static_Settings import * from Config import *
from API_TitleServer import apiSDGB from API_TitleServer import apiSDGB
def apiLogin(timestamp:int, userId:int) -> dict: def apiLogin(timestamp:int, userId:int) -> dict:

14
HelperMusicDB.py Normal file
View File

@ -0,0 +1,14 @@
from MusicDB import musicDB
from loguru import logger
def getMusicTitle(musicId: int) -> str:
'''从数据库获取音乐的标题'''
#logger.debug(f"查询歌名: {musicId}")
musicInfo = musicDB.get(musicId)
if not musicInfo:
logger.warning(f"数据库里未找到此歌曲: {musicId}")
return "R_ERR_MUSIC_ID_NOT_IN_DATABASE"
musicName = musicInfo.get("name")
#logger.debug(f"成功查询到歌名: {musicName}")
return musicName

View File

@ -1,11 +1,11 @@
# 解锁东西的一个通用的助手,不可独立使用 # 解锁东西的一个通用的助手,不可独立使用
from loguru import logger from loguru import logger
from Static_Settings import * from Config import *
from HelperFullPlay import implFullPlayAction from HelperFullPlay import implFullPlayAction
def implUnlockThing(newUserItemList, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str: def implUnlockThing(newUserItemList, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
musicData= ({ musicData= ({
"musicId": 566, #天火明命 "musicId": 11538, # Amber Chronicle
"level": 0, "level": 0,
"playCount": 1, "playCount": 1,
"achievement": 0, "achievement": 0,
@ -14,7 +14,7 @@ def implUnlockThing(newUserItemList, userId: int, currentLoginTimestamp:int, cur
"deluxscoreMax": 0, "deluxscoreMax": 0,
"scoreRank": 0, "scoreRank": 0,
"extNum1": 0 "extNum1": 0
}) })
userAllPatches = { userAllPatches = {
"upsertUserAll": { "upsertUserAll": {
"userMusicDetailList": [musicData], "userMusicDetailList": [musicData],

View File

@ -8,7 +8,7 @@ from datetime import datetime
from loguru import logger from loguru import logger
from API_TitleServer import apiSDGB from API_TitleServer import apiSDGB
from Static_Settings import * from Config import *
def apiUploadUserPlaylog(userId:int, musicDataToBeUploaded, currentUserData2, loginId:int) -> str: def apiUploadUserPlaylog(userId:int, musicDataToBeUploaded, currentUserData2, loginId:int) -> str:
'''返回 Json String。''' '''返回 Json String。'''

View File

@ -2,7 +2,7 @@
import pytz import pytz
from datetime import datetime from datetime import datetime
from Static_Settings import * from Config import *
from HelperGetUserThing import implGetUser_ from HelperGetUserThing import implGetUser_
def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentSpecialNumber): def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentSpecialNumber):
@ -12,11 +12,11 @@ def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, curre
currentUserAll = generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentSpecialNumber) currentUserAll = generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentSpecialNumber)
# 然后从服务器取得必要的数据 # 然后从服务器取得必要的数据
currentUserExtend = implGetUser_("Extend", userId) currentUserExtend = implGetUser_("Extend", userId, True)
currentUserOption = implGetUser_("Option", userId) currentUserOption = implGetUser_("Option", userId, True)
currentUserRating = implGetUser_("Rating", userId) currentUserRating = implGetUser_("Rating", userId, True)
currentUserActivity = implGetUser_("Activity", userId) currentUserActivity = implGetUser_("Activity", userId, True)
currentUserCharge = implGetUser_("Charge", userId) currentUserCharge = implGetUser_("Charge", userId, True)
# 把这些数据都追加进去 # 把这些数据都追加进去
currentUserAll['upsertUserAll']['userExtend'] = [currentUserExtend['userExtend']] currentUserAll['upsertUserAll']['userExtend'] = [currentUserExtend['userExtend']]

View File

@ -1,7 +1,3 @@
# 感谢伟大的 Diving-Fish 让我被迫直面恐惧写这个逼玩意
import xml.dom.minidom as minidom
from pathlib import Path
import rapidjson as json import rapidjson as json
from typing import Dict, Union from typing import Dict, Union
@ -12,7 +8,7 @@ MusicDBType = Dict[int, Dict[str, Union[int, str]]]
__all__ = ['musicDB'] __all__ = ['musicDB']
# 读取并解析 JSON 文件 # 读取并解析 JSON 文件
with open('musicDB.json', 'r', encoding='utf-8') as f: with open('./Data/musicDB.json', 'r', encoding='utf-8') as f:
# 使用 json.load 直接从文件对象读取 JSON 数据 # 使用 json.load 直接从文件对象读取 JSON 数据
data = json.load(f) data = json.load(f)

View File

@ -1,6 +1,7 @@
# 解密从 HDD 抓包得到的数据 # 解密从 HDD 抓包得到的数据
# 兼容 PRiSM 和 CN 2024 # 兼容 PRiSM 和 CN 2024
# 仅用于分析
# 完全 Standalone不依赖于其他文件
import base64 import base64
import zlib import zlib

View File

@ -12,11 +12,11 @@ def makeMusicDBJson():
免得国服每次更新还要重新生成太麻烦 免得国服每次更新还要重新生成太麻烦
''' '''
# 记得改 # 记得改
A000_DIR = Path('/run/media/remik1r3n/软件/maimaiDX_SDGB/Sinmai_Data/StreamingAssets/A000') A000_DIR = Path('H:\PRiSM\Package\Sinmai_Data\StreamingAssets\A000')
OPTION_DIR = Path('/run/media/remik1r3n/软件/maimaiDX_SDGB/Sinmai_Data/StreamingAssets') OPTION_DIR = Path('H:\PRiSM\Package\Sinmai_Data\StreamingAssets')
music_db: dict[str, dict[str, str | int]] = {} music_db: dict[str, dict[str, str | int]] = {}
DEST_PATH = Path('./musicDB.json') DEST_PATH = Path('../Data/musicDB.json')
music_folders = [f for f in (A000_DIR / 'music').iterdir() if f.is_dir()] music_folders = [f for f in (A000_DIR / 'music').iterdir() if f.is_dir()]
for option_dir in OPTION_DIR.iterdir(): for option_dir in OPTION_DIR.iterdir():
@ -43,9 +43,10 @@ def makeMusicDBJson():
serialized += f' "{key}": {json.dumps(value, ensure_ascii=False)},\n' serialized += f' "{key}": {json.dumps(value, ensure_ascii=False)},\n'
serialized = serialized[:-2] + '\n}' serialized = serialized[:-2] + '\n}'
with open(DEST_PATH, 'w') as f: with open(DEST_PATH, 'w', encoding='utf-8') as f:
f.write(serialized) f.write(serialized)
if __name__ == '__main__': if __name__ == '__main__':
makeMusicDBJson() makeMusicDBJson()
print('Done.') print('Done.')

View File

@ -6,6 +6,12 @@ from PyQt6.QtWidgets import (
) )
from PyQt6.QtCore import Qt from PyQt6.QtCore import Qt
# 将当前目录的父目录加入到 sys.path 中
from pathlib import Path
current_dir = Path(__file__).resolve().parent
parent_dir = current_dir.parent
sys.path.append(str(parent_dir))
from API_TitleServer import * from API_TitleServer import *
def sendRequest(requestText:str, apiNameText:str, uid:int) -> str: def sendRequest(requestText:str, apiNameText:str, uid:int) -> str:
@ -14,8 +20,10 @@ def sendRequest(requestText:str, apiNameText:str, uid:int) -> str:
data = json.dumps(data) data = json.dumps(data)
except: except:
return "给出的输入不是有效的 JSON" return "给出的输入不是有效的 JSON"
try:
result = apiSDGB(data, apiNameText, uid) result = apiSDGB(data, apiNameText, uid)
except Exception as e:
return "请求失败:" + str(e)
return result return result

View File