refactor: more flexible cached scrape
This commit is contained in:
@@ -9,12 +9,12 @@ use redb::TableDefinition;
|
||||
use serde::Serialize;
|
||||
use spdlog::{error, info};
|
||||
|
||||
use sdgb_api::bincode;
|
||||
use sdgb_api::title::MaiVersionExt as _;
|
||||
use sdgb_api::title::MaiVersionExt;
|
||||
use sdgb_api::title::{
|
||||
Sdgb1_50,
|
||||
methods::{APIExt, HasUid},
|
||||
};
|
||||
use sdgb_api::{ApiError, bincode};
|
||||
|
||||
use bincode::{BorrowDecode, Encode, borrow_decode_from_slice};
|
||||
|
||||
@@ -88,6 +88,28 @@ pub async fn cached_concurrent_fetch<A: APIExt>(
|
||||
where
|
||||
A::Payload: From<u32>,
|
||||
A::Response: Encode + for<'a> BorrowDecode<'a, ()> + HasUid,
|
||||
{
|
||||
cached_concurrent_fetch_userfn(
|
||||
user_ids,
|
||||
client,
|
||||
concurrency,
|
||||
definition,
|
||||
async |client, user_id| {
|
||||
Sdgb1_50::request_ext::<A>(client, A::Payload::from(user_id), user_id).await
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn cached_concurrent_fetch_userfn<R>(
|
||||
user_ids: impl Into<Vec<u32>>,
|
||||
client: &AsyncClient,
|
||||
concurrency: usize,
|
||||
definition: TableDefinition<'_, u32, Vec<u8>>,
|
||||
scrape: impl AsyncFn(&AsyncClient, u32) -> Result<R, ApiError>,
|
||||
) -> Result<(), Box<dyn snafu::Error>>
|
||||
where
|
||||
R: Encode + for<'a> BorrowDecode<'a, ()> + HasUid,
|
||||
{
|
||||
let _ = cache::init_db();
|
||||
|
||||
@@ -95,9 +117,9 @@ where
|
||||
let read = cache::read_txn()?;
|
||||
let write = cache::write_txn()?;
|
||||
let config = sdgb_api::bincode::config::Configuration::<
|
||||
sdgb_api::bincode::config::LittleEndian,
|
||||
>::default()
|
||||
.with_no_limit();
|
||||
sdgb_api::bincode::config::LittleEndian,
|
||||
>::default()
|
||||
.with_no_limit();
|
||||
|
||||
info!("number of user_id: {}", user_ids.len());
|
||||
|
||||
@@ -107,8 +129,7 @@ where
|
||||
let cache_table = cache::open_table_ro(&read, definition)?;
|
||||
let data = cache_table.get(user_id)?;
|
||||
if let Some(data) = data {
|
||||
let decoded: (A::Response, _) =
|
||||
borrow_decode_from_slice(&data.value(), config)?;
|
||||
let decoded: (R, _) = borrow_decode_from_slice(&data.value(), config)?;
|
||||
|
||||
return Ok(decoded.0);
|
||||
}
|
||||
@@ -118,9 +139,7 @@ where
|
||||
return Err("early skip due to ctrl-c")?;
|
||||
}
|
||||
|
||||
let resp =
|
||||
Sdgb1_50::request_ext::<A>(&client, <A as APIExt>::Payload::from(user_id), user_id)
|
||||
.await;
|
||||
let resp = scrape(&client, user_id).await;
|
||||
|
||||
match &resp {
|
||||
Ok(resp) => {
|
||||
|
||||
Reference in New Issue
Block a user