maimaiDX-Api/API_TitleServer.py
2025-01-23 18:52:09 +08:00

170 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 舞萌DX 2024
# 标题服务器通讯实现
import zlib
import hashlib
import requests
from loguru import logger
import random
import time
from ctypes import c_int32
from Crypto.Cipher import AES
from Crypto.Util.Padding import unpad
from Static_Settings import *
# 舞萌DX 2024
AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@"
AesIV = ";;KjR1C3hgB1ovXa"
ObfuscateParam = "BEs2D5vW"
class WahlapServerBoomedError(Exception):
pass
class Request500Error(Exception):
pass
class aes_pkcs7(object):
def __init__(self, key: str, iv: str):
self.key = key.encode('utf-8')
self.iv = iv.encode('utf-8')
self.mode = AES.MODE_CBC
def encrypt(self, content):
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
content_padding = self.pkcs7padding(content)
encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8'))
return encrypt_bytes
def decrypt(self, content):
cipher = AES.new(self.key, AES.MODE_CBC, self.iv)
return cipher.decrypt(content)
def pkcs7unpadding(self, text):
length = len(text)
unpadding = ord(text[length - 1])
return text[0:length - unpadding]
def pkcs7padding(self, text):
bs = 16
length = len(text)
bytes_length = len(text.encode('utf-8'))
padding_size = length if (bytes_length == length) else bytes_length
padding = bs - padding_size % bs
padding_text = chr(padding) * padding
return text + padding_text
def SDGBApiHash(api):
return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest()
def apiSDGB(data:str, useApi, agentExtraData, maxRetries=5):
'''
舞萌DX 2024 API 通讯用函数
:param data: 请求数据
:param useApi: 使用的 API
:param agentExtraData: UA 附加信息机台相关则为狗号如A63E01E9564用户相关则为 UID
:param maxRetry: 最大重试次数, 默认为 3
'''
# 历史遗留代码有时候会传入 int故先全部转 str
agentExtra = str(agentExtraData)
# 编码好请求,准备发送
aes = aes_pkcs7(AesKey,AesIV)
data = data
data_enc = aes.encrypt(data)
data_def = zlib.compress(data_enc)
requests.packages.urllib3.disable_warnings()
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
logger.debug("TitleServer Request Start: "+ str(useApi)+" , Data: "+str(data))
retries = 0
while retries < maxRetries:
try:
# 发送请求
responseRaw = requests.post(endpoint + SDGBApiHash(useApi), headers={
"User-Agent": f"{SDGBApiHash(useApi)}#{agentExtra}",
"Content-Type": "application/json",
"Mai-Encoding": "1.40",
"Accept-Encoding": "",
"Charset": "UTF-8",
"Content-Encoding": "deflate",
"Expect": "100-continue"
}, data=data_def, verify=False)
logger.debug("TitleServer Request Sent.")
logger.debug("TitleServer Response Code: " + str(responseRaw.status_code))
# 如果是 404 或 500直接抛出异常不再继续重试
match responseRaw.status_code:
case 200:
logger.debug("Request 200 OK!")
case 404:
logger.error(f"Request 404! ")
raise NotImplementedError
case 500:
logger.error(f"Request Failed! 500!!!! ")
raise Request500Error
case _:
logger.error(f"Request Failed! {responseRaw.status_code}")
raise NotImplementedError
responseContent = responseRaw.content
# 尝试解压请求
try:
responseDecompressed = zlib.decompress(responseContent)
logger.debug("Successfully decompressed response.")
except zlib.error as e:
logger.warning(f"RAW Response: {responseContent}")
logger.warning(f"Wahlap Server Boomed! Will now retry.{e}")
retries += 1
time.sleep(4) # 休眠4秒后重试
continue
# 解压成功,解密请求并返回
resultResponse = unpad(aes.decrypt(responseDecompressed), 16).decode()
logger.info("TitleServer Response OK!")
logger.debug("TitleServer Response: " + str(resultResponse))
return resultResponse
# 除了 404 和 500 之外的错误重试
except Request500Error:
raise Request500Error("500请求格式错误")
except NotImplementedError:
raise NotImplementedError("请求未知错误")
except Exception as e:
logger.warning(f"Request Failed! Will now retry.. {e}")
retries += 1
time.sleep(4)
else:
# 重试次数用尽WahlapServerBoomedError
raise WahlapServerBoomedError("重试多次仍然不能成功请求")
def calcSpecialNumber():
'''使用 c_int32 实现的 SpecialNumber 算法'''
rng = random.SystemRandom()
num2 = rng.randint(1, 1037933) * 2069
num2 += 0x400
num2 = c_int32(num2).value
result = c_int32(0)
for _ in range(32):
result.value <<= 1
result.value += num2 & 1
num2 >>= 1
return c_int32(result.value).value
def calcSpecialNumber2():
'''实验性替代 SpecialNumber 算法'''
max = 1037933
num2 = random.randint(1, max) * 2069
num2 += 1024 # specialnum
num3 = 0
for i in range(0, 32):
num3 <<= 1
num3 += num2 % 2
num2 >>= 1
return num3