From cb599c5f35f743266ef4f9a0ca13c13b4f9fb943 Mon Sep 17 00:00:00 2001 From: mokurin000 <1348292515a@gmail.com> Date: Tue, 29 Jul 2025 17:07:34 +0800 Subject: [PATCH] chore: totally fix lint issues --- API_AimeDB.py | 48 ++++++++++------ API_AuthLiteDelivery.py | 95 +++++++++++++++++--------------- API_TitleServer.py | 115 +++++++++++++++++++++++++-------------- ActionChangeVersion.py | 40 ++++++++------ ActionLoginBonus.py | 105 +++++++++++++++++++---------------- ActionScoreRecord.py | 110 +++++++++++++++++++++++-------------- Best50_To_Diving_Fish.py | 2 +- ForceLogout.py | 50 ++++++++++------- HelperUserAll.py | 2 +- MusicDB.py | 8 +-- Standalone/UI.py | 28 ++++++---- 11 files changed, 358 insertions(+), 245 deletions(-) diff --git a/API_AimeDB.py b/API_AimeDB.py index 4ba7fa1..7f93952 100644 --- a/API_AimeDB.py +++ b/API_AimeDB.py @@ -8,25 +8,36 @@ import requests import json import re from loguru import logger + # 常量 CHIP_ID = "A63E-01E68606624" COMMON_KEY = "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW" API_URL = "http://ai.sys-allnet.cn/wc_aime/api/get_data" + # 计算 SHA256 def getSHA256(input_str): """SHA256计算""" - return hashlib.sha256(input_str.encode('utf-8')).hexdigest().upper() + return hashlib.sha256(input_str.encode("utf-8")).hexdigest().upper() + # 生成时间戳 def generateSEGATimestamp(): """SEGA格式的 YYMMDDHHMMSS 时间戳(sb玩意)""" return time.strftime("%y%m%d%H%M%S", time.localtime()) + # 计算认证 key -def calcSEGAAimeDBAuthKey(varString:str, timestamp:str, commonKey:str="XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW") -> str: +def calcSEGAAimeDBAuthKey( + varString: str, timestamp: str, commonKey: str = "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW" +) -> str: """计算 SEGA AimeDB 的认证 key""" - return hashlib.sha256((varString + timestamp + commonKey).encode("utf-8")).hexdigest().upper() + return ( + hashlib.sha256((varString + timestamp + commonKey).encode("utf-8")) + .hexdigest() + .upper() + ) + def apiAimeDB(qrCode): """AimeDB 扫码 API 实现""" @@ -42,11 +53,11 @@ def apiAimeDB(qrCode): "openGameID": "MAID", "key": currentKey, "qrCode": qrCode, - "timestamp": timestamp + "timestamp": timestamp, } # 输出准备好的请求数据 - print("Payload:", json.dumps(payload, separators=(',', ':'))) + print("Payload:", json.dumps(payload, separators=(",", ":"))) # 发送 POST 请求 headers = { @@ -55,7 +66,9 @@ def apiAimeDB(qrCode): "User-Agent": "WC_AIME_LIB", "Content-Type": "application/json", } - response = requests.post(API_URL, data=json.dumps(payload, separators=(',', ':')), headers=headers) + response = requests.post( + API_URL, data=json.dumps(payload, separators=(",", ":")), headers=headers + ) # 返回服务器的响应 return response @@ -64,16 +77,16 @@ def apiAimeDB(qrCode): def isSGWCFormat(input_string: str) -> bool: """简单检查二维码字符串是否符合格式""" if ( - len(input_string) != 84 #长度 - or not input_string.startswith("SGWCMAID") #识别字 - or re.match("^[0-9A-F]+$", input_string[20:]) is None #有效字符 + len(input_string) != 84 # 长度 + or not input_string.startswith("SGWCMAID") # 识别字 + or re.match("^[0-9A-F]+$", input_string[20:]) is None # 有效字符 ): return False else: return True - -def implAimeDB(qrCode:str, isAlreadyFinal:bool=False) -> str: + +def implAimeDB(qrCode: str, isAlreadyFinal: bool = False) -> str: """ Aime DB 的请求的参考实现。 提供完整 QRCode 内容,返回响应的字符串(Json格式) @@ -93,7 +106,7 @@ def implAimeDB(qrCode:str, isAlreadyFinal:bool=False) -> str: return response.text -def implGetUID(qr_content:str) -> dict: +def implGetUID(qr_content: str) -> dict: """ 包装后的 UID 扫码器实现。 此函数会返回 AimeDB 传回的 Json 转成 Python 字典的结果。 @@ -101,18 +114,19 @@ def implGetUID(qr_content:str) -> dict: """ # 检查格式 if not isSGWCFormat(qr_content): - return {'errorID': 60001} # 二维码内容明显无效 - + return {"errorID": 60001} # 二维码内容明显无效 + # 发送请求并处理响应 try: result = json.loads(implAimeDB(qr_content)) logger.info(f"QRScan Got Response {result}") - except: - return {'errorID': 60002} # 无法解码 Response 的内容 - + except Exception: + return {"errorID": 60002} # 无法解码 Response 的内容 + # 返回结果 return result + if __name__ == "__main__": userInputQR = input("QRCode: ") print(implAimeDB(userInputQR)) diff --git a/API_AuthLiteDelivery.py b/API_AuthLiteDelivery.py index 77798ca..f228117 100644 --- a/API_AuthLiteDelivery.py +++ b/API_AuthLiteDelivery.py @@ -7,119 +7,128 @@ from loguru import logger from urllib.parse import parse_qs import configparser as ini -LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71]) -LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000') +LITE_AUTH_KEY = bytes( + [47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71] +) +LITE_AUTH_IV = bytes.fromhex("00000000000000000000000000000000") + def auth_lite_encrypt(plaintext: str) -> bytes: # 构造数据:16字节头 + 16字节0前缀 + 明文 header = bytes(16) - content = bytes(16) + plaintext.encode('utf-8') + content = bytes(16) + plaintext.encode("utf-8") data = header + content # 填充并加密 padded_data = pad(data, AES.block_size) cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV) return cipher.encrypt(padded_data) + def auth_lite_decrypt(ciphertext: bytes) -> str: # 解密并去除填充 cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV) decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size) # 提取内容并解码 content = decrypted_data[16:] # 去除头部的16字节 - return content.decode('utf-8').strip() + return content.decode("utf-8").strip() -def getRawDelivery(title_ver:str="1.51"): - encrypted = auth_lite_encrypt(f'title_id=SDGB&title_ver={title_ver}&client_id=A63E01C2805') + +def getRawDelivery(title_ver: str = "1.51"): + encrypted = auth_lite_encrypt( + f"title_id=SDGB&title_ver={title_ver}&client_id=A63E01C2805" + ) r = httpx.post( - 'http://at.sys-allnet.cn/net/delivery/instruction', - data = encrypted, - headers = { - 'User-Agent': "SDGB;Windows/Lite", - 'Pragma': 'DFI' - } + "http://at.sys-allnet.cn/net/delivery/instruction", + data=encrypted, + headers={"User-Agent": "SDGB;Windows/Lite", "Pragma": "DFI"}, ) resp_data = r.content decrypted_str = auth_lite_decrypt(resp_data) # 过滤所有控制字符 - decrypted_str = ''.join([i for i in decrypted_str if 31 < ord(i) < 127]) + decrypted_str = "".join([i for i in decrypted_str if 31 < ord(i) < 127]) logger.info(f"RAW Response: {decrypted_str}") - + return decrypted_str + def parseRawDelivery(deliveryStr): """解析 RAW 的 Delivery 字符串,返回其中的有效的 instruction URL 的列表""" parsedResponseDict = {key: value[0] for key, value in parse_qs(deliveryStr).items()} - urlList = parsedResponseDict['uri'].split('|') + urlList = parsedResponseDict["uri"].split("|") # 过滤掉空字符串和内容为 null 的情况 - urlList = [url for url in urlList if url and url != 'null'] + urlList = [url for url in urlList if url and url != "null"] logger.info(f"Parsed URL List: {urlList}") validURLs = [] for url in urlList: # 检查是否是 HTTPS 的 URL,以及是否是 txt 文件,否则忽略 - if not url.startswith('https://') or not url.endswith('.txt'): + if not url.startswith("https://") or not url.endswith(".txt"): logger.warning(f"Invalid URL will be ignored: {url}") continue validURLs.append(url) logger.info(f"Verified Valid URLs: {validURLs}") return validURLs + def getUpdateIniFromURL(url): # 发送请求 - response = httpx.get(url, headers={ - 'User-Agent': 'SDGB;Windows/Lite', - 'Pragma': 'DFI' - }) + response = httpx.get( + url, headers={"User-Agent": "SDGB;Windows/Lite", "Pragma": "DFI"} + ) logger.info(f"成功自 {url} 获取更新信息") return response.text + def parseUpdateIni(iniText): # 解析配置 config = ini.ConfigParser(allow_no_value=True) config.read_string(iniText) logger.info(f"成功解析配置文件,包含的节有:{config.sections()}") - + # 获取 COMMON 节的配置 - common = config['COMMON'] - + common = config["COMMON"] + # 初始化消息列表 message = [] - + # 获取游戏描述并去除引号 - game_desc = common['GAME_DESC'].strip('"') - + game_desc = common["GAME_DESC"].strip('"') + # 根据前缀选择消息模板和图标 prefix_icons = { - 'PATCH': ('💾 游戏程序更新 (.app) ', 'PATCH_'), - 'OPTION': ('📚 游戏内容更新 (.opt) ', 'OPTION_') + "PATCH": ("💾 游戏程序更新 (.app) ", "PATCH_"), + "OPTION": ("📚 游戏内容更新 (.opt) ", "OPTION_"), } - icon, prefix = prefix_icons.get(game_desc.split('_')[0], ('📦 游戏更新 ', '')) - + icon, prefix = prefix_icons.get(game_desc.split("_")[0], ("📦 游戏更新 ", "")) + # 构建消息标题 - game_title = game_desc.replace(prefix, '', 1) + game_title = game_desc.replace(prefix, "", 1) message.append(f"{icon}{game_title}") - + # 添加可选文件的下载链接(如果有) - if 'OPTIONAL' in config: + if "OPTIONAL" in config: message.append("往期更新包:") - optional_files = [f"{url.split('/')[-1]} {url}" for _, url in config.items('OPTIONAL')] + optional_files = [ + f"{url.split('/')[-1]} {url}" for _, url in config.items("OPTIONAL") + ] message.extend(optional_files) - + # 添加主文件的下载链接 - main_file = common['INSTALL1'] - main_file_name = main_file.split('/')[-1] + main_file = common["INSTALL1"] + main_file_name = main_file.split("/")[-1] message.append(f"此次更新包: \n{main_file_name} {main_file}") # 添加发布时间信息 - release_time = common['RELEASE_TIME'].replace('T', ' ') + release_time = common["RELEASE_TIME"].replace("T", " ") message.append(f"正式发布时间:{release_time}。\n") - + # 构建最终的消息字符串 - final_message = '\n'.join(message) + final_message = "\n".join(message) logger.info(f"消息构建完成,最终的消息为:\n{final_message}") - + return final_message -if __name__ == '__main__': + +if __name__ == "__main__": urlList = parseRawDelivery(getRawDelivery("1.51")) for url in urlList: iniText = getUpdateIniFromURL(url) diff --git a/API_TitleServer.py b/API_TitleServer.py index 3d97c31..5d9aa93 100644 --- a/API_TitleServer.py +++ b/API_TitleServer.py @@ -1,18 +1,20 @@ # 舞萌DX # 标题服务器通讯实现 - import zlib import hashlib -import httpx -from loguru import logger import random import time + +import httpx +from loguru import logger from ctypes import c_int32 from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad -from Config import * -from typing import Optional -import certifi + +from Config import ( + useProxy, + proxyUrl, +) use2024Api = False # 是否使用 2024 API @@ -25,19 +27,23 @@ else: AesIV = "d6xHIKq]1J]Dt^ue" ObfuscateParam = "B44df8yT" + class SDGBApiError(Exception): pass - + + class SDGBRequestError(SDGBApiError): pass + class SDGBResponseError(SDGBApiError): pass + class aes_pkcs7(object): def __init__(self, key: str, iv: str): - self.key = key.encode('utf-8') - self.iv = iv.encode('utf-8') + self.key = key.encode("utf-8") + self.iv = iv.encode("utf-8") self.mode = AES.MODE_CBC def encrypt(self, content: bytes) -> bytes: @@ -55,21 +61,23 @@ class aes_pkcs7(object): def pkcs7unpadding(self, text): length = len(text) unpadding = ord(text[length - 1]) - return text[0:length - unpadding] + return text[0 : length - unpadding] def pkcs7padding(self, text): bs = 16 length = len(text) - bytes_length = len(text.encode('utf-8')) + bytes_length = len(text.encode("utf-8")) padding_size = length if (bytes_length == length) else bytes_length padding = bs - padding_size % bs padding_text = chr(padding) * padding return text + padding_text + def getSDGBApiHash(api): # API 的 Hash 的生成 # 有空做一下 Hash 的彩虹表? - return hashlib.md5((api+"MaimaiChn"+ObfuscateParam).encode()).hexdigest() + return hashlib.md5((api + "MaimaiChn" + ObfuscateParam).encode()).hexdigest() + def apiSDGB( data: str, @@ -94,7 +102,7 @@ def apiSDGB( endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/" # 准备好请求数据 - requestDataFinal = aes.encrypt(zlib.compress(data.encode('utf-8'))) + requestDataFinal = aes.encrypt(zlib.compress(data.encode("utf-8"))) if not noLog: logger.debug(f"[Stage 1] 准备开始请求 {targetApi},以 {data}") @@ -108,7 +116,7 @@ def apiSDGB( httpClient = httpx.Client(proxy=proxyUrl, verify=False) else: httpClient = httpx.Client(verify=False) - + # 发送请求 response = httpClient.post( url=endpoint + getSDGBApiHash(targetApi), @@ -119,10 +127,10 @@ def apiSDGB( "Accept-Encoding": "", "Charset": "UTF-8", "Content-Encoding": "deflate", - "Expect": "100-continue" + "Expect": "100-continue", }, - content=requestDataFinal, #数据 - timeout=timeout + content=requestDataFinal, # 数据 + timeout=timeout, ) if not noLog: @@ -142,30 +150,42 @@ def apiSDGB( if not noLog: logger.debug("[Stage 3] Decryption SUCCESS.") except Exception as e: - logger.warning(f"[Stage 3] Decryption FAILED. Raw Content: {responseContentRaw}, Error: {e}") + logger.warning( + f"[Stage 3] Decryption FAILED. Raw Content: {responseContentRaw}, Error: {e}" + ) raise SDGBResponseError("Decryption failed") # 然后尝试解压 try: # 看看文件头是否是压缩过的 - if responseContentDecrypted.startswith(b'\x78\x9c'): + if responseContentDecrypted.startswith(b"\x78\x9c"): logger.debug("[Stage 4] Zlib detected, decompressing...") - responseContentFinal = zlib.decompress(responseContentDecrypted).decode('utf-8') + responseContentFinal = zlib.decompress( + responseContentDecrypted + ).decode("utf-8") else: - logger.warning(f"[Stage 4] Not Zlib Format!! using raw content: {responseContentDecrypted}") - responseContentFinal = responseContentDecrypted.decode('utf-8') + logger.warning( + f"[Stage 4] Not Zlib Format!! using raw content: {responseContentDecrypted}" + ) + responseContentFinal = responseContentDecrypted.decode("utf-8") # 完成解压 if not noLog: - logger.debug(f"[Stage 4] Process OK, Content: {responseContentFinal}") + logger.debug( + f"[Stage 4] Process OK, Content: {responseContentFinal}" + ) # 最终处理,检查是否是 JSON 格式 - if responseContentFinal.startswith('{') and responseContentFinal.endswith('}'): + if responseContentFinal.startswith( + "{" + ) and responseContentFinal.endswith("}"): # 如果是 JSON 格式,直接返回 logger.debug("[Stage 5] Response is JSON, returning.") return responseContentFinal else: # 如果不是 JSON 格式,直接返回但是警告 - logger.warning("[Stage 5] Response is not JSON, returning as is, take care!") + logger.warning( + "[Stage 5] Response is not JSON, returning as is, take care!" + ) return responseContentFinal - except: + except Exception: logger.warning(f"解压失败,原始响应: {responseContentDecrypted}") raise SDGBResponseError("解压失败") except SDGBRequestError as e: @@ -181,16 +201,17 @@ def apiSDGB( time.sleep(2) finally: - if 'httpClient' in locals(): + if "httpClient" in locals(): httpClient.close() raise SDGBApiError("重试多次仍然无法成功请求服务器") + def calcPlaySpecial(): """使用 c_int32 实现的 SpecialNumber 算法""" rng = random.SystemRandom() num2 = rng.randint(1, 1037933) * 2069 - num2 += 1024 #GameManager.CalcSpecialNum() + num2 += 1024 # GameManager.CalcSpecialNum() num2 = c_int32(num2).value result = c_int32(0) for _ in range(32): @@ -199,21 +220,24 @@ def calcPlaySpecial(): num2 >>= 1 return c_int32(result.value).value + class AESPKCS7_2024: # 实现了 maimai 通讯所用的 AES 加密的类 def __init__(self, key: str, iv: str): - self.key = key.encode('utf-8') - self.iv = iv.encode('utf-8') + self.key = key.encode("utf-8") + self.iv = iv.encode("utf-8") self.mode = AES.MODE_CBC + # 加密 def encrypt(self, content) -> bytes: # if content is str, convert to bytes if isinstance(content, str): - encodedData = content.encode('utf-8') + encodedData = content.encode("utf-8") cipher = AES.new(self.key, self.mode, self.iv) content_padded = pad(encodedData, AES.block_size) encrypted_bytes = cipher.encrypt(content_padded) return encrypted_bytes + # 解密 def decrypt(self, encrypted_content: bytes) -> str: cipher = AES.new(self.key, self.mode, self.iv) @@ -221,7 +245,14 @@ class AESPKCS7_2024: decrypted = unpad(decrypted_padded, AES.block_size) return decrypted -def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5): + +def apiSDGB_2024( + data: str, + targetApi: str, + userAgentExtraData: str, + noLog: bool = False, + timeout: int = 5, +): """ 舞萌DX 2024 API 通讯用函数 :param data: 请求数据 @@ -258,13 +289,13 @@ def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=Fal "Accept-Encoding": "", "Charset": "UTF-8", "Content-Encoding": "deflate", - "Expect": "100-continue" + "Expect": "100-continue", }, content=reqData_deflated, # 经测试,加 Verify 之后速度慢好多,因此建议选择性开 - #verify=certifi.where(), - #verify=False, - timeout=timeout + # verify=certifi.where(), + # verify=False, + timeout=timeout, ) if not noLog: @@ -282,22 +313,22 @@ def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=Fal try: responseDecompressed = zlib.decompress(responseRAWContent) logger.debug("成功解压响应!") - except: + except Exception: logger.warning(f"无法解压,得到的原始响应: {responseRAWContent}") raise SDGBResponseError("解压失败") try: resultResponse = aes.decrypt(responseDecompressed) - logger.debug(f"成功解密响应!") - except: + logger.debug("成功解密响应!") + except Exception: logger.warning(f"解密失败,得到的原始响应: {responseDecompressed}") raise SDGBResponseError("解密失败") - + if not noLog: logger.debug(f"响应: {resultResponse}") return resultResponse - + # 异常处理 - except SDGBRequestError as e: + except SDGBRequestError: # 请求格式错误,不需要重试 raise SDGBRequestError("请求格式错误") except SDGBResponseError as e: diff --git a/ActionChangeVersion.py b/ActionChangeVersion.py index 5ee4a1c..764181a 100644 --- a/ActionChangeVersion.py +++ b/ActionChangeVersion.py @@ -7,40 +7,44 @@ from HelperFullPlay import implFullPlayAction, generateMusicData from HelperGetUserThing import implGetUser_ from MyConfig import testUid8 -def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str: + +def implWipeTickets(userId: int, currentLoginTimestamp: int, currentLoginResult) -> str: # Get User Charge currentUserCharge = implGetUser_("Charge", userId) - currentUserChargeList = currentUserCharge['userChargeList'] + currentUserChargeList = currentUserCharge["userChargeList"] # All stock set to 0 for charge in currentUserChargeList: - charge['stock'] = 0 - -# example format -# {"userId":11088995,"length":16,"userChargeList":[{"chargeId":1,"stock":0,"purchaseDate":"2025-02-04 00:51:50","validDate":"2025-02-04 00:51:50","extNum1":0},{"chargeId":2,"stock":0,"purchaseDate":"2025-06-11 17:19:42","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":3,"stock":0,"purchaseDate":"2025-06-11 17:19:40","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":4,"stock":0,"purchaseDate":"2025-06-11 09:34:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":5,"stock":0,"purchaseDate":"2025-01-30 12:31:16","validDate":"2025-04-30 04:00:00","extNum1":0},{"chargeId":6,"stock":0,"purchaseDate":"2025-02-17 20:01:42","validDate":"2025-02-17 20:01:42","extNum1":0},{"chargeId":7,"stock":0,"purchaseDate":"2025-02-06 16:17:41","validDate":"2025-02-06 16:17:41","extNum1":0},{"chargeId":8,"stock":0,"purchaseDate":"2025-02-06 16:17:49","validDate":"2025-02-06 16:17:49","extNum1":0},{"chargeId":9,"stock":0,"purchaseDate":"2025-02-06 16:18:00","validDate":"2025-02-06 16:18:00","extNum1":0},{"chargeId":10001,"stock":1,"purchaseDate":"2025-06-11 17:19:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":10005,"stock":0,"purchaseDate":"2025-04-25 15:45:55","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10105,"stock":0,"purchaseDate":"2025-04-25 15:46:00","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10205,"stock":0,"purchaseDate":"2025-04-25 15:46:03","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":11001,"stock":0,"purchaseDate":"2025-01-08 20:43:05","validDate":"2025-04-08 04:00:00","extNum1":0},{"chargeId":30001,"stock":0,"purchaseDate":"2025-04-25 15:46:17","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":999999,"stock":0,"purchaseDate":"2025-02-06 23:03:14","validDate":"2025-02-06 23:03:14","extNum1":0}]} + charge["stock"] = 0 + # example format + # {"userId":11088995,"length":16,"userChargeList":[{"chargeId":1,"stock":0,"purchaseDate":"2025-02-04 00:51:50","validDate":"2025-02-04 00:51:50","extNum1":0},{"chargeId":2,"stock":0,"purchaseDate":"2025-06-11 17:19:42","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":3,"stock":0,"purchaseDate":"2025-06-11 17:19:40","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":4,"stock":0,"purchaseDate":"2025-06-11 09:34:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":5,"stock":0,"purchaseDate":"2025-01-30 12:31:16","validDate":"2025-04-30 04:00:00","extNum1":0},{"chargeId":6,"stock":0,"purchaseDate":"2025-02-17 20:01:42","validDate":"2025-02-17 20:01:42","extNum1":0},{"chargeId":7,"stock":0,"purchaseDate":"2025-02-06 16:17:41","validDate":"2025-02-06 16:17:41","extNum1":0},{"chargeId":8,"stock":0,"purchaseDate":"2025-02-06 16:17:49","validDate":"2025-02-06 16:17:49","extNum1":0},{"chargeId":9,"stock":0,"purchaseDate":"2025-02-06 16:18:00","validDate":"2025-02-06 16:18:00","extNum1":0},{"chargeId":10001,"stock":1,"purchaseDate":"2025-06-11 17:19:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":10005,"stock":0,"purchaseDate":"2025-04-25 15:45:55","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10105,"stock":0,"purchaseDate":"2025-04-25 15:46:00","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10205,"stock":0,"purchaseDate":"2025-04-25 15:46:03","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":11001,"stock":0,"purchaseDate":"2025-01-08 20:43:05","validDate":"2025-04-08 04:00:00","extNum1":0},{"chargeId":30001,"stock":0,"purchaseDate":"2025-04-25 15:46:17","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":999999,"stock":0,"purchaseDate":"2025-02-06 23:03:14","validDate":"2025-02-06 23:03:14","extNum1":0}]} musicData = generateMusicData() userAllPatches = { - "upsertUserAll": { -# "userData": [{ -# "lastRomVersion": romVersion, -# "lastDataVersion": dataVersion -# }], - "userChargeList": currentUserChargeList, - "userMusicDetailList": [musicData], - "isNewMusicDetailList": "1" #1避免覆盖 - }} + "upsertUserAll": { + # "userData": [{ + # "lastRomVersion": romVersion, + # "lastDataVersion": dataVersion + # }], + "userChargeList": currentUserChargeList, + "userMusicDetailList": [musicData], + "isNewMusicDetailList": "1", # 1避免覆盖 + } + } - result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) + result = implFullPlayAction( + userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches + ) return result + if __name__ == "__main__": userId = testUid8 currentLoginTimestamp = generateTimestamp() loginResult = apiLogin(currentLoginTimestamp, userId) - if loginResult['returnCode'] != 1: + if loginResult["returnCode"] != 1: logger.info("登录失败") exit() try: @@ -48,4 +52,4 @@ if __name__ == "__main__": logger.info(apiLogout(currentLoginTimestamp, userId)) finally: logger.info(apiLogout(currentLoginTimestamp, userId)) - #logger.warning("Error") + # logger.warning("Error") diff --git a/ActionLoginBonus.py b/ActionLoginBonus.py index ffe7d3a..f8bdc0b 100644 --- a/ActionLoginBonus.py +++ b/ActionLoginBonus.py @@ -5,53 +5,59 @@ import rapidjson as json from loguru import logger import xml.etree.ElementTree as ET -from Config import * +from Config import ( + loginBonusDBPath, + loginBonusDBPathFallback, +) +from MyConfig import testUid from API_TitleServer import apiSDGB from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperFullPlay import implFullPlayAction, generateMusicData + + class NoSelectedBonusError(Exception): pass -def apiQueryLoginBonus(userId:int) -> str: + +def apiQueryLoginBonus(userId: int) -> str: """ログインボーナスを取得する API""" - data = json.dumps({ - "userId": int(userId), - "nextIndex": 0, - "maxCount": 2000 - }) + data = json.dumps({"userId": int(userId), "nextIndex": 0, "maxCount": 2000}) return apiSDGB(data, "GetUserLoginBonusApi", userId) -def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, bonusGenerateMode=1): + +def implLoginBonus( + userId: int, currentLoginTimestamp: int, currentLoginResult, bonusGenerateMode=1 +): """ ログインボーナスデータをアップロードする bonusGenerateMode は、ログインボーナスを生成する方法を指定します。 1: 選択したボーナスのみ MAX にする(選択したボーナスはないの場合は False を返す) 2: 全部 MAX にする """ - musicData = generateMusicData() + musicData = generateMusicData() # サーバーからログインボーナスデータを取得 - data = json.dumps({ - "userId": int(userId), - "nextIndex": 0, - "maxCount": 2000 - }) + data = json.dumps({"userId": int(userId), "nextIndex": 0, "maxCount": 2000}) UserLoginBonusResponse = json.loads(apiSDGB(data, "GetUserLoginBonusApi", userId)) # ログインボーナスリストを生成、それから処理してアップロード - UserLoginBonusList = UserLoginBonusResponse['userLoginBonusList'] + UserLoginBonusList = UserLoginBonusResponse["userLoginBonusList"] finalBonusList = generateLoginBonusList(UserLoginBonusList, bonusGenerateMode) if not finalBonusList: - return False #ログインボーナスを選択していないから失敗 + return False # ログインボーナスを選択していないから失敗 # UserAllのパッチ userAllPatches = { - "upsertUserAll": { - "userMusicDetailList": [musicData], - "isNewMusicDetailList": "1", #上書きしない - "userLoginBonusList": finalBonusList, - "isNewLoginBonusList": "0" * len(finalBonusList) - }} - result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) + "upsertUserAll": { + "userMusicDetailList": [musicData], + "isNewMusicDetailList": "1", # 上書きしない + "userLoginBonusList": finalBonusList, + "isNewLoginBonusList": "0" * len(finalBonusList), + } + } + result = implFullPlayAction( + userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches + ) return result + def generateLoginBonusList(UserLoginBonusList, generateMode=1): """ ログインボーナスリストを生成します。 @@ -65,60 +71,64 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1): try: tree = ET.parse(loginBonusDBPath) root = tree.getroot() - loginBonusIdList = [int(item.find('id').text) for item in root.findall('.//StringID')] + loginBonusIdList = [ + int(item.find("id").text) for item in root.findall(".//StringID") + ] logger.debug(f"ログインボーナスIDリスト: {loginBonusIdList}") except FileNotFoundError: try: tree = ET.parse(loginBonusDBPathFallback) root = tree.getroot() - loginBonusIdList = [int(item.find('id').text) for item in root.findall('.//StringID')] + loginBonusIdList = [ + int(item.find("id").text) for item in root.findall(".//StringID") + ] logger.debug(f"ログインボーナスIDリスト: {loginBonusIdList}") - except: + except Exception: raise FileNotFoundError("ログインボーナスデータベースを読み込めません") # ログインボーナスの MAX POINT は5の場合があります # その全部のボーナスIDをこのリストに追加してください # 必ず最新のデータを使用してください Bonus5Id = [12, 29, 30, 38, 43, 604, 611] - + # UserBonusList から bonusId を取得 - UserLoginBonusIdList = [item['bonusId'] for item in UserLoginBonusList] - + UserLoginBonusIdList = [item["bonusId"] for item in UserLoginBonusList] + # 存在しないボーナス NonExistingBonuses = list(set(loginBonusIdList) - set(UserLoginBonusIdList)) logger.debug(f"存在しないボーナス: {NonExistingBonuses}") bonusList = [] - if generateMode == 1: #選択したボーナスのみ MAX にする + if generateMode == 1: # 選択したボーナスのみ MAX にする for item in UserLoginBonusList: - if item['isCurrent'] and not item['isComplete']: - point = 4 if item['bonusId'] in Bonus5Id else 9 + if item["isCurrent"] and not item["isComplete"]: + point = 4 if item["bonusId"] in Bonus5Id else 9 data = { - "bonusId": item['bonusId'], + "bonusId": item["bonusId"], "point": point, "isCurrent": True, - "isComplete": False + "isComplete": False, } bonusList.append(data) if len(bonusList) == 0: raise NoSelectedBonusError("選択したログインボーナスがありません") - elif generateMode == 2: #全部 MAX にする + elif generateMode == 2: # 全部 MAX にする # 存在しているボーナスを追加 for item in UserLoginBonusList: - if not item['isComplete']: + if not item["isComplete"]: data = { - "bonusId": item['bonusId'], - "point": 4 if item['bonusId'] in Bonus5Id else 9, - "isCurrent": item['isCurrent'], - "isComplete": False + "bonusId": item["bonusId"], + "point": 4 if item["bonusId"] in Bonus5Id else 9, + "isCurrent": item["isCurrent"], + "isComplete": False, } bonusList.append(data) - elif item['bonusId'] == 999: + elif item["bonusId"] == 999: data = { "bonusId": 999, - "point": (item['point'] // 10) * 10 + 9, - "isCurrent": item['isCurrent'], - "isComplete": False + "point": (item["point"] // 10) * 10 + 9, + "isCurrent": item["isCurrent"], + "isComplete": False, } bonusList.append(data) # 存在しないボーナスを追加 @@ -127,19 +137,20 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1): "bonusId": bonusId, "point": 4 if bonusId in Bonus5Id else 9, "isCurrent": False, - "isComplete": False + "isComplete": False, } bonusList.append(data) else: raise SyntaxError("generateMode は 1 または 2 でなければなりません") - + logger.debug(f"ログインボーナスリスト: {bonusList}") return bonusList + if __name__ == "__main__": # ログインボーナスデータをアップロードする userId = testUid currentLoginTimestamp = generateTimestamp() currentLoginResult = apiLogin(currentLoginTimestamp, userId) implLoginBonus(userId, currentLoginTimestamp, currentLoginResult, 2) - apiLogout(currentLoginTimestamp, userId) \ No newline at end of file + apiLogout(currentLoginTimestamp, userId) diff --git a/ActionScoreRecord.py b/ActionScoreRecord.py index bcafaaa..345804d 100644 --- a/ActionScoreRecord.py +++ b/ActionScoreRecord.py @@ -1,71 +1,99 @@ # 删除和上传成绩 from loguru import logger -from Config import * + from HelperLogInOut import apiLogin, apiLogout, generateTimestamp from HelperFullPlay import implFullPlayAction +from MyConfig import testUid -def implDeleteMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int) -> str: - musicData= ({ - "musicId": musicId, - "level": levelId, - "playCount": 1, - "achievement": 0, - "comboStatus": 0, - "syncStatus": 0, - "deluxscoreMax": 0, - "scoreRank": 0, - "extNum1": 0 -}) + +def implDeleteMusicRecord( + userId: int, + currentLoginTimestamp: int, + currentLoginResult, + musicId: int, + levelId: int, +) -> str: + musicData = { + "musicId": musicId, + "level": levelId, + "playCount": 1, + "achievement": 0, + "comboStatus": 0, + "syncStatus": 0, + "deluxscoreMax": 0, + "scoreRank": 0, + "extNum1": 0, + } userAllPatches = { - "upsertUserAll": { - "userMusicDetailList": [musicData], - "isNewMusicDetailList": "0" # 0为编辑,即可删除掉成绩 - }} - result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) + "upsertUserAll": { + "userMusicDetailList": [musicData], + "isNewMusicDetailList": "0", # 0为编辑,即可删除掉成绩 + } + } + result = implFullPlayAction( + userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches + ) return result -def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int, achievement:int, dxScore:int) -> str: + +def implUploadMusicRecord( + userId: int, + currentLoginTimestamp: int, + currentLoginResult, + musicId: int, + levelId: int, + achievement: int, + dxScore: int, +) -> str: """ VERY EARLY STAGE OF UPLOADING SCORES!!!! DO NOT USE THIS!!!! 上传成绩的参考实现。 """ # 要上传的数据 - musicData= ({ - "musicId": musicId, - "level": levelId, - "playCount": 1, - "achievement": achievement, - "comboStatus": 0, - "syncStatus": 0, - "deluxscoreMax": dxScore, - "scoreRank": 0, - "extNum1": 0 -}) + musicData = { + "musicId": musicId, + "level": levelId, + "playCount": 1, + "achievement": achievement, + "comboStatus": 0, + "syncStatus": 0, + "deluxscoreMax": dxScore, + "scoreRank": 0, + "extNum1": 0, + } userAllPatches = { - "upsertUserAll": { - "userMusicDetailList": [musicData], - "isNewMusicDetailList": "1" # 0编辑 1插入 - }} - result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches) + "upsertUserAll": { + "userMusicDetailList": [musicData], + "isNewMusicDetailList": "1", # 0编辑 1插入 + } + } + result = implFullPlayAction( + userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches + ) return result + if __name__ == "__main__": userId = testUid currentLoginTimestamp = generateTimestamp() loginResult = apiLogin(currentLoginTimestamp, userId) - musicId = 852 #852 is tiamat - levelId = 3 #3 is MASTER + musicId = 852 # 852 is tiamat + levelId = 3 # 3 is MASTER - if loginResult['returnCode'] != 1: + if loginResult["returnCode"] != 1: logger.info("登录失败") exit() try: - logger.info(implDeleteMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId)) - #logger.info(implUploadMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId, 1000000, 100)) + logger.info( + implDeleteMusicRecord( + userId, currentLoginTimestamp, loginResult, musicId, levelId + ) + ) + # logger.info(implUploadMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId, 1000000, 100)) logger.info(apiLogout(currentLoginTimestamp, userId)) finally: logger.info(apiLogout(currentLoginTimestamp, userId)) - #logger.warning("Error") + # logger.warning("Error") diff --git a/Best50_To_Diving_Fish.py b/Best50_To_Diving_Fish.py index a35652b..938060a 100644 --- a/Best50_To_Diving_Fish.py +++ b/Best50_To_Diving_Fish.py @@ -113,7 +113,7 @@ def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list: "dxScore": currentMusicDetail["deluxscoreMax"], } ) - except: + except Exception: logger.error(f"无法将 UserMusic 翻译成水鱼格式: {currentMusicDetail}") return divingFishList diff --git a/ForceLogout.py b/ForceLogout.py index 73f5415..1870460 100644 --- a/ForceLogout.py +++ b/ForceLogout.py @@ -1,36 +1,41 @@ # 解小黑屋实现 # 仍十分不完善,不建议使用 - -from Config import * -from API_TitleServer import * -from GetPreview import apiGetUserPreview -from HelperLogInOut import apiLogout -import rapidjson as json -from loguru import logger import time from datetime import datetime -import asyncio + +import rapidjson as json + +from GetPreview import apiGetUserPreview +from HelperLogInOut import apiLogout +from loguru import logger + +from MyConfig import testUid2 + def isUserLoggedIn(userId): - isLogin = json.loads(apiGetUserPreview(userId, True))['isLogin'] + isLogin = json.loads(apiGetUserPreview(userId, True))["isLogin"] logger.debug(f"用户 {userId} 是否登录: {isLogin}") return isLogin + def getHumanReadableTime(unixTime): - '''将 Unix 时间戳转换为人类可读的时间''' + """将 Unix 时间戳转换为人类可读的时间""" # 减一个小时,因为舞萌貌似是 UTC+9 timestamp = int(unixTime) - 3600 - return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') + return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S") + def getMaimaiUNIXTime(mmddhhmmss, year=2025): """ 解析用户输入的 MMDDHHMMSS 格式的时间,返回 Unix 时间戳 时间会被推后一个小时,因为舞萌貌似是 UTC+9? """ - #date_time_str = f"{year}{mmddhhmmss}" + # date_time_str = f"{year}{mmddhhmmss}" # 加上一个小时 - date_time_str = f"{year}{mmddhhmmss[:4]}{int(mmddhhmmss[4:6])+1:02}{mmddhhmmss[6:]}" - date_time_obj = datetime.strptime(date_time_str, '%Y%m%d%H%M%S') + date_time_str = ( + f"{year}{mmddhhmmss[:4]}{int(mmddhhmmss[4:6]) + 1:02}{mmddhhmmss[6:]}" + ) + date_time_obj = datetime.strptime(date_time_str, "%Y%m%d%H%M%S") # 将 datetime 对象转换为 Unix 时间戳 unix_timestamp = int(time.mktime(date_time_obj.timetuple())) logger.info(f"转换出了时间戳: {unix_timestamp}") @@ -41,19 +46,20 @@ def logOut(userId, Timestamp): """极其简单的登出实现,成功返回 True,失败返回 False 注意:不会检查用户是否真的登出了,只会尝试登出""" try: - if apiLogout(Timestamp, userId, True)['returnCode'] == 1: + if apiLogout(Timestamp, userId, True)["returnCode"] == 1: # 成功送出了登出请求 logger.debug(f"已成功尝试登出用户 {userId}") return True - except: + except Exception: logger.error(f"登出用户 {userId} 的时候发生了错误") return False + def isCorrectTimestamp(timestamp, userId): - ''' + """ 动作:给定一个时间戳,用它尝试登出用户,然后检查用户是否成功登出。 如果用户成功登出,返回 True;否则返回 False。 - ''' + """ if not logOut(userId, timestamp): logger.error(f"用时间戳 {timestamp} 登出用户 {userId} 的时候发生了错误") return False @@ -61,6 +67,7 @@ def isCorrectTimestamp(timestamp, userId): logger.debug(f"时间戳 {timestamp} 是否正确: {isLoggedOut}") return isLoggedOut + def findCorrectTimestamp(timestamp, userId, max_attempts=600): # 初始化偏移量 offset = 1 @@ -72,19 +79,19 @@ def findCorrectTimestamp(timestamp, userId, max_attempts=600): if isCorrectTimestamp(currentTryTimestamp, userId): logger.info(f"正确的时间戳: {currentTryTimestamp}") return currentTryTimestamp - + # 增加偏移量尝试 currentTryTimestamp = timestamp + offset if isCorrectTimestamp(currentTryTimestamp, userId): logger.info(f"正确的时间戳(在给定时间以后): {currentTryTimestamp}") return currentTryTimestamp - + # 减少偏移量尝试 currentTryTimestamp = timestamp - offset if isCorrectTimestamp(currentTryTimestamp, userId): logger.info(f"正确的时间戳(在给定时间以前): {currentTryTimestamp}") return currentTryTimestamp - + # 增加尝试次数和偏移量 attempts += 2 offset += 1 @@ -92,6 +99,7 @@ def findCorrectTimestamp(timestamp, userId, max_attempts=600): logger.error(f"无法找到正确的时间戳,尝试次数超过了 {max_attempts}") return None + if __name__ == "__main__": human_time = "0207155500" beginTimestamp = getMaimaiUNIXTime(human_time) diff --git a/HelperUserAll.py b/HelperUserAll.py index c597344..bfce643 100644 --- a/HelperUserAll.py +++ b/HelperUserAll.py @@ -35,7 +35,7 @@ def isNewMusicType(userId, musicId, level) -> str: ): logger.info(f"We think {musicId} Level {level} should use EDIT.") return "0" - except: + except Exception: return "1" diff --git a/MusicDB.py b/MusicDB.py index b2e3047..24aec0e 100644 --- a/MusicDB.py +++ b/MusicDB.py @@ -6,17 +6,17 @@ from typing import Dict, Union MusicDBType = Dict[int, Dict[str, Union[int, str]]] # 将 '__all__' 用于模块导出声明 -__all__ = ['musicDB'] +__all__ = ["musicDB"] # 读取并解析 JSON 文件 try: - with open(musicDBPath, 'r', encoding='utf-8') as f: + with open(musicDBPath, "r", encoding="utf-8") as f: data = json.load(f) except FileNotFoundError: try: - with open(musicDBPathFallback, 'r', encoding='utf-8') as f: + with open(musicDBPathFallback, "r", encoding="utf-8") as f: data = json.load(f) - except: + except Exception: raise FileNotFoundError("musicDB.json 文件不存在!") # 将 JSON 数据转换为指定格式的字典 diff --git a/Standalone/UI.py b/Standalone/UI.py index c2c6bda..79fdd59 100644 --- a/Standalone/UI.py +++ b/Standalone/UI.py @@ -2,30 +2,40 @@ import sys import rapidjson as json from PyQt6.QtWidgets import ( - QApplication, QMainWindow, QWidget, QVBoxLayout, QLineEdit, QTextEdit, QPushButton, QLabel, QHBoxLayout + QApplication, + QMainWindow, + QWidget, + QVBoxLayout, + QLineEdit, + QTextEdit, + QPushButton, + QLabel, + QHBoxLayout, ) from PyQt6.QtCore import Qt # 将当前目录的父目录加入到 sys.path 中 from pathlib import Path + current_dir = Path(__file__).resolve().parent parent_dir = current_dir.parent sys.path.append(str(parent_dir)) from API_TitleServer import * -def sendRequest(requestText:str, apiNameText:str, uid:int) -> str: + +def sendRequest(requestText: str, apiNameText: str, uid: int) -> str: try: data = json.loads(requestText) data = json.dumps(data) - except: + except Exception: return "给出的输入不是有效的 JSON" try: result = apiSDGB(data, apiNameText, uid) except Exception as e: return "请求失败:" + str(e) return result - + class ApiTester(QMainWindow): def __init__(self): @@ -70,21 +80,18 @@ class ApiTester(QMainWindow): self.ResponseTextBox.setReadOnly(True) MainLayout.addWidget(self.ResponseTextBox) - - # 布局设定 MainLayout.setContentsMargins(5, 5, 5, 5) MainLayout.setSpacing(5) MainLayout.setAlignment(Qt.AlignmentFlag.AlignTop) - def prepareRequest(self): # 发送请求用 try: RequestDataString = self.RequestInputBox.toPlainText() TargetAPIString = self.TargetAPIInputBox.text() AgentExtraString = int(self.AgentExtraInputBox.text()) - except: + except Exception: self.ResponseTextBox.setPlainText("输入无效") return @@ -93,12 +100,13 @@ class ApiTester(QMainWindow): # 显示出输出 self.ResponseTextBox.setPlainText(Result) + if __name__ == "__main__": app = QApplication(sys.argv) # Set proper style for each OS - #if sys.platform == "win32": + # if sys.platform == "win32": # app.setStyle("windowsvista") - #else: + # else: # app.setStyle("Fusion") window = ApiTester() window.show()