2025适配 尚不稳定但是可能能用

This commit is contained in:
Remik1r3n 2025-06-12 22:23:52 +08:00
parent 465211ac96
commit f53dfef7ca
6 changed files with 89 additions and 108 deletions

View File

@ -71,32 +71,29 @@ def getSDGBApiHash(api):
# 有空做一下 Hash 的彩虹表?
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
def apiSDGB2(
def apiSDGB(
data: str,
targetApi: str,
userAgentExtraData: str,
noLog: bool = False,
timeout: int = 5,
useProxy: bool = False,
proxyUrl: Optional[str] = None
maxRetries: int = 3,
) -> str:
"""
舞萌DX API 通讯用函数增强版
舞萌DX 2025 API 通讯用函数
:param data: 请求数据
:param targetApi: 使用的 API
:param userAgentExtraData: UA 附加信息机台相关则为狗号如A63E01E9564用户相关则为 UID
:param noLog: 是否不记录日志
:param timeout: 请求超时时间
:param useProxy: 是否使用代理
:param proxyUrl: 代理地址如果使用代理
:return: 解码后的响应数据
"""
maxRetries = 3
# 处理参数
agentExtra = str(userAgentExtraData)
aes = aes_pkcs7(AesKey, AesIV) # Assuming aes_pkcs7, AesKey, AesIV are defined elsewhere
aes = aes_pkcs7(AesKey, AesIV)
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
# Prepare request data
# 准备好请求数据
requestDataFinal = aes.encrypt(zlib.compress(data.encode('utf-8')))
if not noLog:
@ -105,19 +102,17 @@ def apiSDGB2(
retries = 0
while retries < maxRetries:
try:
# Configure HTTP client
# 配置 HTTP 客户端
if useProxy and proxyUrl:
if not noLog:
logger.debug("使用代理")
httpClient = httpx.Client(proxy=proxyUrl, verify=False)
else:
if not noLog:
logger.debug("不使用代理")
httpClient = httpx.Client(verify=False)
# Send request
# 发送请求
response = httpClient.post(
url=endpoint + getSDGBApiHash(targetApi), # Assuming getSDGBApiHash is defined
url=endpoint + getSDGBApiHash(targetApi),
headers={
"User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}",
"Content-Type": "application/json",
@ -127,7 +122,7 @@ def apiSDGB2(
"Content-Encoding": "deflate",
"Expect": "100-continue"
},
content=requestDataFinal,
content=requestDataFinal, #数据
timeout=timeout
)
@ -139,9 +134,10 @@ def apiSDGB2(
logger.error(errorMessage)
raise SDGBRequestError(errorMessage)
# Process response
# 处理响应内容
responseContentRaw = response.content
# 先尝试解密
try:
responseContentDecrypted = aes.decrypt(responseContentRaw)
if not noLog:
@ -149,41 +145,28 @@ def apiSDGB2(
except Exception as e:
logger.warning(f"解密失败,原始响应: {responseContentRaw}, 错误: {e}")
raise SDGBResponseError("解密失败")
# 然后尝试解压
try:
# 检查 ResponseContentDecrypted 是否为 zlib 压缩格式
# 看看文件头是否正确
if not responseContentDecrypted.startswith(b'\x78\x9c'):
logger.warning("Not Zlib. Not decompressed.")
logger.warning("NOT ZLIB FORMAT")
raise Exception(f"响应内容不是 zlib 压缩格式, 内容: {responseContentDecrypted}")
responseContentFinal = zlib.decompress(responseContentDecrypted).decode('utf-8')
if not noLog:
logger.debug("成功解压响应!")
#logger.debug("成功解压响应!")
logger.debug(f"响应: {responseContentFinal}")
return responseContentFinal
except zlib.error as e:
logger.warning(f"解压失败,原始响应: {responseContentDecrypted}, 错误: {e}")
except:
logger.warning(f"解压失败,原始响应: {responseContentDecrypted}")
raise SDGBResponseError("解压失败")
# If decompression fails after attempts, trigger a retry of the entire request
retries += 1
if retries < maxRetries:
logger.warning(f"解压失败,将重试请求 (第 {retries + 1}/{maxRetries} 次)")
time.sleep(2)
continue
raise SDGBResponseError("多次尝试后仍无法解压响应")
except SDGBRequestError as e:
# Request format error, no retry
logger.error(f"请求格式错误: {e}")
raise
except SDGBResponseError as e:
# Response parsing error, retry once more
logger.warning(f"响应错误,将重试: {e}")
retries += 1
time.sleep(2)
except Exception as e:
# Other errors, retry
logger.warning(f"请求失败,将重试: {e}")
retries += 1
time.sleep(2)
@ -207,43 +190,8 @@ def calcPlaySpecial():
num2 >>= 1
return c_int32(result.value).value
"""
DEPRECATED: 旧的 SpecialNumber 算法
def calcSpecialNumber2():
max = 1037933
num2 = random.randint(1, max) * 2069
num2 += 1024 # specialnum
num3 = 0
for i in range(0, 32):
num3 <<= 1
num3 += num2 % 2
num2 >>= 1
return num3
"""
# 舞萌DX 2024
# omg it's leaking
AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
AesIV = ";;KjR1C3hgB1ovXa"
ObfuscateParam = "BEs2D5vW"
class SDGBApiError(Exception):
pass
class SDGBRequestError(SDGBApiError):
pass
class SDGBResponseError(SDGBApiError):
pass
class AESPKCS7:
class AESPKCS7_2024:
# 实现了 maimai 通讯所用的 AES 加密的类
def __init__(self, key: str, iv: str):
self.key = key.encode('utf-8')
@ -265,7 +213,7 @@ class AESPKCS7:
decrypted = unpad(decrypted_padded, AES.block_size)
return decrypted
def apiSDGB(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5):
def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5):
"""
舞萌DX 2024 API 通讯用函数
:param data: 请求数据
@ -275,7 +223,7 @@ def apiSDGB(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, t
"""
maxRetries = 3
agentExtra = str(userAgentExtraData)
aes = AESPKCS7(AesKey, AesIV)
aes = AESPKCS7_2024(AesKey, AesIV)
reqData_encrypted = aes.encrypt(data)
reqData_deflated = zlib.compress(reqData_encrypted)
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"

View File

@ -4,24 +4,38 @@ from loguru import logger
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
from HelperGetUserThing import implGetUser_
def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
# Get User Charge
currentUserCharge = implGetUser_("Charge", userId)
currentUserChargeList = currentUserCharge['userChargeList']
# All stock set to 0
for charge in currentUserChargeList:
charge['stock'] = 0
# example format
# {"userId":11088995,"length":16,"userChargeList":[{"chargeId":1,"stock":0,"purchaseDate":"2025-02-04 00:51:50","validDate":"2025-02-04 00:51:50","extNum1":0},{"chargeId":2,"stock":0,"purchaseDate":"2025-06-11 17:19:42","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":3,"stock":0,"purchaseDate":"2025-06-11 17:19:40","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":4,"stock":0,"purchaseDate":"2025-06-11 09:34:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":5,"stock":0,"purchaseDate":"2025-01-30 12:31:16","validDate":"2025-04-30 04:00:00","extNum1":0},{"chargeId":6,"stock":0,"purchaseDate":"2025-02-17 20:01:42","validDate":"2025-02-17 20:01:42","extNum1":0},{"chargeId":7,"stock":0,"purchaseDate":"2025-02-06 16:17:41","validDate":"2025-02-06 16:17:41","extNum1":0},{"chargeId":8,"stock":0,"purchaseDate":"2025-02-06 16:17:49","validDate":"2025-02-06 16:17:49","extNum1":0},{"chargeId":9,"stock":0,"purchaseDate":"2025-02-06 16:18:00","validDate":"2025-02-06 16:18:00","extNum1":0},{"chargeId":10001,"stock":1,"purchaseDate":"2025-06-11 17:19:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":10005,"stock":0,"purchaseDate":"2025-04-25 15:45:55","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10105,"stock":0,"purchaseDate":"2025-04-25 15:46:00","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10205,"stock":0,"purchaseDate":"2025-04-25 15:46:03","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":11001,"stock":0,"purchaseDate":"2025-01-08 20:43:05","validDate":"2025-04-08 04:00:00","extNum1":0},{"chargeId":30001,"stock":0,"purchaseDate":"2025-04-25 15:46:17","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":999999,"stock":0,"purchaseDate":"2025-02-06 23:03:14","validDate":"2025-02-06 23:03:14","extNum1":0}]}
def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLoginResult, dataVersion="1.40.09", romVersion="1.41.00") -> str:
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
"userData": [{
"lastRomVersion": romVersion,
"lastDataVersion": dataVersion
}],
# "userData": [{
# "lastRomVersion": romVersion,
# "lastDataVersion": dataVersion
# }],
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" #1避免覆盖
}}
logger.info("Changing version number to " + dataVersion + " and " + romVersion)
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result
if __name__ == "__main__":
userId = testUid
userId = testUid2
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
@ -29,7 +43,7 @@ if __name__ == "__main__":
logger.info("登录失败")
exit()
try:
logger.info(implChangeVersionNumber(userId, currentLoginTimestamp, loginResult, "1.00.00", "1.00.00"))
logger.info(implWipeTickets(userId, currentLoginTimestamp, loginResult))
logger.info(apiLogout(currentLoginTimestamp, userId))
finally:
logger.info(apiLogout(currentLoginTimestamp, userId))

View File

@ -5,13 +5,10 @@ import rapidjson as json
from loguru import logger
import xml.etree.ElementTree as ET
from Config import *
from API_TitleServer import apiSDGB
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction
from HelperFullPlay import implFullPlayAction, generateMusicData
class NoSelectedBonusError(Exception):
pass
@ -31,17 +28,7 @@ def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, b
1: 選択したボーナスのみ MAX にする選択したボーナスはないの場合は False を返す
2: 全部 MAX にする
"""
musicData = {
"musicId": 674, # Magical Flavor
"level": 0,
"playCount": 2,
"achievement": 0,
"comboStatus": 0,
"syncStatus": 0,
"deluxscoreMax": 0,
"scoreRank": 0,
"extNum1": 0
}
musicData = generateMusicData()
# サーバーからログインボーナスデータを取得
data = json.dumps({
"userId": int(userId),
@ -151,7 +138,7 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
if __name__ == "__main__":
# ログインボーナスデータをアップロードする
userId = testUid8
userId = testUid
currentLoginTimestamp = generateTimestamp()
currentLoginResult = apiLogin(currentLoginTimestamp, userId)
implLoginBonus(userId, currentLoginTimestamp, currentLoginResult, 2)

View File

@ -41,3 +41,18 @@ def implUnlockMusic(musicToBeUnlocked: int, userId: int, currentLoginTimestamp:i
]
unlockThingResult = implUnlockThing(userItemList, userId, currentLoginTimestamp, currentLoginResult)
return unlockThingResult
if __name__ == "__main__":
userId = testUid2
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1:
logger.info("登录失败")
exit()
try:
logger.info(implWipeTickets(userId, currentLoginTimestamp, loginResult))
logger.info(apiLogout(currentLoginTimestamp, userId))
finally:
logger.info(apiLogout(currentLoginTimestamp, userId))
#logger.warning("Error")

View File

@ -9,7 +9,29 @@ from HelperGetUserThing import implGetUser_
from loguru import logger
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
from HelperGetUserThing import implGetUser_
def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
'''清空用户所有票的 API 请求器,返回 Json String。'''
# 先得到当前用户的 Charge 数据
currentUserCharge = implGetUser_("Charge", userId)
# 取得 List
currentUserChargeList = currentUserCharge['userChargeList']
# 所有 stock 都置为 0
for charge in currentUserChargeList:
charge['stock'] = 0
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" #1避免覆盖
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result
def apiQueryTicket(userId:int) -> str:
'''查询已有票的 API 请求器,返回 Json String。'''
@ -39,7 +61,7 @@ def apiBuyTicket(userId:int, ticketType:int, price:int, playerRating:int, playCo
},
"userCharge": {
"chargeId": ticketType,
"stock": 1,
"stock": 0,
"purchaseDate": (datetime.now(pytz.timezone('Asia/Shanghai')) - timedelta(hours=1)).strftime("%Y-%m-%d %H:%M:%S.0"),
"validDate": (datetime.now(pytz.timezone('Asia/Shanghai')) - timedelta(hours=1) + timedelta(days=90)).replace(hour=4, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S")
}
@ -74,8 +96,8 @@ if __name__ == "__main__":
logger.info("登录失败")
exit()
try:
logger.info(implBuyTicket(userId, 2))
logger.info(apiLogout(currentLoginTimestamp, userId))
logger.info(implWipeTickets(userId, currentLoginTimestamp, loginResult))
#logger.info(apiQueryTicket(userId))
finally:
logger.info(apiLogout(currentLoginTimestamp, userId))
#logger.warning("Error")

View File

@ -12,11 +12,6 @@ def implGetUser_(thing:str, userId:int, noLog=False) -> dict:
# 返回 Dict
return userthingDict
# 已弃用
#def apiGetUserData(userId:int) -> str:
# """Now aka of implGetUser_(Data)"""
# return implGetUser_("Data", userId)
def apiGetUserThing(userId:int, thing:str, noLog=False) -> str:
"""获取用户数据的 API 请求器,返回 Json String"""
# 构建 Payload