Compare commits

...

4 Commits

Author SHA1 Message Date
Remik1r3n
486da8a4c7 Merge branch 'master' of github.com:Remik1r3n/maimaiDX-Api 2025-02-07 14:47:39 +08:00
Remik1r3n
49b575d036 解小黑屋概念测试 2025-02-07 14:47:33 +08:00
Remik1r3n
394a15b812 Evil 爬虫 XD 2025-02-07 14:46:15 +08:00
Remik1r3n
c5d408fff0 B50更新小改进 2025-02-07 14:07:14 +08:00
3 changed files with 109 additions and 42 deletions

View File

@ -86,8 +86,12 @@ def maimaiUserMusicDetailToDivingFishFormat(userMusicDetailList) -> list:
'dxScore': currentMusicDetail['deluxscoreMax'], 'dxScore': currentMusicDetail['deluxscoreMax'],
}) })
except: except:
print(currentMusicDetail) logger.error(f"Fish Format Translate Error: {currentMusicDetail}")
logger.error(f"Error: {currentMusicDetail}")
# debug output fish list to file
#with open("fishList.txt", "w", encoding="utf-8") as f:
# f.write(str(divingFishList))
return divingFishList return divingFishList
def isVaildFishToken(importToken:str): def isVaildFishToken(importToken:str):
@ -101,23 +105,20 @@ def isVaildFishToken(importToken:str):
return True return True
def implUserMusicToDivingFish(userId:int, fishImportToken:str): def implUserMusicToDivingFish(userId:int, fishImportToken:str):
'''上传所有成绩到水鱼的参考实现''' '''上传所有成绩到水鱼的参考实现返回成绩的数量或者False'''
logger.info("开始上传舞萌成绩到水鱼查分器!") logger.info("开始上传舞萌成绩到水鱼查分器!")
userFullMusicDetailList = getUserFullMusicDetail(userId) userFullMusicDetailList = getUserFullMusicDetail(userId)
logger.info("成功得到成绩!转换成水鱼格式..") logger.info("成功得到成绩!转换成水鱼格式..")
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList) divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
logger.info("转换成功!开始上传水鱼..") logger.info("转换成功!开始上传水鱼..")
return updateFishRecords(fishImportToken, divingFishData) if not updateFishRecords(fishImportToken, divingFishData)
logger.error("上传失败!")
return False
return len(divingFishData)
if __name__ == '__main__': if __name__ == '__main__':
if True: if True:
userId = None userId = testUid2
importToken = None importToken = testImportToken
#currentLoginTimestamp = generateTimestamp() #currentLoginTimestamp = generateTimestamp()
implUserMusicToDivingFish(userId, importToken)
userFullMusicDetailList = getUserFullMusicDetail(userId)
logger.warning("Now We Begin To Build DivingFish Data")
divingFishData = maimaiUserMusicDetailToDivingFishFormat(userFullMusicDetailList)
logger.debug(divingFishData)
logger.warning("Now We Begin To Update DivingFish Data")
updateFishRecords(importToken, divingFishData)

View File

@ -8,12 +8,34 @@ from HelperLogInOut import apiLogout
import json import json
from loguru import logger from loguru import logger
import time import time
import datetime from datetime import datetime
def isUserLoggedIn(userId): def isUserLoggedIn(userId):
return True
return json.loads(apiGetUserPreview(userId))['isLogin'] return json.loads(apiGetUserPreview(userId))['isLogin']
def getUNIXTime(mmddhhmmss, year=2025):
# 解析 MMDDHHMMSS 格式的时间,返回 Unix 时间戳
try:
date_time_str = f"{year}{mmddhhmmss}"
date_time_obj = datetime.strptime(date_time_str, '%Y%m%d%H%M%S')
# 将 datetime 对象转换为 Unix 时间戳
unix_timestamp = int(time.mktime(date_time_obj.timetuple()))
logger.info(f"转换出了时间戳: {unix_timestamp}")
return unix_timestamp
except ValueError as e:
print(f"时间格式错误: {e}")
return None
def logOut(userId, Timestamp): def logOut(userId, Timestamp):
DEBUGMODE = True
if DEBUGMODE:
# 用于调试的时间戳
debugRealTimestamp = 1738663945
logger.info(f"调试模式: 传入的时间戳和调试时间戳之差: {Timestamp - debugRealTimestamp}")
time.sleep(0.2)
return True
try: try:
result = apiLogout(Timestamp, userId) result = apiLogout(Timestamp, userId)
loadedResult = json.loads(result) loadedResult = json.loads(result)
@ -22,32 +44,43 @@ def logOut(userId, Timestamp):
except: except:
return False return False
def forceLogout(userId, beginTimestamp): def isCorrectTimestamp(timestamp, userId):
# 将人类可读的时间转换为 Unix 时间戳 '''暴力的检查时间戳是否正确'''
original_timestamp = beginTimestamp logoutResult = logOut(userId, timestamp)
if not logoutResult:
# 定义时间范围(前后 20 分钟) return False
time_range = 20 * 60 # 20 分钟,以秒为单位 isLoggedOut = not isUserLoggedIn(userId)
return isLoggedOut
# 尝试从原始时间戳开始,逐步向两边扩展
for offset in range(0, time_range + 1):
# 尝试往前 offset 秒
timestamp = original_timestamp - offset
logger.info(f"尝试时间戳: {timestamp}")
if logout_user(timestamp, userid):
if not is_user_logged_in(userid):
print(f"用户 {userid} 已成功登出,使用的时间戳: {timestamp}")
return
# 尝试往后 offset 秒
timestamp = original_timestamp + offset
if logout_user(timestamp, userid):
if not is_user_logged_in(userid):
print(f"用户 {userid} 已成功登出,使用的时间戳: {timestamp}")
return
print(f"无法在前后 20 分钟内登出用户 {userid}")
# 示例使用 def findCorrectTimestamp(timestamp, userId, max_attempts=1200):
userId = testUid # 初始化偏移量
human_time = "2023-10-01 12:00:00" # 用户输入的人类可读时间 offset = 1
forceLogout(userId, human_time) attempts = 0
while attempts < max_attempts:
# 尝试当前时间戳
if isCorrectTimestamp(timestamp, userId):
print(f"Found correct timestamp: {timestamp}")
return timestamp
# 增加偏移量尝试
if isCorrectTimestamp(timestamp + offset, userId):
print(f"Found correct timestamp: {timestamp + offset}")
return timestamp + offset
# 减少偏移量尝试
if isCorrectTimestamp(timestamp - offset, userId):
print(f"Found correct timestamp: {timestamp - offset}")
return timestamp - offset
# 增加尝试次数和偏移量
attempts += 2
offset += 1
print("尝试多次后未找到正确时间戳")
return None
human_time = "0204061500"
beginTimestamp = getUNIXTime(human_time)
print(f"我们将开始用这个时间戳开始尝试: {beginTimestamp}")
correctTimestamp = findCorrectTimestamp(beginTimestamp, testUid)

View File

@ -3,6 +3,8 @@
import json import json
from API_TitleServer import apiSDGB from API_TitleServer import apiSDGB
from Config import * from Config import *
import time
import random
def apiGetUserPreview(userId) -> str: def apiGetUserPreview(userId) -> str:
data = json.dumps({ data = json.dumps({
@ -16,3 +18,34 @@ if __name__ == "__main__":
#userId = input("请输入用户 ID") #userId = input("请输入用户 ID")
userId = testUid userId = testUid
print(apiGetUserPreview(userId)) print(apiGetUserPreview(userId))
def crawlAllUserPreview():
# 这里设置开始和结束的 UserId
BeginUserId = 11000000
EndUserId = 12599999
# 打开文件,准备写入
with open('Remi_UserID_DB_Output.txt', 'w', encoding="utf-8") as f:
# 遍历 UserId
for userId in range(BeginUserId, EndUserId + 1):
# 调用 API
try:
userPreview = apiGetUserPreview(userId)
currentUser = json.loads(userPreview)
if currentUser["userId"] is not None:
# 每爬到一个就把它存到一个文件里面,每个一行
f.write(userPreview + "\n")
else:
f.write("\n")
except:
f.write("ERROR\n")
time.sleep(4)
f.flush()
# 随机等待0.2-0.5秒
time.sleep(random.uniform(0.2, 0.5))
print('Finished!')
if __name__ == "__main__":
crawlAllUserPreview()