Files
maimaiDX-Api/ChargeTicket.py
2025-07-29 15:20:28 +08:00

127 lines
3.8 KiB
Python

from datetime import datetime, timedelta
# 倍票相关 API 的实现
import rapidjson as json
import pytz
from loguru import logger
from API_TitleServer import apiSDGB
from HelperGetUserThing import implGetUser_
from Config import (
clientId,
placeId,
regionId,
)
from MyConfig import testUid2
from HelperLogInOut import apiLogin, apiLogout, generateTimestamp
from HelperFullPlay import implFullPlayAction, generateMusicData
def implWipeTickets(userId: int, currentLoginTimestamp: int, currentLoginResult) -> str:
"""清空用户所有票的 API 请求器,返回 Json String。"""
# 先得到当前用户的 Charge 数据
currentUserCharge = implGetUser_("Charge", userId)
# 取得 List
currentUserChargeList = currentUserCharge["userChargeList"]
# 所有 stock 都置为 0
for charge in currentUserChargeList:
charge["stock"] = 0
musicData = generateMusicData()
userAllPatches = {
"upsertUserAll": {
"userChargeList": currentUserChargeList,
"userMusicDetailList": [musicData],
"isNewMusicDetailList": "1", # 1避免覆盖
}
}
result = implFullPlayAction(
userId, currentLoginTimestamp, currentLoginResult, musicData, userAllPatches
)
return result
def apiQueryTicket(userId: int) -> str:
"""查询已有票的 API 请求器,返回 Json String。"""
# 构建 Payload
data = json.dumps({"userId": userId})
# 发送请求
userdata_result = apiSDGB(data, "GetUserChargeApi", userId)
# 返回响应
return userdata_result
def apiBuyTicket(
userId: int, ticketType: int, price: int, playerRating: int, playCount: int
) -> str:
"""倍票购买 API 的请求器"""
nowTime = datetime.now(pytz.timezone("Asia/Shanghai"))
# 构造请求数据 Payload
data = json.dumps(
{
"userId": userId,
"userChargelog": {
"chargeId": ticketType,
"price": price,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"playCount": playCount,
"playerRating": playerRating,
"placeId": placeId,
"regionId": regionId,
"clientId": clientId,
},
"userCharge": {
"chargeId": ticketType,
"stock": 1,
"purchaseDate": nowTime.strftime("%Y-%m-%d %H:%M:%S.0"),
"validDate": (nowTime + timedelta(days=90))
.replace(hour=4, minute=0, second=0)
.strftime("%Y-%m-%d %H:%M:%S"),
},
}
)
# 发送请求,返回最终得到的 Json String 回执
return apiSDGB(data, "UpsertUserChargelogApi", userId)
def implBuyTicket(userId: int, ticketType: int):
"""
购买倍票 API 的参考实现。
需要事先登录.
返回服务器响应的 Json string。
"""
# 先使用 GetUserData API 请求器,取得 rating 和 pc 数
currentUserData = implGetUser_("Data", userId)
if currentUserData:
playerRating = currentUserData["userData"]["playerRating"]
playCount = currentUserData["userData"].get("playCount", 0)
else:
return False
# 正式买票
getTicketResponseStr = apiBuyTicket(
userId, ticketType, ticketType - 1, playerRating, playCount
)
# 返回结果
return getTicketResponseStr
if __name__ == "__main__":
userId = testUid2
currentLoginTimestamp = generateTimestamp()
loginResult = apiLogin(currentLoginTimestamp, userId)
if loginResult["returnCode"] != 1:
logger.info("登录失败")
exit()
try:
logger.info(implBuyTicket(userId, 2)) # 购买倍票
# logger.info(apiQueryTicket(userId))
finally:
logger.info(apiLogout(currentLoginTimestamp, userId))
# logger.warning("Error")