This commit is contained in:
Michael Zhang 2024-06-18 15:20:42 -05:00
parent 21728e6de5
commit f771a7d20f
4 changed files with 377 additions and 343 deletions

View file

@ -25,13 +25,13 @@ CREATE TABLE app (
app_license TEXT app_license TEXT
); );
CREATE TABLE app_table ( CREATE TABLE app_table_mapping (
app_id INTEGER NOT NULL, app_id INTEGER NOT NULL,
app_table_name TEXT NOT NULL, app_table_name TEXT NOT NULL,
db_table_name TEXT NOT NULL db_table_name TEXT NOT NULL
); );
CREATE TABLE full_key_to_db_key ( CREATE TABLE key_mapping (
full_key TEXT NOT NULL, full_key TEXT NOT NULL,
app_id INTEGER NOT NULL, app_id INTEGER NOT NULL,
app_table_name TEXT NOT NULL, app_table_name TEXT NOT NULL,

View file

@ -4,6 +4,7 @@ pub mod apps;
// pub mod journal; // pub mod journal;
// pub mod mail; // pub mod mail;
pub mod node; pub mod node;
pub mod node_raw;
// pub mod utils; // pub mod utils;
use std::{collections::HashMap, fs, path::Path}; use std::{collections::HashMap, fs, path::Path};

View file

@ -1,17 +1,17 @@
use std::{ use std::collections::{BTreeMap, HashMap};
collections::{BTreeMap, HashMap},
str::FromStr,
};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use itertools::Itertools; use itertools::Itertools;
use miette::{bail, IntoDiagnostic, Result}; use miette::{bail, Context, Error, IntoDiagnostic, Report, Result};
use serde_json::Value; use serde_json::Value;
use sqlx::{Acquire, Connection, FromRow}; use sqlx::{
query::Query, sqlite::SqliteArguments, Acquire, Connection, Executor,
FromRow, QueryBuilder, Sqlite,
};
use tantivy::schema::{OwnedValue, Value as _}; use tantivy::schema::{OwnedValue, Value as _};
use uuid::Uuid; use uuid::Uuid;
use crate::{AppState, NodeId}; use crate::{state::node_raw::FieldMappingRow, AppState, NodeId};
// use super::utils::owned_value_to_json_value; // use super::utils::owned_value_to_json_value;
@ -25,366 +25,357 @@ pub struct NodeInfo {
pub fields: Option<HashMap<String, Value>>, pub fields: Option<HashMap<String, Value>>,
} }
#[derive(Debug)]
pub struct FieldInfo {
pub relation_name: String,
pub relation_field: String,
pub r#type: String,
pub is_fts_enabled: bool,
}
pub type FieldMapping = HashMap<String, FieldInfo>;
impl AppState { impl AppState {
/// Get all properties of a node /// Get all properties of a node
pub async fn get_node(&self, node_id: impl AsRef<str>) -> Result<NodeInfo> { pub async fn get_node(&self, node_id: impl AsRef<str>) -> Result<NodeInfo> {
let node_id = node_id.as_ref().to_owned(); let node_id = node_id.as_ref().to_owned();
let conn = self.conn().await?; let mut conn = self.conn().await?;
#[derive(FromRow)]
struct FieldMappingRow {
full_key: String,
app_id: i64,
app_table_name: String,
app_table_field: String,
}
conn conn
.transaction(|tx| { .transaction::<_, _, sqlx::Error>(|tx| {
Box::pin(async move { Box::pin(async move {
let result = sqlx::query_as!( let node_id = node_id.clone();
FieldMappingRow, let field_mapping =
" AppState::get_related_field_list_for_node_id(&mut **tx, &node_id)
SELECT .await?;
node_has_key.full_key, app_id, app_table_name, app_table_field
FROM node_has_key
INNER JOIN full_key_to_db_key
ON node_has_key.full_key = full_key_to_db_key.full_key
WHERE node_id = $1
",
node_id
)
.fetch_all(&mut **tx)
.await
.into_diagnostic()?;
let field_mapping = result
.into_iter()
.map(|row| (row.full_key.clone(), row))
.collect::<HashMap<_, _>>();
// Group the keys by which relation they're in // Group the keys by which relation they're in
let result_by_relation = field_mapping.iter().into_group_map_by( let fields_by_table = field_mapping.iter().into_group_map_by(
|( |FieldMappingRow {
_,
FieldMappingRow {
app_id, app_id,
app_table_name, app_table_name,
.. ..
}, }| (app_id, app_table_name),
)| (app_id, app_table_name),
); );
let mut all_relation_queries = vec![]; // Run the query that grabs all of the relevant fields, and coalesce
let mut all_relation_constraints = vec![]; // the fields back
let mut all_fields = vec![]; let related_fields =
let mut field_counter = 0; AppState::query_related_fields(&mut **tx, &fields_by_table).await?;
for (i, (relation, fields)) in result_by_relation.iter().enumerate() {
let constraint_name = format!("c{i}"); println!("Related fields: {:?}", related_fields);
// let created_at = DateTime::from_timestamp_millis(
// (result.rows[0][2].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// let updated_at = DateTime::from_timestamp_millis(
// (result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// let mut fields = HashMap::new();
// for row in result
// .rows
// .into_iter()
// .map(|row| row.into_iter().skip(4).zip(all_fields.iter()))
// {
// for (value, (_, _, field_name)) in row {
// fields.insert(
// field_name.to_string(),
// data_value_to_json_value(&value),
// );
// }
// }
todo!()
// Ok(NodeInfo {
// node_id: NodeId(Uuid::from_str(&node_id).unwrap()),
// created_at,
// updated_at,
// fields: Some(fields),
// })
})
})
.await
.into_diagnostic()?;
todo!()
// Ok(())
}
async fn query_related_fields<'e, 'c: 'e, X>(
x: X,
fields_by_table: &HashMap<(&i64, &String), Vec<&FieldMappingRow>>,
) -> sqlx::Result<HashMap<String, Value>>
where
X: 'e + Executor<'c, Database = Sqlite>,
{
let mut query = QueryBuilder::new("");
let mut mapping = HashMap::new();
let mut ctr = 0;
let mut selected_fields = vec![];
for ((app_id, app_table_name), fields) in fields_by_table.iter() {
let table_gen_name = format!("c{ctr}");
ctr += 1;
let mut keys = vec![]; let mut keys = vec![];
let mut constraints = vec![]; for field_info in fields.iter() {
for (key, field_info) in fields.iter() { let field_gen_name = format!("f{ctr}");
let counted_field_name = format!("f{field_counter}"); ctr += 1;
field_counter += 1; mapping.insert(&field_info.full_key, field_gen_name.clone());
keys.push(counted_field_name.clone()); keys.push(field_gen_name.clone());
constraints.push(format!(
"{}: {}", selected_fields.push(format!(
field_info.relation_field.to_owned(), "{}.{} as {}",
counted_field_name, table_gen_name, field_info.app_table_field, field_gen_name
)); ));
all_fields.push((
counted_field_name, // constraints.push(format!(
field_info.relation_field.to_owned(), // "{}: {}",
key, // field_info.relation_field.to_owned(),
)) // field_gen_name,
// ));
// all_fields.push((
// field_gen_name,
// field_info.relation_field.to_owned(),
// key,
// ))
} }
let keys = keys.join(", "); // let keys = keys.join(", ");
let constraints = constraints.join(", "); // let constraints = constraints.join(", ");
all_relation_queries.push(format!( // all_relation_queries.push(format!(
" // "
{constraint_name}[{keys}] := // {table_gen_name}[{keys}] :=
*{relation}{{ node_id, {constraints} }}, // *{relation}{{ node_id, {constraints} }},
node_id = $node_id // node_id = $node_id
" // "
)); // ));
all_relation_constraints.push(format!("{constraint_name}[{keys}],")) // all_relation_constraints.push(format!("{table_gen_name}[{keys}],"))
} }
let all_relation_constraints = all_relation_constraints.join("\n"); query.push("SELECT");
let all_relation_queries = all_relation_queries.join("\n\n"); query.push(selected_fields.join(", "));
let all_field_names = all_fields query.push("FROM");
.iter() println!("Query: {:?}", query.sql());
.map(|(field_name, _, _)| field_name)
.join(", ");
let query = format!(
"
{all_relation_queries}
?[type, extra_data, created_at, updated_at, {all_field_names}] := // let all_relation_constraints = all_relation_constraints.join("\n");
*node {{ id, type, created_at, updated_at, extra_data }}, // let all_relation_queries = all_relation_queries.join("\n\n");
{all_relation_constraints} // let all_field_names = all_fields
id = $node_id // .iter()
" // .map(|(field_name, _, _)| field_name)
); // .join(", ");
// let _query = format!(
// "
// {all_relation_queries}
let result = tx.run_script( // ?[type, extra_data, created_at, updated_at, {all_field_names}] :=
&query, // *node {{ id, type, created_at, updated_at, extra_data }},
btmap! { "node_id".to_owned() => node_id.to_string().into(), }, // {all_relation_constraints}
)?; // id = $node_id
// "
// );
if result.rows.is_empty() { let rows = query.build().fetch_all(x).await.into_diagnostic();
bail!("Not found")
}
let created_at = DateTime::from_timestamp_millis( todo!()
(result.rows[0][2].get_float().unwrap() * 1000.0) as i64,
)
.unwrap();
let updated_at = DateTime::from_timestamp_millis(
(result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
)
.unwrap();
let mut fields = HashMap::new();
for row in result
.rows
.into_iter()
.map(|row| row.into_iter().skip(4).zip(all_fields.iter()))
{
for (value, (_, _, field_name)) in row {
fields.insert(
field_name.to_string(),
data_value_to_json_value(&value),
);
}
}
Ok(NodeInfo {
node_id: NodeId(Uuid::from_str(&node_id).unwrap()),
created_at,
updated_at,
fields: Some(fields),
})
})
})
.await?;
Ok(())
} }
} }
// #[derive(Debug)] #[derive(Debug)]
// pub enum CreateOrUpdate { pub enum CreateOrUpdate {
// Create { r#type: String }, Create { r#type: String },
// Update { node_id: NodeId }, Update { node_id: NodeId },
// } }
impl AppState {
// TODO: Split this out into create and update
// pub async fn create_or_update_node(
// &self,
// opts: CreateOrUpdate,
// extra_data: Option<ExtraData>,
// ) -> Result<NodeInfo> {
// let node_id = match opts {
// CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()),
// CreateOrUpdate::Update { ref node_id } => node_id.clone(),
// };
// let node_id = node_id.to_string();
// let action = match opts {
// CreateOrUpdate::Create { .. } => "put",
// CreateOrUpdate::Update { .. } => "update",
// };
// println!("Request: {opts:?} {extra_data:?}");
// let (created_at, updated_at) = match opts {
// CreateOrUpdate::Create { ref r#type } => {
// let node_result = tx.run_script(
// "
// ?[id, type] <- [[$node_id, $type]]
// :put node { id, type }
// :returning
// ",
// btmap! {
// "node_id".to_owned() => DataValue::from(node_id.clone()),
// "type".to_owned() => DataValue::from(r#type.to_owned()),
// },
// )?;
// let created_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// let updated_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// (created_at, updated_at)
// }
// CreateOrUpdate::Update { .. } => {
// let node_result = tx.run_script(
// "
// ?[id, type, created_at, updated_at] := *node { id, type, created_at, updated_at },
// id = $node_id
// ",
// btmap! {
// "node_id".to_owned() => DataValue::from(node_id.clone()),
// },
// )?;
// let created_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][2].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// let updated_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// (created_at, updated_at)
// }
// };
// if let Some(extra_data) = extra_data {
// let node_id_field = self
// .tantivy_field_map
// .get_by_left("node_id")
// .unwrap()
// .clone();
// if !extra_data.is_empty() {
// let keys = extra_data.keys().map(|s| s.to_owned()).collect::<Vec<_>>();
// let field_mapping =
// self.get_rows_for_extra_keys(&tx, keys.as_slice())?;
// // Group the keys by which relation they're in
// let result_by_relation = field_mapping.iter().into_group_map_by(
// |(_, FieldInfo { relation_name, .. })| relation_name,
// );
// for (relation, fields) in result_by_relation.iter() {
// let mut doc = btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) };
// let fields_mapping = fields
// .into_iter()
// .map(
// |(
// key,
// FieldInfo {
// relation_field,
// r#type,
// is_fts_enabled,
// ..
// },
// )| {
// let new_value = extra_data.get(*key).unwrap();
// // TODO: Make this more generic
// let new_value = match r#type.as_str() {
// "int" => DataValue::from(new_value.as_i64().unwrap()),
// _ => DataValue::from(new_value.as_str().unwrap()),
// };
// if *is_fts_enabled {
// if let Some(field) = self.tantivy_field_map.get_by_left(*key)
// {
// doc.insert(
// field.clone(),
// OwnedValue::Str(new_value.get_str().unwrap().to_owned()),
// );
// }
// }
// (relation_field.to_owned(), new_value)
// },
// )
// .collect::<BTreeMap<_, _>>();
// let mut writer =
// self.tantivy_index.writer(15_000_000).into_diagnostic()?;
// let delete_term =
// Term::from_field_text(node_id_field.clone(), &node_id);
// writer.delete_term(delete_term);
// writer.add_document(doc).into_diagnostic()?;
// writer.commit().into_diagnostic()?;
// drop(writer);
// let keys = fields_mapping.keys().collect::<Vec<_>>();
// let keys_joined = keys.iter().join(", ");
// if !keys.is_empty() {
// let query = format!(
// "
// ?[ node_id, {keys_joined} ] <- [$input_data]
// :{action} {relation} {{ node_id, {keys_joined} }}
// "
// );
// let mut params = vec![];
// params.push(DataValue::from(node_id.clone()));
// for key in keys {
// params.push(fields_mapping[key].clone());
// }
// let result = tx.run_script(
// &query,
// btmap! {
// "input_data".to_owned() => DataValue::List(params),
// },
// );
// }
// }
// let input = DataValue::List(
// keys
// .iter()
// .map(|s| {
// DataValue::List(vec![
// DataValue::from(s.to_owned()),
// DataValue::from(node_id.clone()),
// ])
// })
// .collect_vec(),
// );
// tx.run_script(
// "
// ?[key, id] <- $input_data
// :put node_has_key { key, id }
// ",
// btmap! {
// "input_data".to_owned() => input
// },
// )?;
// }
// }
// tx.commit()?;
// Ok(NodeInfo {
// node_id: NodeId(Uuid::from_str(&node_id).unwrap()),
// created_at,
// updated_at,
// fields: None,
// })
// }
}
// impl AppState { // impl AppState {
// // TODO: Split this out into create and update
// pub async fn create_or_update_node(
// &self,
// opts: CreateOrUpdate,
// extra_data: Option<ExtraData>,
// ) -> Result<NodeInfo> {
// let node_id = match opts {
// CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()),
// CreateOrUpdate::Update { ref node_id } => node_id.clone(),
// };
// let node_id = node_id.to_string();
// let action = match opts {
// CreateOrUpdate::Create { .. } => "put",
// CreateOrUpdate::Update { .. } => "update",
// };
// println!("Request: {opts:?} {extra_data:?}");
// let tx = self.db.multi_transaction(true);
// let (created_at, updated_at) = match opts {
// CreateOrUpdate::Create { ref r#type } => {
// let node_result = tx.run_script(
// "
// ?[id, type] <- [[$node_id, $type]]
// :put node { id, type }
// :returning
// ",
// btmap! {
// "node_id".to_owned() => DataValue::from(node_id.clone()),
// "type".to_owned() => DataValue::from(r#type.to_owned()),
// },
// )?;
// let created_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// let updated_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// (created_at, updated_at)
// }
// CreateOrUpdate::Update { .. } => {
// let node_result = tx.run_script(
// "
// ?[id, type, created_at, updated_at] := *node { id, type, created_at, updated_at },
// id = $node_id
// ",
// btmap! {
// "node_id".to_owned() => DataValue::from(node_id.clone()),
// },
// )?;
// let created_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][2].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// let updated_at = DateTime::from_timestamp_millis(
// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
// )
// .unwrap();
// (created_at, updated_at)
// }
// };
// if let Some(extra_data) = extra_data {
// let node_id_field = self
// .tantivy_field_map
// .get_by_left("node_id")
// .unwrap()
// .clone();
// if !extra_data.is_empty() {
// let keys = extra_data.keys().map(|s| s.to_owned()).collect::<Vec<_>>();
// let field_mapping =
// self.get_rows_for_extra_keys(&tx, keys.as_slice())?;
// // Group the keys by which relation they're in
// let result_by_relation = field_mapping.iter().into_group_map_by(
// |(_, FieldInfo { relation_name, .. })| relation_name,
// );
// for (relation, fields) in result_by_relation.iter() {
// let mut doc = btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) };
// let fields_mapping = fields
// .into_iter()
// .map(
// |(
// key,
// FieldInfo {
// relation_field,
// r#type,
// is_fts_enabled,
// ..
// },
// )| {
// let new_value = extra_data.get(*key).unwrap();
// // TODO: Make this more generic
// let new_value = match r#type.as_str() {
// "int" => DataValue::from(new_value.as_i64().unwrap()),
// _ => DataValue::from(new_value.as_str().unwrap()),
// };
// if *is_fts_enabled {
// if let Some(field) = self.tantivy_field_map.get_by_left(*key)
// {
// doc.insert(
// field.clone(),
// OwnedValue::Str(new_value.get_str().unwrap().to_owned()),
// );
// }
// }
// (relation_field.to_owned(), new_value)
// },
// )
// .collect::<BTreeMap<_, _>>();
// let mut writer =
// self.tantivy_index.writer(15_000_000).into_diagnostic()?;
// let delete_term =
// Term::from_field_text(node_id_field.clone(), &node_id);
// writer.delete_term(delete_term);
// writer.add_document(doc).into_diagnostic()?;
// writer.commit().into_diagnostic()?;
// drop(writer);
// let keys = fields_mapping.keys().collect::<Vec<_>>();
// let keys_joined = keys.iter().join(", ");
// if !keys.is_empty() {
// let query = format!(
// "
// ?[ node_id, {keys_joined} ] <- [$input_data]
// :{action} {relation} {{ node_id, {keys_joined} }}
// "
// );
// let mut params = vec![];
// params.push(DataValue::from(node_id.clone()));
// for key in keys {
// params.push(fields_mapping[key].clone());
// }
// let result = tx.run_script(
// &query,
// btmap! {
// "input_data".to_owned() => DataValue::List(params),
// },
// );
// }
// }
// let input = DataValue::List(
// keys
// .iter()
// .map(|s| {
// DataValue::List(vec![
// DataValue::from(s.to_owned()),
// DataValue::from(node_id.clone()),
// ])
// })
// .collect_vec(),
// );
// tx.run_script(
// "
// ?[key, id] <- $input_data
// :put node_has_key { key, id }
// ",
// btmap! {
// "input_data".to_owned() => input
// },
// )?;
// }
// }
// tx.commit()?;
// Ok(NodeInfo {
// node_id: NodeId(Uuid::from_str(&node_id).unwrap()),
// created_at,
// updated_at,
// fields: None,
// })
// }
// pub async fn update_node() {} // pub async fn update_node() {}

View file

@ -0,0 +1,42 @@
use sqlx::{Executor, FromRow, Sqlite};
use crate::AppState;
#[derive(FromRow)]
pub struct FieldMappingRow {
pub(crate) full_key: String,
pub(crate) app_id: i64,
pub(crate) app_table_name: String,
pub(crate) app_table_field: String,
pub(crate) db_table_name: Option<String>,
}
impl AppState {
pub async fn get_related_field_list_for_node_id<'e, 'c: 'e, X>(
x: X,
node_id: &str,
) -> sqlx::Result<Vec<FieldMappingRow>>
where
X: 'e + Executor<'c, Database = Sqlite>,
{
sqlx::query_as!(
FieldMappingRow,
"
SELECT
node_has_key.full_key, key_mapping.app_id,
key_mapping.app_table_name, app_table_field,
app_table_mapping.db_table_name
FROM node_has_key
INNER JOIN key_mapping
ON node_has_key.full_key = key_mapping.full_key
INNER JOIN app_table_mapping
ON key_mapping.app_id = app_table_mapping.app_id
AND key_mapping.app_table_name = app_table_mapping.app_table_name
WHERE node_id = $1
",
node_id
)
.fetch_all(x)
.await
}
}