Files
eaquira/sdgb/UpsertMusic.py
91c0e59d-6161-45ab-8aa4-2371574db28f 8da425391a update to 1.53
2026-01-22 23:35:46 +08:00

68 lines
2.1 KiB
Python

import json
import asyncio
import httpx
import time
import logging
from sdgb import MaimaiClient
from settings import userId, musicData
from chime import *
from payload import *
maimai = MaimaiClient()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def run_workflow(self):
async with httpx.AsyncClient(verify=False) as client:
# Preview 探测
PreviewResponse = json.loads(await self.call_api(client, "GetUserPreviewApi", requestData_UserPreview, userId))
if PreviewResponse["isLogin"] == True:
logger.error("已在他处登录。")
return
# UserLogin
LoginResponse = json.loads(await self.call_api(client, "UserLoginApi", requestData_UserLogin, userId))
if LoginResponse["returnCode"] != 1:
logger.error("login failed.")
return
loginId = LoginResponse['loginId']
loginDate = LoginResponse['lastLoginDate']
# UserData 等
tasks = [
self.call_api(client, "GetUserDataApi", requestData_UserData, userId),
self.call_api(client, "GetUserExtendApi", requestData_UserData, userId),
self.call_api(client, "GetUserOptionApi", requestData_UserData, userId),
self.call_api(client, "GetUserRatingApi", requestData_UserData, userId),
self.call_api(client, "GetUserChargeApi", requestData_UserData, userId),
self.call_api(client, "GetUserActivityApi", requestData_UserData, userId),
self.call_api(client, "GetUserMissionDataApi", requestData_UserData, userId),
]
GeneralUserInfo = await asyncio.gather(*tasks)
await asyncio.sleep(60) # 模拟游戏时间
# UserAll
requestData_UserAll = UserAll_payload(loginId, loginDate, musicData, GeneralUserInfo)
await self.call_api(client, "UpsertUserAllApi", requestData_UserAll, userId)
# UserLogout
await self.call_api(client, "UserLogoutApi", requestData_UserLogout, userId)
if __name__ == "__main__":
asyncio.run(run_workflow(maimai))