From 3a2eda20cb92e25a7e913c7d6af99304c1bb770b Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Tue, 18 Jun 2024 16:28:15 -0500 Subject: [PATCH] compiles again --- .gitignore | 2 +- .../migrations/00001_initial.sql | 2 +- crates/panorama-core/src/state/node.rs | 371 ++++++++++-------- crates/panorama-core/src/state/node_raw.rs | 12 +- 4 files changed, 209 insertions(+), 178 deletions(-) diff --git a/.gitignore b/.gitignore index 0beeccc..6a3b694 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,4 @@ dist target .DS_Store **/export/export.json -test.db \ No newline at end of file +test.db* \ No newline at end of file diff --git a/crates/panorama-core/migrations/00001_initial.sql b/crates/panorama-core/migrations/00001_initial.sql index 534db2a..a1ce6a2 100644 --- a/crates/panorama-core/migrations/00001_initial.sql +++ b/crates/panorama-core/migrations/00001_initial.sql @@ -1,7 +1,7 @@ CREATE TABLE node ( node_id TEXT PRIMARY KEY, node_type TEXT NOT NULL, - updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at INTEGER NOT NULL DEFAULT CURRENT_TIMESTAMP, extra_data JSON ); diff --git a/crates/panorama-core/src/state/node.rs b/crates/panorama-core/src/state/node.rs index d1e5caf..99c9965 100644 --- a/crates/panorama-core/src/state/node.rs +++ b/crates/panorama-core/src/state/node.rs @@ -1,6 +1,9 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + str::FromStr, +}; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, TimeZone, Utc}; use itertools::Itertools; use miette::{bail, Context, Error, IntoDiagnostic, Report, Result}; use serde_json::Value; @@ -8,7 +11,11 @@ use sqlx::{ query::Query, sqlite::SqliteArguments, Acquire, Connection, Executor, FromRow, QueryBuilder, Sqlite, }; -use tantivy::schema::{OwnedValue, Value as _}; +use tantivy::{ + schema::{OwnedValue, Value as _}, + time::Date, + Term, +}; use uuid::Uuid; use crate::{state::node_raw::FieldMappingRow, AppState, NodeId}; @@ -16,6 +23,8 @@ use crate::{state::node_raw::FieldMappingRow, AppState, NodeId}; // use super::utils::owned_value_to_json_value; pub type ExtraData = BTreeMap; +pub type FieldsByTable<'a> = + HashMap<(&'a i64, &'a String), Vec<&'a FieldMappingRow>>; #[derive(Debug)] pub struct NodeInfo { @@ -99,7 +108,7 @@ impl AppState { async fn query_related_fields<'e, 'c: 'e, X>( x: X, - fields_by_table: &HashMap<(&i64, &String), Vec<&FieldMappingRow>>, + fields_by_table: &FieldsByTable<'_>, ) -> sqlx::Result> where X: 'e + Executor<'c, Database = Sqlite>, @@ -186,193 +195,215 @@ pub enum CreateOrUpdate { impl AppState { // TODO: Split this out into create and update - // pub async fn create_or_update_node( - // &self, - // opts: CreateOrUpdate, - // extra_data: Option, - // ) -> Result { - // let node_id = match opts { - // CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()), - // CreateOrUpdate::Update { ref node_id } => node_id.clone(), - // }; - // let node_id = node_id.to_string(); + pub async fn create_or_update_node( + &self, + opts: CreateOrUpdate, + extra_data: Option, + ) -> Result { + let node_id = match opts { + CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()), + CreateOrUpdate::Update { ref node_id } => node_id.clone(), + }; + let node_id = node_id.to_string(); - // let action = match opts { - // CreateOrUpdate::Create { .. } => "put", - // CreateOrUpdate::Update { .. } => "update", - // }; + let action = match opts { + CreateOrUpdate::Create { .. } => "put", + CreateOrUpdate::Update { .. } => "update", + }; - // println!("Request: {opts:?} {extra_data:?}"); + println!("Request: {opts:?} {extra_data:?}"); - // let (created_at, updated_at) = match opts { - // CreateOrUpdate::Create { ref r#type } => { - // let node_result = tx.run_script( - // " - // ?[id, type] <- [[$node_id, $type]] - // :put node { id, type } - // :returning - // ", - // btmap! { - // "node_id".to_owned() => DataValue::from(node_id.clone()), - // "type".to_owned() => DataValue::from(r#type.to_owned()), - // }, - // )?; - // let created_at = DateTime::from_timestamp_millis( - // (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64, - // ) - // .unwrap(); - // let updated_at = DateTime::from_timestamp_millis( - // (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64, - // ) - // .unwrap(); - // (created_at, updated_at) - // } - // CreateOrUpdate::Update { .. } => { - // let node_result = tx.run_script( - // " - // ?[id, type, created_at, updated_at] := *node { id, type, created_at, updated_at }, - // id = $node_id - // ", - // btmap! { - // "node_id".to_owned() => DataValue::from(node_id.clone()), - // }, - // )?; - // let created_at = DateTime::from_timestamp_millis( - // (node_result.rows[0][2].get_float().unwrap() * 1000.0) as i64, - // ) - // .unwrap(); - // let updated_at = DateTime::from_timestamp_millis( - // (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64, - // ) - // .unwrap(); - // (created_at, updated_at) - // } - // }; + let mut conn = self.conn().await?; - // if let Some(extra_data) = extra_data { - // let node_id_field = self - // .tantivy_field_map - // .get_by_left("node_id") - // .unwrap() - // .clone(); + conn + .transaction::<_, _, sqlx::Error>(|tx| { + Box::pin(async move { + let node_info = match opts { + CreateOrUpdate::Create { r#type } => { + AppState::create_node_raw(&mut **tx, &r#type).await? + } + CreateOrUpdate::Update { node_id } => todo!(), + }; - // if !extra_data.is_empty() { - // let keys = extra_data.keys().map(|s| s.to_owned()).collect::>(); - // let field_mapping = - // self.get_rows_for_extra_keys(&tx, keys.as_slice())?; + if let Some(extra_data) = extra_data { + if !extra_data.is_empty() { + let node_id_str = node_id.to_string(); + let field_mapping = AppState::get_related_field_list_for_node_id( + &mut **tx, + &node_id_str, + ) + .await?; - // // Group the keys by which relation they're in - // let result_by_relation = field_mapping.iter().into_group_map_by( - // |(_, FieldInfo { relation_name, .. })| relation_name, - // ); + // Group the keys by which relation they're in + let fields_by_table = field_mapping.iter().into_group_map_by( + |FieldMappingRow { + app_id, + app_table_name, + .. + }| (app_id, app_table_name), + ); - // for (relation, fields) in result_by_relation.iter() { - // let mut doc = btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) }; - // let fields_mapping = fields - // .into_iter() - // .map( - // |( - // key, - // FieldInfo { - // relation_field, - // r#type, - // is_fts_enabled, - // .. - // }, - // )| { - // let new_value = extra_data.get(*key).unwrap(); + AppState::write_extra_data( + &mut **tx, + &node_id_str, + &fields_by_table, + extra_data, + ) + .await?; + } + } - // // TODO: Make this more generic - // let new_value = match r#type.as_str() { - // "int" => DataValue::from(new_value.as_i64().unwrap()), - // _ => DataValue::from(new_value.as_str().unwrap()), - // }; + Ok(node_info) + }) + }) + .await + .into_diagnostic() + } - // if *is_fts_enabled { - // if let Some(field) = self.tantivy_field_map.get_by_left(*key) - // { - // doc.insert( - // field.clone(), - // OwnedValue::Str(new_value.get_str().unwrap().to_owned()), - // ); - // } - // } + async fn create_node_raw<'e, 'c: 'e, X>( + x: X, + r#type: &str, + ) -> sqlx::Result + where + X: 'e + Executor<'c, Database = Sqlite>, + { + let node_id = Uuid::now_v7(); + let node_id_str = node_id.to_string(); - // (relation_field.to_owned(), new_value) - // }, - // ) - // .collect::>(); + #[derive(FromRow)] + struct Result { + updated_at: i64, + } - // let mut writer = - // self.tantivy_index.writer(15_000_000).into_diagnostic()?; + let result = sqlx::query_as!( + Result, + r#" + INSERT INTO node (node_id, node_type, extra_data) + VALUES (?, ?, "{}") + RETURNING updated_at + "#, + node_id_str, + r#type, + ) + .fetch_one(x) + .await?; - // let delete_term = - // Term::from_field_text(node_id_field.clone(), &node_id); - // writer.delete_term(delete_term); + let updated_at = + DateTime::from_timestamp_millis(result.updated_at * 1000).unwrap(); + let created_at = DateTime::from_timestamp_millis( + node_id.get_timestamp().unwrap().to_unix().0 as i64 * 1000, + ) + .unwrap(); - // writer.add_document(doc).into_diagnostic()?; - // writer.commit().into_diagnostic()?; - // drop(writer); + Ok(NodeInfo { + node_id: NodeId(node_id), + created_at, + updated_at, + fields: None, + }) + } - // let keys = fields_mapping.keys().collect::>(); - // let keys_joined = keys.iter().join(", "); + async fn write_extra_data<'e, 'c: 'e, X>( + x: X, + node_id: &str, + fields_by_table: &FieldsByTable<'_>, + extra_data: ExtraData, + ) -> sqlx::Result<()> + where + X: 'e + Executor<'c, Database = Sqlite>, + { + // Update Tantivy indexes + // for ((app_id, app_table_name), fields) in fields_by_table.iter() { + // let mut writer = + // self.tantivy_index.writer(15_000_000).into_diagnostic()?; - // if !keys.is_empty() { - // let query = format!( - // " - // ?[ node_id, {keys_joined} ] <- [$input_data] - // :{action} {relation} {{ node_id, {keys_joined} }} - // " - // ); + // let delete_term = Term::from_field_text(node_id_field.clone(), &node_id); + // writer.delete_term(delete_term); - // let mut params = vec![]; - // params.push(DataValue::from(node_id.clone())); - // for key in keys { - // params.push(fields_mapping[key].clone()); - // } + // writer.add_document(doc).into_diagnostic()?; + // writer.commit().into_diagnostic()?; + // drop(writer); + // } - // let result = tx.run_script( - // &query, - // btmap! { - // "input_data".to_owned() => DataValue::List(params), - // }, - // ); - // } - // } + // Update database + let mut node_has_keys = Vec::new(); + for ((app_id, app_table_name), fields) in fields_by_table.iter() { + for field_info in fields { + node_has_keys.push(&field_info.full_key); + } - // let input = DataValue::List( - // keys - // .iter() - // .map(|s| { - // DataValue::List(vec![ - // DataValue::from(s.to_owned()), - // DataValue::from(node_id.clone()), - // ]) - // }) - // .collect_vec(), - // ); + // let mut doc = + // btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) }; + // let fields_mapping = fields + // .into_iter() + // .map( + // |( + // key, + // FieldInfo { + // relation_field, + // r#type, + // is_fts_enabled, + // .. + // }, + // )| { + // let new_value = extra_data.get(*key).unwrap(); - // tx.run_script( - // " - // ?[key, id] <- $input_data - // :put node_has_key { key, id } - // ", - // btmap! { - // "input_data".to_owned() => input - // }, - // )?; - // } - // } + // // TODO: Make this more generic + // let new_value = match r#type.as_str() { + // "int" => DataValue::from(new_value.as_i64().unwrap()), + // _ => DataValue::from(new_value.as_str().unwrap()), + // }; - // tx.commit()?; + // if *is_fts_enabled { + // if let Some(field) = self.tantivy_field_map.get_by_left(*key) { + // doc.insert( + // field.clone(), + // OwnedValue::Str(new_value.get_str().unwrap().to_owned()), + // ); + // } + // } - // Ok(NodeInfo { - // node_id: NodeId(Uuid::from_str(&node_id).unwrap()), - // created_at, - // updated_at, - // fields: None, - // }) - // } + // (relation_field.to_owned(), new_value) + // }, + // ) + // .collect::>(); + + // let keys = fields_mapping.keys().collect::>(); + // let keys_joined = keys.iter().join(", "); + + // if !keys.is_empty() { + // let query = format!( + // " + // ?[ node_id, {keys_joined} ] <- [$input_data] + // :{action} {relation} {{ node_id, {keys_joined} }} + // " + // ); + + // let mut params = vec![]; + // params.push(DataValue::from(node_id.clone())); + // for key in keys { + // params.push(fields_mapping[key].clone()); + // } + + // let result = tx.run_script( + // &query, + // btmap! { + // "input_data".to_owned() => DataValue::List(params), + // }, + // ); + // } + } + + let mut query = + QueryBuilder::new("INSERT INTO node_has_key (node_id, full_key) VALUES "); + query.push_values(node_has_keys, |mut b, key| { + b.push_bind(node_id).push_bind(key); + }); + println!("Query: {:?}", query.sql()); + query.build().execute(x).await?; + + Ok(()) + } } // impl AppState { diff --git a/crates/panorama-core/src/state/node_raw.rs b/crates/panorama-core/src/state/node_raw.rs index e97b8fd..610ed9b 100644 --- a/crates/panorama-core/src/state/node_raw.rs +++ b/crates/panorama-core/src/state/node_raw.rs @@ -4,15 +4,15 @@ use crate::AppState; #[derive(FromRow)] pub struct FieldMappingRow { - pub(crate) full_key: String, - pub(crate) app_id: i64, - pub(crate) app_table_name: String, - pub(crate) app_table_field: String, - pub(crate) db_table_name: Option, + pub full_key: String, + pub app_id: i64, + pub app_table_name: String, + pub app_table_field: String, + pub db_table_name: Option, } impl AppState { - pub async fn get_related_field_list_for_node_id<'e, 'c: 'e, X>( + pub(crate) async fn get_related_field_list_for_node_id<'e, 'c: 'e, X>( x: X, node_id: &str, ) -> sqlx::Result>