This commit is contained in:
Your Name 2025-02-04 15:40:51 +08:00
commit b41b185b2e
18 changed files with 124 additions and 90 deletions

View File

@ -1,4 +1,6 @@
# 100% Standalone 的国服 AimeDB 通讯实现
# 100% Standalone 的舞萌国服 AimeDB 通讯实现
# Ver.CN1.41
# Mainline 20250203
import hashlib
import time
@ -60,7 +62,7 @@ def apiAimeDB(qrCode):
def isSGWCFormat(input_string: str) -> bool:
'''简单检查二维码字符串是否符合格式'''
"""简单检查二维码字符串是否符合格式"""
if (
len(input_string) != 84 #长度
or not input_string.startswith("SGWCMAID") #识别字
@ -72,10 +74,10 @@ def isSGWCFormat(input_string: str) -> bool:
def implAimeDB(qrCode:str, isAlreadyFinal:bool=False) -> str:
'''
"""
Aime DB 的请求的参考实现
提供完整 QRCode 内容返回响应的字符串Json格式
'''
"""
if isAlreadyFinal:
qr_code_final = qrCode
else:
@ -88,15 +90,15 @@ def implAimeDB(qrCode:str, isAlreadyFinal:bool=False) -> str:
# 获得结果
print("implAimeDB: StatusCode is ", response.status_code)
print("implAimeDB: Response Body is:", response.text)
return(response.text)
return response.text
def implGetUID(qr_content:str) -> dict:
'''
"""
包装后的 UID 扫码器实现
此函数会返回 AimeDB 传回的 Json 转成 Python 字典的结果
主要特点是添加了几个新的错误码(6000x)用来应对程序的错误
'''
"""
# 检查格式
if not isSGWCFormat(qr_content):
return {'errorID': 60001} # 二维码内容明显无效

View File

@ -26,7 +26,7 @@ class WahlapServerBoomedError(Exception):
class Request500Error(Exception):
pass
class aes_pkcs7(object):
class AES_PKCS7(object):
def __init__(self, key: str, iv: str):
self.key = key.encode('utf-8')
self.iv = iv.encode('utf-8')
@ -60,20 +60,20 @@ def SDGBApiHash(api):
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
def apiSDGB(data:str, useApi, agentExtraData, noLog=False):
'''
"""
舞萌DX 2024 API 通讯用函数
:param data: 请求数据
:param useApi: 使用的 API
:param agentExtraData: UA 附加信息机台相关则为狗号如A63E01E9564用户相关则为 UID
:param noLog: 是否不记录日志
'''
"""
maxRetries = 3
# 历史遗留代码有时候会传入 int故先全部转 str
agentExtra = str(agentExtraData)
# 编码好请求,准备发送
aes = aes_pkcs7(AesKey,AesIV)
aes = AES_PKCS7(AesKey, AesIV)
data = data
data_enc = aes.encrypt(data)
data_def = zlib.compress(data_enc)
@ -145,7 +145,7 @@ def apiSDGB(data:str, useApi, agentExtraData, noLog=False):
def calcSpecialNumber():
'''使用 c_int32 实现的 SpecialNumber 算法'''
"""使用 c_int32 实现的 SpecialNumber 算法"""
rng = random.SystemRandom()
num2 = rng.randint(1, 1037933) * 2069
num2 += 0x400
@ -159,7 +159,7 @@ def calcSpecialNumber():
def calcSpecialNumber2():
'''实验性替代 SpecialNumber 算法'''
"""实验性替代 SpecialNumber 算法"""
max = 1037933
num2 = random.randint(1, max) * 2069

View File

@ -14,7 +14,7 @@ class NoSelectedBonusError(Exception):
def apiQueryLoginBonus(userId:int) -> str:
'''ログインボーナスを取得する API'''
"""ログインボーナスを取得する API"""
data = json.dumps({
"userId": int(userId),
"nextIndex": 0,
@ -23,12 +23,12 @@ def apiQueryLoginBonus(userId:int) -> str:
return apiSDGB(data, "GetUserLoginBonusApi", userId)
def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, bonusGenerateMode=1):
'''
"""
ログインボーナスデータをアップロードする
bonusGenerateMode ログインボーナスを生成する方法を指定します
1: 選択したボーナスのみ MAX にする選択したボーナスはないの場合は False を返す
2: 全部 MAX にする
'''
"""
musicData = {
"musicId": 674, # Magical Flavor
"level": 0,
@ -64,12 +64,12 @@ def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, b
return result
def generateLoginBonusList(UserLoginBonusList, generateMode=1):
'''
"""
ログインボーナスリストを生成します
generateMode ログインボーナスを生成する方法を指定します
1: 選択したボーナスのみ MAX にする選択したボーナスはないの場合は False を返す
2: 全部 MAX にする
'''
"""
# HDDから、ログインボーナスデータを読み込む
# アップデートがある場合、このファイルを更新する必要があります
# 必ず最新のデータを使用してください
@ -145,19 +145,3 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
logger.debug(f"ログインボーナスリスト: {bonusList}")
return bonusList
if __name__ == "__main__":
userId = testUid
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1:
logger.info("登录失败")
exit()
try:
# Remember to change the mode!!!
logger.info(implLoginBonus(userId, currentLoginTimestamp, loginResult, 1))
logger.info(apiLogout(currentLoginTimestamp, userId))
except:
logger.info(apiLogout(currentLoginTimestamp, userId))
logger.warning("Error")

View File

@ -26,10 +26,10 @@ def implDeleteMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginRe
return result
def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int, achievement:int, dxScore:int) -> str:
'''
"""
VERY EARLY STAGE OF UPLOADING SCORES!!!! DO NOT USE THIS!!!!
上传成绩的参考实现
'''
"""
# 要上传的数据
musicData= ({

View File

@ -7,9 +7,9 @@ from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperUnlockThing import implUnlockThing
def implUnlockSingleItem(itemId: int, itemKind: int, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
'''
"""
发单个东西比如搭档 10
'''
"""
userItemList = [
{
"itemKind": itemKind,
@ -22,9 +22,9 @@ def implUnlockSingleItem(itemId: int, itemKind: int, userId: int, currentLoginTi
return unlockThingResult
def implUnlockMusic(musicToBeUnlocked: int, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
'''
"""
解锁乐曲
'''
"""
userItemList = [
{
"itemKind": 5,
@ -41,22 +41,3 @@ def implUnlockMusic(musicToBeUnlocked: int, userId: int, currentLoginTimestamp:i
]
unlockThingResult = implUnlockThing(userItemList, userId, currentLoginTimestamp, currentLoginResult)
return unlockThingResult
if __name__ == "__main__":
userId = testUid
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
# Change you want item ID
wantToUnlockItemId = 11538
if loginResult['returnCode'] != 1:
logger.info("登录失败")
exit()
try:
# Change you want to unlock music or something
logger.info(implUnlockMusic(wantToUnlockItemId, userId, currentLoginTimestamp, loginResult))
logger.info(apiLogout(currentLoginTimestamp, userId))
except:
logger.info(apiLogout(currentLoginTimestamp, userId))
logger.warning("Error")

View File

@ -22,7 +22,7 @@ BASE_URL = 'https://www.diving-fish.com/api/maimaidxprober'
COMBO_ID_TO_NAME = ['', 'fc', 'fcp', 'ap', 'app']
SYNC_ID_TO_NAME = ['', 'fs', 'fsp', 'fsd', 'fsdp', 'sync']
def apiDivingFish(method:str, apiPath:str, importToken:str, data:dict=None) -> dict:
def apiDivingFish(method:str, apiPath:str, importToken:str, data=None):
'''水鱼查分器的 API 通讯实现'''
headers = {
"Import-Token": importToken
@ -56,7 +56,7 @@ def getFishRecords(importToken: str) -> dict:
def updateFishRecords(importToken: str, records: list[dict]) -> dict:
return apiDivingFish('POST', '/player/update_records', importToken, records)
def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList: list) -> list:
def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list:
'''舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式'''
divingFishList = []
for currentMusicDetail in userMusicDetailList:

View File

@ -43,7 +43,7 @@ def apiBuyTicket(userId:int, ticketType:int, price:int, playerRating:int, playCo
# 发送请求,返回最终得到的 Json String 回执
return apiSDGB(data, "UpsertUserChargelogApi", userId)
def implBuyTicket(userId:int, ticketType:int) -> str:
def implBuyTicket(userId:int, ticketType:int):
'''
购买倍票 API 的参考实现
需要事先登录.

View File

@ -11,4 +11,4 @@ loginBonusDBPathFallback = "./maimaiDX-Api/Data/loginBonusDB.json"
musicDBPathFallback = "./maimaiDX-Api/Data/musicDB.json"
# 日本精工,安全防漏
#from MyConfig import *
from MyConfig import *

View File

@ -1,3 +1,6 @@
# 解小黑屋实现
# 未完工
from Config import *
from API_TitleServer import *
from GetPreview import apiGetUserPreview
@ -47,4 +50,4 @@ def forceLogout(userId, beginTimestamp):
# 示例使用
userId = testUid
human_time = "2023-10-01 12:00:00" # 用户输入的人类可读时间
force_logout(userId, human_time)
forceLogout(userId, human_time)

View File

@ -8,7 +8,7 @@ from HelperUploadUserPlayLog import apiUploadUserPlaylog
from HelperUserAll import generateFullUserAll
def generateMusicData():
'''生成一份占位的音乐数据'''
"""生成一份占位的音乐数据"""
return {
"musicId": 834, # PANDORA PARADOXXX
"level": 4,
@ -25,7 +25,7 @@ def applyUserAllPatches(userAll, patches):
"""
递归地将给定的补丁应用到用户数据的各个层次
:param user_all: 原始用户数据
:param userAll: 原始用户数据
:param patches: 包含所有patch的字典
"""
for key, value in patches.items():
@ -45,11 +45,11 @@ def applyUserAllPatches(userAll, patches):
# 否则直接更新或添加key
userAll[key] = value
def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResult, musicData, userAllPatches, debugMode=False) -> str:
'''
def implFullPlayAction(userId: int, currentLoginTimestamp:int, currentLoginResult, musicData, userAllPatches, debugMode=False):
"""
一份完整的上机实现可以打 patch 来实现各种功能
需要在外部先登录并传入登录结果
'''
"""
# 取得 UserData
currentUserData = implGetUser_("Data", userId)

View File

@ -14,7 +14,7 @@ import sys
#logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True)
def getUserMusicDetail(userId:int, nextIndex:int=0, maxCount:int=50) -> dict:
'''获取用户的成绩的API'''
"""获取用户的成绩的API"""
data = json.dumps({
"userId": int(userId),
"nextIndex": nextIndex,
@ -22,9 +22,9 @@ def getUserMusicDetail(userId:int, nextIndex:int=0, maxCount:int=50) -> dict:
})
return json.loads(apiSDGB(data, "GetUserMusicApi", userId))
def getUserFullMusicDetail(userId: int) -> dict:
'''获取用户的全部成绩'''
userMusicDetailList_current = []
def getUserFullMusicDetail(userId: int):
"""获取用户的全部成绩"""
currentUserMusicDetailList = []
nextIndex:int|None = None # 初始化 nextIndex
while nextIndex != 0 or nextIndex is None: #只要还有nextIndex就一直获取获取
userMusicResponse = getUserMusicDetail(userId, nextIndex or 0)
@ -34,11 +34,11 @@ def getUserFullMusicDetail(userId: int) -> dict:
for currentMusicDetail in currentMusic['userMusicDetailList']:
if not currentMusicDetail['playCount'] > 0:
continue
userMusicDetailList_current.append(currentMusicDetail)
return userMusicDetailList_current
currentUserMusicDetailList.append(currentMusicDetail)
return currentUserMusicDetailList
def parseUserFullMusicDetail(userFullMusicDetailList: list) -> dict:
'''解析用户的全部成绩'''
def parseUserFullMusicDetail(userFullMusicDetailList: list):
"""解析用户的全部成绩,给出一个迫真人类可读 list 套 dict"""
musicDetailList = []
for currentMusicDetail in userFullMusicDetailList:
musicDetailList.append({
@ -50,7 +50,7 @@ def parseUserFullMusicDetail(userFullMusicDetailList: list) -> dict:
return musicDetailList
if __name__ == '__main__':
userId = 11088995
userId = testUid
currentLoginTimestamp = generateTimestamp()
#loginResult = apiLogin(currentLoginTimestamp, userId)

View File

@ -4,7 +4,7 @@ import json
from API_TitleServer import apiSDGB
def apiGetUserData(userId:int) -> str:
'''已弃用,将逐步淘汰'''
"""已弃用,将逐步淘汰"""
logger.info("apiGetUserData 已弃用,将逐步淘汰。")
# 构建 Payload
data = json.dumps({
@ -16,7 +16,7 @@ def apiGetUserData(userId:int) -> str:
return userdata_result
def apiGetUserThing(userId:int, thing:str, noLog=False) -> str:
'''获取用户数据的 API 请求器,返回 Json String'''
"""获取用户数据的 API 请求器,返回 Json String"""
# 构建 Payload
data = json.dumps({
"userId": userId
@ -27,7 +27,7 @@ def apiGetUserThing(userId:int, thing:str, noLog=False) -> str:
return userthing_result
def implGetUser_(thing:str, userId:int, noLog=False) -> dict:
'''获取用户某些数据的 API 实现,返回 Dict'''
"""获取用户某些数据的 API 实现,返回 Dict"""
# 获取 Json String
userthing_result = apiGetUserThing(userId, thing, noLog)
# 转换为 Dict

View File

@ -9,7 +9,7 @@ from Config import *
from API_TitleServer import apiSDGB
def apiLogin(timestamp:int, userId:int) -> dict:
'''登录,返回服务器给的 Json 的 dict'''
"""登录,返回服务器给的 Json 的 dict"""
data = json.dumps({
"userId": userId,
"accessCode": "",
@ -25,7 +25,7 @@ def apiLogin(timestamp:int, userId:int) -> dict:
return login_result
def apiLogout(timestamp:int, userId:int) -> dict:
'''登出,返回 Json dict'''
"""登出,返回 Json dict"""
data = json.dumps({
"userId": userId,
"accessCode": "",
@ -41,7 +41,7 @@ def apiLogout(timestamp:int, userId:int) -> dict:
def generateTimestamp() -> int:
'''生成时间戳'''
"""生成时间戳"""
timestamp = int(time.time()) - 60
logger.info(f"生成时间戳: {timestamp}")
return timestamp

View File

@ -3,7 +3,7 @@ from loguru import logger
def getMusicTitle(musicId: int) -> str:
'''从数据库获取音乐的标题'''
"""从数据库获取音乐的标题"""
#logger.debug(f"查询歌名: {musicId}")
musicInfo = musicDB.get(musicId)
if not musicInfo:

View File

@ -11,7 +11,7 @@ from API_TitleServer import apiSDGB
from Config import *
def apiUploadUserPlaylog(userId:int, musicDataToBeUploaded, currentUserData2, loginId:int) -> str:
'''返回 Json String。'''
"""返回 Json String。"""
# 构建一个 PlayLog
data = json.dumps({

View File

@ -6,7 +6,7 @@ from Config import *
from HelperGetUserThing import implGetUser_
def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentSpecialNumber):
'''从服务器取得必要的数据并构建一个比较完整的 UserAll'''
"""从服务器取得必要的数据并构建一个比较完整的 UserAll"""
# 先构建一个基础 UserAll
currentUserAll = generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentSpecialNumber)
@ -30,7 +30,7 @@ def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, curre
def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentSpecialNumber):
'''构建一个非常基础的 UserAll 数据,必须手动填充一些数据'''
"""构建一个非常基础的 UserAll 数据,必须手动填充一些数据"""
data = {
"userId": userId,

View File

@ -0,0 +1,65 @@
# 舞萌DX AimeDB 服务器模拟实现
# 适用于舞萌DX 2024
# 理论可用于 HDD 登号等(这种情况下自行修改 hosts
## 配置
# 0 返回本地生成的假结果
# 1 原样返回官方服务器的结果
useOfficialServer = 0
# 本地生成假结果时候总是返回这个UID
DUMMY_USER_ID = 10086
from fastapi import (
FastAPI,
Request
)
from fastapi.responses import (
PlainTextResponse,
JSONResponse
)
import json
import uvicorn
from loguru import logger
# 将当前目录的父目录加入到 sys.path 中
import sys
from pathlib import Path
current_dir = Path(__file__).resolve().parent
parent_dir = current_dir.parent
sys.path.append(str(parent_dir))
from API_AimeDB import implAimeDB, calcSEGAAimeDBAuthKey
app = FastAPI()
@app.post('/qrcode/api/alive_check')
async def qrcode_alive_check_api():
return PlainTextResponse('alive')
@app.post('/wc_aime/api/alive_check')
async def wc_aime_alive_check_api():
return PlainTextResponse('alive')
@app.post('/wc_aime/api/get_data')
async def get_data_dummy_api(request: Request):
gotRequest = json.loads((await request.body()).decode())
if useOfficialServer == 0:
fakeTimestamp = str(int(gotRequest['timestamp'])+3)
currentResponse = {
'errorID': 0,
'key': calcSEGAAimeDBAuthKey(str(DUMMY_USER_ID), fakeTimestamp),
'timestamp': fakeTimestamp,
'userID': DUMMY_USER_ID
}
logger.info(f"返回假结果: {currentResponse}")
return JSONResponse(currentResponse)
elif useOfficialServer == 1:
# 发给真正的 AimeDB
realAimeDBResponse = implAimeDB(gotRequest['qrCode'], True)
return JSONResponse(json.loads(realAimeDBResponse))
else:
pass
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=80)

View File

@ -1,5 +1,4 @@
# 感谢伟大的 Diving-Fish 让我被迫直面恐惧写这个逼玩意
# 感谢伟大的 Diving-Fish 让我被迫直面恐惧
import xml.dom.minidom as minidom
from pathlib import Path
import json