Skip to content

Commit

Permalink
chore: move version and persist to lazyitem
Browse files Browse the repository at this point in the history
  • Loading branch information
a-rustacean committed Aug 17, 2024
1 parent f01fef3 commit 0c9799b
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 545 deletions.
7 changes: 3 additions & 4 deletions src/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,17 @@ pub async fn init_vector_store(
location: Some((0, 0)),
});
let mut current_node = ArcShift::new(MergedNode {
version_id: 0, // Initialize with appropriate version ID
hnsw_level: l as u8,
prop: ArcShift::new(PropState::Ready(prop.clone())),
neighbors: EagerLazyItemSet::new(),
parent: LazyItemRef::new_invalid(),
child: LazyItemRef::new_invalid(),
versions: LazyItemMap::new(),
persist_flag: Arc::new(AtomicBool::new(true)),
});

let lazy_node = LazyItem::from_arcshift(current_node.clone());
let nn = LazyItemRef::from_arcshift(current_node.clone());
// TODO: Initialize with appropriate version ID
let lazy_node = LazyItem::from_arcshift(0, current_node.clone());
let nn = LazyItemRef::from_arcshift(0, current_node.clone());

if let Some(prev_node) = prev.item.get().get_data() {
current_node
Expand Down
39 changes: 24 additions & 15 deletions src/models/cache_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ use arcshift::ArcShift;
use dashmap::DashMap;
use probabilistic_collections::cuckoo::CuckooFilter;
use std::collections::HashSet;
use std::io::Read;
use std::io::Seek;
use std::sync::Arc;
use std::sync::RwLock;
use std::io::{Read, Seek};
use std::sync::{atomic::AtomicBool, Arc, RwLock};

pub struct NodeRegistry<R: Read + Seek> {
cuckoo_filter: RwLock<CuckooFilter<u64>>,
Expand Down Expand Up @@ -64,12 +62,20 @@ impl<R: Read + Seek> NodeRegistry<R> {
}
println!("Released read lock on cuckoo_filter");

let version_id = if let FileIndex::Valid { version, .. } = &file_index {
*version
} else {
0
};

if max_loads == 0 || !skipm.insert(combined_index) {
println!("Either max_loads hit 0 or loop detected, returning LazyItem with no data");
return Ok(LazyItem::Valid {
data: None,
file_index: ArcShift::new(Some(file_index)),
decay_counter: 0,
persist_flag: Arc::new(AtomicBool::new(true)),
version_id,
});
}

Expand All @@ -95,6 +101,8 @@ impl<R: Read + Seek> NodeRegistry<R> {
data: Some(ArcShift::new(node)),
file_index: ArcShift::new(Some(file_index)),
decay_counter: 0,
persist_flag: Arc::new(AtomicBool::new(true)),
version_id,
};

println!("Inserting item into registry");
Expand All @@ -111,19 +119,20 @@ impl<R: Read + Seek> NodeRegistry<R> {
let mut reader_lock = self.reader.write().unwrap();
let mut skipm: HashSet<u64> = HashSet::new();

match file_index {
FileIndex::Valid { offset, .. } => T::deserialize(
&mut *reader_lock,
file_index,
self.clone(),
1000,
&mut skipm,
),
FileIndex::Invalid => Err(std::io::Error::new(
if file_index == FileIndex::Invalid {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Cannot deserialize with an invalid FileIndex",
)),
}
));
};

T::deserialize(
&mut *reader_lock,
file_index,
self.clone(),
1000,
&mut skipm,
)
}

pub fn combine_index(file_index: &FileIndex) -> u64 {
Expand Down
35 changes: 13 additions & 22 deletions src/models/file_persist.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::cache_loader::NodeRegistry;
use super::common::WaCustomError;
use super::lazy_load::{FileIndex, LazyItem};
use super::lazy_load::{FileIndex, LazyItem, SyncPersist};
use super::types::{HNSWLevel, MergedNode, NodeProp, VectorId};
use crate::models::custom_buffered_writer::*;
use crate::models::serializer::CustomSerialize;
Expand Down Expand Up @@ -36,32 +36,34 @@ pub fn read_node_from_file<R: Read + Seek>(
Ok(node)
}
pub fn write_node_to_file(
mut node: ArcShift<MergedNode>,
lazy_item: &LazyItem<MergedNode>,
writer: &mut CustomBufferedWriter,
file_index: Option<FileIndex>,
) -> Result<FileIndex, WaCustomError> {
let mut node_arc = lazy_item
.get_data()
.ok_or(WaCustomError::LazyLoadingError("node in null".to_string()))?;
let node = node_arc.get();

match file_index {
Some(FileIndex::Valid { offset, version }) => {
println!(
"About to write at offset {}, version {}, node: {:#?}",
offset,
version,
node.get()
offset, version, node
);
writer
.seek(SeekFrom::Start(offset as u64))
.map_err(|e| WaCustomError::FsError(e.to_string()))?;
}
Some(FileIndex::Invalid) | None => {
println!("About to write node at the end of file: {:#?}", node.get());
println!("About to write node at the end of file: {:#?}", node);
writer
.seek(SeekFrom::End(0))
.map_err(|e| WaCustomError::FsError(e.to_string()))?;
}
}

let new_offset = node
.get()
.serialize(writer)
.map_err(|e| WaCustomError::SerializationError(e.to_string()))?;

Expand All @@ -70,7 +72,7 @@ pub fn write_node_to_file(
offset: new_offset,
version: match file_index {
Some(FileIndex::Valid { version, .. }) => version,
_ => node.get().get_current_version(),
_ => lazy_item.get_current_version(),
},
};

Expand All @@ -81,22 +83,11 @@ pub fn persist_node_update_loc(
ver_file: &mut CustomBufferedWriter,
node: &mut ArcShift<LazyItem<MergedNode>>,
) -> Result<(), WaCustomError> {
// Extract the necessary information from the node
let (data, current_file_index) = {
let lazy_item = node.get();
match lazy_item {
LazyItem::Valid {
data: Some(data), ..
} => {
let current_file_index = lazy_item.get_file_index();
(data.clone(), current_file_index)
}
_ => return Err(WaCustomError::LazyLoadingError("Data is None".to_string())),
}
};
let lazy_item = node.get();
let current_file_index = lazy_item.get_file_index();

// Write the node to file
let new_file_index = write_node_to_file(data, ver_file, current_file_index)?;
let new_file_index = write_node_to_file(node.get(), ver_file, current_file_index)?;

// Update the file index in the lazy item
node.rcu(|lazy_item| {
Expand Down
Loading

0 comments on commit 0c9799b

Please sign in to comment.