Files
sdgb-utils-rs/utils/flat_user_detail.py
2025-08-04 13:44:51 +08:00

93 lines
2.7 KiB
Python

from sys import stderr, stdin
from decimal import Decimal
from functools import reduce
from typing import Literal
import polars as pl
import orjson as json
import pyecharts.options as opts
from pyecharts.charts import Scatter
from loguru import logger
from helpers import query_music_db, find_level, dx_rating
def calculate_dxrating(music: dict):
music_id = music["musicId"]
level_id = music["level"]
ach = music["achievement"]
music_info = query_music_db(music_id)
if not music_info:
logger.error(f"music: {music_id} not found")
return music | {"dxRating": 0}
level = find_level(music_info, level_id)
try:
return music | {"dxRating": dx_rating(Decimal(level.pop()["difficulty"]), ach)}
except IndexError as _:
logger.warning(f"unknown level: {music_id} - {level_id}", file=stderr)
return music | {"dxRating": 0}
data = json.loads(stdin.buffer.read())
user_id = data["userId"]
music_list: list[dict[str, dict]] = data["userMusicList"]
musics = reduce(
lambda a, b: a + b, (music["userMusicDetailList"] for music in music_list)
)
musics = list(map(calculate_dxrating, musics))
df = (
pl.LazyFrame(musics)
.filter(pl.col("dxRating") > 0) # filter out invalid play
.select(["playCount", "dxRating"])
.sort("dxRating", descending=False)
.with_columns(pl.col("playCount").cum_sum())
.collect()
)
x_data = df["playCount"].to_list()
y_data = df["dxRating"].to_list()
def init_chart(
x_type: Literal["value", "log"], x_min: int = 1, x_max: int = 5000
) -> Scatter:
return (
Scatter(
init_opts=opts.InitOpts(
width="1600px", # 设置图表宽度
height="1000px", # 设置图表高度
)
)
.set_series_opts()
.set_global_opts(
xaxis_opts=opts.AxisOpts(
type_=x_type,
min_=x_min,
max_=x_max,
),
yaxis_opts=opts.AxisOpts(
type_="value",
axistick_opts=opts.AxisTickOpts(is_show=True),
splitline_opts=opts.SplitLineOpts(is_show=True),
max_=330,
),
tooltip_opts=opts.TooltipOpts(is_show=False),
visualmap_opts=opts.VisualMapOpts(max_=16400),
)
.add_xaxis(xaxis_data=x_data)
.add_yaxis(
series_name="",
y_axis=y_data,
symbol_size=5,
label_opts=opts.LabelOpts(is_show=False),
)
)
x_max = (y_data[-1] // 50 * 50) + 50
init_chart("value", 1, x_max).render(f"{user_id}-pc-rating-linear.html")
init_chart("log", 1, x_max).render(f"{user_id}-pc-rating-log.html")