use std::sync::Arc; use std::{fs::OpenOptions, io::BufWriter}; use std::{path::Path, sync::atomic::Ordering}; use futures_util::StreamExt; use nyquest_preset::nyquest::AsyncClient; use parquet::basic::BrotliLevel; use parquet::file::properties::WriterProperties; use parquet::file::writer::SerializedFileWriter; use parquet::record::RecordWriter; use redb::ReadableTable; use redb::TableDefinition; use serde::Serialize; use spdlog::{error, info}; use sdgb_api::title::MaiVersionExt; use sdgb_api::title::{Sdgb1_50, methods::APIExt}; use sdgb_api::{ApiError, bincode}; use bincode::{BorrowDecode, Encode, borrow_decode_from_slice}; use crate::{EARLY_QUIT, cache}; #[allow(unused)] pub fn read_cache_keys( definition: TableDefinition<'_, u32, Vec>, ) -> Result, Box> { let txn = cache::read_txn()?; let table = cache::open_table_ro(&txn, definition)?; Ok(table .iter()? .flatten() .map(|(value, _)| value.value()) .collect::>()) } pub fn read_cache( definition: TableDefinition<'_, u32, Vec>, ) -> Result, Box> where D: for<'d> BorrowDecode<'d, ()>, { let txn = cache::read_txn()?; let table = cache::open_table_ro(&txn, definition)?; let config = bincode::config::Configuration::::default().with_no_limit(); Ok(table .iter()? .flatten() .map(|d| borrow_decode_from_slice(&d.1.value(), config)) .flatten() .map(|(value, _)| value) .collect::>()) } pub fn dump_parquet( data: impl Into>, output_path: impl AsRef, ) -> Result<(), Box> where for<'a> &'a [D]: RecordWriter, { let data = data.into(); let file = OpenOptions::new() .create(true) .truncate(true) .write(true) .open(output_path)?; #[cfg(file_lock_ready)] file.try_lock()?; let writer = BufWriter::new(file); let schema = data.as_slice().schema()?; let props = Arc::new( WriterProperties::builder() .set_compression(parquet::basic::Compression::BROTLI(BrotliLevel::try_new( 6, )?)) .build(), ); let mut writer = SerializedFileWriter::new(writer, schema, props).unwrap(); let mut row_group = writer.next_row_group().unwrap(); data.as_slice().write_to_row_group(&mut row_group)?; row_group.close()?; writer.close().unwrap(); info!("dumped {} records", data.len()); Ok(()) } pub fn dump_json( output_path: impl AsRef, definition: TableDefinition<'_, u32, Vec>, ) -> Result<(), Box> where D: for<'d> BorrowDecode<'d, ()> + Serialize, { let file = OpenOptions::new() .create(true) .truncate(true) .write(true) .open(output_path)?; #[cfg(file_lock_ready)] file.try_lock()?; let data = read_cache::(definition)?; let writer = BufWriter::new(file); serde_json::to_writer_pretty(writer, &data)?; info!("dumped {} records", data.len()); Ok(()) } pub async fn cached_concurrent_fetch( user_ids: impl Into>, client: &AsyncClient, concurrency: usize, definition: TableDefinition<'_, u32, Vec>, ) -> Result<(), Box> where A::Payload: From, A::Response: Encode + for<'a> BorrowDecode<'a, ()>, { cached_concurrent_fetch_userfn( user_ids, client, concurrency, definition, async |client, user_id| { Sdgb1_50::request_ext::(client, A::Payload::from(user_id), user_id).await }, ) .await } pub async fn cached_concurrent_fetch_userfn( user_ids: impl Into>, client: &AsyncClient, concurrency: usize, definition: TableDefinition<'_, u32, Vec>, scrape: impl AsyncFn(&AsyncClient, u32) -> Result, ) -> Result<(), Box> where R: Encode + for<'a> BorrowDecode<'a, ()>, { let _ = cache::init_db(); let user_ids = user_ids.into(); let read = cache::read_txn()?; let write = cache::write_txn()?; let config = sdgb_api::bincode::config::Configuration::< sdgb_api::bincode::config::LittleEndian, >::default() .with_no_limit(); info!("number of user_id: {}", user_ids.len()); let collect = futures_util::stream::iter(user_ids) .map(async |user_id| { { let cache_table = cache::open_table_ro(&read, definition)?; let data = cache_table.get(user_id)?; if data.is_some() { return Ok(()); } } if EARLY_QUIT.load(Ordering::Relaxed) { return Err("early skip due to ctrl-c")?; } let resp = scrape(&client, user_id).await; match &resp { Ok(resp) => { use sdgb_api::bincode::encode_to_vec; if let Ok(mut table) = cache::open_table(&write, definition) && let Ok(encoded) = encode_to_vec(resp, config) { info!("encode length for {user_id}: {}", encoded.len()); _ = table.insert(user_id, encoded); } } Err(sdgb_api::ApiError::JSON { .. }) => {} Err(e) => { error!("fetch failed: {e}"); } } Result::<_, Box>::Ok(()) }) .buffer_unordered(concurrency) // slower to avoid being banned .collect::>() .await; drop(collect); let _ = write.commit(); Ok(()) }