refactor: implement fetchall with generic type
This commit is contained in:
150
sdgb-cli/src/utils/helpers/mod.rs
Normal file
150
sdgb-cli/src/utils/helpers/mod.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
use std::{fs::OpenOptions, io::BufWriter};
|
||||
use std::{
|
||||
io::{self, BufRead},
|
||||
path::Path,
|
||||
sync::atomic::Ordering,
|
||||
};
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use nyquest_preset::nyquest::AsyncClient;
|
||||
|
||||
use redb::ReadableTable;
|
||||
use redb::TableDefinition;
|
||||
use serde::Serialize;
|
||||
use spdlog::{error, info};
|
||||
|
||||
use sdgb_api::bincode;
|
||||
use sdgb_api::title::MaiVersionExt as _;
|
||||
use sdgb_api::title::{
|
||||
Sdgb1_50,
|
||||
methods::{APIExt, HasUid},
|
||||
};
|
||||
|
||||
use bincode::{BorrowDecode, Encode, borrow_decode_from_slice};
|
||||
|
||||
use crate::{EARLY_QUIT, cache};
|
||||
|
||||
pub fn dump_cache<D>(
|
||||
output_path: impl AsRef<Path>,
|
||||
definition: TableDefinition<'_, u32, Vec<u8>>,
|
||||
) -> Result<(), Box<dyn snafu::Error>>
|
||||
where
|
||||
D: for<'d> BorrowDecode<'d, ()> + Serialize,
|
||||
{
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(output_path)?;
|
||||
|
||||
#[cfg(file_lock_ready)]
|
||||
file.try_lock()?;
|
||||
|
||||
let txn = cache::read_txn()?;
|
||||
let table = cache::open_table_ro(&txn, definition)?;
|
||||
|
||||
let config =
|
||||
bincode::config::Configuration::<bincode::config::LittleEndian>::default().with_no_limit();
|
||||
|
||||
let user_ids = table
|
||||
.iter()?
|
||||
.flatten()
|
||||
.map(|d| borrow_decode_from_slice(&d.1.value(), config))
|
||||
.flatten()
|
||||
.map(|(value, _)| value)
|
||||
.collect::<Vec<D>>();
|
||||
|
||||
let writer = BufWriter::new(file);
|
||||
serde_json::to_writer(writer, &user_ids)?;
|
||||
info!("dumped {} user id", user_ids.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cached_concurrent_fetch<A: APIExt>(
|
||||
client: &AsyncClient,
|
||||
concurrency: usize,
|
||||
definition: TableDefinition<'_, u32, Vec<u8>>,
|
||||
) -> Result<(), Box<dyn snafu::Error>>
|
||||
where
|
||||
A::Payload: From<u32>,
|
||||
A::Response: Encode + for<'a> BorrowDecode<'a, ()> + HasUid,
|
||||
{
|
||||
let mut user_ids = Vec::new();
|
||||
{
|
||||
let mut stdin = io::stdin().lock();
|
||||
let mut buf = String::new();
|
||||
|
||||
while stdin.read_line(&mut buf).is_ok_and(|size| size != 0) {
|
||||
if buf.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let user_id: u32 = buf.trim().parse()?;
|
||||
buf.clear();
|
||||
user_ids.push(user_id);
|
||||
}
|
||||
}
|
||||
|
||||
let _ = cache::init_db();
|
||||
let read = cache::read_txn()?;
|
||||
let write = cache::write_txn()?;
|
||||
let config = sdgb_api::bincode::config::Configuration::<
|
||||
sdgb_api::bincode::config::LittleEndian,
|
||||
>::default()
|
||||
.with_no_limit();
|
||||
|
||||
info!("number of user_id: {}", user_ids.len());
|
||||
|
||||
let collect = futures_util::stream::iter(user_ids)
|
||||
.map(async |user_id| {
|
||||
{
|
||||
let cache_table = cache::open_table_ro(&read, definition)?;
|
||||
let data = cache_table.get(user_id)?;
|
||||
if let Some(data) = data {
|
||||
let decoded: (A::Response, _) =
|
||||
borrow_decode_from_slice(&data.value(), config)?;
|
||||
|
||||
return Ok(decoded.0);
|
||||
}
|
||||
}
|
||||
|
||||
if EARLY_QUIT.load(Ordering::Relaxed) {
|
||||
return Err("early skip due to ctrl-c")?;
|
||||
}
|
||||
|
||||
let resp =
|
||||
Sdgb1_50::request_ext::<A>(&client, <A as APIExt>::Payload::from(user_id), user_id)
|
||||
.await;
|
||||
|
||||
match &resp {
|
||||
Ok(resp) => {
|
||||
use sdgb_api::bincode::encode_to_vec;
|
||||
|
||||
use crate::cache::PLAYERS;
|
||||
|
||||
info!("found: {user_id}");
|
||||
|
||||
if let Ok(mut table) = cache::open_table(&write, PLAYERS)
|
||||
&& let Ok(encoded) = encode_to_vec(resp, config)
|
||||
{
|
||||
_ = table.insert(resp.get_uid(), encoded);
|
||||
}
|
||||
}
|
||||
Err(sdgb_api::ApiError::JSON { .. }) => {}
|
||||
Err(e) => {
|
||||
error!("preview failed: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Result::<_, Box<dyn snafu::Error>>::Ok(resp?)
|
||||
})
|
||||
.buffer_unordered(concurrency) // slower to avoid being banned
|
||||
.filter_map(async |r| r.ok())
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
drop(collect);
|
||||
|
||||
let _ = write.commit();
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user