Merge branch 'master' of github.com:Remik1r3n/maimaiDX-Api

This commit is contained in:
Remik1r3n 2025-02-07 14:47:39 +08:00
commit 486da8a4c7
10 changed files with 165 additions and 96 deletions

View File

@ -3,16 +3,13 @@
import zlib
import hashlib
import requests
import httpx
from loguru import logger
import random
import time
from ctypes import c_int32
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from Config import *
# 舞萌DX 2024
@ -20,10 +17,16 @@ AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
AesIV = ";;KjR1C3hgB1ovXa"
ObfuscateParam = "BEs2D5vW"
class WahlapServerBoomedError(Exception):
class SDGBApiError(Exception):
pass
class Request500Error(Exception):
class SDGBMaxRetriesError(SDGBApiError):
pass
class SDGBRequestError(SDGBApiError):
pass
class SDGBResponseError(SDGBApiError):
pass
class AES_PKCS7(object):
@ -56,93 +59,84 @@ class AES_PKCS7(object):
padding_text = chr(padding) * padding
return text + padding_text
def SDGBApiHash(api):
def getSDGBApiHash(api):
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
def apiSDGB(data:str, useApi, agentExtraData, noLog=False):
def apiSDGB(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5):
"""
舞萌DX 2024 API 通讯用函数
:param data: 请求数据
:param useApi: 使用的 API
:param agentExtraData: UA 附加信息机台相关则为狗号如A63E01E9564用户相关则为 UID
:param targetApi: 使用的 API
:param userAgentExtraData: UA 附加信息机台相关则为狗号如A63E01E9564用户相关则为 UID
:param noLog: 是否不记录日志
"""
maxRetries = 3
# 历史遗留代码有时候会传入 int故先全部转 str
agentExtra = str(agentExtraData)
# 编码好请求,准备发送
agentExtra = str(userAgentExtraData)
aes = AES_PKCS7(AesKey, AesIV)
data = data
data_enc = aes.encrypt(data)
data_def = zlib.compress(data_enc)
requests.packages.urllib3.disable_warnings()
reqData_encrypted = aes.encrypt(data)
reqData_deflated = zlib.compress(reqData_encrypted)
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
if not noLog:
logger.debug("TitleServer Request Start: "+ str(useApi)+" , Data: "+str(data))
logger.debug(f"开始请求 {targetApi},以 {data}")
retries = 0
while retries < maxRetries:
try:
# 发送请求
responseRaw = requests.post(endpoint + SDGBApiHash(useApi), headers={
"User-Agent": f"{SDGBApiHash(useApi)}#{agentExtra}",
"Content-Type": "application/json",
"Mai-Encoding": "1.40",
"Accept-Encoding": "",
"Charset": "UTF-8",
"Content-Encoding": "deflate",
"Expect": "100-continue"
}, data=data_def, verify=False)
logger.debug("TitleServer Request Sent.")
logger.debug("TitleServer Response Code: " + str(responseRaw.status_code))
# 如果是 404 或 500直接抛出异常不再继续重试
match responseRaw.status_code:
case 200:
logger.debug("Request 200 OK!")
case 404:
logger.error(f"Request 404! ")
raise NotImplementedError
case 500:
logger.error(f"Request Failed! 500!!!! ")
raise Request500Error
case _:
logger.error(f"Request Failed! {responseRaw.status_code}")
raise NotImplementedError
responseContent = responseRaw.content
# 尝试解压请求
response = httpx.post(
url=endpoint + getSDGBApiHash(targetApi),
headers={
"User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}",
"Content-Type": "application/json",
"Mai-Encoding": "1.40",
"Accept-Encoding": "",
"Charset": "UTF-8",
"Content-Encoding": "deflate",
"Expect": "100-continue"
},
content=reqData_deflated,
verify=False,
timeout=timeout
)
logger.info(f"{targetApi} 请求结果: {response.status_code}")
if response.status_code == 200:
logger.debug("200 OK!")
else:
errorMessage = f"请求失败: {response.status_code}"
logger.error(errorMessage)
raise SDGBRequestError(errorMessage)
responseRAWContent = response.content
try:
responseDecompressed = zlib.decompress(responseContent)
logger.debug("Successfully decompressed response.")
except zlib.error as e:
logger.warning(f"RAW Response: {responseContent}")
logger.warning(f"Wahlap Server Boomed! Will now retry.{e}")
retries += 1
time.sleep(4) # 休眠4秒后重试
continue
# 解压成功,解密请求并返回
responseDecompressed = zlib.decompress(responseRAWContent)
logger.debug("成功解压响应!")
except zlib.error:
logger.warning(f"无法解压,得到的原始响应: {responseRAWContent}")
raise SDGBResponseError("Decompression failed")
resultResponse = unpad(aes.decrypt(responseDecompressed), 16).decode()
logger.info("TitleServer:" + useApi + " Response: " + str(responseRaw.status_code))
if not noLog:
logger.debug("TitleServer Response: " + str(resultResponse))
logger.debug(f"响应: {resultResponse}")
return resultResponse
# 除了 404 和 500 之外的错误重试
except Request500Error:
raise Request500Error("500请求格式错误")
except NotImplementedError:
raise NotImplementedError("请求未知错误")
# 异常处理
except SDGBRequestError as e:
# 请求格式错误,不需要重试
raise SDGBRequestError("请求格式错误")
except SDGBResponseError as e:
# 响应解析错误,重试但是只一次
logger.warning(f"Will now retry. {e}")
retries += 2
time.sleep(2)
except Exception as e:
logger.warning(f"Request Failed! Will now retry.. {e}")
# 其他错误,重试
logger.warning(f"Will now retry. {e}")
retries += 1
time.sleep(3)
else:
# 重试次数用尽WahlapServerBoomedError
raise WahlapServerBoomedError("重试多次仍然不能成功请求")
raise SDGBApiError("Multiple retries failed to make a successful request")
def calcSpecialNumber():
"""使用 c_int32 实现的 SpecialNumber 算法"""
@ -157,7 +151,6 @@ def calcSpecialNumber():
num2 >>= 1
return c_int32(result.value).value
def calcSpecialNumber2():
"""实验性替代 SpecialNumber 算法"""
max = 1037933

View File

@ -12,7 +12,6 @@ from HelperFullPlay import implFullPlayAction
class NoSelectedBonusError(Exception):
pass
def apiQueryLoginBonus(userId:int) -> str:
"""ログインボーナスを取得する API"""
data = json.dumps({

View File

@ -86,8 +86,12 @@ def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list:
'dxScore': currentMusicDetail['deluxscoreMax'],
})
except:
print(currentMusicDetail)
logger.error(f"Error: {currentMusicDetail}")
logger.error(f"Fish Format Translate Error: {currentMusicDetail}")
# debug output fish list to file
#with open("fishList.txt", "w", encoding="utf-8") as f:
# f.write(str(divingFishList))
return divingFishList
def isVaildFishToken(importToken:str):
@ -101,23 +105,20 @@ def isVaildFishToken(importToken:str):
return True
def implUserMusicToDivingFish(userId:int, fishImportToken:str):
'''上传所有成绩到水鱼的参考实现'''
logger.info("Start to upload user music detail to DivingFish")
'''上传所有成绩到水鱼的参考实现返回成绩的数量或者False'''
logger.info("开始上传舞萌成绩到水鱼查分器!")
userFullMusicDetailList = getUserFullMusicDetail(userId)
logger.info("Got UserData, Convert to Fish Format")
logger.info("成功得到成绩!转换成水鱼格式..")
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
logger.ionfo("Convert OK. Start to Update Fish Records")
updateFishRecords(fishImportToken, divingFishData)
logger.info("转换成功!开始上传水鱼..")
if not updateFishRecords(fishImportToken, divingFishData)
logger.error("上传失败!")
return False
return len(divingFishData)
if __name__ == '__main__':
if True:
userId = None
importToken = None
userId = testUid2
importToken = testImportToken
#currentLoginTimestamp = generateTimestamp()
userFullMusicDetailList = getUserFullMusicDetail(userId)
logger.warning("Now We Begin To Build DivingFish Data")
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
logger.debug(divingFishData)
logger.warning("Now We Begin To Update DivingFish Data")
updateFishRecords(importToken, divingFishData)
implUserMusicToDivingFish(userId, importToken)

View File

@ -60,3 +60,10 @@ def implBuyTicket(userId:int, ticketType:int):
getTicketResponseStr = apiBuyTicket(userId, ticketType, ticketType-1, playerRating, playCount)
# 返回结果
return getTicketResponseStr
if __name__ == "__main__":
userId = testUid2
ticketType = 3
print(implBuyTicket(userId, ticketType))
print(apiQueryTicket(userId))

View File

@ -3,6 +3,8 @@
import json
from API_TitleServer import apiSDGB
from Config import *
import time
import random
def apiGetUserPreview(userId) -> str:
data = json.dumps({
@ -16,3 +18,34 @@ if __name__ == "__main__":
#userId = input("请输入用户 ID")
userId = testUid
print(apiGetUserPreview(userId))
def crawlAllUserPreview():
# 这里设置开始和结束的 UserId
BeginUserId = 11000000
EndUserId = 12599999
# 打开文件,准备写入
with open('Remi_UserID_DB_Output.txt', 'w', encoding="utf-8") as f:
# 遍历 UserId
for userId in range(BeginUserId, EndUserId + 1):
# 调用 API
try:
userPreview = apiGetUserPreview(userId)
currentUser = json.loads(userPreview)
if currentUser["userId"] is not None:
# 每爬到一个就把它存到一个文件里面,每个一行
f.write(userPreview + "\n")
else:
f.write("\n")
except:
f.write("ERROR\n")
time.sleep(4)
f.flush()
# 随机等待0.2-0.5秒
time.sleep(random.uniform(0.2, 0.5))
print('Finished!')
if __name__ == "__main__":
crawlAllUserPreview()

View File

@ -2,7 +2,7 @@ import json
from loguru import logger
from Config import *
from API_TitleServer import apiSDGB, calcSpecialNumber, WahlapServerBoomedError, Request500Error
from API_TitleServer import *
from HelperGetUserThing import implGetUser_
from HelperUploadUserPlayLog import apiUploadUserPlaylog
from HelperUserAll import generateFullUserAll
@ -80,16 +80,16 @@ def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResul
# 开始上传 UserAll
try:
currentUserAllResult = json.loads(apiSDGB(data, "UpsertUserAllApi", userId))
except Request500Error:
except SDGBRequestError:
logger.warning("上传 UserAll 出现 500. 重建数据.")
retries += 1
continue
except Exception:
raise WahlapServerBoomedError("邪门错误")
raise SDGBApiError("邪门错误")
# 成功上传后退出循环
break
else: # 重试次数超过3次
raise Request500Error("多次尝试后仍无法成功上传 UserAll")
raise SDGBRequestError
logger.info("上机:结果:"+ str(currentUserAllResult))
return currentUserAllResult

View File

@ -5,7 +5,7 @@ from API_TitleServer import apiSDGB
def apiGetUserData(userId:int) -> str:
"""已弃用,将逐步淘汰"""
logger.info("apiGetUserData 已弃用,将逐步淘汰。")
#logger.info("apiGetUserData 已弃用,将逐步淘汰。")
# 构建 Payload
data = json.dumps({
"userId": userId

View File

@ -10,7 +10,8 @@ levelIdDict = {
"": 1,
"": 2,
"": 3,
"": 4
"": 4,
"": 5
}
def getHalfWidthString(s):
@ -58,7 +59,6 @@ def getFriendlyUserData(userId:int) -> str:
def getHumanReadableRegionData(userRegion:str) -> str:
'''生成一个人类可读的地区数据'''
#Example Data: {"userId":11088995,"length":7,"userRegionList":[{"regionId":1,"playCount":1,"created":"2023-06-23 20:06:20"},{"regionId":8,"playCount":3,"created":"2024-06-05 22:57:41"},{"regionId":13,"playCount":7,"created":"2024-10-08 19:16:27"},{"regionId":16,"playCount":3,"created":"2024-08-02 11:54:29"},{"regionId":17,"playCount":46,"created":"2023-11-18 20:14:56"},{"regionId":22,"playCount":907,"created":"2022-08-18 20:19:08"},{"regionId":27,"playCount":18,"created":"2024-11-05 23:42:43"}]}
userRegionList = userRegion.get("userRegionList")
logger.info(userRegionList)
result = ""
@ -69,7 +69,6 @@ def getHumanReadableRegionData(userRegion:str) -> str:
result += f"\n{regionName} 游玩次数: {playCount} 首次游玩: {created}"
return result
def getHumanReadablePreview(preview_json_content:str) -> str:
'''简单,粗略地解释 Preview 的 Json String 为人话。'''
previewData = json.loads(preview_json_content)

View File

@ -2,6 +2,8 @@
# 适用于舞萌DX 2024
# 理论可用于 HDD 登号等(这种情况下自行修改 hosts
# SGWCMAID111111111111AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
## 配置
# 0 返回本地生成的假结果
# 1 原样返回官方服务器的结果

35
_Special.py Normal file
View File

@ -0,0 +1,35 @@
# 纯纯测试用
from loguru import logger
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLoginResult, dataVersion="1.40.09", romVersion="1.41.00") -> str:
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
"userData": [{
"playerRating": 114514,
}],
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" #1避免覆盖
}}
logger.info("Changing version number to " + dataVersion + " and " + romVersion)
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result
if __name__ == "__main__":
userId = testUid
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1:
logger.info("登录失败")
exit()
try:
logger.info(implChangeVersionNumber(userId, currentLoginTimestamp, loginResult, "1.00.00", "1.00.00"))
logger.info(apiLogout(currentLoginTimestamp, userId))
finally:
logger.info(apiLogout(currentLoginTimestamp, userId))
#logger.warning("Error")