AuthLite PowerOn初步实现和其他改进

This commit is contained in:
Remik1r3n 2025-02-21 23:34:49 +08:00
parent 47cbd5b09a
commit 55bd19717a
10 changed files with 177 additions and 57 deletions

View File

@ -1,31 +1,35 @@
# All.Net AuthLite 更新获取 # All.Net AuthLite 更新获取
from Crypto.Cipher import AES from Crypto.Cipher import AES
from Crypto.Util.Padding import pad from Crypto.Util.Padding import pad, unpad
import httpx import httpx
from loguru import logger from loguru import logger
from urllib.parse import parse_qs from urllib.parse import parse_qs
import configparser as ini import configparser as ini
def enc(key, iv, data): LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
cipher = AES.new(key, AES.MODE_CBC, iv) LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
encrypted = cipher.encrypt(data)
return encrypted
def dec(key, iv ,data): def auth_lite_encrypt(plaintext: str) -> bytes:
de_cipher = AES.new(key, AES.MODE_CBC, iv) # 构造数据16字节头 + 16字节0前缀 + 明文
decrypted = de_cipher.decrypt(data) header = bytes(16)
return decrypted content = bytes(16) + plaintext.encode('utf-8')
data = header + content
# 填充并加密
padded_data = pad(data, AES.block_size)
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
return cipher.encrypt(padded_data)
def auth_lite_decrypt(ciphertext: bytes) -> str:
# 解密并去除填充
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
# 提取内容并解码
content = decrypted_data[16:] # 去除头部的16字节
return content.decode('utf-8').strip()
def getRawDelivery(): def getRawDelivery():
key = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71]) encrypted = auth_lite_encrypt('title_id=SDGB&title_ver=1.40&client_id=A63E01C2805')
iv = bytes.fromhex('00000000000000000000000000000000')
content = bytes([0] * 16) + b'title_id=SDGB&title_ver=1.40&client_id=A63E01C2805'
header = bytes.fromhex('00000000000000000000000000000000')
bytes_data = pad(header + content, 16)
encrypted = enc(key, iv, bytes_data)
r = httpx.post( r = httpx.post(
'http://at.sys-allnet.cn/net/delivery/instruction', 'http://at.sys-allnet.cn/net/delivery/instruction',
data = encrypted, data = encrypted,
@ -34,10 +38,8 @@ def getRawDelivery():
'Pragma': 'DFI' 'Pragma': 'DFI'
} }
) )
resp_data = r.content resp_data = r.content
decrypted = dec(key, resp_data[:16], resp_data) decrypted_str = auth_lite_decrypt(resp_data)
decrypted_str = decrypted[16:].decode('UTF-8').strip()
# 过滤所有控制字符 # 过滤所有控制字符
decrypted_str = ''.join([i for i in decrypted_str if 31 < ord(i) < 127]) decrypted_str = ''.join([i for i in decrypted_str if 31 < ord(i) < 127])
logger.info(f"RAW Response: {decrypted_str}") logger.info(f"RAW Response: {decrypted_str}")
@ -117,7 +119,6 @@ def parseUpdateIni(iniText):
return final_message return final_message
if __name__ == '__main__': if __name__ == '__main__':
urlList = parseRawDelivery(getRawDelivery()) urlList = parseRawDelivery(getRawDelivery())
for url in urlList: for url in urlList:

View File

@ -33,8 +33,14 @@ def apiDivingFish(method:str, apiPath:str, importToken:str, data=None):
url=BASE_URL + apiPath, url=BASE_URL + apiPath,
headers=headers, headers=headers,
) )
elif method == 'DELETE':
response = requests.delete(
url=BASE_URL + apiPath,
headers=headers,
)
else: else:
raise NotImplementedError logger.error(f'未知的请求方法:{method}')
raise ValueError(f'未知的请求方法:{method}')
logger.info(f'水鱼查分器请求结果:{response.status_code}') logger.info(f'水鱼查分器请求结果:{response.status_code}')
logger.debug(f'水鱼查分器回应:{response.text}') logger.debug(f'水鱼查分器回应:{response.text}')
@ -104,9 +110,41 @@ def implUserMusicToDivingFish(userId:int, fishImportToken:str):
return False return False
return len(divingFishData) return len(divingFishData)
def generateDebugTestScore():
'''生成测试成绩'''
return [
{
"achievement": 1010000,
"comboStatus": 4,
"deluxscoreMax": 4026,
"level": 4,
"musicId": 834,
"syncStatus": 4
},
{
"achievement": 1010000,
"comboStatus": 4,
"deluxscoreMax": 4200,
"level": 4,
"musicId": 11663,
"syncStatus": 4
}
]
def implResetFishUser(fishImportToken:str):
'''重置水鱼查分器的用户数据'''
logger.info("开始重置水鱼查分器的用户数据..")
result = apiDivingFish('DELETE', '/player/delete_records', fishImportToken)
if result:
logger.info("重置成功!")
return True
logger.error("重置失败!")
return False
if __name__ == '__main__': if __name__ == '__main__':
if True: if True:
userId = testUid2 userId = testUid2
importToken = testImportToken importToken = testImportToken
#currentLoginTimestamp = generateTimestamp() #currentLoginTimestamp = generateTimestamp()
implUserMusicToDivingFish(userId, importToken) #implUserMusicToDivingFish(userId, importToken)
implResetFishUser(importToken)

View File

@ -11,4 +11,4 @@ loginBonusDBPathFallback = "./maimaiDX-Api/Data/loginBonusDB.json"
musicDBPathFallback = "./maimaiDX-Api/Data/musicDB.json" musicDBPathFallback = "./maimaiDX-Api/Data/musicDB.json"
# 日本精工,安全防漏 # 日本精工,安全防漏
#from MyConfig import * from MyConfig import *

View File

@ -1,5 +1,5 @@
# 解小黑屋实现 # 解小黑屋实现
# 未完工 # 仍十分不完善,不建议使用
from Config import * from Config import *
from API_TitleServer import * from API_TitleServer import *

View File

@ -5,6 +5,7 @@ from API_TitleServer import apiSDGB
from Config import * from Config import *
import time import time
import random import random
from loguru import logger
def apiGetUserPreview(userId, noLog:bool=False) -> str: def apiGetUserPreview(userId, noLog:bool=False) -> str:
data = json.dumps({ data = json.dumps({
@ -26,7 +27,7 @@ if __name__ == "__main__":
def crawlAllUserPreview(): def crawlAllUserPreview():
"""omg it's a evil crawler""" """omg it's a evil crawler"""
# 这里设置开始和结束的 UserId # 这里设置开始和结束的 UserId
BeginUserId = 11000000 BeginUserId = 10200000
EndUserId = 12599999 EndUserId = 12599999
# 打开文件,准备写入 # 打开文件,准备写入
@ -35,11 +36,12 @@ def crawlAllUserPreview():
for userId in range(BeginUserId, EndUserId + 1): for userId in range(BeginUserId, EndUserId + 1):
# 调用 API # 调用 API
try: try:
userPreview = apiGetUserPreview(userId) userPreview = apiGetUserPreview(userId, True)
currentUser = json.loads(userPreview) currentUser = json.loads(userPreview)
if currentUser["userId"] is not None: if currentUser["userId"] is not None:
# 每爬到一个就把它存到一个文件里面,每个一行 # 每爬到一个就把它存到一个文件里面,每个一行
f.write(userPreview + "\n") f.write(userPreview + "\n")
logger.info(f"{userId}: {currentUser['userName']}, RATING: {currentUser['playerRating']}")
else: else:
f.write("\n") f.write("\n")
except: except:

View File

@ -1,28 +0,0 @@
import os
import re
pattern = re.compile(r'Maimai2Servlet/(.*?)MaimaiChn')
# 获取目录
dir = 'C:/Users/remik1r3n/Workspace/maimaiDX-Api/HashEntertainment'
known_hashes = []
for filename in os.listdir(dir):
# 只处理.txt文件
if filename.endswith('.txt'):
file_path = os.path.join(dir, filename)
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# 搜索匹配的模式
matches = pattern.findall(content)
# 输出每个匹配中的不定内容
for match in matches:
known_hashes.append(match)
# 去重
known_hashes = list(set(known_hashes))
# 输出
for hash in known_hashes:
print(hash)

View File

@ -1,2 +1,10 @@
# genshin-impact # genshin-impact
Genshin Impact Helpers and Tools Genshin Impact Helpers and Tools
## Project Matsuri
A project aim to create a compatible forwarder server, that allow Genshin Impact 2023 version to connect to the OFFICIAL server or other server very easily.
It includes:
- A very simple Auth-Lite server, processes /poweron request.
- A title server implementation. It will decrypt the request, and forward it to the official server.
- AimeDB is still in research and more info will be updated soon(tm).

View File

@ -0,0 +1,33 @@
# 解密国服 Auth Lite 的包
# 仅测试过 PowerOn 请求,其他包不确定是否适用
# 仅适用国服 DX2024其他未测试
# 完全 Standalone不依赖于其他文件
import base64
from Crypto.Cipher import AES
from urllib.parse import unquote
from Crypto.Util.Padding import unpad
# 密钥从 HDD 提取
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
# 填入你抓包得到的想解密的数据的 base64 编码
base64String = "N0PyrjawH/7qA8A28y++3txHDsKuAs5+nib751QiNlJYigvwldPaG7xd0WYXvgqlWY16JIy38GQ8+M4ttaWRNfpWy9l29pC2h2abd4VGhIeWGLbOjc2Bthqhibui76vi4dW+05TsPiyXbOsqHFzScvdByKUtZUobZgrnr/WW+YqRIUdw/ZHBmKBY81JivnVH9AkEyCCP9xubYMjDqi65WhDpcrdMk5nUjHq/O7R1eXr12Es9gXDUruy/H4M7eMt+4kFSDCGpLSFwAEDhba6rpOz0n588nfvXXFlZ+a3ZsZSBYAJPBZ795Ck8ZDIYnEMWMV5nk6qPc2HiBF9ZZw88FlATGC8NqsTSjGX6JJXWDApUaSF5obXMu4LTmMMr0KDt2fQ6VQPkLnTgJ6tsJv1iAQvtcZ9ymn3I4S0XWXGmEq8r7XE7D+pnSJyjUn7bSXb6HOzCQtQc9XYmIbylS2sNkiDXywrxVgmiAXc4Ta8M9aNUb+81QrKj6qqC06DzdYQNBFRxb78X3nGYECEmKBsxIOR7Mf/ZqWYtA28Ob9H5CCUmjLvUaoQ+htJMHfQ9fvvevfu+FxtlJXfw+3UQDiQ1xvSZe2NMCgkLOuwqZ5/5PyoAV9MKzRXT4hBzDoiAIt7bzOH9JcNJkjUtLAjXbnwN6M6zUKpgMK4WYeCwUffNy21GbLVtfIxZZbVhK8A6Ni7j"
def auth_lite_decrypt(ciphertext: bytes) -> str:
# 解密并去除填充
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
# 提取内容并解码
content = decrypted_data[16:] # 去除头部的16字节
return content.decode('utf-8').strip()
# 解码 base64
decodedData = base64.b64decode(base64String)
# 解密数据
decryptedData = auth_lite_decrypt(decodedData)
print(decryptedData)
# 解码 URL 编码
print(unquote(decryptedData))

View File

@ -0,0 +1,66 @@
# 舞萌DX Auth-Lite 服务器模拟实现
# 仅实现了 /net/initialize 接口,用来处理 PowerOn
from fastapi import (
FastAPI,
Request
)
from fastapi.responses import (
HTMLResponse
)
import uvicorn
import httpx
from loguru import logger
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
# 从 HDD 提取
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
def auth_lite_encrypt(plaintext: str) -> bytes:
# 构造数据16字节头 + 16字节0前缀 + 明文
header = bytes(16)
content = bytes(16) + plaintext.encode('utf-8')
data = header + content
# 填充并加密
padded_data = pad(data, AES.block_size)
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
return cipher.encrypt(padded_data)
def auth_lite_decrypt(ciphertext: bytes) -> str:
# 解密并去除填充
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
# 提取内容并解码
content = decrypted_data[16:] # 去除头部的16字节
return content.decode('utf-8').strip()
def apiOfficialServer(encryptedString: str):
url = "http://at.sys-allnet.cn/net/initialize"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "SDGB;Windows/Lite"
}
data = encryptedString
response = httpx.post(url, headers=headers, data=data)
return response.content
app = FastAPI()
USE_OFFICIAL_SERVER = 1
@app.post('/net/initialize')
async def get_data_dummy_api(request: Request):
gotRequest = (await request.body())
if USE_OFFICIAL_SERVER == 1:
decrypted = auth_lite_decrypt(gotRequest)
officialResponse = apiOfficialServer(auth_lite_encrypt(decrypted))
logger.info(auth_lite_decrypt(officialResponse))
return HTMLResponse(officialResponse)
else:
# todo
pass
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=80)

View File

@ -10,12 +10,12 @@ def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLogin
userAllPatches = { userAllPatches = {
"upsertUserAll": { "upsertUserAll": {
"userData": [{ "userData": [{
"playerRating": 114514, "lastRomVersion": romVersion,
"lastDataVersion": dataVersion,
}], }],
"userMusicDetailList": [musicData], "userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" #1避免覆盖 "isNewMusicDetailList": "1" #1避免覆盖
}} }}
logger.info("Changing version number to " + dataVersion + " and " + romVersion)
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result return result