diff --git a/API_AuthLite.py b/API_AuthLiteDelivery.py similarity index 76% rename from API_AuthLite.py rename to API_AuthLiteDelivery.py index 45422c9..31ea6e1 100644 --- a/API_AuthLite.py +++ b/API_AuthLiteDelivery.py @@ -1,31 +1,35 @@ # All.Net AuthLite 更新获取 from Crypto.Cipher import AES -from Crypto.Util.Padding import pad +from Crypto.Util.Padding import pad, unpad import httpx from loguru import logger from urllib.parse import parse_qs import configparser as ini -def enc(key, iv, data): - cipher = AES.new(key, AES.MODE_CBC, iv) - encrypted = cipher.encrypt(data) - return encrypted +LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71]) +LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000') -def dec(key, iv ,data): - de_cipher = AES.new(key, AES.MODE_CBC, iv) - decrypted = de_cipher.decrypt(data) - return decrypted +def auth_lite_encrypt(plaintext: str) -> bytes: + # 构造数据:16字节头 + 16字节0前缀 + 明文 + header = bytes(16) + content = bytes(16) + plaintext.encode('utf-8') + data = header + content + # 填充并加密 + padded_data = pad(data, AES.block_size) + cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV) + return cipher.encrypt(padded_data) + +def auth_lite_decrypt(ciphertext: bytes) -> str: + # 解密并去除填充 + cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV) + decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size) + # 提取内容并解码 + content = decrypted_data[16:] # 去除头部的16字节 + return content.decode('utf-8').strip() def getRawDelivery(): - key = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71]) - iv = bytes.fromhex('00000000000000000000000000000000') - content = bytes([0] * 16) + b'title_id=SDGB&title_ver=1.40&client_id=A63E01C2805' - header = bytes.fromhex('00000000000000000000000000000000') - bytes_data = pad(header + content, 16) - - encrypted = enc(key, iv, bytes_data) - + encrypted = auth_lite_encrypt('title_id=SDGB&title_ver=1.40&client_id=A63E01C2805') r = httpx.post( 'http://at.sys-allnet.cn/net/delivery/instruction', data = encrypted, @@ -34,10 +38,8 @@ def getRawDelivery(): 'Pragma': 'DFI' } ) - resp_data = r.content - decrypted = dec(key, resp_data[:16], resp_data) - decrypted_str = decrypted[16:].decode('UTF-8').strip() + decrypted_str = auth_lite_decrypt(resp_data) # 过滤所有控制字符 decrypted_str = ''.join([i for i in decrypted_str if 31 < ord(i) < 127]) logger.info(f"RAW Response: {decrypted_str}") @@ -117,7 +119,6 @@ def parseUpdateIni(iniText): return final_message - if __name__ == '__main__': urlList = parseRawDelivery(getRawDelivery()) for url in urlList: diff --git a/Best50_To_Diving_Fish.py b/Best50_To_Diving_Fish.py index c6cf921..48de752 100644 --- a/Best50_To_Diving_Fish.py +++ b/Best50_To_Diving_Fish.py @@ -33,8 +33,14 @@ def apiDivingFish(method:str, apiPath:str, importToken:str, data=None): url=BASE_URL + apiPath, headers=headers, ) + elif method == 'DELETE': + response = requests.delete( + url=BASE_URL + apiPath, + headers=headers, + ) else: - raise NotImplementedError + logger.error(f'未知的请求方法:{method}') + raise ValueError(f'未知的请求方法:{method}') logger.info(f'水鱼查分器请求结果:{response.status_code}') logger.debug(f'水鱼查分器回应:{response.text}') @@ -104,9 +110,41 @@ def implUserMusicToDivingFish(userId:int, fishImportToken:str): return False return len(divingFishData) +def generateDebugTestScore(): + '''生成测试成绩''' + return [ + { + "achievement": 1010000, + "comboStatus": 4, + "deluxscoreMax": 4026, + "level": 4, + "musicId": 834, + "syncStatus": 4 + }, + { + "achievement": 1010000, + "comboStatus": 4, + "deluxscoreMax": 4200, + "level": 4, + "musicId": 11663, + "syncStatus": 4 + } + ] + +def implResetFishUser(fishImportToken:str): + '''重置水鱼查分器的用户数据''' + logger.info("开始重置水鱼查分器的用户数据..") + result = apiDivingFish('DELETE', '/player/delete_records', fishImportToken) + if result: + logger.info("重置成功!") + return True + logger.error("重置失败!") + return False + if __name__ == '__main__': if True: userId = testUid2 importToken = testImportToken #currentLoginTimestamp = generateTimestamp() - implUserMusicToDivingFish(userId, importToken) \ No newline at end of file + #implUserMusicToDivingFish(userId, importToken) + implResetFishUser(importToken) diff --git a/Config.py b/Config.py index f4e779e..ebb56ed 100644 --- a/Config.py +++ b/Config.py @@ -11,4 +11,4 @@ loginBonusDBPathFallback = "./maimaiDX-Api/Data/loginBonusDB.json" musicDBPathFallback = "./maimaiDX-Api/Data/musicDB.json" # 日本精工,安全防漏 -#from MyConfig import * +from MyConfig import * diff --git a/ForceLogout.py b/ForceLogout.py index 9a4f8e9..73f5415 100644 --- a/ForceLogout.py +++ b/ForceLogout.py @@ -1,5 +1,5 @@ # 解小黑屋实现 -# 未完工 +# 仍十分不完善,不建议使用 from Config import * from API_TitleServer import * diff --git a/GetPreview.py b/GetPreview.py index 074265a..d0fe6cf 100644 --- a/GetPreview.py +++ b/GetPreview.py @@ -5,6 +5,7 @@ from API_TitleServer import apiSDGB from Config import * import time import random +from loguru import logger def apiGetUserPreview(userId, noLog:bool=False) -> str: data = json.dumps({ @@ -26,7 +27,7 @@ if __name__ == "__main__": def crawlAllUserPreview(): """omg it's a evil crawler""" # 这里设置开始和结束的 UserId - BeginUserId = 11000000 + BeginUserId = 10200000 EndUserId = 12599999 # 打开文件,准备写入 @@ -35,11 +36,12 @@ def crawlAllUserPreview(): for userId in range(BeginUserId, EndUserId + 1): # 调用 API try: - userPreview = apiGetUserPreview(userId) + userPreview = apiGetUserPreview(userId, True) currentUser = json.loads(userPreview) if currentUser["userId"] is not None: # 每爬到一个就把它存到一个文件里面,每个一行 f.write(userPreview + "\n") + logger.info(f"{userId}: {currentUser['userName']}, RATING: {currentUser['playerRating']}") else: f.write("\n") except: diff --git a/HashEntertainment/GetMyHash.py b/HashEntertainment/GetMyHash.py deleted file mode 100644 index 5b5d56c..0000000 --- a/HashEntertainment/GetMyHash.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import re - -pattern = re.compile(r'Maimai2Servlet/(.*?)MaimaiChn') - -# 获取目录 -dir = 'C:/Users/remik1r3n/Workspace/maimaiDX-Api/HashEntertainment' - -known_hashes = [] - -for filename in os.listdir(dir): - # 只处理.txt文件 - if filename.endswith('.txt'): - file_path = os.path.join(dir, filename) - with open(file_path, 'r', encoding='utf-8') as file: - content = file.read() - # 搜索匹配的模式 - matches = pattern.findall(content) - # 输出每个匹配中的不定内容 - for match in matches: - known_hashes.append(match) - -# 去重 -known_hashes = list(set(known_hashes)) - -# 输出 -for hash in known_hashes: - print(hash) \ No newline at end of file diff --git a/README.md b/README.md index 56587da..e06bbe5 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,10 @@ # genshin-impact Genshin Impact Helpers and Tools + +## Project Matsuri +A project aim to create a compatible forwarder server, that allow Genshin Impact 2023 version to connect to the OFFICIAL server or other server very easily. + +It includes: +- A very simple Auth-Lite server, processes /poweron request. +- A title server implementation. It will decrypt the request, and forward it to the official server. +- AimeDB is still in research and more info will be updated soon(tm). diff --git a/Standalone/DecryptAuthLite.py b/Standalone/DecryptAuthLite.py new file mode 100644 index 0000000..88967ab --- /dev/null +++ b/Standalone/DecryptAuthLite.py @@ -0,0 +1,33 @@ +# 解密国服 Auth Lite 的包 +# 仅测试过 PowerOn 请求,其他包不确定是否适用 +# 仅适用国服 DX2024,其他未测试 + +# 完全 Standalone,不依赖于其他文件 + +import base64 +from Crypto.Cipher import AES +from urllib.parse import unquote +from Crypto.Util.Padding import unpad + +# 密钥从 HDD 提取 +LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71]) +LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000') + +# 填入你抓包得到的想解密的数据的 base64 编码 +base64String = "N0PyrjawH/7qA8A28y++3txHDsKuAs5+nib751QiNlJYigvwldPaG7xd0WYXvgqlWY16JIy38GQ8+M4ttaWRNfpWy9l29pC2h2abd4VGhIeWGLbOjc2Bthqhibui76vi4dW+05TsPiyXbOsqHFzScvdByKUtZUobZgrnr/WW+YqRIUdw/ZHBmKBY81JivnVH9AkEyCCP9xubYMjDqi65WhDpcrdMk5nUjHq/O7R1eXr12Es9gXDUruy/H4M7eMt+4kFSDCGpLSFwAEDhba6rpOz0n588nfvXXFlZ+a3ZsZSBYAJPBZ795Ck8ZDIYnEMWMV5nk6qPc2HiBF9ZZw88FlATGC8NqsTSjGX6JJXWDApUaSF5obXMu4LTmMMr0KDt2fQ6VQPkLnTgJ6tsJv1iAQvtcZ9ymn3I4S0XWXGmEq8r7XE7D+pnSJyjUn7bSXb6HOzCQtQc9XYmIbylS2sNkiDXywrxVgmiAXc4Ta8M9aNUb+81QrKj6qqC06DzdYQNBFRxb78X3nGYECEmKBsxIOR7Mf/ZqWYtA28Ob9H5CCUmjLvUaoQ+htJMHfQ9fvvevfu+FxtlJXfw+3UQDiQ1xvSZe2NMCgkLOuwqZ5/5PyoAV9MKzRXT4hBzDoiAIt7bzOH9JcNJkjUtLAjXbnwN6M6zUKpgMK4WYeCwUffNy21GbLVtfIxZZbVhK8A6Ni7j" + +def auth_lite_decrypt(ciphertext: bytes) -> str: + # 解密并去除填充 + cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV) + decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size) + # 提取内容并解码 + content = decrypted_data[16:] # 去除头部的16字节 + return content.decode('utf-8').strip() + +# 解码 base64 +decodedData = base64.b64decode(base64String) +# 解密数据 +decryptedData = auth_lite_decrypt(decodedData) +print(decryptedData) +# 解码 URL 编码 +print(unquote(decryptedData)) diff --git a/Standalone/DummyAuthLiteServer.py b/Standalone/DummyAuthLiteServer.py new file mode 100644 index 0000000..09a517d --- /dev/null +++ b/Standalone/DummyAuthLiteServer.py @@ -0,0 +1,66 @@ +# 舞萌DX Auth-Lite 服务器模拟实现 +# 仅实现了 /net/initialize 接口,用来处理 PowerOn + +from fastapi import ( + FastAPI, + Request +) +from fastapi.responses import ( + HTMLResponse +) +import uvicorn +import httpx +from loguru import logger +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad + +# 从 HDD 提取 +LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71]) +LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000') + +def auth_lite_encrypt(plaintext: str) -> bytes: + # 构造数据:16字节头 + 16字节0前缀 + 明文 + header = bytes(16) + content = bytes(16) + plaintext.encode('utf-8') + data = header + content + # 填充并加密 + padded_data = pad(data, AES.block_size) + cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV) + return cipher.encrypt(padded_data) + +def auth_lite_decrypt(ciphertext: bytes) -> str: + # 解密并去除填充 + cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV) + decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size) + # 提取内容并解码 + content = decrypted_data[16:] # 去除头部的16字节 + return content.decode('utf-8').strip() + +def apiOfficialServer(encryptedString: str): + url = "http://at.sys-allnet.cn/net/initialize" + headers = { + "Content-Type": "application/x-www-form-urlencoded", + "User-Agent": "SDGB;Windows/Lite" + } + data = encryptedString + response = httpx.post(url, headers=headers, data=data) + return response.content + +app = FastAPI() + +USE_OFFICIAL_SERVER = 1 + +@app.post('/net/initialize') +async def get_data_dummy_api(request: Request): + gotRequest = (await request.body()) + if USE_OFFICIAL_SERVER == 1: + decrypted = auth_lite_decrypt(gotRequest) + officialResponse = apiOfficialServer(auth_lite_encrypt(decrypted)) + logger.info(auth_lite_decrypt(officialResponse)) + return HTMLResponse(officialResponse) + else: + # todo + pass + +if __name__ == '__main__': + uvicorn.run(app, host="0.0.0.0", port=80) diff --git a/_Special.py b/_Special.py index 0061b97..34239d7 100644 --- a/_Special.py +++ b/_Special.py @@ -10,12 +10,12 @@ def implChangeVersionNumber(userId: int, currentLoginTimestamp:int, currentLogin userAllPatches = { "upsertUserAll": { "userData": [{ - "playerRating": 114514, + "lastRomVersion": romVersion, + "lastDataVersion": dataVersion, }], "userMusicDetailList": [musicData], "isNewMusicDetailList": "1" #1避免覆盖 }} - logger.info("Changing version number to " + dataVersion + " and " + romVersion) result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) return result