mirror of
https://github.com/Remik1r3n/maimaiDX-Api.git
synced 2025-06-16 02:27:27 +08:00
225 lines
7.4 KiB
Python
225 lines
7.4 KiB
Python
# 舞萌DX
|
||
# 标题服务器通讯实现
|
||
|
||
import zlib
|
||
import hashlib
|
||
import httpx
|
||
from loguru import logger
|
||
import random
|
||
import time
|
||
from ctypes import c_int32
|
||
from Crypto.Cipher import AES
|
||
from Crypto.Util.Padding import pad, unpad
|
||
from Config import *
|
||
from typing import Optional
|
||
import certifi
|
||
|
||
# 舞萌DX 2024
|
||
# omg it's leaking
|
||
#AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
|
||
#AesIV = ";;KjR1C3hgB1ovXa"
|
||
#ObfuscateParam = "BEs2D5vW"
|
||
|
||
# 2025
|
||
AesKey = "a>32bVP7v<63BVLkY[xM>daZ1s9MBP<R"
|
||
AesIV = "d6xHIKq]1J]Dt^ue"
|
||
ObfuscateParam = "B44df8yT"
|
||
|
||
class SDGBApiError(Exception):
|
||
pass
|
||
|
||
class SDGBRequestError(SDGBApiError):
|
||
pass
|
||
|
||
class SDGBResponseError(SDGBApiError):
|
||
pass
|
||
|
||
class aes_pkcs7(object):
|
||
def __init__(self, key: str, iv: str):
|
||
self.key = key.encode('utf-8')
|
||
self.iv = iv.encode('utf-8')
|
||
self.mode = AES.MODE_CBC
|
||
|
||
def encrypt(self, content: bytes) -> bytes:
|
||
cipher = AES.new(self.key, self.mode, self.iv)
|
||
content_padded = pad(content, AES.block_size)
|
||
encrypted_bytes = cipher.encrypt(content_padded)
|
||
return encrypted_bytes
|
||
|
||
def decrypt(self, content):
|
||
cipher = AES.new(self.key, self.mode, self.iv)
|
||
decrypted_padded = cipher.decrypt(content)
|
||
decrypted = unpad(decrypted_padded, AES.block_size)
|
||
return decrypted
|
||
|
||
def pkcs7unpadding(self, text):
|
||
length = len(text)
|
||
unpadding = ord(text[length - 1])
|
||
return text[0:length - unpadding]
|
||
|
||
def pkcs7padding(self, text):
|
||
bs = 16
|
||
length = len(text)
|
||
bytes_length = len(text.encode('utf-8'))
|
||
padding_size = length if (bytes_length == length) else bytes_length
|
||
padding = bs - padding_size % bs
|
||
padding_text = chr(padding) * padding
|
||
return text + padding_text
|
||
|
||
def getSDGBApiHash(api):
|
||
# API 的 Hash 的生成
|
||
# 有空做一下 Hash 的彩虹表?
|
||
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
|
||
|
||
def apiSDGB(
|
||
data: str,
|
||
targetApi: str,
|
||
userAgentExtraData: str,
|
||
noLog: bool = False,
|
||
timeout: int = 5,
|
||
useProxy: bool = False,
|
||
proxyUrl: Optional[str] = None
|
||
) -> str:
|
||
"""
|
||
舞萌DX API 通讯用函数(增强版)
|
||
:param data: 请求数据
|
||
:param targetApi: 使用的 API
|
||
:param userAgentExtraData: UA 附加信息,机台相关则为狗号(如A63E01E9564),用户相关则为 UID
|
||
:param noLog: 是否不记录日志
|
||
:param timeout: 请求超时时间(秒)
|
||
:param useProxy: 是否使用代理
|
||
:param proxyUrl: 代理地址(如果使用代理)
|
||
:return: 解码后的响应数据
|
||
"""
|
||
maxRetries = 3
|
||
agentExtra = str(userAgentExtraData)
|
||
aes = aes_pkcs7(AesKey, AesIV) # Assuming aes_pkcs7, AesKey, AesIV are defined elsewhere
|
||
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
|
||
|
||
# Prepare request data
|
||
requestDataFinal = aes.encrypt(zlib.compress(data.encode('utf-8')))
|
||
|
||
if not noLog:
|
||
logger.debug(f"开始请求 {targetApi},以 {data}")
|
||
|
||
retries = 0
|
||
while retries < maxRetries:
|
||
try:
|
||
# Configure HTTP client
|
||
if useProxy and proxyUrl:
|
||
if not noLog:
|
||
logger.debug("使用代理")
|
||
httpClient = httpx.Client(proxy=proxyUrl, verify=False)
|
||
else:
|
||
if not noLog:
|
||
logger.debug("不使用代理")
|
||
httpClient = httpx.Client(verify=False)
|
||
|
||
# Send request
|
||
response = httpClient.post(
|
||
url=endpoint + getSDGBApiHash(targetApi), # Assuming getSDGBApiHash is defined
|
||
headers={
|
||
"User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}",
|
||
"Content-Type": "application/json",
|
||
"Mai-Encoding": "1.50",
|
||
"Accept-Encoding": "",
|
||
"Charset": "UTF-8",
|
||
"Content-Encoding": "deflate",
|
||
"Expect": "100-continue"
|
||
},
|
||
content=requestDataFinal,
|
||
timeout=timeout
|
||
)
|
||
|
||
if not noLog:
|
||
logger.info(f"{targetApi} 请求结果: {response.status_code}")
|
||
|
||
if response.status_code != 200:
|
||
errorMessage = f"请求失败: {response.status_code}"
|
||
logger.error(errorMessage)
|
||
raise SDGBRequestError(errorMessage)
|
||
|
||
# Process response
|
||
responseContentRaw = response.content
|
||
|
||
try:
|
||
responseContentDecrypted = aes.decrypt(responseContentRaw)
|
||
if not noLog:
|
||
logger.debug("成功解密响应!")
|
||
except Exception as e:
|
||
logger.warning(f"解密失败,原始响应: {responseContentRaw}, 错误: {e}")
|
||
raise SDGBResponseError("解密失败")
|
||
|
||
|
||
try:
|
||
# 检查 ResponseContentDecrypted 是否为 zlib 压缩格式
|
||
if not responseContentDecrypted.startswith(b'\x78\x9c'):
|
||
logger.warning("Not Zlib. Not decompressed.")
|
||
raise Exception(f"响应内容不是 zlib 压缩格式, 内容: {responseContentDecrypted}")
|
||
responseContentFinal = zlib.decompress(responseContentDecrypted).decode('utf-8')
|
||
if not noLog:
|
||
logger.debug("成功解压响应!")
|
||
logger.debug(f"响应: {responseContentFinal}")
|
||
return responseContentFinal
|
||
except zlib.error as e:
|
||
logger.warning(f"解压失败,原始响应: {responseContentDecrypted}, 错误: {e}")
|
||
raise SDGBResponseError("解压失败")
|
||
|
||
# If decompression fails after attempts, trigger a retry of the entire request
|
||
retries += 1
|
||
if retries < maxRetries:
|
||
logger.warning(f"解压失败,将重试请求 (第 {retries + 1}/{maxRetries} 次)")
|
||
time.sleep(2)
|
||
continue
|
||
raise SDGBResponseError("多次尝试后仍无法解压响应")
|
||
|
||
except SDGBRequestError as e:
|
||
# Request format error, no retry
|
||
logger.error(f"请求格式错误: {e}")
|
||
raise
|
||
except SDGBResponseError as e:
|
||
# Response parsing error, retry once more
|
||
logger.warning(f"响应错误,将重试: {e}")
|
||
retries += 1
|
||
time.sleep(2)
|
||
except Exception as e:
|
||
# Other errors, retry
|
||
logger.warning(f"请求失败,将重试: {e}")
|
||
retries += 1
|
||
time.sleep(2)
|
||
|
||
finally:
|
||
if 'httpClient' in locals():
|
||
httpClient.close()
|
||
|
||
raise SDGBApiError("重试多次仍然无法成功请求服务器")
|
||
|
||
def calcPlaySpecial():
|
||
"""使用 c_int32 实现的 SpecialNumber 算法"""
|
||
rng = random.SystemRandom()
|
||
num2 = rng.randint(1, 1037933) * 2069
|
||
num2 += 1024 #GameManager.CalcSpecialNum()
|
||
num2 = c_int32(num2).value
|
||
result = c_int32(0)
|
||
for _ in range(32):
|
||
result.value <<= 1
|
||
result.value += num2 & 1
|
||
num2 >>= 1
|
||
return c_int32(result.value).value
|
||
|
||
"""
|
||
DEPRECATED: 旧的 SpecialNumber 算法
|
||
def calcSpecialNumber2():
|
||
max = 1037933
|
||
num2 = random.randint(1, max) * 2069
|
||
|
||
num2 += 1024 # specialnum
|
||
num3 = 0
|
||
for i in range(0, 32):
|
||
num3 <<= 1
|
||
num3 += num2 % 2
|
||
num2 >>= 1
|
||
|
||
return num3
|
||
"""
|