use std::{fs::OpenOptions, io::BufWriter}; use std::{ io::{self, BufRead}, path::Path, sync::atomic::Ordering, }; use futures_util::StreamExt; use nyquest_preset::nyquest::AsyncClient; use redb::ReadableTable; use redb::TableDefinition; use serde::Serialize; use spdlog::{error, info}; use sdgb_api::bincode; use sdgb_api::title::MaiVersionExt as _; use sdgb_api::title::{ Sdgb1_50, methods::{APIExt, HasUid}, }; use bincode::{BorrowDecode, Encode, borrow_decode_from_slice}; use crate::{EARLY_QUIT, cache}; pub fn read_cache( definition: TableDefinition<'_, u32, Vec>, ) -> Result, Box> where D: for<'d> BorrowDecode<'d, ()>, { let txn = cache::read_txn()?; let table = cache::open_table_ro(&txn, definition)?; let config = bincode::config::Configuration::::default().with_no_limit(); Ok(table .iter()? .flatten() .map(|d| borrow_decode_from_slice(&d.1.value(), config)) .flatten() .map(|(value, _)| value) .collect::>()) } pub fn dump_cache( output_path: impl AsRef, definition: TableDefinition<'_, u32, Vec>, ) -> Result<(), Box> where D: for<'d> BorrowDecode<'d, ()> + Serialize, { let file = OpenOptions::new() .create(true) .truncate(true) .write(true) .open(output_path)?; #[cfg(file_lock_ready)] file.try_lock()?; let data = read_cache::(definition)?; let writer = BufWriter::new(file); serde_json::to_writer(writer, &data)?; info!("dumped {} user id", data.len()); Ok(()) } pub async fn cached_concurrent_fetch( client: &AsyncClient, concurrency: usize, definition: TableDefinition<'_, u32, Vec>, ) -> Result<(), Box> where A::Payload: From, A::Response: Encode + for<'a> BorrowDecode<'a, ()> + HasUid, { let mut user_ids = Vec::new(); { let mut stdin = io::stdin().lock(); let mut buf = String::new(); while stdin.read_line(&mut buf).is_ok_and(|size| size != 0) { if buf.is_empty() { continue; } let user_id: u32 = buf.trim().parse()?; buf.clear(); user_ids.push(user_id); } } let _ = cache::init_db(); let read = cache::read_txn()?; let write = cache::write_txn()?; let config = sdgb_api::bincode::config::Configuration::< sdgb_api::bincode::config::LittleEndian, >::default() .with_no_limit(); info!("number of user_id: {}", user_ids.len()); let collect = futures_util::stream::iter(user_ids) .map(async |user_id| { { let cache_table = cache::open_table_ro(&read, definition)?; let data = cache_table.get(user_id)?; if let Some(data) = data { let decoded: (A::Response, _) = borrow_decode_from_slice(&data.value(), config)?; return Ok(decoded.0); } } if EARLY_QUIT.load(Ordering::Relaxed) { return Err("early skip due to ctrl-c")?; } let resp = Sdgb1_50::request_ext::(&client, ::Payload::from(user_id), user_id) .await; match &resp { Ok(resp) => { use sdgb_api::bincode::encode_to_vec; use crate::cache::PLAYERS; info!("found: {user_id}"); if let Ok(mut table) = cache::open_table(&write, PLAYERS) && let Ok(encoded) = encode_to_vec(resp, config) { _ = table.insert(resp.get_uid(), encoded); } } Err(sdgb_api::ApiError::JSON { .. }) => {} Err(e) => { error!("preview failed: {e}"); } } Result::<_, Box>::Ok(resp?) }) .buffer_unordered(concurrency) // slower to avoid being banned .filter_map(async |r| r.ok()) .collect::>() .await; drop(collect); let _ = write.commit(); Ok(()) }