refactor: module imports

This commit is contained in:
mokurin000
2025-07-29 02:27:10 +08:00
parent e99f04c416
commit 12093c9b9b
12 changed files with 651 additions and 512 deletions

View File

@@ -1,10 +1,11 @@
# 改变版本号,实现伪封号和解封号之类
from loguru import logger
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
from HelperGetUserThing import implGetUser_
from MyConfig import testUid8
def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
# Get User Charge
@@ -35,7 +36,7 @@ def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult)
return result
if __name__ == "__main__":
userId = testUid2
userId = testUid8
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)

View File

@@ -52,7 +52,7 @@ if __name__ == "__main__":
exit()
try:
logger.info(
implUnlockSingleItem(10, 14, userId, currentLoginTimestamp, loginResult)
implUnlockSingleItem(14, 10, userId, currentLoginTimestamp, loginResult)
)
logger.info(apiLogout(currentLoginTimestamp, userId))
finally:

View File

@@ -1,57 +1,56 @@
from API_TitleServer import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from Config import *
import requests
from loguru import logger
from HelperGetUserMusicDetail import getUserFullMusicDetail
from HelperMusicDB import getMusicTitle
import requests
import rapidjson as json
class divingFishAuthFailError(Exception):
pass
class divingFishCommError(Exception):
pass
# 水鱼查分器的 API 地址
BASE_URL = 'https://www.diving-fish.com/api/maimaidxprober'
BASE_URL = "https://www.diving-fish.com/api/maimaidxprober"
# 水鱼查分器的成绩状态转换
COMBO_ID_TO_NAME = ['', 'fc', 'fcp', 'ap', 'app']
SYNC_ID_TO_NAME = ['', 'fs', 'fsp', 'fsd', 'fsdp', 'sync']
COMBO_ID_TO_NAME = ["", "fc", "fcp", "ap", "app"]
SYNC_ID_TO_NAME = ["", "fs", "fsp", "fsd", "fsdp", "sync"]
def apiDivingFish(method:str, apiPath:str, importToken:str, data=None):
'''水鱼查分器的 API 通讯实现'''
headers = {
"Import-Token": importToken
}
if method == 'POST':
headers['Content-Type'] = 'application/json'
logger.info(f'水鱼查分器 API 请求:{method} {BASE_URL + apiPath}')
if method == 'POST':
def apiDivingFish(method: str, apiPath: str, importToken: str, data=None):
"""水鱼查分器的 API 通讯实现"""
headers = {"Import-Token": importToken}
if method == "POST":
headers["Content-Type"] = "application/json"
logger.info(f"水鱼查分器 API 请求:{method} {BASE_URL + apiPath}")
if method == "POST":
response = requests.post(
url=BASE_URL + apiPath,
json=data,
headers=headers,
)
elif method == 'GET':
elif method == "GET":
response = requests.get(
url=BASE_URL + apiPath,
headers=headers,
)
elif method == 'DELETE':
elif method == "DELETE":
response = requests.delete(
url=BASE_URL + apiPath,
headers=headers,
)
else:
logger.error(f'未知的请求方法:{method}')
raise ValueError(f'未知的请求方法:{method}')
logger.error(f"未知的请求方法:{method}")
raise ValueError(f"未知的请求方法:{method}")
logger.info(f'水鱼查分器请求结果:{response.status_code}')
logger.debug(f'水鱼查分器回应:{response.text}')
finalResponseTextDecode = response.text.encode('utf-8').decode('unicode_escape')
logger.debug(f'水鱼查分器回应解码后:{finalResponseTextDecode}')
logger.info(f"水鱼查分器请求结果:{response.status_code}")
logger.debug(f"水鱼查分器回应:{response.text}")
finalResponseTextDecode = response.text.encode("utf-8").decode("unicode_escape")
logger.debug(f"水鱼查分器回应解码后:{finalResponseTextDecode}")
match response.status_code:
case 200:
return response.json()
@@ -60,89 +59,103 @@ def apiDivingFish(method:str, apiPath:str, importToken:str, data=None):
case _:
raise divingFishCommError
def getFishRecords(importToken: str) -> dict:
'''获取水鱼查分器的成绩'''
return apiDivingFish('GET', '/player/records', importToken)
"""获取水鱼查分器的成绩"""
return apiDivingFish("GET", "/player/records", importToken)
def updateFishRecords(importToken: str, records: list[dict]) -> dict:
'''上传成绩到水鱼查分器'''
return apiDivingFish('POST', '/player/update_records', importToken, records)
"""上传成绩到水鱼查分器"""
return apiDivingFish("POST", "/player/update_records", importToken, records)
def resetFishRecords(fishImportToken:str):
'''重置水鱼查分器的用户数据'''
return apiDivingFish('DELETE', '/player/delete_records', fishImportToken)
def getFishUserInfo(userQQ:int):
'''按QQ获取水鱼查分器的用户信息'''
return apiDivingFish('POST', '/query/player', "", {"qq": userQQ})
def resetFishRecords(fishImportToken: str):
"""重置水鱼查分器的用户数据"""
return apiDivingFish("DELETE", "/player/delete_records", fishImportToken)
def getFishUserInfo(userQQ: int):
"""按QQ获取水鱼查分器的用户信息"""
return apiDivingFish("POST", "/query/player", "", {"qq": userQQ})
def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list:
'''舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式'''
"""舞萌的 UserMusicDetail 成绩格式转换成水鱼的格式"""
divingFishList = []
for currentMusicDetail in userMusicDetailList:
# musicId 大于 100000 属于宴谱,不计入
if currentMusicDetail['musicId'] >= 100000:
if currentMusicDetail["musicId"] >= 100000:
continue
# 获得歌名
currentMusicTitle = getMusicTitle(currentMusicDetail['musicId'])
currentMusicTitle = getMusicTitle(currentMusicDetail["musicId"])
# 如果数据库里未找到此歌曲
if currentMusicTitle == "R_ERR_MUSIC_ID_NOT_IN_DATABASE":
logger.warning(f"数据库无此歌曲 跳过: {currentMusicDetail['musicId']}")
continue
# 每一个乐曲都判断下是 DX 还是标准
if currentMusicDetail['musicId'] >= 10000:
notesType = 'DX'
if currentMusicDetail["musicId"] >= 10000:
notesType = "DX"
else:
notesType = 'SD'
notesType = "SD"
# 追加进列表
try:
divingFishList.append({
'achievements': (currentMusicDetail['achievement'] / 10000), # 水鱼的成绩是 float 而非舞萌的 int
'title': currentMusicTitle,
'type': notesType,
'level_index': currentMusicDetail['level'],
'fc': COMBO_ID_TO_NAME[currentMusicDetail['comboStatus']],
'fs': SYNC_ID_TO_NAME[currentMusicDetail['syncStatus']],
'dxScore': currentMusicDetail['deluxscoreMax'],
})
divingFishList.append(
{
"achievements": (
currentMusicDetail["achievement"] / 10000
), # 水鱼的成绩是 float 而非舞萌的 int
"title": currentMusicTitle,
"type": notesType,
"level_index": currentMusicDetail["level"],
"fc": COMBO_ID_TO_NAME[currentMusicDetail["comboStatus"]],
"fs": SYNC_ID_TO_NAME[currentMusicDetail["syncStatus"]],
"dxScore": currentMusicDetail["deluxscoreMax"],
}
)
except:
logger.error(f"无法将 UserMusic 翻译成水鱼格式: {currentMusicDetail}")
return divingFishList
def isVaildFishToken(importToken:str):
'''通过尝试获取一次成绩,检查水鱼查分器的 Token 是否有效
有效返回 True无效返回 False'''
result = apiDivingFish('GET', '/player/records', importToken)
def isVaildFishToken(importToken: str):
"""通过尝试获取一次成绩,检查水鱼查分器的 Token 是否有效
有效返回 True无效返回 False"""
result = apiDivingFish("GET", "/player/records", importToken)
logger.debug(f"水鱼查分器 Token 检查结果:{result}")
if result:
return True
return False
def implGetUserCurrentDXRating(userQQ:int):
'''获取用户当前的 DX RATING'''
def implGetUserCurrentDXRating(userQQ: int):
"""获取用户当前的 DX RATING"""
try:
playerData = getFishUserInfo(userQQ)
playerRating = playerData['rating']
playerRating = playerData["rating"]
logger.info(f"用户 {userQQ} 的 DX RATING 是 {playerRating}")
except Exception as e:
logger.warning(f"无法获取用户 {userQQ} 的 DX RATING: {e}")
return False
return playerRating
def implUserMusicToDivingFish(userId:int, fishImportToken:str):
'''上传所有成绩到水鱼的参考实现。
def implUserMusicToDivingFish(userId: int, fishImportToken: str):
"""上传所有成绩到水鱼的参考实现。
返回一个 int 的 ErrorCode。
0: Success
1: Get User Music Fail
2: Auth Fail
3: Comm Error
'''
"""
logger.info("开始尝试上传舞萌成绩到水鱼查分器!")
try:
userFullMusicDetailList = getUserFullMusicDetail(userId)
logger.info("成功得到成绩!转换成水鱼格式..")
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
divingFishData = maimaiUserMusicDetailToDivingFishFormat(
userFullMusicDetailList
)
logger.info("转换成功!开始上传水鱼..")
except Exception as e:
logger.error(f"获取成绩失败!{e}")
@@ -156,8 +169,9 @@ def implUserMusicToDivingFish(userId:int, fishImportToken:str):
logger.error("水鱼查分器通讯失败!")
return 3
def generateDebugTestScore():
'''生成测试成绩'''
"""生成测试成绩"""
return [
{
"achievement": 1010000,
@@ -165,7 +179,7 @@ def generateDebugTestScore():
"deluxscoreMax": 4026,
"level": 4,
"musicId": 834,
"syncStatus": 4
"syncStatus": 4,
},
{
"achievement": 1010000,
@@ -173,7 +187,6 @@ def generateDebugTestScore():
"deluxscoreMax": 4200,
"level": 4,
"musicId": 11663,
"syncStatus": 4
}
"syncStatus": 4,
},
]

View File

@@ -1,106 +1,127 @@
from datetime import datetime, timedelta
# 倍票相关 API 的实现
import rapidjson as json
import pytz
from datetime import datetime, timedelta
from loguru import logger
from Config import *
from API_TitleServer import apiSDGB
from HelperGetUserThing import implGetUser_
from loguru import logger
from Config import (
clientId,
placeId,
regionId,
)
from MyConfig import testUid2
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
from HelperGetUserThing import implGetUser_
def implWipeTickets(userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
'''清空用户所有票的 API 请求器,返回 Json String。'''
def implWipeTickets(userId: int, currentLoginTimestamp: int, currentLoginResult) -> str:
"""清空用户所有票的 API 请求器,返回 Json String。"""
# 先得到当前用户的 Charge 数据
currentUserCharge = implGetUser_("Charge", userId)
# 取得 List
currentUserChargeList = currentUserCharge['userChargeList']
currentUserChargeList = currentUserCharge["userChargeList"]
# 所有 stock 都置为 0
for charge in currentUserChargeList:
charge['stock'] = 0
charge["stock"] = 0
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1" #1避免覆盖
}}
"upsertUserAll": {
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1", # 1避免覆盖
}
}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
result = implFullPlayAction(
userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches
)
return result
def apiQueryTicket(userId:int) -> str:
'''查询已有票的 API 请求器,返回 Json String。'''
def apiQueryTicket(userId: int) -> str:
"""查询已有票的 API 请求器,返回 Json String。"""
# 构建 Payload
data = json.dumps({
"userId": userId
})
data = json.dumps({"userId": userId})
# 发送请求
userdata_result = apiSDGB(data, "GetUserChargeApi", userId)
# 返回响应
return userdata_result
def apiBuyTicket(userId:int, ticketType:int, price:int, playerRating:int, playCount:int) -> str:
'''倍票购买 API 的请求器'''
nowTime = datetime.now(pytz.timezone('Asia/Shanghai'))
def apiBuyTicket(
userId: int, ticketType: int, price: int, playerRating: int, playCount: int
) -> str:
"""倍票购买 API 的请求器"""
nowTime = datetime.now(pytz.timezone("Asia/Shanghai"))
# 构造请求数据 Payload
data = json.dumps({
"userId": userId,
"userChargelog": {
"chargeId": ticketType,
"price": price,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"playCount": playCount,
"playerRating": playerRating,
"placeId": placeId,
"regionId": regionId,
"clientId": clientId
},
"userCharge": {
"chargeId": ticketType,
"stock": 1,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"validDate": (nowTime + timedelta(days=90)).replace(hour=4, minute=0, second=0).strftime("%Y-%m-%d %H:%M:%S")
data = json.dumps(
{
"userId": userId,
"userChargelog": {
"chargeId": ticketType,
"price": price,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"playCount": playCount,
"playerRating": playerRating,
"placeId": placeId,
"regionId": regionId,
"clientId": clientId,
},
"userCharge": {
"chargeId": ticketType,
"stock": 1,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"validDate": (nowTime + timedelta(days=90))
.replace(hour=4, minute=0, second=0)
.strftime("%Y-%m-%d %H:%M:%S"),
},
}
})
)
# 发送请求,返回最终得到的 Json String 回执
return apiSDGB(data, "UpsertUserChargelogApi", userId)
def implBuyTicket(userId:int, ticketType:int):
'''
def implBuyTicket(userId: int, ticketType: int):
"""
购买倍票 API 的参考实现。
需要事先登录.
返回服务器响应的 Json string。
'''
"""
# 先使用 GetUserData API 请求器,取得 rating 和 pc 数
currentUserData = implGetUser_("Data", userId)
if currentUserData:
playerRating = currentUserData['userData']['playerRating']
playCount = currentUserData['userData'].get('playCount', 0)
playerRating = currentUserData["userData"]["playerRating"]
playCount = currentUserData["userData"].get("playCount", 0)
else:
return False
# 正式买票
getTicketResponseStr = apiBuyTicket(userId, ticketType, ticketType-1, playerRating, playCount)
getTicketResponseStr = apiBuyTicket(
userId, ticketType, ticketType - 1, playerRating, playCount
)
# 返回结果
return getTicketResponseStr
if __name__ == "__main__":
userId = testUid2
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1:
if loginResult["returnCode"] != 1:
logger.info("登录失败")
exit()
try:
logger.info(implBuyTicket(userId, 2)) # 购买倍票
#logger.info(apiQueryTicket(userId))
# logger.info(apiQueryTicket(userId))
finally:
logger.info(apiLogout(currentLoginTimestamp, userId))
#logger.warning("Error")
# logger.warning("Error")

View File

@@ -1,59 +1,62 @@
# 获取用户成绩的各种实现
from API_TitleServer import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from Config import *
import rapidjson as json
from HelperMusicDB import getMusicTitle
from loguru import logger
import sys
from HelperMusicDB import getMusicTitle
from API_TitleServer import apiSDGB
from MyConfig import testUid
# 日志设置
#log_level = "DEBUG"
#log_format = "<green>{time:YYYY-MM-DD HH:mm:ss.SSS zz}</green> | <level>{level: <8}</level> | <yellow>Line {line: >4} ({file}):</yellow> <b>{message}</b>"
#logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True)
#logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True)
# log_level = "DEBUG"
# log_format = "<green>{time:YYYY-MM-DD HH:mm:ss.SSS zz}</green> | <level>{level: <8}</level> | <yellow>Line {line: >4} ({file}):</yellow> <b>{message}</b>"
# logger.add(sys.stderr, level=log_level, format=log_format, colorize=True, backtrace=True, diagnose=True)
# logger.add("file.log", level=log_level, format=log_format, colorize=False, backtrace=True, diagnose=True)
def getUserMusicDetail(userId:int, nextIndex:int=0, maxCount:int=50) -> dict:
def getUserMusicDetail(userId: int, nextIndex: int = 0, maxCount: int = 50) -> dict:
"""获取用户的成绩的API"""
data = json.dumps({
"userId": int(userId),
"nextIndex": nextIndex,
"maxCount": maxCount
})
data = json.dumps(
{"userId": int(userId), "nextIndex": nextIndex, "maxCount": maxCount}
)
return json.loads(apiSDGB(data, "GetUserMusicApi", userId))
def getUserFullMusicDetail(userId: int):
"""获取用户的全部成绩"""
currentUserMusicDetailList = []
nextIndex:int|None = None # 初始化 nextIndex
while nextIndex != 0 or nextIndex is None: #只要还有nextIndex就一直获取获取
nextIndex: int | None = None # 初始化 nextIndex
while nextIndex != 0 or nextIndex is None: # 只要还有nextIndex就一直获取获取
userMusicResponse = getUserMusicDetail(userId, nextIndex or 0)
nextIndex = userMusicResponse['nextIndex']
nextIndex = userMusicResponse["nextIndex"]
logger.info(f"NextIndex: {nextIndex}")
# 处理已经没有 userMusicList 的情况
if not userMusicResponse['userMusicList']:
if not userMusicResponse["userMusicList"]:
break
# 只要还有 userMusicList 就一直加进去,直到全部获取完毕
for currentMusic in userMusicResponse['userMusicList']:
for currentMusicDetail in currentMusic['userMusicDetailList']:
if not currentMusicDetail['playCount'] > 0:
for currentMusic in userMusicResponse["userMusicList"]:
for currentMusicDetail in currentMusic["userMusicDetailList"]:
if not currentMusicDetail["playCount"] > 0:
continue
currentUserMusicDetailList.append(currentMusicDetail)
return currentUserMusicDetailList
def parseUserFullMusicDetail(userFullMusicDetailList: list):
"""解析用户的全部成绩,给出一个迫真人类可读 list 套 dict"""
musicDetailList = []
for currentMusicDetail in userFullMusicDetailList:
musicDetailList.append({
'歌名': getMusicTitle(currentMusicDetail['musicId']),
'难度': currentMusicDetail['level'],
'分数': currentMusicDetail['achievement'] / 10000,
'DX分数': currentMusicDetail['deluxscoreMax']
})
musicDetailList.append(
{
"歌名": getMusicTitle(currentMusicDetail["musicId"]),
"难度": currentMusicDetail["level"],
"分数": currentMusicDetail["achievement"] / 10000,
"DX分数": currentMusicDetail["deluxscoreMax"],
}
)
return musicDetailList
if __name__ == '__main__':
if __name__ == "__main__":
userId = testUid
userFullMusicDetailList = getUserFullMusicDetail(userId)
parsedUserFullMusicDetail = parseUserFullMusicDetail(userFullMusicDetailList)

View File

@@ -1,9 +1,9 @@
# 获取用户数据的 API 实现
from loguru import logger
import rapidjson as json
from API_TitleServer import apiSDGB
def implGetUser_(thing:str, userId:int, noLog=False) -> dict:
def implGetUser_(thing: str, userId: int, noLog=False) -> dict:
"""获取用户某些数据的 API 实现,返回 Dict"""
# 获取 Json String
result = apiGetUserThing(userId, thing, noLog)
@@ -12,14 +12,12 @@ def implGetUser_(thing:str, userId:int, noLog=False) -> dict:
# 返回 Dict
return userthingDict
def apiGetUserThing(userId:int, thing:str, noLog=False) -> str:
def apiGetUserThing(userId: int, thing: str, noLog=False) -> str:
"""获取用户数据的 API 请求器,返回 Json String"""
# 构建 Payload
data = json.dumps({
"userId": userId
})
data = json.dumps({"userId": userId})
# 发送请求
userthing_result = apiSDGB(data, "GetUser" + thing + "Api", userId, noLog)
# 返回响应
return userthing_result

View File

@@ -1,60 +1,81 @@
# 登录·登出实现
# 一般作为模块使用,但也可以作为 CLI 程序运行以强制登出账号。
import rapidjson as json
import time
from loguru import logger
import random
from Config import *
from API_TitleServer import apiSDGB
import rapidjson as json
from loguru import logger
def apiLogin(timestamp:int, userId:int, noLog:bool=False) -> dict:
from API_TitleServer import apiSDGB
from Config import (
clientId,
placeId,
regionId,
)
from MyConfig import testUid
def apiLogin(timestamp: int, userId: int, noLog: bool = False) -> dict:
"""登录,返回 dict"""
data = json.dumps({
"userId": userId,
"accessCode": "",
"regionId": regionId,
"placeId": placeId,
"clientId": clientId,
"dateTime": timestamp,
"isContinue": False,
"genericFlag": 0,
})
data = json.dumps(
{
"userId": userId,
"accessCode": "",
"regionId": regionId,
"placeId": placeId,
"clientId": clientId,
"dateTime": timestamp,
"isContinue": False,
"genericFlag": 0,
}
)
login_result = json.loads(apiSDGB(data, "UserLoginApi", userId, noLog))
if not noLog:
logger.info("登录:结果:"+ str(login_result))
logger.info("登录:结果:" + str(login_result))
return login_result
def apiLogout(timestamp:int, userId:int, noLog:bool=False) -> dict:
def apiLogout(timestamp: int, userId: int, noLog: bool = False) -> dict:
"""登出,返回 dict"""
data = json.dumps({
"userId": userId,
"accessCode": "",
"regionId": regionId,
"placeId": placeId,
"clientId": clientId,
"dateTime": timestamp,
"type": 1
})
data = json.dumps(
{
"userId": userId,
"accessCode": "",
"regionId": regionId,
"placeId": placeId,
"clientId": clientId,
"dateTime": timestamp,
"type": 1,
}
)
logout_result = json.loads(apiSDGB(data, "UserLogoutApi", userId, noLog))
if not noLog:
logger.info("登出:结果:"+ str(logout_result))
logger.info("登出:结果:" + str(logout_result))
return logout_result
def generateTimestampLegacy() -> int:
"""生成一个凑合用的时间戳"""
timestamp = int(time.time()) - 60
logger.info(f"生成时间戳: {timestamp}")
return timestamp
def generateTimestamp() -> int:
"""生成一个今天早上 10:00 随机偏移的时间戳"""
timestamp = int(time.mktime(time.strptime(time.strftime("%Y-%m-%d 10:00:00"), "%Y-%m-%d %H:%M:%S"))) + random.randint(-600, 600)
timestamp = int(
time.mktime(
time.strptime(time.strftime("%Y-%m-%d 10:00:00"), "%Y-%m-%d %H:%M:%S")
)
) + random.randint(-600, 600)
logger.info(f"生成时间戳: {timestamp}")
logger.info(f"此时间戳对应的时间为: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))}")
logger.info(
f"此时间戳对应的时间为: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))}"
)
return timestamp
if __name__ == "__main__":
print("强制登出 CLI")
uid = testUid

View File

@@ -4,9 +4,10 @@ import rapidjson as json
from loguru import logger
from HelperGetUserThing import implGetUser_
import unicodedata
from Config import *
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from MyConfig import testUid
def numberToLetter(number):
"""
@@ -17,10 +18,11 @@ def numberToLetter(number):
else:
return None
def maimaiVersionToHumanReadable(romVersion: str, dataVersion: str) -> str:
try:
romVersionList = romVersion.split('.')
dataVersionList = dataVersion.split('.')
romVersionList = romVersion.split(".")
dataVersionList = dataVersion.split(".")
except Exception as e:
logger.warning(f"无法解析版本号: {romVersion} {dataVersion},错误:{e}")
return "无效版本号:无法解析"
@@ -54,22 +56,17 @@ def maimaiVersionToHumanReadable(romVersion: str, dataVersion: str) -> str:
return finalVersionString
levelIdDict = {
"绿": 0,
"": 1,
"": 2,
"": 3,
"": 4,
"": 5
}
levelIdDict = {"绿": 0, "": 1, "": 2, "": 3, "": 4, "": 5}
def getHalfWidthString(s):
"""全角转半角舞萌ID用"""
return unicodedata.normalize('NFKC', s)
return unicodedata.normalize("NFKC", s)
def getHumanReadableLoginErrorCode(loginResult) -> str:
'''解析登录结果并且给出中文的报错解释'''
match loginResult['returnCode']:
"""解析登录结果并且给出中文的报错解释"""
match loginResult["returnCode"]:
case 1:
return False
case 100:
@@ -79,10 +76,11 @@ def getHumanReadableLoginErrorCode(loginResult) -> str:
case 103:
return "❌ 试图登录的账号 UID 无效,请检查账号是否正确。"
case _:
return "❌ 登录失败!这不应该发生,请反馈此问题。错误详情:"+ loginResult
return "❌ 登录失败!这不应该发生,请反馈此问题。错误详情:" + loginResult
def checkTechnologyUseCount(userId: int) -> int:
'''猜测账号是否用了科技0没用过其他为用过'''
"""猜测账号是否用了科技0没用过其他为用过"""
userData1 = implGetUser_("Data", userId)
userData = userData1.get("userData", {})
userRegion = implGetUser_("Region", userId)
@@ -92,13 +90,16 @@ def checkTechnologyUseCount(userId: int) -> int:
allRegionPlayCount = 0
for region in userRegionList:
allRegionPlayCount += region.get("playCount", 0)
logger.info(f"用户 {userId} 的总游玩次数: {playCount}, 各地区游玩次数: {allRegionPlayCount}")
logger.info(
f"用户 {userId} 的总游玩次数: {playCount}, 各地区游玩次数: {allRegionPlayCount}"
)
# 计算全部的 Region 加起来的游玩次数是否和 playCount 对不上,对不上就是用了科技
# 返回差值
return playCount - allRegionPlayCount
def getFriendlyUserData(userId:int) -> str:
'''生成一个(相对)友好的UserData的人话'''
def getFriendlyUserData(userId: int) -> str:
"""生成一个(相对)友好的UserData的人话"""
userData1 = implGetUser_("Data", userId)
userData = userData1.get("userData", {})
userRegion = implGetUser_("Region", userId)
@@ -114,7 +115,7 @@ def getFriendlyUserData(userId:int) -> str:
result += f"最近登录版本: {maimaiVersionToHumanReadable(userData.get('lastRomVersion'), userData.get('lastDataVersion'))} "
result += f"最近登录地区: {userData.get('lastRegionName', '未知')}\n"
result += f"注册日期: {userData.get('firstPlayDate')} "
result += f"注册版本: {maimaiVersionToHumanReadable(userData.get('firstRomVersion'),userData.get('firstDataVersion'))}\n"
result += f"注册版本: {maimaiVersionToHumanReadable(userData.get('firstRomVersion'), userData.get('firstDataVersion'))}\n"
result += f"封号状态(banState): {banState}\n"
try:
logger.info(userRegion)
@@ -124,28 +125,31 @@ def getFriendlyUserData(userId:int) -> str:
return result
def getHumanReadableRegionData(userRegion:str) -> str:
'''生成一个人类可读的地区数据'''
def getHumanReadableRegionData(userRegion: str) -> str:
"""生成一个人类可读的地区数据"""
userRegionList = userRegion.get("userRegionList")
logger.info(userRegionList)
result = ""
for region in userRegionList:
regionName = WAHLAP_REGIONS.get(region['regionId'], '未知')
playCount = region['playCount']
created = region['created']
regionName = WAHLAP_REGIONS.get(region["regionId"], "未知")
playCount = region["playCount"]
created = region["created"]
result += f"\n{regionName} 游玩次数: {playCount} 首次游玩: {created}"
return result
def getHumanReadablePreview(preview_json_content:str) -> str:
'''简单,粗略地解释 Preview 的 Json String 为人话。'''
def getHumanReadablePreview(preview_json_content: str) -> str:
"""简单,粗略地解释 Preview 的 Json String 为人话。"""
previewData = json.loads(preview_json_content)
userName = getHalfWidthString(previewData['userName'])
playerRating = previewData['playerRating']
userName = getHalfWidthString(previewData["userName"])
playerRating = previewData["playerRating"]
finalString = f"用户名:{userName}\nDX RATING{playerRating}\n"
return finalString
def getHumanReadableLoginBonusList(jsonString: str):
'''生成一个人类可读的 Login Bonus 的列表'''
"""生成一个人类可读的 Login Bonus 的列表"""
data = json.loads(jsonString)
result = []
@@ -157,32 +161,34 @@ def getHumanReadableLoginBonusList(jsonString: str):
result.append(line)
resultString = ""
for line in result: # 转成字符串
for line in result: # 转成字符串
resultString += line + "\n"
return resultString
def getHumanReadableTicketList(jsonString: str):
'''生成一个人类可读的 UserCharge 的列表'''
"""生成一个人类可读的 UserCharge 的列表"""
data = json.loads(jsonString)
userId = data['userId']
length = data['length']
userChargeList = data['userChargeList']
userId = data["userId"]
length = data["length"]
userChargeList = data["userChargeList"]
result = f"UID: {userId} 票槽大小: {length} 所有记录:"
for currentItem in userChargeList:
chargeId = currentItem['chargeId']
stock = currentItem['stock']
purchaseDate = currentItem['purchaseDate']
validDate = currentItem['validDate']
chargeId = currentItem["chargeId"]
stock = currentItem["stock"]
purchaseDate = currentItem["purchaseDate"]
validDate = currentItem["validDate"]
result += f"\nID: {chargeId} 持有: {stock}, 购买日期: {purchaseDate}, 有效期限: {validDate}"
return result
def getHumanReadableUserData(userData) -> str:
'''生成一个人类可读的 UserData 的数据(比较详细)'''
"""生成一个人类可读的 UserData 的数据(比较详细)"""
userId = userData.get("userId")
userData = userData.get("userData", {})
banState = userData.get("banState")
@@ -239,39 +245,40 @@ def getHumanReadableUserData(userData) -> str:
result += f"封号状态: {banState}\n"
return result
WAHLAP_REGIONS = {
1: '北京',
2: '重庆',
3: '上海',
4: '天津',
5: '安徽',
6: '福建',
7: '甘肃',
8: '广东',
9: '贵州',
10: '海南',
11: '河北',
12: '黑龙江',
13: '河南',
14: '湖北',
15: '湖南',
16: '江苏',
17: '江西',
18: '吉林',
19: '辽宁',
20: '青海',
21: '陕西',
22: '山东',
23: '山西',
24: '四川',
25: '未知25',
26: '云南',
27: '浙江',
28: '广西',
29: '内蒙古',
30: '宁夏',
31: '新疆',
32: '西藏',
1: "北京",
2: "重庆",
3: "上海",
4: "天津",
5: "安徽",
6: "福建",
7: "甘肃",
8: "广东",
9: "贵州",
10: "海南",
11: "河北",
12: "黑龙江",
13: "河南",
14: "湖北",
15: "湖南",
16: "江苏",
17: "江西",
18: "吉林",
19: "辽宁",
20: "青海",
21: "陕西",
22: "山东",
23: "山西",
24: "四川",
25: "未知25",
26: "云南",
27: "浙江",
28: "广西",
29: "内蒙古",
30: "宁夏",
31: "新疆",
32: "西藏",
}
if __name__ == "__main__":
@@ -284,12 +291,12 @@ if __name__ == "__main__":
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult['returnCode'] != 1:
if loginResult["returnCode"] != 1:
logger.info("登录失败")
exit()
try:
logger.info(checkTechnologyUseCount(userId))
#logger.info(apiQueryTicket(userId))
# logger.info(apiQueryTicket(userId))
finally:
logger.info(apiLogout(currentLoginTimestamp, userId))
#logger.warning("Error")
# logger.warning("Error")

View File

@@ -4,11 +4,11 @@ from loguru import logger
def getMusicTitle(musicId: int) -> str:
"""从数据库获取音乐的标题"""
#logger.debug(f"查询歌名: {musicId}")
# logger.debug(f"查询歌名: {musicId}")
musicInfo = musicDB.get(musicId)
if not musicInfo:
logger.warning(f"数据库里未找到此歌曲: {musicId}")
return "R_ERR_MUSIC_ID_NOT_IN_DATABASE"
musicName = musicInfo.get("name")
#logger.debug(f"成功查询到歌名: {musicName}")
# logger.debug(f"成功查询到歌名: {musicName}")
return musicName

View File

@@ -1,43 +1,48 @@
# 解锁东西的一个通用的助手,不可独立使用
from loguru import logger
from Config import *
from HelperFullPlay import implFullPlayAction
def implUnlockThing(newUserItemList, userId: int, currentLoginTimestamp:int, currentLoginResult) -> str:
musicData= ({
"musicId": 11538, # Amber Chronicle
"level": 0,
"playCount": 1,
"achievement": 0,
"comboStatus": 0,
"syncStatus": 0,
"deluxscoreMax": 0,
"scoreRank": 0,
"extNum1": 0
})
def implUnlockThing(
newUserItemList, userId: int, currentLoginTimestamp: int, currentLoginResult
) -> str:
musicData = {
"musicId": 11538, # Amber Chronicle
"level": 0,
"playCount": 1,
"achievement": 0,
"comboStatus": 0,
"syncStatus": 0,
"deluxscoreMax": 0,
"scoreRank": 0,
"extNum1": 0,
}
userAllPatches = {
"upsertUserAll": {
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1",
"userItemList": newUserItemList,
"isNewItemList": "1" * len(newUserItemList)
}}
result = implFullPlayAction(userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches)
"upsertUserAll": {
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1",
"userItemList": newUserItemList,
"isNewItemList": "1" * len(newUserItemList),
}
}
result = implFullPlayAction(
userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches
)
return result
itemKindDict = {
"PLATE": 1, # 姓名框
"TITLE": 2, # 称号
"ICON": 3, # 头像
"MUSIC": 5, # 歌
"MUSIC_MASTER": 6, # 紫谱
"MUSIC_RE_MASTER": 7,# 白谱
"CHARACTER": 9, # 旅行伙伴
"PARTNER": 10, # 搭档
"FRAME": 11, # 背景板
"TICKET": 12 # 功能票
# "PRESENT": 4, #
# "MUSIC_STRONG": 8, #
"PLATE": 1, # 姓名框
"TITLE": 2, # 称号
"ICON": 3, # 头像
"MUSIC": 5, # 歌
"MUSIC_MASTER": 6, # 紫谱
"MUSIC_RE_MASTER": 7, # 白谱
"CHARACTER": 9, # 旅行伙伴
"PARTNER": 10, # 搭档
"FRAME": 11, # 背景板
"TICKET": 12, # 功能票
# "PRESENT": 4, #
# "MUSIC_STRONG": 8, #
}
itemKindzhCNDict = {
@@ -50,9 +55,9 @@ itemKindzhCNDict = {
"旅行伙伴": "CHARACTER",
"搭档": "PARTNER",
"背景板": "FRAME",
"功能票": "TICKET"
# "礼物": "PRESENT",
# "STRONG": "MUSIC_STRONG",
"功能票": "TICKET",
# "礼物": "PRESENT",
# "STRONG": "MUSIC_STRONG",
}
partnerList = {
@@ -75,5 +80,5 @@ partnerList = {
"26": "黒姫",
"27": "俊达萌",
"28": "乙姫2024",
"29": "青柠熊柠檬熊2024"
"29": "青柠熊柠檬熊2024",
}

View File

@@ -8,133 +8,146 @@ from datetime import datetime
from loguru import logger
from API_TitleServer import apiSDGB
from Config import *
from Config import (
placeId,
placeName,
)
def apiUploadUserPlaylog(userId:int, musicDataToBeUploaded, currentUserData2, loginId:int) -> str:
def apiUploadUserPlaylog(
userId: int, musicDataToBeUploaded, currentUserData2, loginId: int
) -> str:
"""
上传一个 UserPlayLog。
注意:成绩为随机的空成绩,只用作占位
返回 Json String。"""
# 构建一个 PlayLog
data = json.dumps({
"userId": int(userId),
"userPlaylogList": [
data = json.dumps(
{
"userId": 0,
"orderId": 0,
"playlogId": loginId,
"version": 1051000,
"placeId": placeId,
"placeName": placeName,
"loginDate": int(time.time()), #似乎和登录timestamp不同
"playDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d'),
"userPlayDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0',
"type": 0,
"musicId": int(musicDataToBeUploaded['musicId']),
"level": int(musicDataToBeUploaded['level']),
"trackNo": 1,
"vsMode": 0,
"vsUserName": "",
"vsStatus": 0,
"vsUserRating": 0,
"vsUserAchievement": 0,
"vsUserGradeRank": 0,
"vsRank": 0,
"playerNum": 1,
"playedUserId1": 0,
"playedUserName1": "",
"playedMusicLevel1": 0,
"playedUserId2": 0,
"playedUserName2": "",
"playedMusicLevel2": 0,
"playedUserId3": 0,
"playedUserName3": "",
"playedMusicLevel3": 0,
"characterId1": currentUserData2['charaSlot'][0],
"characterLevel1": random.randint(1000,6500),
"characterAwakening1": 5,
"characterId2": currentUserData2['charaSlot'][1],
"characterLevel2": random.randint(1000,6500),
"characterAwakening2": 5,
"characterId3": currentUserData2['charaSlot'][2],
"characterLevel3": random.randint(1000,6500),
"characterAwakening3": 5,
"characterId4": currentUserData2['charaSlot'][3],
"characterLevel4": random.randint(1000,6500),
"characterAwakening4": 5,
"characterId5": currentUserData2['charaSlot'][4],
"characterLevel5": random.randint(1000,6500),
"characterAwakening5": 5,
"achievement": int(musicDataToBeUploaded['achievement']),
"deluxscore": int(musicDataToBeUploaded['deluxscoreMax']),
"scoreRank": int(musicDataToBeUploaded['scoreRank']),
"maxCombo": 0,
"totalCombo": random.randint(700,900),
"maxSync": 0,
"totalSync": 0,
"tapCriticalPerfect": 0,
"tapPerfect": 0,
"tapGreat": 0,
"tapGood": 0,
"tapMiss": random.randint(1,10),
"holdCriticalPerfect": 0,
"holdPerfect": 0,
"holdGreat": 0,
"holdGood": 0,
"holdMiss": random.randint(1,15),
"slideCriticalPerfect": 0,
"slidePerfect": 0,
"slideGreat": 0,
"slideGood": 0,
"slideMiss": random.randint(1,15),
"touchCriticalPerfect": 0,
"touchPerfect": 0,
"touchGreat": 0,
"touchGood": 0,
"touchMiss": random.randint(1,15),
"breakCriticalPerfect": 0,
"breakPerfect": 0,
"breakGreat": 0,
"breakGood": 0,
"breakMiss": random.randint(1,15),
"isTap": True,
"isHold": True,
"isSlide": True,
"isTouch": True,
"isBreak": True,
"isCriticalDisp": True,
"isFastLateDisp": True,
"fastCount": 0,
"lateCount": 0,
"isAchieveNewRecord": True,
"isDeluxscoreNewRecord": True,
"comboStatus": 0,
"syncStatus": 0,
"isClear": False,
"beforeRating": currentUserData2['playerRating'],
"afterRating": currentUserData2['playerRating'],
"beforeGrade": 0,
"afterGrade": 0,
"afterGradeRank": 1,
"beforeDeluxRating": currentUserData2['playerRating'],
"afterDeluxRating": currentUserData2['playerRating'],
"isPlayTutorial": False,
"isEventMode": False,
"isFreedomMode": False,
"playMode": 0,
"isNewFree": False,
"trialPlayAchievement": -1,
"extNum1": 0,
"extNum2": 0,
"extNum4": 3020,
"extBool1": False,
"extBool2": False
"userId": int(userId),
"userPlaylogList": [
{
"userId": 0,
"orderId": 0,
"playlogId": loginId,
"version": 1051000,
"placeId": placeId,
"placeName": placeName,
"loginDate": int(time.time()), # 似乎和登录timestamp不同
"playDate": datetime.now(pytz.timezone("Asia/Shanghai")).strftime(
"%Y-%m-%d"
),
"userPlayDate": datetime.now(
pytz.timezone("Asia/Shanghai")
).strftime("%Y-%m-%d %H:%M:%S")
+ ".0",
"type": 0,
"musicId": int(musicDataToBeUploaded["musicId"]),
"level": int(musicDataToBeUploaded["level"]),
"trackNo": 1,
"vsMode": 0,
"vsUserName": "",
"vsStatus": 0,
"vsUserRating": 0,
"vsUserAchievement": 0,
"vsUserGradeRank": 0,
"vsRank": 0,
"playerNum": 1,
"playedUserId1": 0,
"playedUserName1": "",
"playedMusicLevel1": 0,
"playedUserId2": 0,
"playedUserName2": "",
"playedMusicLevel2": 0,
"playedUserId3": 0,
"playedUserName3": "",
"playedMusicLevel3": 0,
"characterId1": currentUserData2["charaSlot"][0],
"characterLevel1": random.randint(1000, 6500),
"characterAwakening1": 5,
"characterId2": currentUserData2["charaSlot"][1],
"characterLevel2": random.randint(1000, 6500),
"characterAwakening2": 5,
"characterId3": currentUserData2["charaSlot"][2],
"characterLevel3": random.randint(1000, 6500),
"characterAwakening3": 5,
"characterId4": currentUserData2["charaSlot"][3],
"characterLevel4": random.randint(1000, 6500),
"characterAwakening4": 5,
"characterId5": currentUserData2["charaSlot"][4],
"characterLevel5": random.randint(1000, 6500),
"characterAwakening5": 5,
"achievement": int(musicDataToBeUploaded["achievement"]),
"deluxscore": int(musicDataToBeUploaded["deluxscoreMax"]),
"scoreRank": int(musicDataToBeUploaded["scoreRank"]),
"maxCombo": 0,
"totalCombo": random.randint(700, 900),
"maxSync": 0,
"totalSync": 0,
"tapCriticalPerfect": 0,
"tapPerfect": 0,
"tapGreat": 0,
"tapGood": 0,
"tapMiss": random.randint(1, 10),
"holdCriticalPerfect": 0,
"holdPerfect": 0,
"holdGreat": 0,
"holdGood": 0,
"holdMiss": random.randint(1, 15),
"slideCriticalPerfect": 0,
"slidePerfect": 0,
"slideGreat": 0,
"slideGood": 0,
"slideMiss": random.randint(1, 15),
"touchCriticalPerfect": 0,
"touchPerfect": 0,
"touchGreat": 0,
"touchGood": 0,
"touchMiss": random.randint(1, 15),
"breakCriticalPerfect": 0,
"breakPerfect": 0,
"breakGreat": 0,
"breakGood": 0,
"breakMiss": random.randint(1, 15),
"isTap": True,
"isHold": True,
"isSlide": True,
"isTouch": True,
"isBreak": True,
"isCriticalDisp": True,
"isFastLateDisp": True,
"fastCount": 0,
"lateCount": 0,
"isAchieveNewRecord": True,
"isDeluxscoreNewRecord": True,
"comboStatus": 0,
"syncStatus": 0,
"isClear": False,
"beforeRating": currentUserData2["playerRating"],
"afterRating": currentUserData2["playerRating"],
"beforeGrade": 0,
"afterGrade": 0,
"afterGradeRank": 1,
"beforeDeluxRating": currentUserData2["playerRating"],
"afterDeluxRating": currentUserData2["playerRating"],
"isPlayTutorial": False,
"isEventMode": False,
"isFreedomMode": False,
"playMode": 0,
"isNewFree": False,
"trialPlayAchievement": -1,
"extNum1": 0,
"extNum2": 0,
"extNum4": 3020,
"extBool1": False,
"extBool2": False,
}
],
}
]
})
)
# 发送请求
result = apiSDGB(data, "UploadUserPlaylogListApi", userId)
logger.info("上传游玩记录:结果:"+ str(result))
logger.info("上传游玩记录:结果:" + str(result))
# 返回响应
return result

View File

@@ -2,12 +2,20 @@
import pytz
from datetime import datetime
from Config import *
from HelperGetUserThing import implGetUser_
from HelperGetUserMusicDetail import getUserMusicDetail
from loguru import logger
from HelperGetUserThing import implGetUser_
from HelperGetUserMusicDetail import getUserMusicDetail
from Config import (
clientId,
placeName,
placeId,
regionId,
regionName,
)
def isNewMusicType(userId, musicId, level) -> str:
"""判断这首 musicId 在 isNewMusicDetailList 应该填什么
0: Edit
@@ -16,21 +24,38 @@ def isNewMusicType(userId, musicId, level) -> str:
未完工,仅供测试
"""
userMusicDetailList = getUserMusicDetail(userId, musicId, 1)['userMusicList'][0]['userMusicDetailList']
userMusicDetailList = getUserMusicDetail(userId, musicId, 1)["userMusicList"][0][
"userMusicDetailList"
]
logger.info(userMusicDetailList)
try:
if userMusicDetailList[0]['musicId'] == musicId and userMusicDetailList[0]['level'] == level:
if (
userMusicDetailList[0]["musicId"] == musicId
and userMusicDetailList[0]["level"] == level
):
logger.info(f"We think {musicId} Level {level} should use EDIT.")
return "0"
except:
return "1"
def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial):
def generateFullUserAll(
userId,
currentLoginResult,
currentLoginTimestamp,
currentUserData2,
currentPlaySpecial,
):
"""从服务器取得必要的数据并构建一个比较完整的 UserAll"""
# 先构建一个基础 UserAll
currentUserAll = generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial)
currentUserAll = generateUserAllData(
userId,
currentLoginResult,
currentLoginTimestamp,
currentUserData2,
currentPlaySpecial,
)
# 然后从服务器取得必要的数据
currentUserExtend = implGetUser_("Extend", userId, True)
@@ -41,61 +66,78 @@ def generateFullUserAll(userId, currentLoginResult, currentLoginTimestamp, curre
currentUserMissionData = implGetUser_("MissionData", userId, True)
# 把这些数据都追加进去
currentUserAll['upsertUserAll']['userExtend'] = [currentUserExtend['userExtend']]
currentUserAll['upsertUserAll']['userOption'] = [currentUserOption['userOption']]
currentUserAll['upsertUserAll']['userRatingList'] = [currentUserRating['userRating']]
currentUserAll['upsertUserAll']['userActivityList'] = [currentUserActivity['userActivity']]
currentUserAll['upsertUserAll']['userChargeList'] = currentUserCharge['userChargeList']
currentUserAll['upsertUserAll']['userWeeklyData'] = currentUserMissionData['userWeeklyData']
currentUserAll["upsertUserAll"]["userExtend"] = [currentUserExtend["userExtend"]]
currentUserAll["upsertUserAll"]["userOption"] = [currentUserOption["userOption"]]
currentUserAll["upsertUserAll"]["userRatingList"] = [
currentUserRating["userRating"]
]
currentUserAll["upsertUserAll"]["userActivityList"] = [
currentUserActivity["userActivity"]
]
currentUserAll["upsertUserAll"]["userChargeList"] = currentUserCharge[
"userChargeList"
]
currentUserAll["upsertUserAll"]["userWeeklyData"] = currentUserMissionData[
"userWeeklyData"
]
# 完事
return currentUserAll
def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, currentUserData2, currentPlaySpecial):
def generateUserAllData(
userId,
currentLoginResult,
currentLoginTimestamp,
currentUserData2,
currentPlaySpecial,
):
"""构建一个非常基础的 UserAll 数据,必须手动填充一些数据"""
data = {
"userId": userId,
"playlogId": currentLoginResult['loginId'],
"playlogId": currentLoginResult["loginId"],
"isEventMode": False,
"isFreePlay": False,
"upsertUserAll": {
"userData": [
{
"accessCode": "",
"userName": currentUserData2['userName'],
"userName": currentUserData2["userName"],
"isNetMember": 1,
"point": currentUserData2['point'],
"totalPoint": currentUserData2['totalPoint'],
"iconId": currentUserData2['iconId'],
"plateId": currentUserData2['plateId'],
"titleId": currentUserData2['titleId'],
"partnerId": currentUserData2['partnerId'],
"frameId": currentUserData2['frameId'],
"selectMapId": currentUserData2['selectMapId'],
"totalAwake": currentUserData2['totalAwake'],
"gradeRating": currentUserData2['gradeRating'],
"musicRating": currentUserData2['musicRating'],
"playerRating": currentUserData2['playerRating'],
"highestRating": currentUserData2['highestRating'],
"gradeRank": currentUserData2['gradeRank'],
"classRank": currentUserData2['classRank'],
"courseRank": currentUserData2['courseRank'],
"charaSlot": currentUserData2['charaSlot'],
"charaLockSlot": currentUserData2['charaLockSlot'],
"contentBit": currentUserData2['contentBit'],
"playCount": currentUserData2['playCount'],
"currentPlayCount": currentUserData2['currentPlayCount'],
"point": currentUserData2["point"],
"totalPoint": currentUserData2["totalPoint"],
"iconId": currentUserData2["iconId"],
"plateId": currentUserData2["plateId"],
"titleId": currentUserData2["titleId"],
"partnerId": currentUserData2["partnerId"],
"frameId": currentUserData2["frameId"],
"selectMapId": currentUserData2["selectMapId"],
"totalAwake": currentUserData2["totalAwake"],
"gradeRating": currentUserData2["gradeRating"],
"musicRating": currentUserData2["musicRating"],
"playerRating": currentUserData2["playerRating"],
"highestRating": currentUserData2["highestRating"],
"gradeRank": currentUserData2["gradeRank"],
"classRank": currentUserData2["classRank"],
"courseRank": currentUserData2["courseRank"],
"charaSlot": currentUserData2["charaSlot"],
"charaLockSlot": currentUserData2["charaLockSlot"],
"contentBit": currentUserData2["contentBit"],
"playCount": currentUserData2["playCount"],
"currentPlayCount": currentUserData2["currentPlayCount"],
"renameCredit": 0,
"mapStock": currentUserData2['mapStock'],
"eventWatchedDate": currentUserData2['eventWatchedDate'],
"mapStock": currentUserData2["mapStock"],
"eventWatchedDate": currentUserData2["eventWatchedDate"],
"lastGameId": "SDGB",
"lastRomVersion": currentUserData2['lastRomVersion'],
"lastDataVersion": currentUserData2['lastDataVersion'],
#"lastLoginDate": currentLoginResult['lastLoginDate'], # sb
"lastLoginDate": currentUserData2['lastLoginDate'], # 等待测试
"lastPlayDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0',
"lastRomVersion": currentUserData2["lastRomVersion"],
"lastDataVersion": currentUserData2["lastDataVersion"],
# "lastLoginDate": currentLoginResult['lastLoginDate'], # sb
"lastLoginDate": currentUserData2["lastLoginDate"], # 等待测试
"lastPlayDate": datetime.now(
pytz.timezone("Asia/Shanghai")
).strftime("%Y-%m-%d %H:%M:%S")
+ ".0",
"lastPlayCredit": 1,
"lastPlayMode": 0,
"lastPlaceId": placeId,
@@ -107,68 +149,83 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre
"lastCountryCode": "CHN",
"lastSelectEMoney": 0,
"lastSelectTicket": 0,
"lastSelectCourse": currentUserData2['lastSelectCourse'],
"lastSelectCourse": currentUserData2["lastSelectCourse"],
"lastCountCourse": 0,
"firstGameId": "SDGB",
"firstRomVersion": currentUserData2['firstRomVersion'],
"firstDataVersion": currentUserData2['firstDataVersion'],
"firstPlayDate": currentUserData2['firstPlayDate'],
"compatibleCmVersion": currentUserData2['compatibleCmVersion'],
"dailyBonusDate": currentUserData2['dailyBonusDate'],
"dailyCourseBonusDate": currentUserData2['dailyCourseBonusDate'],
"lastPairLoginDate": currentUserData2['lastPairLoginDate'],
"lastTrialPlayDate": currentUserData2['lastTrialPlayDate'],
"firstRomVersion": currentUserData2["firstRomVersion"],
"firstDataVersion": currentUserData2["firstDataVersion"],
"firstPlayDate": currentUserData2["firstPlayDate"],
"compatibleCmVersion": currentUserData2["compatibleCmVersion"],
"dailyBonusDate": currentUserData2["dailyBonusDate"],
"dailyCourseBonusDate": currentUserData2["dailyCourseBonusDate"],
"lastPairLoginDate": currentUserData2["lastPairLoginDate"],
"lastTrialPlayDate": currentUserData2["lastTrialPlayDate"],
"playVsCount": 0,
"playSyncCount": 0,
"winCount": 0,
"helpCount": 0,
"comboCount": 0,
"totalDeluxscore": currentUserData2['totalDeluxscore'],
"totalBasicDeluxscore": currentUserData2['totalBasicDeluxscore'],
"totalAdvancedDeluxscore": currentUserData2['totalAdvancedDeluxscore'],
"totalExpertDeluxscore": currentUserData2['totalExpertDeluxscore'],
"totalMasterDeluxscore": currentUserData2['totalMasterDeluxscore'],
"totalReMasterDeluxscore": currentUserData2['totalReMasterDeluxscore'],
"totalSync": currentUserData2['totalSync'],
"totalBasicSync": currentUserData2['totalBasicSync'],
"totalAdvancedSync": currentUserData2['totalAdvancedSync'],
"totalExpertSync": currentUserData2['totalExpertSync'],
"totalMasterSync": currentUserData2['totalMasterSync'],
"totalReMasterSync": currentUserData2['totalReMasterSync'],
"totalAchievement": currentUserData2['totalAchievement'],
"totalBasicAchievement": currentUserData2['totalBasicAchievement'],
"totalAdvancedAchievement": currentUserData2['totalAdvancedAchievement'],
"totalExpertAchievement": currentUserData2['totalExpertAchievement'],
"totalMasterAchievement": currentUserData2['totalMasterAchievement'],
"totalReMasterAchievement": currentUserData2['totalReMasterAchievement'],
"playerOldRating": currentUserData2['playerOldRating'],
"playerNewRating": currentUserData2['playerNewRating'],
"totalDeluxscore": currentUserData2["totalDeluxscore"],
"totalBasicDeluxscore": currentUserData2["totalBasicDeluxscore"],
"totalAdvancedDeluxscore": currentUserData2[
"totalAdvancedDeluxscore"
],
"totalExpertDeluxscore": currentUserData2["totalExpertDeluxscore"],
"totalMasterDeluxscore": currentUserData2["totalMasterDeluxscore"],
"totalReMasterDeluxscore": currentUserData2[
"totalReMasterDeluxscore"
],
"totalSync": currentUserData2["totalSync"],
"totalBasicSync": currentUserData2["totalBasicSync"],
"totalAdvancedSync": currentUserData2["totalAdvancedSync"],
"totalExpertSync": currentUserData2["totalExpertSync"],
"totalMasterSync": currentUserData2["totalMasterSync"],
"totalReMasterSync": currentUserData2["totalReMasterSync"],
"totalAchievement": currentUserData2["totalAchievement"],
"totalBasicAchievement": currentUserData2["totalBasicAchievement"],
"totalAdvancedAchievement": currentUserData2[
"totalAdvancedAchievement"
],
"totalExpertAchievement": currentUserData2[
"totalExpertAchievement"
],
"totalMasterAchievement": currentUserData2[
"totalMasterAchievement"
],
"totalReMasterAchievement": currentUserData2[
"totalReMasterAchievement"
],
"playerOldRating": currentUserData2["playerOldRating"],
"playerNewRating": currentUserData2["playerNewRating"],
"banState": 0,
"friendRegistSkip": currentUserData2['friendRegistSkip'],
"dateTime": currentLoginTimestamp
"friendRegistSkip": currentUserData2["friendRegistSkip"],
"dateTime": currentLoginTimestamp,
}
],
"userExtend": [], #需要填上
"userOption": [], #需要填上
"userExtend": [], # 需要填上
"userOption": [], # 需要填上
"userGhost": [],
"userCharacterList": [],
"userMapList": [],
"userLoginBonusList": [],
"userRatingList": [], #需要填上
"userItemList": [], #可选,但经常要填上
"userMusicDetailList": [],#需要填上
"userRatingList": [], # 需要填上
"userItemList": [], # 可选,但经常要填上
"userMusicDetailList": [], # 需要填上
"userCourseList": [],
"userFriendSeasonRankingList": [],
"userChargeList": [], #需要填上
"userChargeList": [], # 需要填上
"userFavoriteList": [],
"userActivityList": [], #需要填上
"userActivityList": [], # 需要填上
"userMissionDataList": [],
"userWeeklyData": [],#应该需要填上
"userWeeklyData": [], # 应该需要填上
"userGamePlaylogList": [
{
"playlogId": currentLoginResult['loginId'],
"playlogId": currentLoginResult["loginId"],
"version": "1.51.00",
"playDate": datetime.now(pytz.timezone('Asia/Shanghai')).strftime('%Y-%m-%d %H:%M:%S') + '.0',
"playDate": datetime.now(pytz.timezone("Asia/Shanghai")).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ ".0",
"playMode": 0,
"useTicketId": -1,
"playCredit": 1,
@@ -177,9 +234,9 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre
"isPlayTutorial": False,
"isEventMode": False,
"isNewFree": False,
"playCount": currentUserData2['playCount'],
"playCount": currentUserData2["playCount"],
"playSpecial": currentPlaySpecial,
"playOtherUserId": 0
"playOtherUserId": 0,
}
],
"user2pPlaylog": {
@@ -189,7 +246,7 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre
"userName2": "",
"regionId": 0,
"placeId": 0,
"user2pPlaylogDetailList": []
"user2pPlaylogDetailList": [],
},
"userIntimateList": [],
"userShopItemStockList": [],
@@ -201,13 +258,13 @@ def generateUserAllData(userId, currentLoginResult, currentLoginTimestamp, curre
"isNewMapList": "",
"isNewLoginBonusList": "",
"isNewItemList": "",
"isNewMusicDetailList": "", #可选但经常要填上
"isNewMusicDetailList": "", # 可选但经常要填上
"isNewCourseList": "0",
"isNewFavoriteList": "",
"isNewFriendSeasonRankingList": "",
"isNewUserIntimateList": "",
"isNewFavoritemusicList": "",
"isNewKaleidxScopeList": ""
}
}
"isNewKaleidxScopeList": "",
},
}
return data