Use zero copy deserialization

This commit is contained in:
Ajeet D'Souza 2021-01-08 20:04:54 +05:30
parent ff16bf140c
commit 917d85f048
10 changed files with 388 additions and 313 deletions

View File

@ -1,8 +1,8 @@
use super::Cmd;
use crate::config;
use crate::store::StoreBuilder;
use crate::util;
use crate::store::Store;
use anyhow::Result;
use clap::Clap;
@ -40,7 +40,8 @@ impl Cmd for Add {
let data_dir = config::zo_data_dir()?;
let max_age = config::zo_maxage()?;
let mut store = Store::open(&data_dir)?;
let mut store = StoreBuilder::new(data_dir);
let mut store = store.build()?;
store.add(path, now);
store.age(max_age);

View File

@ -3,7 +3,7 @@ use crate::config;
use crate::import::{Autojump, Import as _, Z};
use crate::util;
use crate::store::Store;
use crate::store::StoreBuilder;
use anyhow::{bail, Result};
use clap::{ArgEnum, Clap};
@ -27,7 +27,8 @@ impl Cmd for Import {
fn run(&self) -> Result<()> {
let data_dir = config::zo_data_dir()?;
let mut store = Store::open(&data_dir)?;
let mut store = StoreBuilder::new(data_dir);
let mut store = store.build()?;
if !self.merge && !store.dirs.is_empty() {
bail!("zoxide database is not empty, specify --merge to continue anyway")
}

View File

@ -3,7 +3,7 @@ use crate::config;
use crate::fzf::Fzf;
use crate::util;
use crate::store::{self, Store};
use crate::store::{self, StoreBuilder};
use anyhow::{Context, Result};
use clap::Clap;
@ -30,7 +30,8 @@ pub struct Query {
impl Cmd for Query {
fn run(&self) -> Result<()> {
let data_dir = config::zo_data_dir()?;
let mut store = Store::open(&data_dir)?;
let mut store = StoreBuilder::new(data_dir);
let mut store = store.build()?;
let query = store::Query::new(&self.keywords);
let now = util::current_time()?;

View File

@ -1,8 +1,7 @@
use super::Cmd;
use crate::config;
use crate::fzf::Fzf;
use crate::store::Query;
use crate::store::Store;
use crate::store::{Query, StoreBuilder};
use crate::util;
use anyhow::{bail, Context, Result};
@ -25,7 +24,8 @@ pub struct Remove {
impl Cmd for Remove {
fn run(&self) -> Result<()> {
let data_dir = config::zo_data_dir()?;
let mut store = Store::open(&data_dir)?;
let mut store = StoreBuilder::new(data_dir);
let mut store = store.build()?;
let selection;
let path = match &self.interactive {

View File

@ -1,4 +1,5 @@
use crate::store::Rank;
use anyhow::{bail, Context, Result};
use dirs_next as dirs;

View File

@ -3,6 +3,7 @@ use super::Import;
use crate::store::{Dir, Epoch, Store};
use anyhow::{Context, Result};
use std::borrow::Cow;
use std::fs;
use std::path::Path;
@ -43,7 +44,7 @@ impl Import for Autojump {
}
let rank_sum = entries.iter().map(|(_, rank)| rank).sum::<f64>();
for (path, rank) in entries.iter() {
for &(path, rank) in entries.iter() {
if store
.dirs
.iter_mut()
@ -51,7 +52,7 @@ impl Import for Autojump {
.is_none()
{
store.dirs.push(Dir {
path: path.to_string(),
path: Cow::Owned(path.into()),
rank: rank / rank_sum,
last_accessed: self.now,
});

View File

@ -3,6 +3,7 @@ use super::Import;
use crate::store::{Dir, Store};
use anyhow::{Context, Result};
use std::borrow::Cow;
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
@ -46,7 +47,7 @@ impl Import for Z {
dir.last_accessed = dir.last_accessed.max(last_accessed);
}
None => store.dirs.push(Dir {
path: path.to_string(),
path: Cow::Owned(path.into()),
rank,
last_accessed,
}),

View File

@ -1,27 +1,110 @@
use super::{Epoch, Query, Rank};
use super::Query;
use anyhow::{bail, Context, Result};
use bincode::Options as _;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::fmt::{self, Display, Formatter};
use std::ops::{Deref, DerefMut};
use std::path::Path;
#[derive(Debug, Deserialize, Serialize)]
pub struct Dir {
pub path: String,
pub struct DirList<'a>(#[serde(borrow)] Vec<Dir<'a>>);
impl DirList<'_> {
const VERSION: u32 = 3;
pub fn new() -> DirList<'static> {
DirList(Vec::new())
}
pub fn from_bytes<'a>(bytes: &'a [u8]) -> Result<DirList<'a>> {
// Assume a maximum size for the store. This prevents bincode from throwing strange
// errors when it encounters invalid data.
const MAX_SIZE: u64 = 8 << 20; // 8 MiB
let deserializer = &mut bincode::options()
.with_fixint_encoding()
.with_limit(MAX_SIZE);
// Split bytes into sections.
let version_size = deserializer.serialized_size(&Self::VERSION).unwrap() as _;
if bytes.len() < version_size {
bail!("could not deserialize store: corrupted data");
}
let (bytes_version, bytes_dirs) = bytes.split_at(version_size);
// Deserialize sections.
(|| {
let version = deserializer.deserialize(bytes_version)?;
match version {
Self::VERSION => Ok(deserializer.deserialize(bytes_dirs)?),
version => bail!(
"unsupported version (got {}, supports {})",
version,
Self::VERSION,
),
}
})()
.context("could not deserialize store")
}
pub fn to_bytes(&self) -> Result<Vec<u8>> {
(|| -> bincode::Result<_> {
// Preallocate buffer with combined size of sections.
let version_size = bincode::serialized_size(&Self::VERSION)?;
let dirs_size = bincode::serialized_size(&self)?;
let buffer_size = version_size + dirs_size;
let mut buffer = Vec::with_capacity(buffer_size as _);
// Serialize sections into buffer.
bincode::serialize_into(&mut buffer, &Self::VERSION)?;
bincode::serialize_into(&mut buffer, &self)?;
Ok(buffer)
})()
.context("could not serialize store")
}
}
impl<'a> Deref for DirList<'a> {
type Target = Vec<Dir<'a>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a> DerefMut for DirList<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<'a> From<Vec<Dir<'a>>> for DirList<'a> {
fn from(dirs: Vec<Dir<'a>>) -> Self {
DirList(dirs)
}
}
#[derive(Debug, Deserialize, Serialize)]
pub struct Dir<'a> {
#[serde(borrow)]
pub path: Cow<'a, str>,
pub rank: Rank,
pub last_accessed: Epoch,
}
impl Dir {
impl Dir<'_> {
pub fn is_match(&self, query: &Query) -> bool {
query.matches(&self.path) && Path::new(&self.path).is_dir()
query.matches(&self.path) && Path::new(self.path.as_ref()).is_dir()
}
pub fn get_score(&self, now: Epoch) -> Rank {
pub fn score(&self, now: Epoch) -> Rank {
const HOUR: Epoch = 60 * 60;
const DAY: Epoch = 24 * HOUR;
const WEEK: Epoch = 7 * DAY;
// The older the entry, the lesser its importance.
let duration = now.saturating_sub(self.last_accessed);
if duration < HOUR {
self.rank * 4.0
@ -44,7 +127,7 @@ impl Dir {
}
pub struct DirDisplay<'a> {
dir: &'a Dir,
dir: &'a Dir<'a>,
}
impl Display for DirDisplay<'_> {
@ -54,13 +137,13 @@ impl Display for DirDisplay<'_> {
}
pub struct DirDisplayScore<'a> {
dir: &'a Dir,
dir: &'a Dir<'a>,
now: Epoch,
}
impl Display for DirDisplayScore<'_> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let score = self.dir.get_score(self.now);
let score = self.dir.score(self.now);
let score = if score > 9999.0 {
9999
} else if score > 0.0 {
@ -71,3 +154,6 @@ impl Display for DirDisplayScore<'_> {
write!(f, "{:>4} {}", score, self.dir.path)
}
}
pub type Rank = f64;
pub type Epoch = u64;

View File

@ -1,296 +1,7 @@
mod dir;
mod query;
mod store;
use anyhow::{bail, Context, Result};
use bincode::Options;
use ordered_float::OrderedFloat;
use serde::{Deserialize, Serialize};
use tempfile::{NamedTempFile, PersistError};
use std::cmp::Reverse;
use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
pub use dir::Dir;
pub use dir::{Dir, DirList, Epoch, Rank};
pub use query::Query;
pub type Rank = f64;
pub type Epoch = u64;
#[derive(Debug)]
pub struct Store {
pub dirs: Vec<Dir>,
pub modified: bool,
data_dir: PathBuf,
}
impl Store {
pub const CURRENT_VERSION: StoreVersion = StoreVersion(3);
const MAX_SIZE: u64 = 8 * 1024 * 1024; // 8 MiB
pub fn open<P: Into<PathBuf>>(data_dir: P) -> Result<Store> {
let data_dir = data_dir.into();
let path = Self::get_path(&data_dir);
let buffer = match fs::read(&path) {
Ok(buffer) => buffer,
Err(e) if e.kind() == io::ErrorKind::NotFound => {
fs::create_dir_all(&data_dir).with_context(|| {
format!("unable to create data directory: {}", path.display())
})?;
return Ok(Store {
dirs: Vec::new(),
modified: false,
data_dir,
});
}
Err(e) => {
Err(e).with_context(|| format!("could not read from store: {}", path.display()))?
}
};
let deserializer = &mut bincode::options()
.with_fixint_encoding()
.with_limit(Self::MAX_SIZE);
let version_size = deserializer
.serialized_size(&Self::CURRENT_VERSION)
.unwrap() as _;
if buffer.len() < version_size {
bail!("data store may be corrupted: {}", path.display());
}
let (buffer_version, buffer_dirs) = buffer.split_at(version_size);
let version = deserializer
.deserialize(buffer_version)
.with_context(|| format!("could not deserialize store version: {}", path.display()))?;
let dirs = match version {
Self::CURRENT_VERSION => deserializer
.deserialize(buffer_dirs)
.with_context(|| format!("could not deserialize store: {}", path.display()))?,
version => bail!(
"unsupported store version, got={}, supported={}: {}",
version.0,
Self::CURRENT_VERSION.0,
path.display()
),
};
Ok(Store {
dirs,
modified: false,
data_dir,
})
}
pub fn save(&mut self) -> Result<()> {
if !self.modified {
return Ok(());
}
let (buffer, buffer_size) = (|| -> bincode::Result<_> {
let version_size = bincode::serialized_size(&Self::CURRENT_VERSION)?;
let dirs_size = bincode::serialized_size(&self.dirs)?;
let buffer_size = version_size + dirs_size;
let mut buffer = Vec::with_capacity(buffer_size as _);
bincode::serialize_into(&mut buffer, &Self::CURRENT_VERSION)?;
bincode::serialize_into(&mut buffer, &self.dirs)?;
Ok((buffer, buffer_size))
})()
.context("could not serialize store")?;
let mut file = NamedTempFile::new_in(&self.data_dir).with_context(|| {
format!(
"could not create temporary store in: {}",
self.data_dir.display()
)
})?;
let _ = file.as_file().set_len(buffer_size);
file.write_all(&buffer).with_context(|| {
format!(
"could not write to temporary store: {}",
file.path().display()
)
})?;
let path = Self::get_path(&self.data_dir);
persist(file, &path)
.with_context(|| format!("could not replace store: {}", path.display()))?;
self.modified = false;
Ok(())
}
pub fn add<S: AsRef<str>>(&mut self, path: S, now: Epoch) {
let path = path.as_ref();
debug_assert!(Path::new(path).is_absolute());
match self.dirs.iter_mut().find(|dir| dir.path == path) {
None => self.dirs.push(Dir {
path: path.into(),
last_accessed: now,
rank: 1.0,
}),
Some(dir) => {
dir.last_accessed = now;
dir.rank += 1.0;
}
};
self.modified = true;
}
pub fn iter_matches<'a>(
&'a mut self,
query: &'a Query,
now: Epoch,
) -> impl DoubleEndedIterator<Item = &'a Dir> {
self.dirs
.sort_unstable_by_key(|dir| Reverse(OrderedFloat(dir.get_score(now))));
self.dirs.iter().filter(move |dir| dir.is_match(&query))
}
pub fn remove<S: AsRef<str>>(&mut self, path: S) -> bool {
let path = path.as_ref();
if let Some(idx) = self.dirs.iter().position(|dir| dir.path == path) {
self.dirs.swap_remove(idx);
self.modified = true;
return true;
}
false
}
pub fn age(&mut self, max_age: Rank) {
let sum_age = self.dirs.iter().map(|dir| dir.rank).sum::<Rank>();
if sum_age > max_age {
let factor = 0.9 * max_age / sum_age;
for idx in (0..self.dirs.len()).rev() {
let dir = &mut self.dirs[idx];
dir.rank *= factor;
if dir.rank < 1.0 {
self.dirs.swap_remove(idx);
}
}
self.modified = true;
}
}
fn get_path<P: AsRef<Path>>(data_dir: P) -> PathBuf {
data_dir.as_ref().join("db.zo")
}
}
impl Drop for Store {
fn drop(&mut self) {
if let Err(e) = self.save() {
println!("Error: {}", e)
}
}
}
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct StoreVersion(pub u32);
#[cfg(windows)]
fn persist<P: AsRef<Path>>(mut file: NamedTempFile, path: P) -> Result<(), PersistError> {
use rand::distributions::{Distribution, Uniform};
use std::thread;
use std::time::Duration;
// File renames on Windows are not atomic and sometimes fail with `PermissionDenied`.
// This is extremely unlikely unless it's running in a loop on multiple threads.
// Nevertheless, we guard against it by retrying the rename a fixed number of times.
const MAX_TRIES: usize = 10;
let mut rng = None;
for _ in 0..MAX_TRIES {
match file.persist(&path) {
Ok(_) => break,
Err(e) if e.error.kind() == io::ErrorKind::PermissionDenied => {
let mut rng = rng.get_or_insert_with(rand::thread_rng);
let between = Uniform::from(50..150);
let duration = Duration::from_millis(between.sample(&mut rng));
thread::sleep(duration);
file = e.file;
}
Err(e) => return Err(e),
}
}
Ok(())
}
#[cfg(unix)]
fn persist<P: AsRef<Path>>(file: NamedTempFile, path: P) -> Result<(), PersistError> {
file.persist(&path)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn add() {
let path = if cfg!(windows) {
r"C:\foo\bar"
} else {
"/foo/bar"
};
let now = 946684800;
let data_dir = tempfile::tempdir().unwrap();
{
let mut store = Store::open(data_dir.path()).unwrap();
store.add(path, now);
store.add(path, now);
}
{
let store = Store::open(data_dir.path()).unwrap();
assert_eq!(store.dirs.len(), 1);
let dir = &store.dirs[0];
assert_eq!(dir.path, path);
assert_eq!(dir.last_accessed, now);
}
}
#[test]
fn remove() {
let path = if cfg!(windows) {
r"C:\foo\bar"
} else {
"/foo/bar"
};
let now = 946684800;
let data_dir = tempfile::tempdir().unwrap();
{
let mut store = Store::open(data_dir.path()).unwrap();
store.add(path, now);
}
{
let mut store = Store::open(data_dir.path()).unwrap();
assert!(store.remove(path));
}
{
let mut store = Store::open(data_dir.path()).unwrap();
assert!(store.dirs.is_empty());
assert!(!store.remove(path));
}
}
}
pub use store::{Store, StoreBuilder};

272
src/store/store.rs Normal file
View File

@ -0,0 +1,272 @@
use super::{Dir, DirList, Epoch, Query, Rank};
use anyhow::{Context, Result};
use ordered_float::OrderedFloat;
use tempfile::{NamedTempFile, PersistError};
use std::borrow::Cow;
use std::cmp::Reverse;
use std::fs;
use std::io::{self, Write};
use std::path::{Path, PathBuf};
pub struct Store<'a> {
pub dirs: DirList<'a>,
pub modified: bool,
data_dir: &'a Path,
}
impl<'a> Store<'a> {
pub fn save(&mut self) -> Result<()> {
if !self.modified {
return Ok(());
}
let buffer = self.dirs.to_bytes()?;
let mut file = NamedTempFile::new_in(&self.data_dir).with_context(|| {
format!(
"could not create temporary store in: {}",
self.data_dir.display()
)
})?;
// Preallocate enough space on the file, preventing copying later on.
// This optimization may fail on some filesystems, but it is safe to
// ignore it and proceed.
let _ = file.as_file().set_len(buffer.len() as _);
file.write_all(&buffer).with_context(|| {
format!(
"could not write to temporary store: {}",
file.path().display()
)
})?;
let path = store_path(&self.data_dir);
persist(file, &path)
.with_context(|| format!("could not replace store: {}", path.display()))?;
self.modified = false;
Ok(())
}
pub fn add<S: AsRef<str>>(&mut self, path: S, now: Epoch) {
let path = path.as_ref();
debug_assert!(Path::new(path).is_absolute());
match self.dirs.iter_mut().find(|dir| dir.path == path) {
None => self.dirs.push(Dir {
path: Cow::Owned(path.into()),
last_accessed: now,
rank: 1.0,
}),
Some(dir) => {
dir.last_accessed = now;
dir.rank += 1.0;
}
};
self.modified = true;
}
pub fn iter_matches<'b>(
&'b mut self,
query: &'b Query,
now: Epoch,
) -> impl DoubleEndedIterator<Item = &'b Dir> {
self.dirs
.sort_unstable_by_key(|dir| Reverse(OrderedFloat(dir.score(now))));
self.dirs.iter().filter(move |dir| dir.is_match(&query))
}
pub fn remove<S: AsRef<str>>(&mut self, path: S) -> bool {
let path = path.as_ref();
if let Some(idx) = self.dirs.iter().position(|dir| dir.path == path) {
self.dirs.swap_remove(idx);
self.modified = true;
return true;
}
false
}
pub fn age(&mut self, max_age: Rank) {
let sum_age = self.dirs.iter().map(|dir| dir.rank).sum::<Rank>();
if sum_age > max_age {
let factor = 0.9 * max_age / sum_age;
for idx in (0..self.dirs.len()).rev() {
let dir = &mut self.dirs[idx];
dir.rank *= factor;
if dir.rank < 1.0 {
self.dirs.swap_remove(idx);
}
}
self.modified = true;
}
}
}
impl Drop for Store<'_> {
fn drop(&mut self) {
// Since the error can't be properly handled here,
// pretty-print it instead.
if let Err(e) = self.save() {
println!("Error: {}", e)
}
}
}
#[cfg(windows)]
fn persist<P: AsRef<Path>>(mut file: NamedTempFile, path: P) -> Result<(), PersistError> {
use rand::distributions::{Distribution, Uniform};
use std::thread;
use std::time::Duration;
// File renames on Windows are not atomic and sometimes fail with `PermissionDenied`.
// This is extremely unlikely unless it's running in a loop on multiple threads.
// Nevertheless, we guard against it by retrying the rename a fixed number of times.
const MAX_TRIES: usize = 10;
let mut rng = None;
for _ in 0..MAX_TRIES {
match file.persist(&path) {
Ok(_) => break,
Err(e) if e.error.kind() == io::ErrorKind::PermissionDenied => {
let mut rng = rng.get_or_insert_with(rand::thread_rng);
let between = Uniform::from(50..150);
let duration = Duration::from_millis(between.sample(&mut rng));
thread::sleep(duration);
file = e.file;
}
Err(e) => return Err(e),
}
}
Ok(())
}
#[cfg(unix)]
fn persist<P: AsRef<Path>>(file: NamedTempFile, path: P) -> Result<(), PersistError> {
file.persist(&path)?;
Ok(())
}
pub struct StoreBuilder {
data_dir: PathBuf,
buffer: Vec<u8>,
}
impl StoreBuilder {
pub fn new<P: Into<PathBuf>>(data_dir: P) -> StoreBuilder {
StoreBuilder {
data_dir: data_dir.into(),
buffer: Vec::new(),
}
}
pub fn build<'a>(&'a mut self) -> Result<Store<'a>> {
// Read the entire store to memory. For smaller files, this is faster
// than mmap / streaming, and allows for zero-copy deserialization.
let path = store_path(&self.data_dir);
match fs::read(&path) {
Ok(buffer) => {
self.buffer = buffer;
let dirs = DirList::from_bytes(&self.buffer)
.with_context(|| format!("could not deserialize store: {}", path.display()))?;
Ok(Store {
dirs,
modified: false,
data_dir: &self.data_dir,
})
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
// Create data directory, but don't create any file yet.
// The file will be created later by [`Store::save`]
// if any data is modified.
fs::create_dir_all(&self.data_dir).with_context(|| {
format!(
"unable to create data directory: {}",
self.data_dir.display()
)
})?;
Ok(Store {
dirs: DirList::new(),
modified: false,
data_dir: &self.data_dir,
})
}
Err(e) => {
Err(e).with_context(|| format!("could not read from store: {}", path.display()))
}
}
}
}
fn store_path<P: AsRef<Path>>(data_dir: P) -> PathBuf {
const STORE_FILENAME: &str = "db.zo";
data_dir.as_ref().join(STORE_FILENAME)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn add() {
let path = if cfg!(windows) {
r"C:\foo\bar"
} else {
"/foo/bar"
};
let now = 946684800;
let data_dir = tempfile::tempdir().unwrap();
{
let mut store = StoreBuilder::new(data_dir.path());
let mut store = store.build().unwrap();
store.add(path, now);
store.add(path, now);
}
{
let mut store = StoreBuilder::new(data_dir.path());
let store = store.build().unwrap();
assert_eq!(store.dirs.len(), 1);
let dir = &store.dirs[0];
assert_eq!(dir.path, path);
assert_eq!(dir.last_accessed, now);
}
}
#[test]
fn remove() {
let path = if cfg!(windows) {
r"C:\foo\bar"
} else {
"/foo/bar"
};
let now = 946684800;
let data_dir = tempfile::tempdir().unwrap();
{
let mut store = StoreBuilder::new(data_dir.path());
let mut store = store.build().unwrap();
store.add(path, now);
}
{
let mut store = StoreBuilder::new(data_dir.path());
let mut store = store.build().unwrap();
assert!(store.remove(path));
}
{
let mut store = StoreBuilder::new(data_dir.path());
let mut store = store.build().unwrap();
assert!(store.dirs.is_empty());
assert!(!store.remove(path));
}
}
}