Compare commits

...

2 Commits

Author SHA1 Message Date
Remik1r3n
36af0f5dd6
Merge pull request #1 from Remik1r3n/project-matsuri
AuthLite PowerOn初步实现和其他改进
2025-02-21 23:40:18 +08:00
Remik1r3n
55bd19717a AuthLite PowerOn初步实现和其他改进 2025-02-21 23:34:49 +08:00
10 changed files with 177 additions and 57 deletions

View File

@ -1,31 +1,35 @@
# All.Net AuthLite 更新获取
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad
from Crypto.Util.Padding import pad, unpad
import httpx
from loguru import logger
from urllib.parse import parse_qs
import configparser as ini
def enc(key, iv, data):
cipher = AES.new(key, AES.MODE_CBC, iv)
encrypted = cipher.encrypt(data)
return encrypted
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
def dec(key, iv ,data):
de_cipher = AES.new(key, AES.MODE_CBC, iv)
decrypted = de_cipher.decrypt(data)
return decrypted
def auth_lite_encrypt(plaintext: str) -> bytes:
# 构造数据16字节头 + 16字节0前缀 + 明文
header = bytes(16)
content = bytes(16) + plaintext.encode('utf-8')
data = header + content
# 填充并加密
padded_data = pad(data, AES.block_size)
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
return cipher.encrypt(padded_data)
def auth_lite_decrypt(ciphertext: bytes) -> str:
# 解密并去除填充
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
# 提取内容并解码
content = decrypted_data[16:] # 去除头部的16字节
return content.decode('utf-8').strip()
def getRawDelivery():
key = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
iv = bytes.fromhex('00000000000000000000000000000000')
content = bytes([0] * 16) + b'title_id=SDGB&title_ver=1.40&client_id=A63E01C2805'
header = bytes.fromhex('00000000000000000000000000000000')
bytes_data = pad(header + content, 16)
encrypted = enc(key, iv, bytes_data)
encrypted = auth_lite_encrypt('title_id=SDGB&title_ver=1.40&client_id=A63E01C2805')
r = httpx.post(
'http://at.sys-allnet.cn/net/delivery/instruction',
data = encrypted,
@ -34,10 +38,8 @@ def getRawDelivery():
'Pragma': 'DFI'
}
)
resp_data = r.content
decrypted = dec(key, resp_data[:16], resp_data)
decrypted_str = decrypted[16:].decode('UTF-8').strip()
decrypted_str = auth_lite_decrypt(resp_data)
# 过滤所有控制字符
decrypted_str = ''.join([i for i in decrypted_str if 31 < ord(i) < 127])
logger.info(f"RAW Response: {decrypted_str}")
@ -117,7 +119,6 @@ def parseUpdateIni(iniText):
return final_message
if __name__ == '__main__':
urlList = parseRawDelivery(getRawDelivery())
for url in urlList:

View File

@ -33,8 +33,14 @@ def apiDivingFish(method:str, apiPath:str, importToken:str, data=None):
url=BASE_URL + apiPath,
headers=headers,
)
elif method == 'DELETE':
response = requests.delete(
url=BASE_URL + apiPath,
headers=headers,
)
else:
raise NotImplementedError
logger.error(f'未知的请求方法:{method}')
raise ValueError(f'未知的请求方法:{method}')
logger.info(f'水鱼查分器请求结果:{response.status_code}')
logger.debug(f'水鱼查分器回应:{response.text}')
@ -104,9 +110,41 @@ def implUserMusicToDivingFish(userId:int, fishImportToken:str):
return False
return len(divingFishData)
def generateDebugTestScore():
'''生成测试成绩'''
return [
{
"achievement": 1010000,
"comboStatus": 4,
"deluxscoreMax": 4026,
"level": 4,
"musicId": 834,
"syncStatus": 4
},
{
"achievement": 1010000,
"comboStatus": 4,
"deluxscoreMax": 4200,
"level": 4,
"musicId": 11663,
"syncStatus": 4
}
]
def implResetFishUser(fishImportToken:str):
'''重置水鱼查分器的用户数据'''
logger.info("开始重置水鱼查分器的用户数据..")
result = apiDivingFish('DELETE', '/player/delete_records', fishImportToken)
if result:
logger.info("重置成功!")
return True
logger.error("重置失败!")
return False
if __name__ == '__main__':
if True:
userId = testUid2
importToken = testImportToken
#currentLoginTimestamp = generateTimestamp()
implUserMusicToDivingFish(userId, importToken)
#implUserMusicToDivingFish(userId, importToken)
implResetFishUser(importToken)

View File

@ -11,4 +11,4 @@ loginBonusDBPathFallback = "./maimaiDX-Api/Data/loginBonusDB.json"
musicDBPathFallback = "./maimaiDX-Api/Data/musicDB.json"
# 日本精工,安全防漏
#from MyConfig import *
from MyConfig import *

View File

@ -1,5 +1,5 @@
# 解小黑屋实现
# 未完工
# 仍十分不完善,不建议使用
from Config import *
from API_TitleServer import *

View File

@ -5,6 +5,7 @@ from API_TitleServer import apiSDGB
from Config import *
import time
import random
from loguru import logger
def apiGetUserPreview(userId, noLog:bool=False) -> str:
data = json.dumps({
@ -26,7 +27,7 @@ if __name__ == "__main__":
def crawlAllUserPreview():
"""omg it's a evil crawler"""
# 这里设置开始和结束的 UserId
BeginUserId = 11000000
BeginUserId = 10200000
EndUserId = 12599999
# 打开文件,准备写入
@ -35,11 +36,12 @@ def crawlAllUserPreview():
for userId in range(BeginUserId, EndUserId + 1):
# 调用 API
try:
userPreview = apiGetUserPreview(userId)
userPreview = apiGetUserPreview(userId, True)
currentUser = json.loads(userPreview)
if currentUser["userId"] is not None:
# 每爬到一个就把它存到一个文件里面,每个一行
f.write(userPreview + "\n")
logger.info(f"{userId}: {currentUser['userName']}, RATING: {currentUser['playerRating']}")
else:
f.write("\n")
except:

View File

@ -1,28 +0,0 @@
import os
import re
pattern = re.compile(r'Maimai2Servlet/(.*?)MaimaiChn')
# 获取目录
dir = 'C:/Users/remik1r3n/Workspace/maimaiDX-Api/HashEntertainment'
known_hashes = []
for filename in os.listdir(dir):
# 只处理.txt文件
if filename.endswith('.txt'):
file_path = os.path.join(dir, filename)
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# 搜索匹配的模式
matches = pattern.findall(content)
# 输出每个匹配中的不定内容
for match in matches:
known_hashes.append(match)
# 去重
known_hashes = list(set(known_hashes))
# 输出
for hash in known_hashes:
print(hash)

View File

@ -1,2 +1,10 @@
# genshin-impact
Genshin Impact Helpers and Tools
## Project Matsuri
A project aim to create a compatible forwarder server, that allow Genshin Impact 2023 version to connect to the OFFICIAL server or other server very easily.
It includes:
- A very simple Auth-Lite server, processes /poweron request.
- A title server implementation. It will decrypt the request, and forward it to the official server.
- AimeDB is still in research and more info will be updated soon(tm).

View File

@ -0,0 +1,33 @@
# 解密国服 Auth Lite 的包
# 仅测试过 PowerOn 请求,其他包不确定是否适用
# 仅适用国服 DX2024其他未测试
# 完全 Standalone不依赖于其他文件
import base64
from Crypto.Cipher import AES
from urllib.parse import unquote
from Crypto.Util.Padding import unpad
# 密钥从 HDD 提取
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
# 填入你抓包得到的想解密的数据的 base64 编码
base64String = "N0PyrjawH/7qA8A28y++3txHDsKuAs5+nib751QiNlJYigvwldPaG7xd0WYXvgqlWY16JIy38GQ8+M4ttaWRNfpWy9l29pC2h2abd4VGhIeWGLbOjc2Bthqhibui76vi4dW+05TsPiyXbOsqHFzScvdByKUtZUobZgrnr/WW+YqRIUdw/ZHBmKBY81JivnVH9AkEyCCP9xubYMjDqi65WhDpcrdMk5nUjHq/O7R1eXr12Es9gXDUruy/H4M7eMt+4kFSDCGpLSFwAEDhba6rpOz0n588nfvXXFlZ+a3ZsZSBYAJPBZ795Ck8ZDIYnEMWMV5nk6qPc2HiBF9ZZw88FlATGC8NqsTSjGX6JJXWDApUaSF5obXMu4LTmMMr0KDt2fQ6VQPkLnTgJ6tsJv1iAQvtcZ9ymn3I4S0XWXGmEq8r7XE7D+pnSJyjUn7bSXb6HOzCQtQc9XYmIbylS2sNkiDXywrxVgmiAXc4Ta8M9aNUb+81QrKj6qqC06DzdYQNBFRxb78X3nGYECEmKBsxIOR7Mf/ZqWYtA28Ob9H5CCUmjLvUaoQ+htJMHfQ9fvvevfu+FxtlJXfw+3UQDiQ1xvSZe2NMCgkLOuwqZ5/5PyoAV9MKzRXT4hBzDoiAIt7bzOH9JcNJkjUtLAjXbnwN6M6zUKpgMK4WYeCwUffNy21GbLVtfIxZZbVhK8A6Ni7j"
def auth_lite_decrypt(ciphertext: bytes) -> str:
# 解密并去除填充
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
# 提取内容并解码
content = decrypted_data[16:] # 去除头部的16字节
return content.decode('utf-8').strip()
# 解码 base64
decodedData = base64.b64decode(base64String)
# 解密数据
decryptedData = auth_lite_decrypt(decodedData)
print(decryptedData)
# 解码 URL 编码
print(unquote(decryptedData))

View File

@ -0,0 +1,66 @@
# 舞萌DX Auth-Lite 服务器模拟实现
# 仅实现了 /net/initialize 接口,用来处理 PowerOn
from fastapi import (
FastAPI,
Request
)
from fastapi.responses import (
HTMLResponse
)
import uvicorn
import httpx
from loguru import logger
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
# 从 HDD 提取
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
def auth_lite_encrypt(plaintext: str) -> bytes:
# 构造数据16字节头 + 16字节0前缀 + 明文
header = bytes(16)
content = bytes(16) + plaintext.encode('utf-8')
data = header + content
# 填充并加密
padded_data = pad(data, AES.block_size)
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
return cipher.encrypt(padded_data)
def auth_lite_decrypt(ciphertext: bytes) -> str:
# 解密并去除填充
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
# 提取内容并解码
content = decrypted_data[16:] # 去除头部的16字节
return content.decode('utf-8').strip()
def apiOfficialServer(encryptedString: str):
url = "http://at.sys-allnet.cn/net/initialize"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": "SDGB;Windows/Lite"
}
data = encryptedString
response = httpx.post(url, headers=headers, data=data)
return response.content
app = FastAPI()
USE_OFFICIAL_SERVER = 1
@app.post('/net/initialize')
async def get_data_dummy_api(request: Request):
gotRequest = (await request.body())
if USE_OFFICIAL_SERVER == 1:
decrypted = auth_lite_decrypt(gotRequest)
officialResponse = apiOfficialServer(auth_lite_encrypt(decrypted))
logger.info(auth_lite_decrypt(officialResponse))
return HTMLResponse(officialResponse)
else:
# todo
pass
if __name__ == '__main__':
uvicorn.run(app, host="0.0.0.0", port=80)

View File

@ -10,12 +10,12 @@ def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLogin
userAllPatches = {
"upsertUserAll": {
"userData": [{
"playerRating": 114514,
"lastRomVersion": romVersion,
"lastDataVersion": dataVersion,
}],
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" #1避免覆盖
}}
logger.info("Changing version number to " + dataVersion + " and " + romVersion)
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
return result