mirror of
https://github.com/Remik1r3n/maimaiDX-Api.git
synced 2025-07-09 06:44:46 +08:00
Compare commits
No commits in common. "486da8a4c707c80cd2f887a5ff4998f212789cd4" and "a16525c52e3d04d1e4d1e0d892a89f8ffd4b1e20" have entirely different histories.
486da8a4c7
...
a16525c52e
@ -86,12 +86,8 @@ def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list:
|
|||||||
'dxScore': currentMusicDetail['deluxscoreMax'],
|
'dxScore': currentMusicDetail['deluxscoreMax'],
|
||||||
})
|
})
|
||||||
except:
|
except:
|
||||||
logger.error(f"Fish Format Translate Error: {currentMusicDetail}")
|
print(currentMusicDetail)
|
||||||
|
logger.error(f"Error: {currentMusicDetail}")
|
||||||
# debug output fish list to file
|
|
||||||
#with open("fishList.txt", "w", encoding="utf-8") as f:
|
|
||||||
# f.write(str(divingFishList))
|
|
||||||
|
|
||||||
return divingFishList
|
return divingFishList
|
||||||
|
|
||||||
def isVaildFishToken(importToken:str):
|
def isVaildFishToken(importToken:str):
|
||||||
@ -105,20 +101,23 @@ def isVaildFishToken(importToken:str):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def implUserMusicToDivingFish(userId:int, fishImportToken:str):
|
def implUserMusicToDivingFish(userId:int, fishImportToken:str):
|
||||||
'''上传所有成绩到水鱼的参考实现,返回成绩的数量或者False'''
|
'''上传所有成绩到水鱼的参考实现'''
|
||||||
logger.info("开始上传舞萌成绩到水鱼查分器!")
|
logger.info("开始上传舞萌成绩到水鱼查分器!")
|
||||||
userFullMusicDetailList = getUserFullMusicDetail(userId)
|
userFullMusicDetailList = getUserFullMusicDetail(userId)
|
||||||
logger.info("成功得到成绩!转换成水鱼格式..")
|
logger.info("成功得到成绩!转换成水鱼格式..")
|
||||||
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
|
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
|
||||||
logger.info("转换成功!开始上传水鱼..")
|
logger.info("转换成功!开始上传水鱼..")
|
||||||
if not updateFishRecords(fishImportToken, divingFishData)
|
return updateFishRecords(fishImportToken, divingFishData)
|
||||||
logger.error("上传失败!")
|
|
||||||
return False
|
|
||||||
return len(divingFishData)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
if True:
|
if True:
|
||||||
userId = testUid2
|
userId = None
|
||||||
importToken = testImportToken
|
importToken = None
|
||||||
#currentLoginTimestamp = generateTimestamp()
|
#currentLoginTimestamp = generateTimestamp()
|
||||||
implUserMusicToDivingFish(userId, importToken)
|
|
||||||
|
userFullMusicDetailList = getUserFullMusicDetail(userId)
|
||||||
|
logger.warning("Now We Begin To Build DivingFish Data")
|
||||||
|
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
|
||||||
|
logger.debug(divingFishData)
|
||||||
|
logger.warning("Now We Begin To Update DivingFish Data")
|
||||||
|
updateFishRecords(importToken, divingFishData)
|
||||||
|
@ -8,34 +8,12 @@ from HelperLogInOut import apiLogout
|
|||||||
import json
|
import json
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
import datetime
|
||||||
|
|
||||||
|
|
||||||
def isUserLoggedIn(userId):
|
def isUserLoggedIn(userId):
|
||||||
return True
|
|
||||||
return json.loads(apiGetUserPreview(userId))['isLogin']
|
return json.loads(apiGetUserPreview(userId))['isLogin']
|
||||||
|
|
||||||
def getUNIXTime(mmddhhmmss, year=2025):
|
|
||||||
# 解析 MMDDHHMMSS 格式的时间,返回 Unix 时间戳
|
|
||||||
try:
|
|
||||||
date_time_str = f"{year}{mmddhhmmss}"
|
|
||||||
date_time_obj = datetime.strptime(date_time_str, '%Y%m%d%H%M%S')
|
|
||||||
# 将 datetime 对象转换为 Unix 时间戳
|
|
||||||
unix_timestamp = int(time.mktime(date_time_obj.timetuple()))
|
|
||||||
logger.info(f"转换出了时间戳: {unix_timestamp}")
|
|
||||||
return unix_timestamp
|
|
||||||
except ValueError as e:
|
|
||||||
print(f"时间格式错误: {e}")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def logOut(userId, Timestamp):
|
def logOut(userId, Timestamp):
|
||||||
DEBUGMODE = True
|
|
||||||
if DEBUGMODE:
|
|
||||||
# 用于调试的时间戳
|
|
||||||
debugRealTimestamp = 1738663945
|
|
||||||
logger.info(f"调试模式: 传入的时间戳和调试时间戳之差: {Timestamp - debugRealTimestamp}")
|
|
||||||
time.sleep(0.2)
|
|
||||||
return True
|
|
||||||
try:
|
try:
|
||||||
result = apiLogout(Timestamp, userId)
|
result = apiLogout(Timestamp, userId)
|
||||||
loadedResult = json.loads(result)
|
loadedResult = json.loads(result)
|
||||||
@ -44,43 +22,32 @@ def logOut(userId, Timestamp):
|
|||||||
except:
|
except:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def isCorrectTimestamp(timestamp, userId):
|
def forceLogout(userId, beginTimestamp):
|
||||||
'''暴力的检查时间戳是否正确'''
|
# 将人类可读的时间转换为 Unix 时间戳
|
||||||
logoutResult = logOut(userId, timestamp)
|
original_timestamp = beginTimestamp
|
||||||
if not logoutResult:
|
|
||||||
return False
|
|
||||||
isLoggedOut = not isUserLoggedIn(userId)
|
|
||||||
return isLoggedOut
|
|
||||||
|
|
||||||
def findCorrectTimestamp(timestamp, userId, max_attempts=1200):
|
# 定义时间范围(前后 20 分钟)
|
||||||
# 初始化偏移量
|
time_range = 20 * 60 # 20 分钟,以秒为单位
|
||||||
offset = 1
|
|
||||||
attempts = 0
|
|
||||||
|
|
||||||
while attempts < max_attempts:
|
# 尝试从原始时间戳开始,逐步向两边扩展
|
||||||
# 尝试当前时间戳
|
for offset in range(0, time_range + 1):
|
||||||
if isCorrectTimestamp(timestamp, userId):
|
# 尝试往前 offset 秒
|
||||||
print(f"Found correct timestamp: {timestamp}")
|
timestamp = original_timestamp - offset
|
||||||
return timestamp
|
logger.info(f"尝试时间戳: {timestamp}")
|
||||||
|
if logout_user(timestamp, userid):
|
||||||
|
if not is_user_logged_in(userid):
|
||||||
|
print(f"用户 {userid} 已成功登出,使用的时间戳: {timestamp}")
|
||||||
|
return
|
||||||
|
# 尝试往后 offset 秒
|
||||||
|
timestamp = original_timestamp + offset
|
||||||
|
if logout_user(timestamp, userid):
|
||||||
|
if not is_user_logged_in(userid):
|
||||||
|
print(f"用户 {userid} 已成功登出,使用的时间戳: {timestamp}")
|
||||||
|
return
|
||||||
|
|
||||||
# 增加偏移量尝试
|
print(f"无法在前后 20 分钟内登出用户 {userid}")
|
||||||
if isCorrectTimestamp(timestamp + offset, userId):
|
|
||||||
print(f"Found correct timestamp: {timestamp + offset}")
|
|
||||||
return timestamp + offset
|
|
||||||
|
|
||||||
# 减少偏移量尝试
|
# 示例使用
|
||||||
if isCorrectTimestamp(timestamp - offset, userId):
|
userId = testUid
|
||||||
print(f"Found correct timestamp: {timestamp - offset}")
|
human_time = "2023-10-01 12:00:00" # 用户输入的人类可读时间
|
||||||
return timestamp - offset
|
forceLogout(userId, human_time)
|
||||||
|
|
||||||
# 增加尝试次数和偏移量
|
|
||||||
attempts += 2
|
|
||||||
offset += 1
|
|
||||||
|
|
||||||
print("尝试多次后未找到正确时间戳")
|
|
||||||
return None
|
|
||||||
|
|
||||||
human_time = "0204061500"
|
|
||||||
beginTimestamp = getUNIXTime(human_time)
|
|
||||||
print(f"我们将开始用这个时间戳开始尝试: {beginTimestamp}")
|
|
||||||
correctTimestamp = findCorrectTimestamp(beginTimestamp, testUid)
|
|
||||||
|
@ -3,8 +3,6 @@
|
|||||||
import json
|
import json
|
||||||
from API_TitleServer import apiSDGB
|
from API_TitleServer import apiSDGB
|
||||||
from Config import *
|
from Config import *
|
||||||
import time
|
|
||||||
import random
|
|
||||||
|
|
||||||
def apiGetUserPreview(userId) -> str:
|
def apiGetUserPreview(userId) -> str:
|
||||||
data = json.dumps({
|
data = json.dumps({
|
||||||
@ -18,34 +16,3 @@ if __name__ == "__main__":
|
|||||||
#userId = input("请输入用户 ID:")
|
#userId = input("请输入用户 ID:")
|
||||||
userId = testUid
|
userId = testUid
|
||||||
print(apiGetUserPreview(userId))
|
print(apiGetUserPreview(userId))
|
||||||
|
|
||||||
|
|
||||||
def crawlAllUserPreview():
|
|
||||||
# 这里设置开始和结束的 UserId
|
|
||||||
BeginUserId = 11000000
|
|
||||||
EndUserId = 12599999
|
|
||||||
|
|
||||||
# 打开文件,准备写入
|
|
||||||
with open('Remi_UserID_DB_Output.txt', 'w', encoding="utf-8") as f:
|
|
||||||
# 遍历 UserId
|
|
||||||
for userId in range(BeginUserId, EndUserId + 1):
|
|
||||||
# 调用 API
|
|
||||||
try:
|
|
||||||
userPreview = apiGetUserPreview(userId)
|
|
||||||
currentUser = json.loads(userPreview)
|
|
||||||
if currentUser["userId"] is not None:
|
|
||||||
# 每爬到一个就把它存到一个文件里面,每个一行
|
|
||||||
f.write(userPreview + "\n")
|
|
||||||
else:
|
|
||||||
f.write("\n")
|
|
||||||
except:
|
|
||||||
f.write("ERROR\n")
|
|
||||||
time.sleep(4)
|
|
||||||
f.flush()
|
|
||||||
# 随机等待0.2-0.5秒
|
|
||||||
time.sleep(random.uniform(0.2, 0.5))
|
|
||||||
|
|
||||||
print('Finished!')
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
crawlAllUserPreview()
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user