perf: parquet based data export
This commit is contained in:
@@ -42,5 +42,7 @@ ctrlc = { version = "3.4.7", features = ["termination"] }
|
||||
# magic macro
|
||||
crabtime = { workspace = true }
|
||||
|
||||
parquet = { workspace = true }
|
||||
|
||||
[build-dependencies]
|
||||
version_check = "0.9.5"
|
||||
|
||||
@@ -311,23 +311,35 @@ async fn main() -> Result<(), Box<dyn snafu::Error>> {
|
||||
|
||||
#[cfg(feature = "fetchall")]
|
||||
Commands::ListAllUserDump {} => {
|
||||
use crate::{cache::PLAYERS, utils::helpers::dump_cache};
|
||||
use crate::{
|
||||
cache::PLAYERS,
|
||||
utils::helpers::{dump_parquet, read_cache},
|
||||
};
|
||||
|
||||
dump_cache::<GetUserPreviewApiResp>("players.json", PLAYERS)?;
|
||||
}
|
||||
#[cfg(feature = "fetchall")]
|
||||
Commands::ScrapeAllB50Dump {} => {
|
||||
use crate::{cache::B50, utils::helpers::dump_cache};
|
||||
|
||||
dump_cache::<GetUserRatingApiResp>("b50.json", B50)?;
|
||||
let players: Vec<GetUserPreviewApiResp> = read_cache(PLAYERS)?;
|
||||
dump_parquet(players, "players.parquet")?;
|
||||
}
|
||||
#[cfg(feature = "fetchall")]
|
||||
Commands::ScrapeAllRegionDump {} => {
|
||||
use sdgb_api::title::model::GetUserRegionApiResp;
|
||||
use crate::{
|
||||
cache::REGIONS,
|
||||
utils::helpers::{dump_parquet, read_cache},
|
||||
};
|
||||
use sdgb_api::title::model::{GetUserRegionApiResp, UserRegionFlatten};
|
||||
|
||||
use crate::{cache::REGIONS, utils::helpers::dump_cache};
|
||||
let regions: Vec<GetUserRegionApiResp> = read_cache(REGIONS)?;
|
||||
let regions_flat = regions
|
||||
.into_iter()
|
||||
.map(Vec::<UserRegionFlatten>::from)
|
||||
.flatten()
|
||||
.collect::<Vec<_>>();
|
||||
dump_parquet(regions_flat, "regions.parquet")?;
|
||||
}
|
||||
#[cfg(feature = "fetchall")]
|
||||
Commands::ScrapeAllB50Dump {} => {
|
||||
use crate::{cache::B50, utils::helpers::dump_json};
|
||||
|
||||
dump_cache::<GetUserRegionApiResp>("region.json", REGIONS)?;
|
||||
dump_json::<GetUserRatingApiResp>("b50.json", B50)?;
|
||||
}
|
||||
|
||||
Commands::Userdata { user_id } => {
|
||||
|
||||
@@ -1,9 +1,14 @@
|
||||
use std::sync::Arc;
|
||||
use std::{fs::OpenOptions, io::BufWriter};
|
||||
use std::{path::Path, sync::atomic::Ordering};
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use nyquest_preset::nyquest::AsyncClient;
|
||||
|
||||
use parquet::basic::BrotliLevel;
|
||||
use parquet::file::properties::WriterProperties;
|
||||
use parquet::file::writer::SerializedFileWriter;
|
||||
use parquet::record::RecordWriter;
|
||||
use redb::ReadableTable;
|
||||
use redb::TableDefinition;
|
||||
use serde::Serialize;
|
||||
@@ -52,7 +57,46 @@ where
|
||||
.collect::<Vec<D>>())
|
||||
}
|
||||
|
||||
pub fn dump_cache<D>(
|
||||
pub fn dump_parquet<D>(
|
||||
data: impl Into<Vec<D>>,
|
||||
output_path: impl AsRef<Path>,
|
||||
) -> Result<(), Box<dyn snafu::Error>>
|
||||
where
|
||||
for<'a> &'a [D]: RecordWriter<D>,
|
||||
{
|
||||
let data = data.into();
|
||||
let file = OpenOptions::new()
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.write(true)
|
||||
.open(output_path)?;
|
||||
|
||||
#[cfg(file_lock_ready)]
|
||||
file.try_lock()?;
|
||||
|
||||
let writer = BufWriter::new(file);
|
||||
let schema = data.as_slice().schema()?;
|
||||
let props = Arc::new(
|
||||
WriterProperties::builder()
|
||||
.set_compression(parquet::basic::Compression::BROTLI(BrotliLevel::try_new(
|
||||
6,
|
||||
)?))
|
||||
.build(),
|
||||
);
|
||||
|
||||
let mut writer = SerializedFileWriter::new(writer, schema, props).unwrap();
|
||||
let mut row_group = writer.next_row_group().unwrap();
|
||||
|
||||
data.as_slice().write_to_row_group(&mut row_group)?;
|
||||
row_group.close()?;
|
||||
|
||||
writer.close().unwrap();
|
||||
info!("dumped {} user id", data.len());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn dump_json<D>(
|
||||
output_path: impl AsRef<Path>,
|
||||
definition: TableDefinition<'_, u32, Vec<u8>>,
|
||||
) -> Result<(), Box<dyn snafu::Error>>
|
||||
|
||||
Reference in New Issue
Block a user