mirror of
https://github.com/Remik1r3n/maimaiDX-Api.git
synced 2025-05-20 01:07:28 +08:00
Merge pull request #1 from Remik1r3n/project-matsuri
AuthLite PowerOn初步实现和其他改进
This commit is contained in:
commit
36af0f5dd6
@ -1,31 +1,35 @@
|
||||
# All.Net AuthLite 更新获取
|
||||
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from urllib.parse import parse_qs
|
||||
import configparser as ini
|
||||
|
||||
def enc(key, iv, data):
|
||||
cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
encrypted = cipher.encrypt(data)
|
||||
return encrypted
|
||||
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
|
||||
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
|
||||
|
||||
def dec(key, iv ,data):
|
||||
de_cipher = AES.new(key, AES.MODE_CBC, iv)
|
||||
decrypted = de_cipher.decrypt(data)
|
||||
return decrypted
|
||||
def auth_lite_encrypt(plaintext: str) -> bytes:
|
||||
# 构造数据:16字节头 + 16字节0前缀 + 明文
|
||||
header = bytes(16)
|
||||
content = bytes(16) + plaintext.encode('utf-8')
|
||||
data = header + content
|
||||
# 填充并加密
|
||||
padded_data = pad(data, AES.block_size)
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
return cipher.encrypt(padded_data)
|
||||
|
||||
def auth_lite_decrypt(ciphertext: bytes) -> str:
|
||||
# 解密并去除填充
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
|
||||
# 提取内容并解码
|
||||
content = decrypted_data[16:] # 去除头部的16字节
|
||||
return content.decode('utf-8').strip()
|
||||
|
||||
def getRawDelivery():
|
||||
key = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
|
||||
iv = bytes.fromhex('00000000000000000000000000000000')
|
||||
content = bytes([0] * 16) + b'title_id=SDGB&title_ver=1.40&client_id=A63E01C2805'
|
||||
header = bytes.fromhex('00000000000000000000000000000000')
|
||||
bytes_data = pad(header + content, 16)
|
||||
|
||||
encrypted = enc(key, iv, bytes_data)
|
||||
|
||||
encrypted = auth_lite_encrypt('title_id=SDGB&title_ver=1.40&client_id=A63E01C2805')
|
||||
r = httpx.post(
|
||||
'http://at.sys-allnet.cn/net/delivery/instruction',
|
||||
data = encrypted,
|
||||
@ -34,10 +38,8 @@ def getRawDelivery():
|
||||
'Pragma': 'DFI'
|
||||
}
|
||||
)
|
||||
|
||||
resp_data = r.content
|
||||
decrypted = dec(key, resp_data[:16], resp_data)
|
||||
decrypted_str = decrypted[16:].decode('UTF-8').strip()
|
||||
decrypted_str = auth_lite_decrypt(resp_data)
|
||||
# 过滤所有控制字符
|
||||
decrypted_str = ''.join([i for i in decrypted_str if 31 < ord(i) < 127])
|
||||
logger.info(f"RAW Response: {decrypted_str}")
|
||||
@ -117,7 +119,6 @@ def parseUpdateIni(iniText):
|
||||
|
||||
return final_message
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
urlList = parseRawDelivery(getRawDelivery())
|
||||
for url in urlList:
|
@ -33,8 +33,14 @@ def apiDivingFish(method:str, apiPath:str, importToken:str, data=None):
|
||||
url=BASE_URL + apiPath,
|
||||
headers=headers,
|
||||
)
|
||||
elif method == 'DELETE':
|
||||
response = requests.delete(
|
||||
url=BASE_URL + apiPath,
|
||||
headers=headers,
|
||||
)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
logger.error(f'未知的请求方法:{method}')
|
||||
raise ValueError(f'未知的请求方法:{method}')
|
||||
|
||||
logger.info(f'水鱼查分器请求结果:{response.status_code}')
|
||||
logger.debug(f'水鱼查分器回应:{response.text}')
|
||||
@ -104,9 +110,41 @@ def implUserMusicToDivingFish(userId:int, fishImportToken:str):
|
||||
return False
|
||||
return len(divingFishData)
|
||||
|
||||
def generateDebugTestScore():
|
||||
'''生成测试成绩'''
|
||||
return [
|
||||
{
|
||||
"achievement": 1010000,
|
||||
"comboStatus": 4,
|
||||
"deluxscoreMax": 4026,
|
||||
"level": 4,
|
||||
"musicId": 834,
|
||||
"syncStatus": 4
|
||||
},
|
||||
{
|
||||
"achievement": 1010000,
|
||||
"comboStatus": 4,
|
||||
"deluxscoreMax": 4200,
|
||||
"level": 4,
|
||||
"musicId": 11663,
|
||||
"syncStatus": 4
|
||||
}
|
||||
]
|
||||
|
||||
def implResetFishUser(fishImportToken:str):
|
||||
'''重置水鱼查分器的用户数据'''
|
||||
logger.info("开始重置水鱼查分器的用户数据..")
|
||||
result = apiDivingFish('DELETE', '/player/delete_records', fishImportToken)
|
||||
if result:
|
||||
logger.info("重置成功!")
|
||||
return True
|
||||
logger.error("重置失败!")
|
||||
return False
|
||||
|
||||
if __name__ == '__main__':
|
||||
if True:
|
||||
userId = testUid2
|
||||
importToken = testImportToken
|
||||
#currentLoginTimestamp = generateTimestamp()
|
||||
implUserMusicToDivingFish(userId, importToken)
|
||||
#implUserMusicToDivingFish(userId, importToken)
|
||||
implResetFishUser(importToken)
|
||||
|
@ -11,4 +11,4 @@ loginBonusDBPathFallback = "./maimaiDX-Api/Data/loginBonusDB.json"
|
||||
musicDBPathFallback = "./maimaiDX-Api/Data/musicDB.json"
|
||||
|
||||
# 日本精工,安全防漏
|
||||
#from MyConfig import *
|
||||
from MyConfig import *
|
||||
|
@ -1,5 +1,5 @@
|
||||
# 解小黑屋实现
|
||||
# 未完工
|
||||
# 仍十分不完善,不建议使用
|
||||
|
||||
from Config import *
|
||||
from API_TitleServer import *
|
||||
|
@ -5,6 +5,7 @@ from API_TitleServer import apiSDGB
|
||||
from Config import *
|
||||
import time
|
||||
import random
|
||||
from loguru import logger
|
||||
|
||||
def apiGetUserPreview(userId, noLog:bool=False) -> str:
|
||||
data = json.dumps({
|
||||
@ -26,7 +27,7 @@ if __name__ == "__main__":
|
||||
def crawlAllUserPreview():
|
||||
"""omg it's a evil crawler"""
|
||||
# 这里设置开始和结束的 UserId
|
||||
BeginUserId = 11000000
|
||||
BeginUserId = 10200000
|
||||
EndUserId = 12599999
|
||||
|
||||
# 打开文件,准备写入
|
||||
@ -35,11 +36,12 @@ def crawlAllUserPreview():
|
||||
for userId in range(BeginUserId, EndUserId + 1):
|
||||
# 调用 API
|
||||
try:
|
||||
userPreview = apiGetUserPreview(userId)
|
||||
userPreview = apiGetUserPreview(userId, True)
|
||||
currentUser = json.loads(userPreview)
|
||||
if currentUser["userId"] is not None:
|
||||
# 每爬到一个就把它存到一个文件里面,每个一行
|
||||
f.write(userPreview + "\n")
|
||||
logger.info(f"{userId}: {currentUser['userName']}, RATING: {currentUser['playerRating']}")
|
||||
else:
|
||||
f.write("\n")
|
||||
except:
|
||||
|
@ -1,28 +0,0 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
pattern = re.compile(r'Maimai2Servlet/(.*?)MaimaiChn')
|
||||
|
||||
# 获取目录
|
||||
dir = 'C:/Users/remik1r3n/Workspace/maimaiDX-Api/HashEntertainment'
|
||||
|
||||
known_hashes = []
|
||||
|
||||
for filename in os.listdir(dir):
|
||||
# 只处理.txt文件
|
||||
if filename.endswith('.txt'):
|
||||
file_path = os.path.join(dir, filename)
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
content = file.read()
|
||||
# 搜索匹配的模式
|
||||
matches = pattern.findall(content)
|
||||
# 输出每个匹配中的不定内容
|
||||
for match in matches:
|
||||
known_hashes.append(match)
|
||||
|
||||
# 去重
|
||||
known_hashes = list(set(known_hashes))
|
||||
|
||||
# 输出
|
||||
for hash in known_hashes:
|
||||
print(hash)
|
@ -1,2 +1,10 @@
|
||||
# genshin-impact
|
||||
Genshin Impact Helpers and Tools
|
||||
|
||||
## Project Matsuri
|
||||
A project aim to create a compatible forwarder server, that allow Genshin Impact 2023 version to connect to the OFFICIAL server or other server very easily.
|
||||
|
||||
It includes:
|
||||
- A very simple Auth-Lite server, processes /poweron request.
|
||||
- A title server implementation. It will decrypt the request, and forward it to the official server.
|
||||
- AimeDB is still in research and more info will be updated soon(tm).
|
||||
|
33
Standalone/DecryptAuthLite.py
Normal file
33
Standalone/DecryptAuthLite.py
Normal file
@ -0,0 +1,33 @@
|
||||
# 解密国服 Auth Lite 的包
|
||||
# 仅测试过 PowerOn 请求,其他包不确定是否适用
|
||||
# 仅适用国服 DX2024,其他未测试
|
||||
|
||||
# 完全 Standalone,不依赖于其他文件
|
||||
|
||||
import base64
|
||||
from Crypto.Cipher import AES
|
||||
from urllib.parse import unquote
|
||||
from Crypto.Util.Padding import unpad
|
||||
|
||||
# 密钥从 HDD 提取
|
||||
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
|
||||
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
|
||||
|
||||
# 填入你抓包得到的想解密的数据的 base64 编码
|
||||
base64String = "N0PyrjawH/7qA8A28y++3txHDsKuAs5+nib751QiNlJYigvwldPaG7xd0WYXvgqlWY16JIy38GQ8+M4ttaWRNfpWy9l29pC2h2abd4VGhIeWGLbOjc2Bthqhibui76vi4dW+05TsPiyXbOsqHFzScvdByKUtZUobZgrnr/WW+YqRIUdw/ZHBmKBY81JivnVH9AkEyCCP9xubYMjDqi65WhDpcrdMk5nUjHq/O7R1eXr12Es9gXDUruy/H4M7eMt+4kFSDCGpLSFwAEDhba6rpOz0n588nfvXXFlZ+a3ZsZSBYAJPBZ795Ck8ZDIYnEMWMV5nk6qPc2HiBF9ZZw88FlATGC8NqsTSjGX6JJXWDApUaSF5obXMu4LTmMMr0KDt2fQ6VQPkLnTgJ6tsJv1iAQvtcZ9ymn3I4S0XWXGmEq8r7XE7D+pnSJyjUn7bSXb6HOzCQtQc9XYmIbylS2sNkiDXywrxVgmiAXc4Ta8M9aNUb+81QrKj6qqC06DzdYQNBFRxb78X3nGYECEmKBsxIOR7Mf/ZqWYtA28Ob9H5CCUmjLvUaoQ+htJMHfQ9fvvevfu+FxtlJXfw+3UQDiQ1xvSZe2NMCgkLOuwqZ5/5PyoAV9MKzRXT4hBzDoiAIt7bzOH9JcNJkjUtLAjXbnwN6M6zUKpgMK4WYeCwUffNy21GbLVtfIxZZbVhK8A6Ni7j"
|
||||
|
||||
def auth_lite_decrypt(ciphertext: bytes) -> str:
|
||||
# 解密并去除填充
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
|
||||
# 提取内容并解码
|
||||
content = decrypted_data[16:] # 去除头部的16字节
|
||||
return content.decode('utf-8').strip()
|
||||
|
||||
# 解码 base64
|
||||
decodedData = base64.b64decode(base64String)
|
||||
# 解密数据
|
||||
decryptedData = auth_lite_decrypt(decodedData)
|
||||
print(decryptedData)
|
||||
# 解码 URL 编码
|
||||
print(unquote(decryptedData))
|
66
Standalone/DummyAuthLiteServer.py
Normal file
66
Standalone/DummyAuthLiteServer.py
Normal file
@ -0,0 +1,66 @@
|
||||
# 舞萌DX Auth-Lite 服务器模拟实现
|
||||
# 仅实现了 /net/initialize 接口,用来处理 PowerOn
|
||||
|
||||
from fastapi import (
|
||||
FastAPI,
|
||||
Request
|
||||
)
|
||||
from fastapi.responses import (
|
||||
HTMLResponse
|
||||
)
|
||||
import uvicorn
|
||||
import httpx
|
||||
from loguru import logger
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad, unpad
|
||||
|
||||
# 从 HDD 提取
|
||||
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
|
||||
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
|
||||
|
||||
def auth_lite_encrypt(plaintext: str) -> bytes:
|
||||
# 构造数据:16字节头 + 16字节0前缀 + 明文
|
||||
header = bytes(16)
|
||||
content = bytes(16) + plaintext.encode('utf-8')
|
||||
data = header + content
|
||||
# 填充并加密
|
||||
padded_data = pad(data, AES.block_size)
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
return cipher.encrypt(padded_data)
|
||||
|
||||
def auth_lite_decrypt(ciphertext: bytes) -> str:
|
||||
# 解密并去除填充
|
||||
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
|
||||
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
|
||||
# 提取内容并解码
|
||||
content = decrypted_data[16:] # 去除头部的16字节
|
||||
return content.decode('utf-8').strip()
|
||||
|
||||
def apiOfficialServer(encryptedString: str):
|
||||
url = "http://at.sys-allnet.cn/net/initialize"
|
||||
headers = {
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"User-Agent": "SDGB;Windows/Lite"
|
||||
}
|
||||
data = encryptedString
|
||||
response = httpx.post(url, headers=headers, data=data)
|
||||
return response.content
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
USE_OFFICIAL_SERVER = 1
|
||||
|
||||
@app.post('/net/initialize')
|
||||
async def get_data_dummy_api(request: Request):
|
||||
gotRequest = (await request.body())
|
||||
if USE_OFFICIAL_SERVER == 1:
|
||||
decrypted = auth_lite_decrypt(gotRequest)
|
||||
officialResponse = apiOfficialServer(auth_lite_encrypt(decrypted))
|
||||
logger.info(auth_lite_decrypt(officialResponse))
|
||||
return HTMLResponse(officialResponse)
|
||||
else:
|
||||
# todo
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
uvicorn.run(app, host="0.0.0.0", port=80)
|
@ -10,12 +10,12 @@ def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLogin
|
||||
userAllPatches = {
|
||||
"upsertUserAll": {
|
||||
"userData": [{
|
||||
"playerRating": 114514,
|
||||
"lastRomVersion": romVersion,
|
||||
"lastDataVersion": dataVersion,
|
||||
}],
|
||||
"userMusicDetailList": [musicData],
|
||||
"isNewMusicDetailList": "1" #1避免覆盖
|
||||
}}
|
||||
logger.info("Changing version number to " + dataVersion + " and " + romVersion)
|
||||
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
|
||||
return result
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user