From f771a7d20f2f00c2d4a110495252b0eb4f06c09a Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Tue, 18 Jun 2024 15:20:42 -0500 Subject: [PATCH] compiles --- .../migrations/00001_initial.sql | 4 +- crates/panorama-core/src/state/mod.rs | 1 + crates/panorama-core/src/state/node.rs | 673 +++++++++--------- crates/panorama-core/src/state/node_raw.rs | 42 ++ 4 files changed, 377 insertions(+), 343 deletions(-) create mode 100644 crates/panorama-core/src/state/node_raw.rs diff --git a/crates/panorama-core/migrations/00001_initial.sql b/crates/panorama-core/migrations/00001_initial.sql index 0d706a2..534db2a 100644 --- a/crates/panorama-core/migrations/00001_initial.sql +++ b/crates/panorama-core/migrations/00001_initial.sql @@ -25,13 +25,13 @@ CREATE TABLE app ( app_license TEXT ); -CREATE TABLE app_table ( +CREATE TABLE app_table_mapping ( app_id INTEGER NOT NULL, app_table_name TEXT NOT NULL, db_table_name TEXT NOT NULL ); -CREATE TABLE full_key_to_db_key ( +CREATE TABLE key_mapping ( full_key TEXT NOT NULL, app_id INTEGER NOT NULL, app_table_name TEXT NOT NULL, diff --git a/crates/panorama-core/src/state/mod.rs b/crates/panorama-core/src/state/mod.rs index 77810df..0e8896b 100644 --- a/crates/panorama-core/src/state/mod.rs +++ b/crates/panorama-core/src/state/mod.rs @@ -4,6 +4,7 @@ pub mod apps; // pub mod journal; // pub mod mail; pub mod node; +pub mod node_raw; // pub mod utils; use std::{collections::HashMap, fs, path::Path}; diff --git a/crates/panorama-core/src/state/node.rs b/crates/panorama-core/src/state/node.rs index 64b5a38..d1e5caf 100644 --- a/crates/panorama-core/src/state/node.rs +++ b/crates/panorama-core/src/state/node.rs @@ -1,17 +1,17 @@ -use std::{ - collections::{BTreeMap, HashMap}, - str::FromStr, -}; +use std::collections::{BTreeMap, HashMap}; use chrono::{DateTime, Utc}; use itertools::Itertools; -use miette::{bail, IntoDiagnostic, Result}; +use miette::{bail, Context, Error, IntoDiagnostic, Report, Result}; use serde_json::Value; -use sqlx::{Acquire, Connection, FromRow}; +use sqlx::{ + query::Query, sqlite::SqliteArguments, Acquire, Connection, Executor, + FromRow, QueryBuilder, Sqlite, +}; use tantivy::schema::{OwnedValue, Value as _}; use uuid::Uuid; -use crate::{AppState, NodeId}; +use crate::{state::node_raw::FieldMappingRow, AppState, NodeId}; // use super::utils::owned_value_to_json_value; @@ -25,366 +25,357 @@ pub struct NodeInfo { pub fields: Option>, } -#[derive(Debug)] -pub struct FieldInfo { - pub relation_name: String, - pub relation_field: String, - pub r#type: String, - pub is_fts_enabled: bool, -} - -pub type FieldMapping = HashMap; - impl AppState { /// Get all properties of a node pub async fn get_node(&self, node_id: impl AsRef) -> Result { let node_id = node_id.as_ref().to_owned(); - let conn = self.conn().await?; - - #[derive(FromRow)] - struct FieldMappingRow { - full_key: String, - app_id: i64, - app_table_name: String, - app_table_field: String, - } + let mut conn = self.conn().await?; conn - .transaction(|tx| { + .transaction::<_, _, sqlx::Error>(|tx| { Box::pin(async move { - let result = sqlx::query_as!( - FieldMappingRow, - " - SELECT - node_has_key.full_key, app_id, app_table_name, app_table_field - FROM node_has_key - INNER JOIN full_key_to_db_key - ON node_has_key.full_key = full_key_to_db_key.full_key - WHERE node_id = $1 - ", - node_id - ) - .fetch_all(&mut **tx) - .await - .into_diagnostic()?; - - let field_mapping = result - .into_iter() - .map(|row| (row.full_key.clone(), row)) - .collect::>(); + let node_id = node_id.clone(); + let field_mapping = + AppState::get_related_field_list_for_node_id(&mut **tx, &node_id) + .await?; // Group the keys by which relation they're in - let result_by_relation = field_mapping.iter().into_group_map_by( - |( - _, - FieldMappingRow { - app_id, - app_table_name, - .. - }, - )| (app_id, app_table_name), + let fields_by_table = field_mapping.iter().into_group_map_by( + |FieldMappingRow { + app_id, + app_table_name, + .. + }| (app_id, app_table_name), ); - let mut all_relation_queries = vec![]; - let mut all_relation_constraints = vec![]; - let mut all_fields = vec![]; - let mut field_counter = 0; - for (i, (relation, fields)) in result_by_relation.iter().enumerate() { - let constraint_name = format!("c{i}"); + // Run the query that grabs all of the relevant fields, and coalesce + // the fields back + let related_fields = + AppState::query_related_fields(&mut **tx, &fields_by_table).await?; - let mut keys = vec![]; - let mut constraints = vec![]; - for (key, field_info) in fields.iter() { - let counted_field_name = format!("f{field_counter}"); - field_counter += 1; + println!("Related fields: {:?}", related_fields); - keys.push(counted_field_name.clone()); - constraints.push(format!( - "{}: {}", - field_info.relation_field.to_owned(), - counted_field_name, - )); - all_fields.push(( - counted_field_name, - field_info.relation_field.to_owned(), - key, - )) - } + // let created_at = DateTime::from_timestamp_millis( + // (result.rows[0][2].get_float().unwrap() * 1000.0) as i64, + // ) + // .unwrap(); - let keys = keys.join(", "); - let constraints = constraints.join(", "); - all_relation_queries.push(format!( - " - {constraint_name}[{keys}] := - *{relation}{{ node_id, {constraints} }}, - node_id = $node_id - " - )); - all_relation_constraints.push(format!("{constraint_name}[{keys}],")) - } + // let updated_at = DateTime::from_timestamp_millis( + // (result.rows[0][3].get_float().unwrap() * 1000.0) as i64, + // ) + // .unwrap(); - let all_relation_constraints = all_relation_constraints.join("\n"); - let all_relation_queries = all_relation_queries.join("\n\n"); - let all_field_names = all_fields - .iter() - .map(|(field_name, _, _)| field_name) - .join(", "); - let query = format!( - " - {all_relation_queries} + // let mut fields = HashMap::new(); - ?[type, extra_data, created_at, updated_at, {all_field_names}] := - *node {{ id, type, created_at, updated_at, extra_data }}, - {all_relation_constraints} - id = $node_id - " - ); + // for row in result + // .rows + // .into_iter() + // .map(|row| row.into_iter().skip(4).zip(all_fields.iter())) + // { + // for (value, (_, _, field_name)) in row { + // fields.insert( + // field_name.to_string(), + // data_value_to_json_value(&value), + // ); + // } + // } - let result = tx.run_script( - &query, - btmap! { "node_id".to_owned() => node_id.to_string().into(), }, - )?; + todo!() - if result.rows.is_empty() { - bail!("Not found") - } - - let created_at = DateTime::from_timestamp_millis( - (result.rows[0][2].get_float().unwrap() * 1000.0) as i64, - ) - .unwrap(); - - let updated_at = DateTime::from_timestamp_millis( - (result.rows[0][3].get_float().unwrap() * 1000.0) as i64, - ) - .unwrap(); - - let mut fields = HashMap::new(); - - for row in result - .rows - .into_iter() - .map(|row| row.into_iter().skip(4).zip(all_fields.iter())) - { - for (value, (_, _, field_name)) in row { - fields.insert( - field_name.to_string(), - data_value_to_json_value(&value), - ); - } - } - - Ok(NodeInfo { - node_id: NodeId(Uuid::from_str(&node_id).unwrap()), - created_at, - updated_at, - fields: Some(fields), - }) + // Ok(NodeInfo { + // node_id: NodeId(Uuid::from_str(&node_id).unwrap()), + // created_at, + // updated_at, + // fields: Some(fields), + // }) }) }) - .await?; + .await + .into_diagnostic()?; - Ok(()) + todo!() + // Ok(()) + } + + async fn query_related_fields<'e, 'c: 'e, X>( + x: X, + fields_by_table: &HashMap<(&i64, &String), Vec<&FieldMappingRow>>, + ) -> sqlx::Result> + where + X: 'e + Executor<'c, Database = Sqlite>, + { + let mut query = QueryBuilder::new(""); + let mut mapping = HashMap::new(); + let mut ctr = 0; + + let mut selected_fields = vec![]; + for ((app_id, app_table_name), fields) in fields_by_table.iter() { + let table_gen_name = format!("c{ctr}"); + ctr += 1; + + let mut keys = vec![]; + for field_info in fields.iter() { + let field_gen_name = format!("f{ctr}"); + ctr += 1; + mapping.insert(&field_info.full_key, field_gen_name.clone()); + + keys.push(field_gen_name.clone()); + + selected_fields.push(format!( + "{}.{} as {}", + table_gen_name, field_info.app_table_field, field_gen_name + )); + + // constraints.push(format!( + // "{}: {}", + // field_info.relation_field.to_owned(), + // field_gen_name, + // )); + // all_fields.push(( + // field_gen_name, + // field_info.relation_field.to_owned(), + // key, + // )) + } + + // let keys = keys.join(", "); + // let constraints = constraints.join(", "); + // all_relation_queries.push(format!( + // " + // {table_gen_name}[{keys}] := + // *{relation}{{ node_id, {constraints} }}, + // node_id = $node_id + // " + // )); + // all_relation_constraints.push(format!("{table_gen_name}[{keys}],")) + } + + query.push("SELECT"); + query.push(selected_fields.join(", ")); + query.push("FROM"); + println!("Query: {:?}", query.sql()); + + // let all_relation_constraints = all_relation_constraints.join("\n"); + // let all_relation_queries = all_relation_queries.join("\n\n"); + // let all_field_names = all_fields + // .iter() + // .map(|(field_name, _, _)| field_name) + // .join(", "); + // let _query = format!( + // " + // {all_relation_queries} + + // ?[type, extra_data, created_at, updated_at, {all_field_names}] := + // *node {{ id, type, created_at, updated_at, extra_data }}, + // {all_relation_constraints} + // id = $node_id + // " + // ); + + let rows = query.build().fetch_all(x).await.into_diagnostic(); + + todo!() } } -// #[derive(Debug)] -// pub enum CreateOrUpdate { -// Create { r#type: String }, -// Update { node_id: NodeId }, -// } +#[derive(Debug)] +pub enum CreateOrUpdate { + Create { r#type: String }, + Update { node_id: NodeId }, +} + +impl AppState { + // TODO: Split this out into create and update + // pub async fn create_or_update_node( + // &self, + // opts: CreateOrUpdate, + // extra_data: Option, + // ) -> Result { + // let node_id = match opts { + // CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()), + // CreateOrUpdate::Update { ref node_id } => node_id.clone(), + // }; + // let node_id = node_id.to_string(); + + // let action = match opts { + // CreateOrUpdate::Create { .. } => "put", + // CreateOrUpdate::Update { .. } => "update", + // }; + + // println!("Request: {opts:?} {extra_data:?}"); + + // let (created_at, updated_at) = match opts { + // CreateOrUpdate::Create { ref r#type } => { + // let node_result = tx.run_script( + // " + // ?[id, type] <- [[$node_id, $type]] + // :put node { id, type } + // :returning + // ", + // btmap! { + // "node_id".to_owned() => DataValue::from(node_id.clone()), + // "type".to_owned() => DataValue::from(r#type.to_owned()), + // }, + // )?; + // let created_at = DateTime::from_timestamp_millis( + // (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64, + // ) + // .unwrap(); + // let updated_at = DateTime::from_timestamp_millis( + // (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64, + // ) + // .unwrap(); + // (created_at, updated_at) + // } + // CreateOrUpdate::Update { .. } => { + // let node_result = tx.run_script( + // " + // ?[id, type, created_at, updated_at] := *node { id, type, created_at, updated_at }, + // id = $node_id + // ", + // btmap! { + // "node_id".to_owned() => DataValue::from(node_id.clone()), + // }, + // )?; + // let created_at = DateTime::from_timestamp_millis( + // (node_result.rows[0][2].get_float().unwrap() * 1000.0) as i64, + // ) + // .unwrap(); + // let updated_at = DateTime::from_timestamp_millis( + // (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64, + // ) + // .unwrap(); + // (created_at, updated_at) + // } + // }; + + // if let Some(extra_data) = extra_data { + // let node_id_field = self + // .tantivy_field_map + // .get_by_left("node_id") + // .unwrap() + // .clone(); + + // if !extra_data.is_empty() { + // let keys = extra_data.keys().map(|s| s.to_owned()).collect::>(); + // let field_mapping = + // self.get_rows_for_extra_keys(&tx, keys.as_slice())?; + + // // Group the keys by which relation they're in + // let result_by_relation = field_mapping.iter().into_group_map_by( + // |(_, FieldInfo { relation_name, .. })| relation_name, + // ); + + // for (relation, fields) in result_by_relation.iter() { + // let mut doc = btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) }; + // let fields_mapping = fields + // .into_iter() + // .map( + // |( + // key, + // FieldInfo { + // relation_field, + // r#type, + // is_fts_enabled, + // .. + // }, + // )| { + // let new_value = extra_data.get(*key).unwrap(); + + // // TODO: Make this more generic + // let new_value = match r#type.as_str() { + // "int" => DataValue::from(new_value.as_i64().unwrap()), + // _ => DataValue::from(new_value.as_str().unwrap()), + // }; + + // if *is_fts_enabled { + // if let Some(field) = self.tantivy_field_map.get_by_left(*key) + // { + // doc.insert( + // field.clone(), + // OwnedValue::Str(new_value.get_str().unwrap().to_owned()), + // ); + // } + // } + + // (relation_field.to_owned(), new_value) + // }, + // ) + // .collect::>(); + + // let mut writer = + // self.tantivy_index.writer(15_000_000).into_diagnostic()?; + + // let delete_term = + // Term::from_field_text(node_id_field.clone(), &node_id); + // writer.delete_term(delete_term); + + // writer.add_document(doc).into_diagnostic()?; + // writer.commit().into_diagnostic()?; + // drop(writer); + + // let keys = fields_mapping.keys().collect::>(); + // let keys_joined = keys.iter().join(", "); + + // if !keys.is_empty() { + // let query = format!( + // " + // ?[ node_id, {keys_joined} ] <- [$input_data] + // :{action} {relation} {{ node_id, {keys_joined} }} + // " + // ); + + // let mut params = vec![]; + // params.push(DataValue::from(node_id.clone())); + // for key in keys { + // params.push(fields_mapping[key].clone()); + // } + + // let result = tx.run_script( + // &query, + // btmap! { + // "input_data".to_owned() => DataValue::List(params), + // }, + // ); + // } + // } + + // let input = DataValue::List( + // keys + // .iter() + // .map(|s| { + // DataValue::List(vec![ + // DataValue::from(s.to_owned()), + // DataValue::from(node_id.clone()), + // ]) + // }) + // .collect_vec(), + // ); + + // tx.run_script( + // " + // ?[key, id] <- $input_data + // :put node_has_key { key, id } + // ", + // btmap! { + // "input_data".to_owned() => input + // }, + // )?; + // } + // } + + // tx.commit()?; + + // Ok(NodeInfo { + // node_id: NodeId(Uuid::from_str(&node_id).unwrap()), + // created_at, + // updated_at, + // fields: None, + // }) + // } +} // impl AppState { -// // TODO: Split this out into create and update -// pub async fn create_or_update_node( -// &self, -// opts: CreateOrUpdate, -// extra_data: Option, -// ) -> Result { -// let node_id = match opts { -// CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()), -// CreateOrUpdate::Update { ref node_id } => node_id.clone(), -// }; -// let node_id = node_id.to_string(); - -// let action = match opts { -// CreateOrUpdate::Create { .. } => "put", -// CreateOrUpdate::Update { .. } => "update", -// }; - -// println!("Request: {opts:?} {extra_data:?}"); - -// let tx = self.db.multi_transaction(true); - -// let (created_at, updated_at) = match opts { -// CreateOrUpdate::Create { ref r#type } => { -// let node_result = tx.run_script( -// " -// ?[id, type] <- [[$node_id, $type]] -// :put node { id, type } -// :returning -// ", -// btmap! { -// "node_id".to_owned() => DataValue::from(node_id.clone()), -// "type".to_owned() => DataValue::from(r#type.to_owned()), -// }, -// )?; -// let created_at = DateTime::from_timestamp_millis( -// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64, -// ) -// .unwrap(); -// let updated_at = DateTime::from_timestamp_millis( -// (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64, -// ) -// .unwrap(); -// (created_at, updated_at) -// } -// CreateOrUpdate::Update { .. } => { -// let node_result = tx.run_script( -// " -// ?[id, type, created_at, updated_at] := *node { id, type, created_at, updated_at }, -// id = $node_id -// ", -// btmap! { -// "node_id".to_owned() => DataValue::from(node_id.clone()), -// }, -// )?; -// let created_at = DateTime::from_timestamp_millis( -// (node_result.rows[0][2].get_float().unwrap() * 1000.0) as i64, -// ) -// .unwrap(); -// let updated_at = DateTime::from_timestamp_millis( -// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64, -// ) -// .unwrap(); -// (created_at, updated_at) -// } -// }; - -// if let Some(extra_data) = extra_data { -// let node_id_field = self -// .tantivy_field_map -// .get_by_left("node_id") -// .unwrap() -// .clone(); - -// if !extra_data.is_empty() { -// let keys = extra_data.keys().map(|s| s.to_owned()).collect::>(); -// let field_mapping = -// self.get_rows_for_extra_keys(&tx, keys.as_slice())?; - -// // Group the keys by which relation they're in -// let result_by_relation = field_mapping.iter().into_group_map_by( -// |(_, FieldInfo { relation_name, .. })| relation_name, -// ); - -// for (relation, fields) in result_by_relation.iter() { -// let mut doc = btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) }; -// let fields_mapping = fields -// .into_iter() -// .map( -// |( -// key, -// FieldInfo { -// relation_field, -// r#type, -// is_fts_enabled, -// .. -// }, -// )| { -// let new_value = extra_data.get(*key).unwrap(); - -// // TODO: Make this more generic -// let new_value = match r#type.as_str() { -// "int" => DataValue::from(new_value.as_i64().unwrap()), -// _ => DataValue::from(new_value.as_str().unwrap()), -// }; - -// if *is_fts_enabled { -// if let Some(field) = self.tantivy_field_map.get_by_left(*key) -// { -// doc.insert( -// field.clone(), -// OwnedValue::Str(new_value.get_str().unwrap().to_owned()), -// ); -// } -// } - -// (relation_field.to_owned(), new_value) -// }, -// ) -// .collect::>(); - -// let mut writer = -// self.tantivy_index.writer(15_000_000).into_diagnostic()?; - -// let delete_term = -// Term::from_field_text(node_id_field.clone(), &node_id); -// writer.delete_term(delete_term); - -// writer.add_document(doc).into_diagnostic()?; -// writer.commit().into_diagnostic()?; -// drop(writer); - -// let keys = fields_mapping.keys().collect::>(); -// let keys_joined = keys.iter().join(", "); - -// if !keys.is_empty() { -// let query = format!( -// " -// ?[ node_id, {keys_joined} ] <- [$input_data] -// :{action} {relation} {{ node_id, {keys_joined} }} -// " -// ); - -// let mut params = vec![]; -// params.push(DataValue::from(node_id.clone())); -// for key in keys { -// params.push(fields_mapping[key].clone()); -// } - -// let result = tx.run_script( -// &query, -// btmap! { -// "input_data".to_owned() => DataValue::List(params), -// }, -// ); -// } -// } - -// let input = DataValue::List( -// keys -// .iter() -// .map(|s| { -// DataValue::List(vec![ -// DataValue::from(s.to_owned()), -// DataValue::from(node_id.clone()), -// ]) -// }) -// .collect_vec(), -// ); - -// tx.run_script( -// " -// ?[key, id] <- $input_data -// :put node_has_key { key, id } -// ", -// btmap! { -// "input_data".to_owned() => input -// }, -// )?; -// } -// } - -// tx.commit()?; - -// Ok(NodeInfo { -// node_id: NodeId(Uuid::from_str(&node_id).unwrap()), -// created_at, -// updated_at, -// fields: None, -// }) -// } // pub async fn update_node() {} diff --git a/crates/panorama-core/src/state/node_raw.rs b/crates/panorama-core/src/state/node_raw.rs new file mode 100644 index 0000000..e97b8fd --- /dev/null +++ b/crates/panorama-core/src/state/node_raw.rs @@ -0,0 +1,42 @@ +use sqlx::{Executor, FromRow, Sqlite}; + +use crate::AppState; + +#[derive(FromRow)] +pub struct FieldMappingRow { + pub(crate) full_key: String, + pub(crate) app_id: i64, + pub(crate) app_table_name: String, + pub(crate) app_table_field: String, + pub(crate) db_table_name: Option, +} + +impl AppState { + pub async fn get_related_field_list_for_node_id<'e, 'c: 'e, X>( + x: X, + node_id: &str, + ) -> sqlx::Result> + where + X: 'e + Executor<'c, Database = Sqlite>, + { + sqlx::query_as!( + FieldMappingRow, + " + SELECT + node_has_key.full_key, key_mapping.app_id, + key_mapping.app_table_name, app_table_field, + app_table_mapping.db_table_name + FROM node_has_key + INNER JOIN key_mapping + ON node_has_key.full_key = key_mapping.full_key + INNER JOIN app_table_mapping + ON key_mapping.app_id = app_table_mapping.app_id + AND key_mapping.app_table_name = app_table_mapping.app_table_name + WHERE node_id = $1 + ", + node_id + ) + .fetch_all(x) + .await + } +}