Initial commit: Add maimaiDX API web application with AimeDB scanning and logging features
This commit is contained in:
33
backend/Standalone/DecryptAuthLiteTraffic.py
Normal file
33
backend/Standalone/DecryptAuthLiteTraffic.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# 解密国服 Auth Lite 的包
|
||||
# 仅测试过 PowerOn 请求,其他包不确定是否适用
|
||||
# 仅适用国服 DX2024,其他未测试
|
||||
|
||||
# 完全 Standalone,不依赖于其他文件
|
||||
|
||||
import base64
|
||||
from Crypto.Cipher import AES
|
||||
from urllib.parse import unquote
|
||||
from Crypto.Util.Padding import unpad
|
||||
|
||||
# 密钥从 HDD 提取
|
||||
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
|
||||
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
|
||||
|
||||
# 填入你抓包得到的想解密的数据的 base64 编码
|
||||
base64String = "N0PyrjawH/7qA8A28y++3txHDsKuAs5+nib751QiNlJYigvwldPaG7xd0WYXvgqlWY16JIy38GQ8+M4ttaWRNfpWy9l29pC2h2abd4VGhIeWGLbOjc2Bthqhibui76vi4dW+05TsPiyXbOsqHFzScvdByKUtZUobZgrnr/WW+YqRIUdw/ZHBmKBY81JivnVH9AkEyCCP9xubYMjDqi65WhDpcrdMk5nUjHq/O7R1eXr12Es9gXDUruy/H4M7eMt+4kFSDCGpLSFwAEDhba6rpOz0n588nfvXXFlZ+a3ZsZSBYAJPBZ795Ck8ZDIYnEMWMV5nk6qPc2HiBF9ZZw88FlATGC8NqsTSjGX6JJXWDApUaSF5obXMu4LTmMMr0KDt2fQ6VQPkLnTgJ6tsJv1iAQvtcZ9ymn3I4S0XWXGmEq8r7XE7D+pnSJyjUn7bSXb6HOzCQtQc9XYmIbylS2sNkiDXywrxVgmiAXc4Ta8M9aNUb+81QrKj6qqC06DzdYQNBFRxb78X3nGYECEmKBsxIOR7Mf/ZqWYtA28Ob9H5CCUmjLvUaoQ+htJMHfQ9fvvevfu+FxtlJXfw+3UQDiQ1xvSZe2NMCgkLOuwqZ5/5PyoAV9MKzRXT4hBzDoiAIt7bzOH9JcNJkjUtLAjXbnwN6M6zUKpgMK4WYeCwUffNy21GbLVtfIxZZbVhK8A6Ni7j"
|
||||
|
||||
def auth_lite_decrypt(ciphertext: bytes) -> str:
|
||||
# 解密并去除填充
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
|
||||
# 提取内容并解码
|
||||
content = decrypted_data[16:] # 去除头部的16字节
|
||||
return content.decode('utf-8').strip()
|
||||
|
||||
# 解码 base64
|
||||
decodedData = base64.b64decode(base64String)
|
||||
# 解密数据
|
||||
decryptedData = auth_lite_decrypt(decodedData)
|
||||
print(decryptedData)
|
||||
# 解码 URL 编码
|
||||
print(unquote(decryptedData))
|
||||
74
backend/Standalone/DecryptTitleServerTraffic.py
Normal file
74
backend/Standalone/DecryptTitleServerTraffic.py
Normal file
@@ -0,0 +1,74 @@
|
||||
# 解密从 HDD 抓包得到的数据
|
||||
# 兼容 PRiSM 和 CN 2024
|
||||
|
||||
# 完全 Standalone,不依赖于其他文件
|
||||
|
||||
import base64
|
||||
import zlib
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import unpad, pad
|
||||
|
||||
# CN 2024
|
||||
AES_KEY_SDGB_1_40 = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
|
||||
AES_IV_SDGB_1_40 = ";;KjR1C3hgB1ovXa"
|
||||
|
||||
# 国际服 PRiSM
|
||||
AES_KEY_SDGA_1_50 = "A;mv5YUpHBK3YxTy5KB^[;5]C2AL50Bq"
|
||||
AES_IV_SDGA_1_50 = "9FM:sd9xA91X14v]"
|
||||
|
||||
AES_KEY_SDGB_1_50 = "a>32bVP7v<63BVLkY[xM>daZ1s9MBP<R"
|
||||
AES_IV_SDGB_1_50 = "d6xHIKq]1J]Dt^ue"
|
||||
|
||||
class AESPKCS7:
|
||||
# 实现了 maimai 通讯所用的 AES 加密的类
|
||||
def __init__(self, key: str, iv: str):
|
||||
self.key = key.encode('utf-8')
|
||||
self.iv = iv.encode('utf-8')
|
||||
self.mode = AES.MODE_CBC
|
||||
# 加密
|
||||
def encrypt(self, content: str) -> bytes:
|
||||
cipher = AES.new(self.key, self.mode, self.iv)
|
||||
content_padded = pad(content.encode(), AES.block_size)
|
||||
encrypted_bytes = cipher.encrypt(content_padded)
|
||||
return encrypted_bytes
|
||||
# 解密
|
||||
def decrypt(self, encrypted_content: bytes) -> str:
|
||||
cipher = AES.new(self.key, self.mode, self.iv)
|
||||
decrypted_padded = cipher.decrypt(encrypted_content)
|
||||
decrypted = unpad(decrypted_padded, AES.block_size)
|
||||
return decrypted
|
||||
|
||||
def main_sdga():
|
||||
# 填入你的想解密的数据的 base64 编码
|
||||
base64_encoded_data = "KSGm2qo7qVHz1wrK15PckYC5/kLjKcTtEXOgHeHt1Xn6DPdo3pltoPLADHpe8+Wq"
|
||||
|
||||
aes = AESPKCS7(AES_KEY_SDGA_1_50, AES_IV_SDGA_1_50)
|
||||
|
||||
# 首先解码 base64
|
||||
decodedData = base64.b64decode(base64_encoded_data)
|
||||
# 然后解密数据,PRiSM 是先压缩再加密(which is 正确做法)
|
||||
decryptedData = aes.decrypt(decodedData)
|
||||
# 解压数据
|
||||
decompressedData = zlib.decompress(decryptedData)
|
||||
|
||||
print(str(decompressedData))
|
||||
|
||||
def main_sdgb140():
|
||||
# 填入你的想解密的数据的 base64 编码
|
||||
base64_encoded_data = "eJyrTVvpuGwCR32OdodwtVXZ7/Ofmfhin7k/K61q3XNoad1rAPGwECU="
|
||||
|
||||
aes = AESPKCS7(AES_KEY_SDGB_1_40, AES_IV_SDGB_1_40)
|
||||
|
||||
# 首先解码 base64
|
||||
decodedData = base64.b64decode(base64_encoded_data)
|
||||
# 然后解压数据,CN 2024 是加密后再压缩(纯傻逼
|
||||
decompressedData = zlib.decompress(decodedData)
|
||||
# 最后解密数据
|
||||
decryptedData = aes.decrypt(decompressedData)
|
||||
|
||||
print(str(decryptedData))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main_sdga()
|
||||
main_sdgb140()
|
||||
78
backend/Standalone/DummyAimeDBServer.py
Normal file
78
backend/Standalone/DummyAimeDBServer.py
Normal file
@@ -0,0 +1,78 @@
|
||||
# 舞萌DX AimeDB 服务器模拟实现
|
||||
# 适用于舞萌DX 2024
|
||||
# 理论可用于 HDD 登号等(这种情况下自行修改 hosts
|
||||
|
||||
# SGWCMAID111111111111AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
|
||||
|
||||
## 配置
|
||||
# 0 返回本地生成的假结果
|
||||
# 1 原样返回官方服务器的结果
|
||||
useOfficialServer = 0
|
||||
|
||||
from loguru import logger
|
||||
# 解析命令行参数,作为用户 ID
|
||||
import argparse
|
||||
try:
|
||||
parser = argparse.ArgumentParser(description="舞萌DX AimeDB 服务器模拟实现")
|
||||
#parser.add_argument("userId", type=int, help="用户 ID")
|
||||
#args = parser.parse_args()
|
||||
#DUMMY_USER_ID = args.userId
|
||||
raise Exception("未传入用户 ID")
|
||||
except Exception as e:
|
||||
logger.warning(f"{e},未传入用户 ID,使用默认值")
|
||||
DUMMY_USER_ID = 1
|
||||
|
||||
from fastapi import (
|
||||
FastAPI,
|
||||
Request
|
||||
)
|
||||
from fastapi.responses import (
|
||||
PlainTextResponse,
|
||||
JSONResponse
|
||||
)
|
||||
import rapidjson as json
|
||||
import uvicorn
|
||||
from loguru import logger
|
||||
|
||||
# 将当前目录的父目录加入到 sys.path 中
|
||||
import sys
|
||||
from pathlib import Path
|
||||
current_dir = Path(__file__).resolve().parent
|
||||
parent_dir = current_dir.parent
|
||||
sys.path.append(str(parent_dir))
|
||||
|
||||
from API_AimeDB import implAimeDB, calcSEGAAimeDBAuthKey
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
@app.post('/qrcode/api/alive_check')
|
||||
async def qrcode_alive_check_api():
|
||||
return PlainTextResponse('alive')
|
||||
|
||||
@app.post('/wc_aime/api/alive_check')
|
||||
async def wc_aime_alive_check_api():
|
||||
return PlainTextResponse('alive')
|
||||
|
||||
@app.post('/wc_aime/api/get_data')
|
||||
async def get_data_dummy_api(request: Request):
|
||||
gotRequest = json.loads((await request.body()).decode())
|
||||
|
||||
if useOfficialServer == 0:
|
||||
fakeTimestamp = str(int(gotRequest['timestamp'])+3)
|
||||
currentResponse = {
|
||||
'errorID': 0,
|
||||
'key': calcSEGAAimeDBAuthKey(str(DUMMY_USER_ID), fakeTimestamp),
|
||||
'timestamp': fakeTimestamp,
|
||||
'userID': DUMMY_USER_ID
|
||||
}
|
||||
logger.info(f"返回假结果: {currentResponse}")
|
||||
return JSONResponse(currentResponse)
|
||||
elif useOfficialServer == 1:
|
||||
# 发给真正的 AimeDB
|
||||
realAimeDBResponse = implAimeDB(gotRequest['qrCode'], True)
|
||||
return JSONResponse(json.loads(realAimeDBResponse))
|
||||
else:
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
uvicorn.run(app, host="0.0.0.0", port=80)
|
||||
69
backend/Standalone/DummyAuthLiteServer.py
Normal file
69
backend/Standalone/DummyAuthLiteServer.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# 舞萌DX Auth-Lite 服务器模拟实现
|
||||
# 仅实现了 /net/initialize 接口,用来处理 PowerOn
|
||||
|
||||
# NOT FINISHED: ONLY A DUMMY IMPLEMENTATION FOR TESTING
|
||||
# Contact me if you have more information about this
|
||||
|
||||
from fastapi import (
|
||||
FastAPI,
|
||||
Request
|
||||
)
|
||||
from fastapi.responses import (
|
||||
HTMLResponse
|
||||
)
|
||||
import uvicorn
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
|
||||
# 从 HDD 提取
|
||||
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
|
||||
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
|
||||
|
||||
def auth_lite_encrypt(plaintext: str) -> bytes:
|
||||
# 构造数据:16字节头 + 16字节0前缀 + 明文
|
||||
header = bytes(16)
|
||||
content = bytes(16) + plaintext.encode('utf-8')
|
||||
data = header + content
|
||||
# 填充并加密
|
||||
padded_data = pad(data, AES.block_size)
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
return cipher.encrypt(padded_data)
|
||||
|
||||
def auth_lite_decrypt(ciphertext: bytes) -> str:
|
||||
# 解密并去除填充
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
|
||||
# 提取内容并解码
|
||||
content = decrypted_data[16:] # 去除头部的16字节
|
||||
return content.decode('utf-8').strip()
|
||||
|
||||
def apiOfficialServer(encryptedString: str):
|
||||
url = "http://at.sys-allnet.cn/net/initialize"
|
||||
headers = {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"User-Agent": "SDGB;Windows/Lite"
|
||||
}
|
||||
data = encryptedString
|
||||
response = httpx.post(url, headers=headers, data=data)
|
||||
return response.content
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
USE_OFFICIAL_SERVER = 1
|
||||
|
||||
@app.post('/net/initialize')
|
||||
async def get_data_dummy_api(request: Request):
|
||||
gotRequest = (await request.body())
|
||||
if USE_OFFICIAL_SERVER == 1:
|
||||
decrypted = auth_lite_decrypt(gotRequest)
|
||||
officialResponse = apiOfficialServer(auth_lite_encrypt(decrypted))
|
||||
logger.info(auth_lite_decrypt(officialResponse))
|
||||
return HTMLResponse(officialResponse)
|
||||
else:
|
||||
# todo
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
uvicorn.run(app, host="0.0.0.0", port=80)
|
||||
6
backend/Standalone/Script_GenerateLoginBonusDB.py
Normal file
6
backend/Standalone/Script_GenerateLoginBonusDB.py
Normal file
@@ -0,0 +1,6 @@
|
||||
# Never Gonna Give You Up
|
||||
# Never Gonna Let You Down
|
||||
# Never Gonna Run Around And Desert You
|
||||
# Never Gonna Make You Cry
|
||||
# Never Gonna Say Goodbye
|
||||
# Never Gonna Tell A Lie And Hurt You
|
||||
51
backend/Standalone/Script_GenerateMusicDB.py
Normal file
51
backend/Standalone/Script_GenerateMusicDB.py
Normal file
@@ -0,0 +1,51 @@
|
||||
# 感谢伟大的 Diving-Fish 让我被迫直面恐惧
|
||||
import xml.dom.minidom as minidom
|
||||
from pathlib import Path
|
||||
import rapidjson as json
|
||||
from loguru import logger
|
||||
|
||||
def makeMusicDBJson():
|
||||
'''
|
||||
从 HDD 的文件来生成 music_db.json
|
||||
推荐的是如果要国服用 那就用国际服的文件来生成
|
||||
免得国服每次更新还要重新生成太麻烦
|
||||
'''
|
||||
# 记得改
|
||||
A000_DIR = Path('H:\PRiSM\Package\Sinmai_Data\StreamingAssets\A000')
|
||||
OPTION_DIR = Path('H:\PRiSM\Package\Sinmai_Data\StreamingAssets')
|
||||
|
||||
music_db: dict[str, dict[str, str | int]] = {}
|
||||
DEST_PATH = Path('../Data/musicDB.json')
|
||||
|
||||
music_folders = [f for f in (A000_DIR / 'music').iterdir() if f.is_dir()]
|
||||
for option_dir in OPTION_DIR.iterdir():
|
||||
if (option_dir / 'music').exists():
|
||||
music_folders.extend([f for f in (option_dir / 'music').iterdir() if f.is_dir()])
|
||||
|
||||
for folder in music_folders:
|
||||
xml_path = (folder / 'Music.xml')
|
||||
if xml_path.exists():
|
||||
xml = minidom.parse(xml_path.as_posix())
|
||||
data = xml.getElementsByTagName('MusicData')[0]
|
||||
music_id = data.getElementsByTagName('name')[0].getElementsByTagName('id')[0].firstChild.data
|
||||
music_name = data.getElementsByTagName('name')[0].getElementsByTagName('str')[0].firstChild.data
|
||||
music_version = data.getElementsByTagName('AddVersion')[0].getElementsByTagName('id')[0].firstChild.data
|
||||
music_db[music_id] = {
|
||||
"name": music_name,
|
||||
"version": int(music_version)
|
||||
}
|
||||
logger.debug(f'Found {len(music_db)} music data')
|
||||
serialized = '{\n'
|
||||
sorted_keys = sorted(music_db.keys(), key=lambda x: int(x))
|
||||
for key in sorted_keys:
|
||||
value = music_db[key]
|
||||
serialized += f' "{key}": {json.dumps(value, ensure_ascii=False)},\n'
|
||||
serialized = serialized[:-2] + '\n}'
|
||||
|
||||
with open(DEST_PATH, 'w', encoding='utf-8') as f:
|
||||
f.write(serialized)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
makeMusicDBJson()
|
||||
print('Done.')
|
||||
113
backend/Standalone/UI.py
Normal file
113
backend/Standalone/UI.py
Normal file
@@ -0,0 +1,113 @@
|
||||
import sys
|
||||
import rapidjson as json
|
||||
|
||||
from PyQt6.QtWidgets import (
|
||||
QApplication,
|
||||
QMainWindow,
|
||||
QWidget,
|
||||
QVBoxLayout,
|
||||
QLineEdit,
|
||||
QTextEdit,
|
||||
QPushButton,
|
||||
QLabel,
|
||||
QHBoxLayout,
|
||||
)
|
||||
from PyQt6.QtCore import Qt
|
||||
|
||||
# 将当前目录的父目录加入到 sys.path 中
|
||||
from pathlib import Path
|
||||
|
||||
current_dir = Path(__file__).resolve().parent
|
||||
parent_dir = current_dir.parent
|
||||
sys.path.append(str(parent_dir))
|
||||
|
||||
from API_TitleServer import *
|
||||
|
||||
|
||||
def sendRequest(requestText: str, apiNameText: str, uid: int) -> str:
|
||||
try:
|
||||
data = json.loads(requestText)
|
||||
data = json.dumps(data)
|
||||
except Exception:
|
||||
return "给出的输入不是有效的 JSON"
|
||||
try:
|
||||
result = apiSDGB(data, apiNameText, uid)
|
||||
except Exception as e:
|
||||
return "请求失败:" + str(e)
|
||||
return result
|
||||
|
||||
|
||||
class ApiTester(QMainWindow):
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
# 主窗口设定
|
||||
self.setWindowTitle("舞萌DX 2024 API 测试器")
|
||||
self.resize(640, 400)
|
||||
# 布局
|
||||
mainWidget = QWidget()
|
||||
self.setCentralWidget(mainWidget)
|
||||
MainLayout = QVBoxLayout(mainWidget)
|
||||
|
||||
# 目标 API 输入框布局
|
||||
TargetAPILayout = QHBoxLayout()
|
||||
# API 输入框
|
||||
self.TargetAPIInputBox = QLineEdit()
|
||||
self.TargetAPIInputBox.setPlaceholderText("指定 API")
|
||||
TargetAPILayout.addWidget(self.TargetAPIInputBox)
|
||||
# API 后缀标签
|
||||
TargetAPILabel = QLabel("MaimaiChn")
|
||||
TargetAPILayout.addWidget(TargetAPILabel)
|
||||
# 添加到主布局
|
||||
MainLayout.addLayout(TargetAPILayout)
|
||||
|
||||
# UA额外信息输入框
|
||||
self.AgentExtraInputBox = QLineEdit()
|
||||
self.AgentExtraInputBox.setPlaceholderText("指定附加信息(UID或狗号)")
|
||||
MainLayout.addWidget(self.AgentExtraInputBox)
|
||||
|
||||
# 请求输入框
|
||||
self.RequestInputBox = QTextEdit()
|
||||
self.RequestInputBox.setPlaceholderText("此处填入请求")
|
||||
MainLayout.addWidget(self.RequestInputBox)
|
||||
# 发送按钮
|
||||
SendRequestButton = QPushButton("发送!")
|
||||
SendRequestButton.clicked.connect(self.prepareRequest)
|
||||
MainLayout.addWidget(SendRequestButton)
|
||||
# 响应输出框
|
||||
self.ResponseTextBox = QTextEdit()
|
||||
self.ResponseTextBox.setPlaceholderText("此处显示输出")
|
||||
self.ResponseTextBox.setReadOnly(True)
|
||||
MainLayout.addWidget(self.ResponseTextBox)
|
||||
|
||||
# 布局设定
|
||||
MainLayout.setContentsMargins(5, 5, 5, 5)
|
||||
MainLayout.setSpacing(5)
|
||||
MainLayout.setAlignment(Qt.AlignmentFlag.AlignTop)
|
||||
|
||||
def prepareRequest(self):
|
||||
# 发送请求用
|
||||
try:
|
||||
RequestDataString = self.RequestInputBox.toPlainText()
|
||||
TargetAPIString = self.TargetAPIInputBox.text()
|
||||
AgentExtraString = int(self.AgentExtraInputBox.text())
|
||||
except Exception:
|
||||
self.ResponseTextBox.setPlainText("输入无效")
|
||||
return
|
||||
|
||||
Result = sendRequest(RequestDataString, TargetAPIString, AgentExtraString)
|
||||
|
||||
# 显示出输出
|
||||
self.ResponseTextBox.setPlainText(Result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = QApplication(sys.argv)
|
||||
# Set proper style for each OS
|
||||
# if sys.platform == "win32":
|
||||
# app.setStyle("windowsvista")
|
||||
# else:
|
||||
# app.setStyle("Fusion")
|
||||
window = ApiTester()
|
||||
window.show()
|
||||
sys.exit(app.exec())
|
||||
Reference in New Issue
Block a user