This commit is contained in:
Michael Zhang 2024-06-10 15:03:40 -04:00
parent 07965faf1b
commit 9ea6ee5248
6 changed files with 158 additions and 151 deletions

View file

@ -19,7 +19,7 @@ use miette::{bail, IntoDiagnostic, Result};
use serde_json::Value;
use uuid::Uuid;
#[derive(Clone, Debug, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct NodeId(Uuid);
impl fmt::Display for NodeId {

View file

@ -3,30 +3,38 @@ pub mod journal;
pub mod mail;
pub mod node;
use std::{fs, path::Path};
use std::{collections::HashMap, fs, path::Path};
use cozo::DbInstance;
use miette::{IntoDiagnostic, Result};
use node::FieldMapping;
use tantivy::{
directory::MmapDirectory,
schema::{Schema, STORED, STRING, TEXT},
schema::{Field, Schema, STORED, STRING, TEXT},
Index,
};
use crate::migrations::run_migrations;
pub fn tantivy_schema() -> Schema {
pub fn tantivy_schema() -> (Schema, HashMap<String, Field>) {
let mut schema_builder = Schema::builder();
let mut field_map = HashMap::new();
let node_id = schema_builder.add_text_field("node_id", STRING | STORED);
let title = schema_builder.add_text_field("title", TEXT | STORED);
let body = schema_builder.add_text_field("body", TEXT);
schema_builder.build()
field_map.insert("node_id".to_owned(), node_id);
let journal_content = schema_builder.add_text_field("title", TEXT | STORED);
field_map.insert("panorama/journal/page/content".to_owned(), journal_content);
(schema_builder.build(), field_map)
}
#[derive(Clone)]
pub struct AppState {
pub db: DbInstance,
pub tantivy_index: Index,
pub tantivy_field_map: HashMap<String, Field>,
}
impl AppState {
@ -34,15 +42,16 @@ impl AppState {
let panorama_dir = panorama_dir.as_ref().to_path_buf();
println!("Panorama dir: {}", panorama_dir.display());
let tantivy_index = {
let schema = tantivy_schema();
let (tantivy_index, tantivy_field_map) = {
let (schema, field_map) = tantivy_schema();
let tantivy_path = panorama_dir.join("tantivy-index");
fs::create_dir_all(&tantivy_path).into_diagnostic()?;
let dir = MmapDirectory::open(&tantivy_path).into_diagnostic()?;
Index::builder()
let index = Index::builder()
.schema(schema)
.open_or_create(dir)
.into_diagnostic()?
.into_diagnostic()?;
(index, field_map)
};
let db_path = panorama_dir.join("db.sqlite");
@ -53,7 +62,11 @@ impl AppState {
)
.unwrap();
let state = AppState { db, tantivy_index };
let state = AppState {
db,
tantivy_index,
tantivy_field_map,
};
state.init().await?;
Ok(state)

View file

@ -1,13 +1,21 @@
use std::{
collections::{BTreeMap, HashMap},
fmt::write,
str::FromStr,
};
use chrono::{DateTime, Utc};
use cozo::{DataValue, MultiTransaction, NamedRows, ScriptMutability};
use itertools::Itertools;
use miette::Result;
use miette::{IntoDiagnostic, Result};
use serde_json::Value;
use tantivy::{
collector::TopDocs,
doc,
query::QueryParser,
schema::{OwnedValue, Value as _},
Document, TantivyDocument,
};
use uuid::Uuid;
use crate::{AppState, NodeId};
@ -33,7 +41,8 @@ pub type FieldMapping = HashMap<String, FieldInfo>;
impl AppState {
/// Get all properties of a node
pub async fn get_node(&self, node_id: &NodeId) -> Result<NodeInfo> {
pub async fn get_node(&self, node_id: impl AsRef<str>) -> Result<NodeInfo> {
let node_id = node_id.as_ref().to_owned();
let tx = self.db.multi_transaction(false);
let result = tx.run_script(
@ -46,8 +55,6 @@ impl AppState {
btmap! {"node_id".to_owned() => node_id.to_string().into()},
)?;
println!("FIELDS: {:?}", result);
let field_mapping = AppState::rows_to_field_mapping(result)?;
// Group the keys by which relation they're in
@ -109,19 +116,17 @@ impl AppState {
id = $node_id
"
);
println!("QUERY: {query}");
let result = tx.run_script(
&query,
btmap! { "node_id".to_owned() => node_id.to_string().into(), },
)?;
println!("RESULT: {result:?}");
let created_at = DateTime::from_timestamp_millis(
(result.rows[0][2].get_float().unwrap() * 1000.0) as i64,
)
.unwrap();
let updated_at = DateTime::from_timestamp_millis(
(result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
)
@ -138,17 +143,17 @@ impl AppState {
fields.insert(field_name.to_string(), value);
}
}
println!("FIELDS: {:?}", fields);
Ok(NodeInfo {
node_id: node_id.clone(),
node_id: NodeId(Uuid::from_str(&node_id).unwrap()),
created_at,
updated_at,
fields: Some(fields),
})
}
pub async fn create_node(
// TODO: Split this out into create and update
pub async fn create_or_update_node(
&self,
r#type: impl AsRef<str>,
extra_data: Option<ExtraData>,
@ -173,6 +178,8 @@ impl AppState {
)?;
if let Some(extra_data) = extra_data {
let node_id_field =
self.tantivy_field_map.get("node_id").unwrap().clone();
if !extra_data.is_empty() {
let keys = extra_data.keys().map(|s| s.to_owned()).collect::<Vec<_>>();
let field_mapping =
@ -184,6 +191,7 @@ impl AppState {
);
for (relation, fields) in result_by_relation.iter() {
let mut doc = btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) };
let fields_mapping = fields
.into_iter()
.map(
@ -192,20 +200,38 @@ impl AppState {
FieldInfo {
relation_field,
r#type,
is_fts_enabled,
..
},
)| {
let new_value = extra_data.get(*key).unwrap();
// TODO: Make this more generic
let new_value = match r#type.as_str() {
"int" => DataValue::from(new_value.as_i64().unwrap()),
_ => DataValue::from(new_value.as_str().unwrap()),
};
if *is_fts_enabled {
if let Some(field) = self.tantivy_field_map.get(*key) {
doc.insert(
field.clone(),
OwnedValue::Str(new_value.get_str().unwrap().to_owned()),
);
}
}
(relation_field.to_owned(), new_value)
},
)
.collect::<BTreeMap<_, _>>();
let mut writer =
self.tantivy_index.writer(15_000_000).into_diagnostic()?;
writer.add_document(doc).into_diagnostic()?;
writer.commit().into_diagnostic()?;
drop(writer);
let keys = fields_mapping.keys().collect::<Vec<_>>();
let keys_joined = keys.iter().join(", ");
@ -213,7 +239,7 @@ impl AppState {
"
?[ node_id, {keys_joined} ] <- [$input_data]
:insert {relation} {{ node_id, {keys_joined} }}
"
"
);
let mut params = vec![];
@ -272,6 +298,54 @@ impl AppState {
})
}
pub async fn update_node() {}
pub async fn search_nodes(
&self,
query: impl AsRef<str>,
) -> Result<Vec<(NodeId, Value)>> {
let query = query.as_ref();
let reader = self.tantivy_index.reader().into_diagnostic()?;
let searcher = reader.searcher();
let node_id_field = self.tantivy_field_map.get("node_id").unwrap().clone();
let journal_page_field = self
.tantivy_field_map
.get("panorama/journal/page/content")
.unwrap()
.clone();
let query_parser =
QueryParser::for_index(&self.tantivy_index, vec![journal_page_field]);
let query = query_parser.parse_query(query).into_diagnostic()?;
let top_docs = searcher
.search(&query, &TopDocs::with_limit(10))
.into_diagnostic()?;
Ok(
top_docs
.into_iter()
.map(|(score, doc_address)| {
let retrieved_doc =
searcher.doc::<TantivyDocument>(doc_address).unwrap();
let node_id = retrieved_doc
.get_first(node_id_field.clone())
.unwrap()
.as_str()
.unwrap();
let node_id = NodeId(Uuid::from_str(node_id).unwrap());
(
node_id,
json!({
"score": score,
}),
)
})
.collect::<Vec<_>>(),
)
}
fn get_rows_for_extra_keys(
&self,
tx: &MultiTransaction,
@ -303,7 +377,6 @@ impl AppState {
.rows
.into_iter()
.map(|row| {
println!("ROW {:?}", row);
(
s(&row[0]),
FieldInfo {

View file

@ -1,6 +1,5 @@
use core::panic;
use cozo::DbInstance;
use itertools::Itertools;
use miette::Result;
use tantivy::Index;
@ -8,10 +7,14 @@ use crate::{migrations::run_migrations, state::tantivy_schema, AppState};
pub async fn test_state() -> Result<AppState> {
let db = DbInstance::new("mem", "", "")?;
let schema = tantivy_schema();
let (schema, tantivy_field_map) = tantivy_schema();
let tantivy_index = Index::create_in_ram(schema);
let state = AppState { db, tantivy_index };
let state = AppState {
db,
tantivy_index,
tantivy_field_map,
};
run_migrations(&state.db).await?;
Ok(state)
@ -22,7 +25,7 @@ pub async fn test_create_node() -> Result<()> {
let state = test_state().await?;
let node_info = state
.create_node(
.create_or_update_node(
"panorama/journal/page",
Some(btmap! {
"panorama/journal/page/content".to_owned() => json!("helloge"),
@ -30,20 +33,34 @@ pub async fn test_create_node() -> Result<()> {
)
.await?;
println!(
"{}",
serde_json::to_string_pretty(&state.export().await.unwrap()).unwrap()
);
let mut node = state.get_node(&node_info.node_id).await?;
println!("node: {:?}", node);
let mut node = state.get_node(node_info.node_id.to_string()).await?;
assert!(node.fields.is_some());
let fields = node.fields.take().unwrap();
assert!(fields.contains_key("panorama/journal/page/content"));
Ok(())
}
#[tokio::test]
pub async fn test_full_text_search() -> Result<()> {
let state = test_state().await?;
let node_info = state
.create_or_update_node(
"panorama/journal/page",
Some(btmap! {
"panorama/journal/page/content".to_owned() => json!("Hello, world!"),
}),
)
.await?;
let results = state.search_nodes("world").await?;
assert!(results
.into_iter()
.map(|entry| entry.0)
.contains(&node_info.node_id));
Ok(())
}

View file

@ -1,87 +1,24 @@
use std::{
collections::HashMap,
fs::{self},
io::{Write},
fs::{self, File},
path::PathBuf,
};
use axum::extract::State;
use cozo::ScriptMutability;
use csv::WriterBuilder;
use miette::IntoDiagnostic;
use crate::{error::AppResult, AppState};
// This code is really bad but gives me a quick way to look at all of the data
// in the data at once. Rip this out once there's any Real Security Mechanism.
pub async fn export(State(state): State<AppState>) -> AppResult<()> {
let result = state.db.run_script(
"::relations",
Default::default(),
ScriptMutability::Immutable,
)?;
let name_index = result.headers.iter().position(|x| x == "name").unwrap();
let relation_names = result
.rows
.into_iter()
.map(|row| row[name_index].get_str().unwrap().to_owned())
.collect::<Vec<_>>();
let mut relation_columns = HashMap::new();
for relation_name in relation_names.iter() {
let result = state.db.run_script(
&format!("::columns {relation_name}"),
Default::default(),
ScriptMutability::Immutable,
)?;
let column_index =
result.headers.iter().position(|x| x == "column").unwrap();
let columns = result
.rows
.into_iter()
.map(|row| row[column_index].get_str().unwrap().to_owned())
.collect::<Vec<_>>();
relation_columns.insert(relation_name.clone(), columns);
}
let export = state.export().await?;
let base_dir = PathBuf::from("export");
fs::create_dir_all(&base_dir);
fs::create_dir_all(&base_dir).into_diagnostic()?;
let tx = state.db.multi_transaction(false);
let file = File::create(base_dir.join("export.json")).into_diagnostic()?;
for relation_name in relation_names.iter() {
let relation_path = base_dir.join(format!("{relation_name}.csv"));
let mut writer = WriterBuilder::new()
.has_headers(true)
.from_path(relation_path)
.unwrap();
// let mut file = File::create(&relation_path).unwrap();
let columns = relation_columns
.get(relation_name.as_str())
.unwrap()
.join(", ");
let query = format!("?[{columns}] := *{relation_name} {{ {columns} }}");
let result = tx.run_script(&query, Default::default())?;
writer.write_record(result.headers).unwrap();
for row in result.rows.into_iter() {
// let serialized = serde_json::to_string(&object).unwrap();
writer
.write_record(
row.iter().map(|col| serde_json::to_string(&col).unwrap()),
)
.unwrap();
// file.write(b"\n");
}
writer.flush().unwrap();
}
serde_json::to_writer_pretty(file, &export).into_diagnostic()?;
Ok(())
}

View file

@ -6,6 +6,7 @@ use axum::{
routing::{get, post, put},
Json, Router,
};
use chrono::{DateTime, Utc};
use cozo::{DataValue, MultiTransaction, ScriptMutability};
use itertools::Itertools;
use panorama_core::state::node::ExtraData;
@ -32,14 +33,10 @@ pub(super) fn router() -> Router<AppState> {
#[derive(Serialize, Deserialize, ToSchema, Clone)]
struct GetNodeResult {
node: String,
extra_data: Value,
content: String,
day: Option<String>,
created_at: f64,
updated_at: f64,
r#type: String,
title: String,
node_id: String,
fields: HashMap<String, Value>,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
/// Get node info
@ -60,45 +57,15 @@ pub async fn get_node(
State(state): State<AppState>,
Path(node_id): Path<String>,
) -> AppResult<(StatusCode, Json<Value>)> {
let result = state.db.run_script(
"
j[content] := *journal{ node_id, content }, node_id = $node_id
j[content] := not *journal{ node_id }, node_id = $node_id, content = null
jd[day] := *journal_day{ node_id, day }, node_id = $node_id
jd[day] := not *journal_day{ node_id }, node_id = $node_id, day = null
?[
extra_data, content, day, created_at, updated_at, type, title
] := *node{ id, type, title, created_at, updated_at, extra_data },
j[content],
jd[day],
id = $node_id
:limit 1
",
btmap! {"node_id".to_owned() => node_id.clone().into()},
ScriptMutability::Immutable,
)?;
if result.rows.len() == 0 {
return Ok((StatusCode::NOT_FOUND, Json(json!(null))));
}
let row = &result.rows[0];
let extra_data = row[0].get_str();
let day = row[2].get_str();
let node_info = state.get_node(&node_id).await?;
Ok((
StatusCode::OK,
Json(json!({
"node": node_id,
"extra_data": extra_data,
"content": row[1].get_str(),
"day": day,
"created_at": row[3].get_float(),
"updated_at": row[4].get_float(),
"type": row[5].get_str(),
"title": row[6].get_str(),
"node_id": node_id,
"fields": node_info.fields,
"created_at": node_info.created_at,
"updated_at": node_info.updated_at,
})),
))
}