chore: totally fix lint issues

This commit is contained in:
mokurin000
2025-07-29 17:07:34 +08:00
parent ae6a4d1cf4
commit cb599c5f35
11 changed files with 358 additions and 245 deletions

View File

@@ -8,25 +8,36 @@ import requests
import json
import re
from loguru import logger
# 常量
CHIP_ID = "A63E-01E68606624"
COMMON_KEY = "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW"
API_URL = "http://ai.sys-allnet.cn/wc_aime/api/get_data"
# 计算 SHA256
def getSHA256(input_str):
"""SHA256计算"""
return hashlib.sha256(input_str.encode('utf-8')).hexdigest().upper()
return hashlib.sha256(input_str.encode("utf-8")).hexdigest().upper()
# 生成时间戳
def generateSEGATimestamp():
"""SEGA格式的 YYMMDDHHMMSS 时间戳sb玩意"""
return time.strftime("%y%m%d%H%M%S", time.localtime())
# 计算认证 key
def calcSEGAAimeDBAuthKey(varString:str, timestamp:str, commonKey:str="XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW") -> str:
def calcSEGAAimeDBAuthKey(
varString: str, timestamp: str, commonKey: str = "XcW5FW4cPArBXEk4vzKz3CIrMuA5EVVW"
) -> str:
"""计算 SEGA AimeDB 的认证 key"""
return hashlib.sha256((varString + timestamp + commonKey).encode("utf-8")).hexdigest().upper()
return (
hashlib.sha256((varString + timestamp + commonKey).encode("utf-8"))
.hexdigest()
.upper()
)
def apiAimeDB(qrCode):
"""AimeDB 扫码 API 实现"""
@@ -42,11 +53,11 @@ def apiAimeDB(qrCode):
"openGameID": "MAID",
"key": currentKey,
"qrCode": qrCode,
"timestamp": timestamp
"timestamp": timestamp,
}
# 输出准备好的请求数据
print("Payload:", json.dumps(payload, separators=(',', ':')))
print("Payload:", json.dumps(payload, separators=(",", ":")))
# 发送 POST 请求
headers = {
@@ -55,7 +66,9 @@ def apiAimeDB(qrCode):
"User-Agent": "WC_AIME_LIB",
"Content-Type": "application/json",
}
response = requests.post(API_URL, data=json.dumps(payload, separators=(',', ':')), headers=headers)
response = requests.post(
API_URL, data=json.dumps(payload, separators=(",", ":")), headers=headers
)
# 返回服务器的响应
return response
@@ -101,18 +114,19 @@ def implGetUID(qr_content:str) -> dict:
"""
# 检查格式
if not isSGWCFormat(qr_content):
return {'errorID': 60001} # 二维码内容明显无效
return {"errorID": 60001} # 二维码内容明显无效
# 发送请求并处理响应
try:
result = json.loads(implAimeDB(qr_content))
logger.info(f"QRScan Got Response {result}")
except:
return {'errorID': 60002} # 无法解码 Response 的内容
except Exception:
return {"errorID": 60002} # 无法解码 Response 的内容
# 返回结果
return result
if __name__ == "__main__":
userInputQR = input("QRCode: ")
print(implAimeDB(userInputQR))

View File

@@ -7,71 +7,77 @@ from loguru import logger
from urllib.parse import parse_qs
import configparser as ini
LITE_AUTH_KEY = bytes([47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71])
LITE_AUTH_IV = bytes.fromhex('00000000000000000000000000000000')
LITE_AUTH_KEY = bytes(
[47, 63, 106, 111, 43, 34, 76, 38, 92, 67, 114, 57, 40, 61, 107, 71]
)
LITE_AUTH_IV = bytes.fromhex("00000000000000000000000000000000")
def auth_lite_encrypt(plaintext: str) -> bytes:
# 构造数据16字节头 + 16字节0前缀 + 明文
header = bytes(16)
content = bytes(16) + plaintext.encode('utf-8')
content = bytes(16) + plaintext.encode("utf-8")
data = header + content
# 填充并加密
padded_data = pad(data, AES.block_size)
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
return cipher.encrypt(padded_data)
def auth_lite_decrypt(ciphertext: bytes) -> str:
# 解密并去除填充
cipher = AES.new(LITE_AUTH_KEY, AES.MODE_CBC, LITE_AUTH_IV)
decrypted_data = unpad(cipher.decrypt(ciphertext), AES.block_size)
# 提取内容并解码
content = decrypted_data[16:] # 去除头部的16字节
return content.decode('utf-8').strip()
return content.decode("utf-8").strip()
def getRawDelivery(title_ver: str = "1.51"):
encrypted = auth_lite_encrypt(f'title_id=SDGB&title_ver={title_ver}&client_id=A63E01C2805')
encrypted = auth_lite_encrypt(
f"title_id=SDGB&title_ver={title_ver}&client_id=A63E01C2805"
)
r = httpx.post(
'http://at.sys-allnet.cn/net/delivery/instruction',
"http://at.sys-allnet.cn/net/delivery/instruction",
data=encrypted,
headers = {
'User-Agent': "SDGB;Windows/Lite",
'Pragma': 'DFI'
}
headers={"User-Agent": "SDGB;Windows/Lite", "Pragma": "DFI"},
)
resp_data = r.content
decrypted_str = auth_lite_decrypt(resp_data)
# 过滤所有控制字符
decrypted_str = ''.join([i for i in decrypted_str if 31 < ord(i) < 127])
decrypted_str = "".join([i for i in decrypted_str if 31 < ord(i) < 127])
logger.info(f"RAW Response: {decrypted_str}")
return decrypted_str
def parseRawDelivery(deliveryStr):
"""解析 RAW 的 Delivery 字符串,返回其中的有效的 instruction URL 的列表"""
parsedResponseDict = {key: value[0] for key, value in parse_qs(deliveryStr).items()}
urlList = parsedResponseDict['uri'].split('|')
urlList = parsedResponseDict["uri"].split("|")
# 过滤掉空字符串和内容为 null 的情况
urlList = [url for url in urlList if url and url != 'null']
urlList = [url for url in urlList if url and url != "null"]
logger.info(f"Parsed URL List: {urlList}")
validURLs = []
for url in urlList:
# 检查是否是 HTTPS 的 URL以及是否是 txt 文件,否则忽略
if not url.startswith('https://') or not url.endswith('.txt'):
if not url.startswith("https://") or not url.endswith(".txt"):
logger.warning(f"Invalid URL will be ignored: {url}")
continue
validURLs.append(url)
logger.info(f"Verified Valid URLs: {validURLs}")
return validURLs
def getUpdateIniFromURL(url):
# 发送请求
response = httpx.get(url, headers={
'User-Agent': 'SDGB;Windows/Lite',
'Pragma': 'DFI'
})
response = httpx.get(
url, headers={"User-Agent": "SDGB;Windows/Lite", "Pragma": "DFI"}
)
logger.info(f"成功自 {url} 获取更新信息")
return response.text
def parseUpdateIni(iniText):
# 解析配置
config = ini.ConfigParser(allow_no_value=True)
@@ -79,47 +85,50 @@ def parseUpdateIni(iniText):
logger.info(f"成功解析配置文件,包含的节有:{config.sections()}")
# 获取 COMMON 节的配置
common = config['COMMON']
common = config["COMMON"]
# 初始化消息列表
message = []
# 获取游戏描述并去除引号
game_desc = common['GAME_DESC'].strip('"')
game_desc = common["GAME_DESC"].strip('"')
# 根据前缀选择消息模板和图标
prefix_icons = {
'PATCH': ('💾 游戏程序更新 (.app) ', 'PATCH_'),
'OPTION': ('📚 游戏内容更新 (.opt) ', 'OPTION_')
"PATCH": ("💾 游戏程序更新 (.app) ", "PATCH_"),
"OPTION": ("📚 游戏内容更新 (.opt) ", "OPTION_"),
}
icon, prefix = prefix_icons.get(game_desc.split('_')[0], ('📦 游戏更新 ', ''))
icon, prefix = prefix_icons.get(game_desc.split("_")[0], ("📦 游戏更新 ", ""))
# 构建消息标题
game_title = game_desc.replace(prefix, '', 1)
game_title = game_desc.replace(prefix, "", 1)
message.append(f"{icon}{game_title}")
# 添加可选文件的下载链接(如果有)
if 'OPTIONAL' in config:
if "OPTIONAL" in config:
message.append("往期更新包:")
optional_files = [f"{url.split('/')[-1]} {url}" for _, url in config.items('OPTIONAL')]
optional_files = [
f"{url.split('/')[-1]} {url}" for _, url in config.items("OPTIONAL")
]
message.extend(optional_files)
# 添加主文件的下载链接
main_file = common['INSTALL1']
main_file_name = main_file.split('/')[-1]
main_file = common["INSTALL1"]
main_file_name = main_file.split("/")[-1]
message.append(f"此次更新包: \n{main_file_name} {main_file}")
# 添加发布时间信息
release_time = common['RELEASE_TIME'].replace('T', ' ')
release_time = common["RELEASE_TIME"].replace("T", " ")
message.append(f"正式发布时间:{release_time}\n")
# 构建最终的消息字符串
final_message = '\n'.join(message)
final_message = "\n".join(message)
logger.info(f"消息构建完成,最终的消息为:\n{final_message}")
return final_message
if __name__ == '__main__':
if __name__ == "__main__":
urlList = parseRawDelivery(getRawDelivery("1.51"))
for url in urlList:
iniText = getUpdateIniFromURL(url)

View File

@@ -1,18 +1,20 @@
# 舞萌DX
# 标题服务器通讯实现
import zlib
import hashlib
import httpx
from loguru import logger
import random
import time
import httpx
from loguru import logger
from ctypes import c_int32
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Config import *
from typing import Optional
import certifi
from Config import (
useProxy,
proxyUrl,
)
use2024Api = False # 是否使用 2024 API
@@ -25,19 +27,23 @@ else:
AesIV = "d6xHIKq]1J]Dt^ue"
ObfuscateParam = "B44df8yT"
class SDGBApiError(Exception):
pass
class SDGBRequestError(SDGBApiError):
pass
class SDGBResponseError(SDGBApiError):
pass
class aes_pkcs7(object):
def __init__(self, key: str, iv: str):
self.key = key.encode('utf-8')
self.iv = iv.encode('utf-8')
self.key = key.encode("utf-8")
self.iv = iv.encode("utf-8")
self.mode = AES.MODE_CBC
def encrypt(self, content: bytes) -> bytes:
@@ -60,17 +66,19 @@ class aes_pkcs7(object):
def pkcs7padding(self, text):
bs = 16
length = len(text)
bytes_length = len(text.encode('utf-8'))
bytes_length = len(text.encode("utf-8"))
padding_size = length if (bytes_length == length) else bytes_length
padding = bs - padding_size % bs
padding_text = chr(padding) * padding
return text + padding_text
def getSDGBApiHash(api):
# API 的 Hash 的生成
# 有空做一下 Hash 的彩虹表?
return hashlib.md5((api + "MaimaiChn" + ObfuscateParam).encode()).hexdigest()
def apiSDGB(
data: str,
targetApi: str,
@@ -94,7 +102,7 @@ def apiSDGB(
endpoint = "https://maimai-gm.wahlap.com:42081/Maimai2Servlet/"
# 准备好请求数据
requestDataFinal = aes.encrypt(zlib.compress(data.encode('utf-8')))
requestDataFinal = aes.encrypt(zlib.compress(data.encode("utf-8")))
if not noLog:
logger.debug(f"[Stage 1] 准备开始请求 {targetApi},以 {data}")
@@ -119,10 +127,10 @@ def apiSDGB(
"Accept-Encoding": "",
"Charset": "UTF-8",
"Content-Encoding": "deflate",
"Expect": "100-continue"
"Expect": "100-continue",
},
content=requestDataFinal, # 数据
timeout=timeout
timeout=timeout,
)
if not noLog:
@@ -142,30 +150,42 @@ def apiSDGB(
if not noLog:
logger.debug("[Stage 3] Decryption SUCCESS.")
except Exception as e:
logger.warning(f"[Stage 3] Decryption FAILED. Raw Content: {responseContentRaw}, Error: {e}")
logger.warning(
f"[Stage 3] Decryption FAILED. Raw Content: {responseContentRaw}, Error: {e}"
)
raise SDGBResponseError("Decryption failed")
# 然后尝试解压
try:
# 看看文件头是否是压缩过的
if responseContentDecrypted.startswith(b'\x78\x9c'):
if responseContentDecrypted.startswith(b"\x78\x9c"):
logger.debug("[Stage 4] Zlib detected, decompressing...")
responseContentFinal = zlib.decompress(responseContentDecrypted).decode('utf-8')
responseContentFinal = zlib.decompress(
responseContentDecrypted
).decode("utf-8")
else:
logger.warning(f"[Stage 4] Not Zlib Format!! using raw content: {responseContentDecrypted}")
responseContentFinal = responseContentDecrypted.decode('utf-8')
logger.warning(
f"[Stage 4] Not Zlib Format!! using raw content: {responseContentDecrypted}"
)
responseContentFinal = responseContentDecrypted.decode("utf-8")
# 完成解压
if not noLog:
logger.debug(f"[Stage 4] Process OK, Content: {responseContentFinal}")
logger.debug(
f"[Stage 4] Process OK, Content: {responseContentFinal}"
)
# 最终处理,检查是否是 JSON 格式
if responseContentFinal.startswith('{') and responseContentFinal.endswith('}'):
if responseContentFinal.startswith(
"{"
) and responseContentFinal.endswith("}"):
# 如果是 JSON 格式,直接返回
logger.debug("[Stage 5] Response is JSON, returning.")
return responseContentFinal
else:
# 如果不是 JSON 格式,直接返回但是警告
logger.warning("[Stage 5] Response is not JSON, returning as is, take care!")
logger.warning(
"[Stage 5] Response is not JSON, returning as is, take care!"
)
return responseContentFinal
except:
except Exception:
logger.warning(f"解压失败,原始响应: {responseContentDecrypted}")
raise SDGBResponseError("解压失败")
except SDGBRequestError as e:
@@ -181,11 +201,12 @@ def apiSDGB(
time.sleep(2)
finally:
if 'httpClient' in locals():
if "httpClient" in locals():
httpClient.close()
raise SDGBApiError("重试多次仍然无法成功请求服务器")
def calcPlaySpecial():
"""使用 c_int32 实现的 SpecialNumber 算法"""
rng = random.SystemRandom()
@@ -199,21 +220,24 @@ def calcPlaySpecial():
num2 >>= 1
return c_int32(result.value).value
class AESPKCS7_2024:
# 实现了 maimai 通讯所用的 AES 加密的类
def __init__(self, key: str, iv: str):
self.key = key.encode('utf-8')
self.iv = iv.encode('utf-8')
self.key = key.encode("utf-8")
self.iv = iv.encode("utf-8")
self.mode = AES.MODE_CBC
# 加密
def encrypt(self, content) -> bytes:
# if content is str, convert to bytes
if isinstance(content, str):
encodedData = content.encode('utf-8')
encodedData = content.encode("utf-8")
cipher = AES.new(self.key, self.mode, self.iv)
content_padded = pad(encodedData, AES.block_size)
encrypted_bytes = cipher.encrypt(content_padded)
return encrypted_bytes
# 解密
def decrypt(self, encrypted_content: bytes) -> str:
cipher = AES.new(self.key, self.mode, self.iv)
@@ -221,7 +245,14 @@ class AESPKCS7_2024:
decrypted = unpad(decrypted_padded, AES.block_size)
return decrypted
def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=False, timeout:int=5):
def apiSDGB_2024(
data: str,
targetApi: str,
userAgentExtraData: str,
noLog: bool = False,
timeout: int = 5,
):
"""
舞萌DX 2024 API 通讯用函数
:param data: 请求数据
@@ -258,13 +289,13 @@ def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=Fal
"Accept-Encoding": "",
"Charset": "UTF-8",
"Content-Encoding": "deflate",
"Expect": "100-continue"
"Expect": "100-continue",
},
content=reqData_deflated,
# 经测试,加 Verify 之后速度慢好多,因此建议选择性开
# verify=certifi.where(),
# verify=False,
timeout=timeout
timeout=timeout,
)
if not noLog:
@@ -282,13 +313,13 @@ def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=Fal
try:
responseDecompressed = zlib.decompress(responseRAWContent)
logger.debug("成功解压响应!")
except:
except Exception:
logger.warning(f"无法解压,得到的原始响应: {responseRAWContent}")
raise SDGBResponseError("解压失败")
try:
resultResponse = aes.decrypt(responseDecompressed)
logger.debug(f"成功解密响应!")
except:
logger.debug("成功解密响应!")
except Exception:
logger.warning(f"解密失败,得到的原始响应: {responseDecompressed}")
raise SDGBResponseError("解密失败")
@@ -297,7 +328,7 @@ def apiSDGB_2024(data:str, targetApi:str, userAgentExtraData:str, noLog:bool=Fal
return resultResponse
# 异常处理
except SDGBRequestError as e:
except SDGBRequestError:
# 请求格式错误,不需要重试
raise SDGBRequestError("请求格式错误")
except SDGBResponseError as e:

View File

@@ -7,19 +7,19 @@ from HelperFullPlay import implFullPlayAction, generateMusicData
from HelperGetUserThing import implGetUser_
from MyConfig import testUid8
def implWipeTickets(userId: int, currentLoginTimestamp: int, currentLoginResult) -> str:
# Get User Charge
currentUserCharge = implGetUser_("Charge", userId)
currentUserChargeList = currentUserCharge['userChargeList']
currentUserChargeList = currentUserCharge["userChargeList"]
# All stock set to 0
for charge in currentUserChargeList:
charge['stock'] = 0
charge["stock"] = 0
# example format
# {"userId":11088995,"length":16,"userChargeList":[{"chargeId":1,"stock":0,"purchaseDate":"2025-02-04 00:51:50","validDate":"2025-02-04 00:51:50","extNum1":0},{"chargeId":2,"stock":0,"purchaseDate":"2025-06-11 17:19:42","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":3,"stock":0,"purchaseDate":"2025-06-11 17:19:40","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":4,"stock":0,"purchaseDate":"2025-06-11 09:34:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":5,"stock":0,"purchaseDate":"2025-01-30 12:31:16","validDate":"2025-04-30 04:00:00","extNum1":0},{"chargeId":6,"stock":0,"purchaseDate":"2025-02-17 20:01:42","validDate":"2025-02-17 20:01:42","extNum1":0},{"chargeId":7,"stock":0,"purchaseDate":"2025-02-06 16:17:41","validDate":"2025-02-06 16:17:41","extNum1":0},{"chargeId":8,"stock":0,"purchaseDate":"2025-02-06 16:17:49","validDate":"2025-02-06 16:17:49","extNum1":0},{"chargeId":9,"stock":0,"purchaseDate":"2025-02-06 16:18:00","validDate":"2025-02-06 16:18:00","extNum1":0},{"chargeId":10001,"stock":1,"purchaseDate":"2025-06-11 17:19:51","validDate":"2025-09-09 04:00:00","extNum1":0},{"chargeId":10005,"stock":0,"purchaseDate":"2025-04-25 15:45:55","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10105,"stock":0,"purchaseDate":"2025-04-25 15:46:00","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":10205,"stock":0,"purchaseDate":"2025-04-25 15:46:03","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":11001,"stock":0,"purchaseDate":"2025-01-08 20:43:05","validDate":"2025-04-08 04:00:00","extNum1":0},{"chargeId":30001,"stock":0,"purchaseDate":"2025-04-25 15:46:17","validDate":"2025-07-24 04:00:00","extNum1":0},{"chargeId":999999,"stock":0,"purchaseDate":"2025-02-06 23:03:14","validDate":"2025-02-06 23:03:14","extNum1":0}]}
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
@@ -29,18 +29,22 @@ def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult)
# }],
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" #1避免覆盖
}}
"isNewMusicDetailList": "1", # 1避免覆盖
}
}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
result = implFullPlayAction(
userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches
)
return result
if __name__ == "__main__":
userId = testUid8
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1:
if loginResult["returnCode"] != 1:
logger.info("登录失败")
exit()
try:

View File

@@ -5,23 +5,29 @@ import rapidjson as json
from loguru import logger
import xml.etree.ElementTree as ET
from Config import *
from Config import (
loginBonusDBPath,
loginBonusDBPathFallback,
)
from MyConfig import testUid
from API_TitleServer import apiSDGB
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
class NoSelectedBonusError(Exception):
pass
def apiQueryLoginBonus(userId: int) -> str:
"""ログインボーナスを取得する API"""
data = json.dumps({
"userId": int(userId),
"nextIndex": 0,
"maxCount": 2000
})
data = json.dumps({"userId": int(userId), "nextIndex": 0, "maxCount": 2000})
return apiSDGB(data, "GetUserLoginBonusApi", userId)
def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, bonusGenerateMode=1):
def implLoginBonus(
userId: int, currentLoginTimestamp: int, currentLoginResult, bonusGenerateMode=1
):
"""
ログインボーナスデータをアップロードする
bonusGenerateMode は、ログインボーナスを生成する方法を指定します。
@@ -30,14 +36,10 @@ def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, b
"""
musicData = generateMusicData()
# サーバーからログインボーナスデータを取得
data = json.dumps({
"userId": int(userId),
"nextIndex": 0,
"maxCount": 2000
})
data = json.dumps({"userId": int(userId), "nextIndex": 0, "maxCount": 2000})
UserLoginBonusResponse = json.loads(apiSDGB(data, "GetUserLoginBonusApi", userId))
# ログインボーナスリストを生成、それから処理してアップロード
UserLoginBonusList = UserLoginBonusResponse['userLoginBonusList']
UserLoginBonusList = UserLoginBonusResponse["userLoginBonusList"]
finalBonusList = generateLoginBonusList(UserLoginBonusList, bonusGenerateMode)
if not finalBonusList:
return False # ログインボーナスを選択していないから失敗
@@ -47,11 +49,15 @@ def implLoginBonus(userId: int, currentLoginTimestamp:int, currentLoginResult, b
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1", # 上書きしない
"userLoginBonusList": finalBonusList,
"isNewLoginBonusList": "0" * len(finalBonusList)
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
"isNewLoginBonusList": "0" * len(finalBonusList),
}
}
result = implFullPlayAction(
userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches
)
return result
def generateLoginBonusList(UserLoginBonusList, generateMode=1):
"""
ログインボーナスリストを生成します。
@@ -65,15 +71,19 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
try:
tree = ET.parse(loginBonusDBPath)
root = tree.getroot()
loginBonusIdList = [int(item.find('id').text) for item in root.findall('.//StringID')]
loginBonusIdList = [
int(item.find("id").text) for item in root.findall(".//StringID")
]
logger.debug(f"ログインボーナスIDリスト: {loginBonusIdList}")
except FileNotFoundError:
try:
tree = ET.parse(loginBonusDBPathFallback)
root = tree.getroot()
loginBonusIdList = [int(item.find('id').text) for item in root.findall('.//StringID')]
loginBonusIdList = [
int(item.find("id").text) for item in root.findall(".//StringID")
]
logger.debug(f"ログインボーナスIDリスト: {loginBonusIdList}")
except:
except Exception:
raise FileNotFoundError("ログインボーナスデータベースを読み込めません")
# ログインボーナスの MAX POINT は5の場合があります
@@ -82,7 +92,7 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
Bonus5Id = [12, 29, 30, 38, 43, 604, 611]
# UserBonusList から bonusId を取得
UserLoginBonusIdList = [item['bonusId'] for item in UserLoginBonusList]
UserLoginBonusIdList = [item["bonusId"] for item in UserLoginBonusList]
# 存在しないボーナス
NonExistingBonuses = list(set(loginBonusIdList) - set(UserLoginBonusIdList))
@@ -91,13 +101,13 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
bonusList = []
if generateMode == 1: # 選択したボーナスのみ MAX にする
for item in UserLoginBonusList:
if item['isCurrent'] and not item['isComplete']:
point = 4 if item['bonusId'] in Bonus5Id else 9
if item["isCurrent"] and not item["isComplete"]:
point = 4 if item["bonusId"] in Bonus5Id else 9
data = {
"bonusId": item['bonusId'],
"bonusId": item["bonusId"],
"point": point,
"isCurrent": True,
"isComplete": False
"isComplete": False,
}
bonusList.append(data)
if len(bonusList) == 0:
@@ -105,20 +115,20 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
elif generateMode == 2: # 全部 MAX にする
# 存在しているボーナスを追加
for item in UserLoginBonusList:
if not item['isComplete']:
if not item["isComplete"]:
data = {
"bonusId": item['bonusId'],
"point": 4 if item['bonusId'] in Bonus5Id else 9,
"isCurrent": item['isCurrent'],
"isComplete": False
"bonusId": item["bonusId"],
"point": 4 if item["bonusId"] in Bonus5Id else 9,
"isCurrent": item["isCurrent"],
"isComplete": False,
}
bonusList.append(data)
elif item['bonusId'] == 999:
elif item["bonusId"] == 999:
data = {
"bonusId": 999,
"point": (item['point'] // 10) * 10 + 9,
"isCurrent": item['isCurrent'],
"isComplete": False
"point": (item["point"] // 10) * 10 + 9,
"isCurrent": item["isCurrent"],
"isComplete": False,
}
bonusList.append(data)
# 存在しないボーナスを追加
@@ -127,7 +137,7 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
"bonusId": bonusId,
"point": 4 if bonusId in Bonus5Id else 9,
"isCurrent": False,
"isComplete": False
"isComplete": False,
}
bonusList.append(data)
else:
@@ -136,6 +146,7 @@ def generateLoginBonusList(UserLoginBonusList, generateMode=1):
logger.debug(f"ログインボーナスリスト: {bonusList}")
return bonusList
if __name__ == "__main__":
# ログインボーナスデータをアップロードする
userId = testUid

View File

@@ -1,12 +1,20 @@
# 删除和上传成绩
from loguru import logger
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction
from MyConfig import testUid
def implDeleteMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int) -> str:
musicData= ({
def implDeleteMusicRecord(
userId: int,
currentLoginTimestamp: int,
currentLoginResult,
musicId: int,
levelId: int,
) -> str:
musicData = {
"musicId": musicId,
"level": levelId,
"playCount": 1,
@@ -15,24 +23,36 @@ def implDeleteMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginRe
"syncStatus": 0,
"deluxscoreMax": 0,
"scoreRank": 0,
"extNum1": 0
})
"extNum1": 0,
}
userAllPatches = {
"upsertUserAll": {
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "0" # 0为编辑即可删除掉成绩
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
"isNewMusicDetailList": "0", # 0为编辑即可删除掉成绩
}
}
result = implFullPlayAction(
userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches
)
return result
def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginResult, musicId:int, levelId:int, achievement:int, dxScore:int) -> str:
def implUploadMusicRecord(
userId: int,
currentLoginTimestamp: int,
currentLoginResult,
musicId: int,
levelId: int,
achievement: int,
dxScore: int,
) -> str:
"""
VERY EARLY STAGE OF UPLOADING SCORES!!!! DO NOT USE THIS!!!!
上传成绩的参考实现。
"""
# 要上传的数据
musicData= ({
musicData = {
"musicId": musicId,
"level": levelId,
"playCount": 1,
@@ -41,16 +61,20 @@ def implUploadMusicRecord(userId: int, currentLoginTimestamp:int, currentLoginRe
"syncStatus": 0,
"deluxscoreMax": dxScore,
"scoreRank": 0,
"extNum1": 0
})
"extNum1": 0,
}
userAllPatches = {
"upsertUserAll": {
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" # 0编辑 1插入
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
"isNewMusicDetailList": "1", # 0编辑 1插入
}
}
result = implFullPlayAction(
userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches
)
return result
if __name__ == "__main__":
userId = testUid
currentLoginTimestamp = generateTimestamp()
@@ -59,11 +83,15 @@ if __name__ == "__main__":
musicId = 852 # 852 is tiamat
levelId = 3 # 3 is MASTER
if loginResult['returnCode'] != 1:
if loginResult["returnCode"] != 1:
logger.info("登录失败")
exit()
try:
logger.info(implDeleteMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId))
logger.info(
implDeleteMusicRecord(
userId, currentLoginTimestamp, loginResult, musicId, levelId
)
)
# logger.info(implUploadMusicRecord(userId, currentLoginTimestamp, loginResult, musicId, levelId, 1000000, 100))
logger.info(apiLogout(currentLoginTimestamp, userId))
finally:

View File

@@ -113,7 +113,7 @@ def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list:
"dxScore": currentMusicDetail["deluxscoreMax"],
}
)
except:
except Exception:
logger.error(f"无法将 UserMusic 翻译成水鱼格式: {currentMusicDetail}")
return divingFishList

View File

@@ -1,26 +1,29 @@
# 解小黑屋实现
# 仍十分不完善,不建议使用
from Config import *
from API_TitleServer import *
from GetPreview import apiGetUserPreview
from HelperLogInOut import apiLogout
import rapidjson as json
from loguru import logger
import time
from datetime import datetime
import asyncio
import rapidjson as json
from GetPreview import apiGetUserPreview
from HelperLogInOut import apiLogout
from loguru import logger
from MyConfig import testUid2
def isUserLoggedIn(userId):
isLogin = json.loads(apiGetUserPreview(userId, True))['isLogin']
isLogin = json.loads(apiGetUserPreview(userId, True))["isLogin"]
logger.debug(f"用户 {userId} 是否登录: {isLogin}")
return isLogin
def getHumanReadableTime(unixTime):
'''将 Unix 时间戳转换为人类可读的时间'''
"""将 Unix 时间戳转换为人类可读的时间"""
# 减一个小时,因为舞萌貌似是 UTC+9
timestamp = int(unixTime) - 3600
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
return datetime.fromtimestamp(timestamp).strftime("%Y-%m-%d %H:%M:%S")
def getMaimaiUNIXTime(mmddhhmmss, year=2025):
"""
@@ -29,8 +32,10 @@ def getMaimaiUNIXTime(mmddhhmmss, year=2025):
"""
# date_time_str = f"{year}{mmddhhmmss}"
# 加上一个小时
date_time_str = f"{year}{mmddhhmmss[:4]}{int(mmddhhmmss[4:6])+1:02}{mmddhhmmss[6:]}"
date_time_obj = datetime.strptime(date_time_str, '%Y%m%d%H%M%S')
date_time_str = (
f"{year}{mmddhhmmss[:4]}{int(mmddhhmmss[4:6]) + 1:02}{mmddhhmmss[6:]}"
)
date_time_obj = datetime.strptime(date_time_str, "%Y%m%d%H%M%S")
# 将 datetime 对象转换为 Unix 时间戳
unix_timestamp = int(time.mktime(date_time_obj.timetuple()))
logger.info(f"转换出了时间戳: {unix_timestamp}")
@@ -41,19 +46,20 @@ def logOut(userId, Timestamp):
"""极其简单的登出实现,成功返回 True失败返回 False
注意:不会检查用户是否真的登出了,只会尝试登出"""
try:
if apiLogout(Timestamp, userId, True)['returnCode'] == 1:
if apiLogout(Timestamp, userId, True)["returnCode"] == 1:
# 成功送出了登出请求
logger.debug(f"已成功尝试登出用户 {userId}")
return True
except:
except Exception:
logger.error(f"登出用户 {userId} 的时候发生了错误")
return False
def isCorrectTimestamp(timestamp, userId):
'''
"""
动作:给定一个时间戳,用它尝试登出用户,然后检查用户是否成功登出。
如果用户成功登出,返回 True否则返回 False。
'''
"""
if not logOut(userId, timestamp):
logger.error(f"用时间戳 {timestamp} 登出用户 {userId} 的时候发生了错误")
return False
@@ -61,6 +67,7 @@ def isCorrectTimestamp(timestamp, userId):
logger.debug(f"时间戳 {timestamp} 是否正确: {isLoggedOut}")
return isLoggedOut
def findCorrectTimestamp(timestamp, userId, max_attempts=600):
# 初始化偏移量
offset = 1
@@ -92,6 +99,7 @@ def findCorrectTimestamp(timestamp, userId, max_attempts=600):
logger.error(f"无法找到正确的时间戳,尝试次数超过了 {max_attempts}")
return None
if __name__ == "__main__":
human_time = "0207155500"
beginTimestamp = getMaimaiUNIXTime(human_time)

View File

@@ -35,7 +35,7 @@ def isNewMusicType(userId, musicId, level) -> str:
):
logger.info(f"We think {musicId} Level {level} should use EDIT.")
return "0"
except:
except Exception:
return "1"

View File

@@ -6,17 +6,17 @@ from typing import Dict, Union
MusicDBType = Dict[int, Dict[str, Union[int, str]]]
# 将 '__all__' 用于模块导出声明
__all__ = ['musicDB']
__all__ = ["musicDB"]
# 读取并解析 JSON 文件
try:
with open(musicDBPath, 'r', encoding='utf-8') as f:
with open(musicDBPath, "r", encoding="utf-8") as f:
data = json.load(f)
except FileNotFoundError:
try:
with open(musicDBPathFallback, 'r', encoding='utf-8') as f:
with open(musicDBPathFallback, "r", encoding="utf-8") as f:
data = json.load(f)
except:
except Exception:
raise FileNotFoundError("musicDB.json 文件不存在!")
# 将 JSON 数据转换为指定格式的字典

View File

@@ -2,23 +2,33 @@ import sys
import rapidjson as json
from PyQt6.QtWidgets import (
QApplication, QMainWindow, QWidget, QVBoxLayout, QLineEdit, QTextEdit, QPushButton, QLabel, QHBoxLayout
QApplication,
QMainWindow,
QWidget,
QVBoxLayout,
QLineEdit,
QTextEdit,
QPushButton,
QLabel,
QHBoxLayout,
)
from PyQt6.QtCore import Qt
# 将当前目录的父目录加入到 sys.path 中
from pathlib import Path
current_dir = Path(__file__).resolve().parent
parent_dir = current_dir.parent
sys.path.append(str(parent_dir))
from API_TitleServer import *
def sendRequest(requestText: str, apiNameText: str, uid: int) -> str:
try:
data = json.loads(requestText)
data = json.dumps(data)
except:
except Exception:
return "给出的输入不是有效的 JSON"
try:
result = apiSDGB(data, apiNameText, uid)
@@ -70,21 +80,18 @@ class ApiTester(QMainWindow):
self.ResponseTextBox.setReadOnly(True)
MainLayout.addWidget(self.ResponseTextBox)
# 布局设定
MainLayout.setContentsMargins(5, 5, 5, 5)
MainLayout.setSpacing(5)
MainLayout.setAlignment(Qt.AlignmentFlag.AlignTop)
def prepareRequest(self):
# 发送请求用
try:
RequestDataString = self.RequestInputBox.toPlainText()
TargetAPIString = self.TargetAPIInputBox.text()
AgentExtraString = int(self.AgentExtraInputBox.text())
except:
except Exception:
self.ResponseTextBox.setPlainText("输入无效")
return
@@ -93,6 +100,7 @@ class ApiTester(QMainWindow):
# 显示出输出
self.ResponseTextBox.setPlainText(Result)
if __name__ == "__main__":
app = QApplication(sys.argv)
# Set proper style for each OS