# 舞萌DX 2024 # 标题服务器通讯实现 import zlib import hashlib import httpx import certifi from loguru import logger import random import time from ctypes import c_int32 from Crypto.Cipher import AES from Crypto.Util.Padding import unpad from Config import * # 舞萌DX 2024 AesKey = "n7bx6:@Fg_:2;5E89Phy7AyIcpxEQ:R@" AesIV = ";;KjR1C3hgB1ovXa" ObfuscateParam = "BEs2D5vW" class SDGBApiError(Exception): pass class SDGBRequestError(SDGBApiError): pass class SDGBResponseError(SDGBApiError): pass class AES_PKCS7(object): def __init__(self, key: str, iv: str): self.key = key.encode('utf-8') self.iv = iv.encode('utf-8') self.mode = AES.MODE_CBC def encrypt(self, content): cipher = AES.new(self.key, AES.MODE_CBC, self.iv) content_padding = self.pkcs7padding(content) encrypt_bytes = cipher.encrypt(content_padding.encode('utf-8')) return encrypt_bytes def decrypt(self, content): cipher = AES.new(self.key, AES.MODE_CBC, self.iv) return cipher.decrypt(content) def pkcs7unpadding(self, text): length = len(text) unpadding = ord(text[length - 1]) return text[0:length - unpadding] def pkcs7padding(self, text): bs = 16 length = len(text) bytes_length = len(text.encode('utf-8')) padding_size = length if (bytes_length == length) else bytes_length padding = bs - padding_size % bs padding_text = chr(padding) * padding return text + padding_text def getSDGBApiHash(api): return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest() def apiSDGB(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5): """ 舞萌DX 2024 API 通讯用函数 :param data: 请求数据 :param targetApi: 使用的 API :param userAgentExtraData: UA 附加信息,机台相关则为狗号(如A63E01E9564),用户相关则为 UID :param noLog: 是否不记录日志 """ maxRetries = 3 agentExtra = str(userAgentExtraData) aes = AES_PKCS7(AesKey, AesIV) reqData_encrypted = aes.encrypt(data) reqData_deflated = zlib.compress(reqData_encrypted) endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/" if not noLog: logger.debug(f"开始请求 {targetApi},以 {data}") retries = 0 while retries < maxRetries: try: response = httpx.post( url=endpoint + getSDGBApiHash(targetApi), headers={ "User-Agent": f"{getSDGBApiHash(targetApi)}#{agentExtra}", "Content-Type": "application/json", "Mai-Encoding": "1.40", "Accept-Encoding": "", "Charset": "UTF-8", "Content-Encoding": "deflate", "Expect": "100-continue" }, content=reqData_deflated, verify=certifi.where(), timeout=timeout ) if not noLog: logger.info(f"{targetApi} 请求结果: {response.status_code}") if response.status_code == 200: logger.debug("200 OK!") else: errorMessage = f"请求失败: {response.status_code}" logger.error(errorMessage) raise SDGBRequestError(errorMessage) responseRAWContent = response.content try: responseDecompressed = zlib.decompress(responseRAWContent) logger.debug("成功解压响应!") except: logger.warning(f"无法解压,得到的原始响应: {responseRAWContent}") raise SDGBResponseError("解压失败") try: resultResponse = unpad(aes.decrypt(responseDecompressed), 16).decode() logger.debug(f"成功解密响应!") except: logger.warning(f"解密失败,得到的原始响应: {responseDecompressed}") raise SDGBResponseError("解密失败") if not noLog: logger.debug(f"响应: {resultResponse}") return resultResponse # 异常处理 except SDGBRequestError as e: # 请求格式错误,不需要重试 raise SDGBRequestError("请求格式错误") except SDGBResponseError as e: # 响应解析错误,重试但是只一次 logger.warning(f"Will now retry. {e}") retries += 2 time.sleep(2) except Exception as e: # 其他错误,重试 logger.warning(f"Will now retry. {e}") retries += 1 time.sleep(2) raise SDGBApiError("Multiple retries failed to make a successful request") def calcSpecialNumber(): """使用 c_int32 实现的 SpecialNumber 算法""" rng = random.SystemRandom() num2 = rng.randint(1, 1037933) * 2069 num2 += 0x400 num2 = c_int32(num2).value result = c_int32(0) for _ in range(32): result.value <<= 1 result.value += num2 & 1 num2 >>= 1 return c_int32(result.value).value def calcSpecialNumber2(): """实验性替代 SpecialNumber 算法""" max = 1037933 num2 = random.randint(1, max) * 2069 num2 += 1024 # specialnum num3 = 0 for i in range(0, 32): num3 <<= 1 num3 += num2 % 2 num2 >>= 1 return num3