compiles again
This commit is contained in:
parent
f771a7d20f
commit
3a2eda20cb
4 changed files with 209 additions and 178 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -3,4 +3,4 @@ dist
|
||||||
target
|
target
|
||||||
.DS_Store
|
.DS_Store
|
||||||
**/export/export.json
|
**/export/export.json
|
||||||
test.db
|
test.db*
|
|
@ -1,7 +1,7 @@
|
||||||
CREATE TABLE node (
|
CREATE TABLE node (
|
||||||
node_id TEXT PRIMARY KEY,
|
node_id TEXT PRIMARY KEY,
|
||||||
node_type TEXT NOT NULL,
|
node_type TEXT NOT NULL,
|
||||||
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
updated_at INTEGER NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||||
extra_data JSON
|
extra_data JSON
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,9 @@
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::{
|
||||||
|
collections::{BTreeMap, HashMap},
|
||||||
|
str::FromStr,
|
||||||
|
};
|
||||||
|
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use miette::{bail, Context, Error, IntoDiagnostic, Report, Result};
|
use miette::{bail, Context, Error, IntoDiagnostic, Report, Result};
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
@ -8,7 +11,11 @@ use sqlx::{
|
||||||
query::Query, sqlite::SqliteArguments, Acquire, Connection, Executor,
|
query::Query, sqlite::SqliteArguments, Acquire, Connection, Executor,
|
||||||
FromRow, QueryBuilder, Sqlite,
|
FromRow, QueryBuilder, Sqlite,
|
||||||
};
|
};
|
||||||
use tantivy::schema::{OwnedValue, Value as _};
|
use tantivy::{
|
||||||
|
schema::{OwnedValue, Value as _},
|
||||||
|
time::Date,
|
||||||
|
Term,
|
||||||
|
};
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{state::node_raw::FieldMappingRow, AppState, NodeId};
|
use crate::{state::node_raw::FieldMappingRow, AppState, NodeId};
|
||||||
|
@ -16,6 +23,8 @@ use crate::{state::node_raw::FieldMappingRow, AppState, NodeId};
|
||||||
// use super::utils::owned_value_to_json_value;
|
// use super::utils::owned_value_to_json_value;
|
||||||
|
|
||||||
pub type ExtraData = BTreeMap<String, Value>;
|
pub type ExtraData = BTreeMap<String, Value>;
|
||||||
|
pub type FieldsByTable<'a> =
|
||||||
|
HashMap<(&'a i64, &'a String), Vec<&'a FieldMappingRow>>;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct NodeInfo {
|
pub struct NodeInfo {
|
||||||
|
@ -99,7 +108,7 @@ impl AppState {
|
||||||
|
|
||||||
async fn query_related_fields<'e, 'c: 'e, X>(
|
async fn query_related_fields<'e, 'c: 'e, X>(
|
||||||
x: X,
|
x: X,
|
||||||
fields_by_table: &HashMap<(&i64, &String), Vec<&FieldMappingRow>>,
|
fields_by_table: &FieldsByTable<'_>,
|
||||||
) -> sqlx::Result<HashMap<String, Value>>
|
) -> sqlx::Result<HashMap<String, Value>>
|
||||||
where
|
where
|
||||||
X: 'e + Executor<'c, Database = Sqlite>,
|
X: 'e + Executor<'c, Database = Sqlite>,
|
||||||
|
@ -186,88 +195,145 @@ pub enum CreateOrUpdate {
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
// TODO: Split this out into create and update
|
// TODO: Split this out into create and update
|
||||||
// pub async fn create_or_update_node(
|
pub async fn create_or_update_node(
|
||||||
// &self,
|
&self,
|
||||||
// opts: CreateOrUpdate,
|
opts: CreateOrUpdate,
|
||||||
// extra_data: Option<ExtraData>,
|
extra_data: Option<ExtraData>,
|
||||||
// ) -> Result<NodeInfo> {
|
) -> Result<NodeInfo> {
|
||||||
// let node_id = match opts {
|
let node_id = match opts {
|
||||||
// CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()),
|
CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()),
|
||||||
// CreateOrUpdate::Update { ref node_id } => node_id.clone(),
|
CreateOrUpdate::Update { ref node_id } => node_id.clone(),
|
||||||
// };
|
};
|
||||||
// let node_id = node_id.to_string();
|
let node_id = node_id.to_string();
|
||||||
|
|
||||||
// let action = match opts {
|
let action = match opts {
|
||||||
// CreateOrUpdate::Create { .. } => "put",
|
CreateOrUpdate::Create { .. } => "put",
|
||||||
// CreateOrUpdate::Update { .. } => "update",
|
CreateOrUpdate::Update { .. } => "update",
|
||||||
// };
|
};
|
||||||
|
|
||||||
// println!("Request: {opts:?} {extra_data:?}");
|
println!("Request: {opts:?} {extra_data:?}");
|
||||||
|
|
||||||
// let (created_at, updated_at) = match opts {
|
let mut conn = self.conn().await?;
|
||||||
// CreateOrUpdate::Create { ref r#type } => {
|
|
||||||
// let node_result = tx.run_script(
|
conn
|
||||||
// "
|
.transaction::<_, _, sqlx::Error>(|tx| {
|
||||||
// ?[id, type] <- [[$node_id, $type]]
|
Box::pin(async move {
|
||||||
// :put node { id, type }
|
let node_info = match opts {
|
||||||
// :returning
|
CreateOrUpdate::Create { r#type } => {
|
||||||
// ",
|
AppState::create_node_raw(&mut **tx, &r#type).await?
|
||||||
// btmap! {
|
}
|
||||||
// "node_id".to_owned() => DataValue::from(node_id.clone()),
|
CreateOrUpdate::Update { node_id } => todo!(),
|
||||||
// "type".to_owned() => DataValue::from(r#type.to_owned()),
|
};
|
||||||
// },
|
|
||||||
// )?;
|
if let Some(extra_data) = extra_data {
|
||||||
// let created_at = DateTime::from_timestamp_millis(
|
if !extra_data.is_empty() {
|
||||||
// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
|
let node_id_str = node_id.to_string();
|
||||||
// )
|
let field_mapping = AppState::get_related_field_list_for_node_id(
|
||||||
// .unwrap();
|
&mut **tx,
|
||||||
// let updated_at = DateTime::from_timestamp_millis(
|
&node_id_str,
|
||||||
// (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64,
|
)
|
||||||
// )
|
.await?;
|
||||||
// .unwrap();
|
|
||||||
// (created_at, updated_at)
|
// Group the keys by which relation they're in
|
||||||
|
let fields_by_table = field_mapping.iter().into_group_map_by(
|
||||||
|
|FieldMappingRow {
|
||||||
|
app_id,
|
||||||
|
app_table_name,
|
||||||
|
..
|
||||||
|
}| (app_id, app_table_name),
|
||||||
|
);
|
||||||
|
|
||||||
|
AppState::write_extra_data(
|
||||||
|
&mut **tx,
|
||||||
|
&node_id_str,
|
||||||
|
&fields_by_table,
|
||||||
|
extra_data,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(node_info)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.into_diagnostic()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_node_raw<'e, 'c: 'e, X>(
|
||||||
|
x: X,
|
||||||
|
r#type: &str,
|
||||||
|
) -> sqlx::Result<NodeInfo>
|
||||||
|
where
|
||||||
|
X: 'e + Executor<'c, Database = Sqlite>,
|
||||||
|
{
|
||||||
|
let node_id = Uuid::now_v7();
|
||||||
|
let node_id_str = node_id.to_string();
|
||||||
|
|
||||||
|
#[derive(FromRow)]
|
||||||
|
struct Result {
|
||||||
|
updated_at: i64,
|
||||||
|
}
|
||||||
|
|
||||||
|
let result = sqlx::query_as!(
|
||||||
|
Result,
|
||||||
|
r#"
|
||||||
|
INSERT INTO node (node_id, node_type, extra_data)
|
||||||
|
VALUES (?, ?, "{}")
|
||||||
|
RETURNING updated_at
|
||||||
|
"#,
|
||||||
|
node_id_str,
|
||||||
|
r#type,
|
||||||
|
)
|
||||||
|
.fetch_one(x)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let updated_at =
|
||||||
|
DateTime::from_timestamp_millis(result.updated_at * 1000).unwrap();
|
||||||
|
let created_at = DateTime::from_timestamp_millis(
|
||||||
|
node_id.get_timestamp().unwrap().to_unix().0 as i64 * 1000,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(NodeInfo {
|
||||||
|
node_id: NodeId(node_id),
|
||||||
|
created_at,
|
||||||
|
updated_at,
|
||||||
|
fields: None,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn write_extra_data<'e, 'c: 'e, X>(
|
||||||
|
x: X,
|
||||||
|
node_id: &str,
|
||||||
|
fields_by_table: &FieldsByTable<'_>,
|
||||||
|
extra_data: ExtraData,
|
||||||
|
) -> sqlx::Result<()>
|
||||||
|
where
|
||||||
|
X: 'e + Executor<'c, Database = Sqlite>,
|
||||||
|
{
|
||||||
|
// Update Tantivy indexes
|
||||||
|
// for ((app_id, app_table_name), fields) in fields_by_table.iter() {
|
||||||
|
// let mut writer =
|
||||||
|
// self.tantivy_index.writer(15_000_000).into_diagnostic()?;
|
||||||
|
|
||||||
|
// let delete_term = Term::from_field_text(node_id_field.clone(), &node_id);
|
||||||
|
// writer.delete_term(delete_term);
|
||||||
|
|
||||||
|
// writer.add_document(doc).into_diagnostic()?;
|
||||||
|
// writer.commit().into_diagnostic()?;
|
||||||
|
// drop(writer);
|
||||||
// }
|
// }
|
||||||
// CreateOrUpdate::Update { .. } => {
|
|
||||||
// let node_result = tx.run_script(
|
|
||||||
// "
|
|
||||||
// ?[id, type, created_at, updated_at] := *node { id, type, created_at, updated_at },
|
|
||||||
// id = $node_id
|
|
||||||
// ",
|
|
||||||
// btmap! {
|
|
||||||
// "node_id".to_owned() => DataValue::from(node_id.clone()),
|
|
||||||
// },
|
|
||||||
// )?;
|
|
||||||
// let created_at = DateTime::from_timestamp_millis(
|
|
||||||
// (node_result.rows[0][2].get_float().unwrap() * 1000.0) as i64,
|
|
||||||
// )
|
|
||||||
// .unwrap();
|
|
||||||
// let updated_at = DateTime::from_timestamp_millis(
|
|
||||||
// (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64,
|
|
||||||
// )
|
|
||||||
// .unwrap();
|
|
||||||
// (created_at, updated_at)
|
|
||||||
// }
|
|
||||||
// };
|
|
||||||
|
|
||||||
// if let Some(extra_data) = extra_data {
|
// Update database
|
||||||
// let node_id_field = self
|
let mut node_has_keys = Vec::new();
|
||||||
// .tantivy_field_map
|
for ((app_id, app_table_name), fields) in fields_by_table.iter() {
|
||||||
// .get_by_left("node_id")
|
for field_info in fields {
|
||||||
// .unwrap()
|
node_has_keys.push(&field_info.full_key);
|
||||||
// .clone();
|
}
|
||||||
|
|
||||||
// if !extra_data.is_empty() {
|
// let mut doc =
|
||||||
// let keys = extra_data.keys().map(|s| s.to_owned()).collect::<Vec<_>>();
|
// btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) };
|
||||||
// let field_mapping =
|
|
||||||
// self.get_rows_for_extra_keys(&tx, keys.as_slice())?;
|
|
||||||
|
|
||||||
// // Group the keys by which relation they're in
|
|
||||||
// let result_by_relation = field_mapping.iter().into_group_map_by(
|
|
||||||
// |(_, FieldInfo { relation_name, .. })| relation_name,
|
|
||||||
// );
|
|
||||||
|
|
||||||
// for (relation, fields) in result_by_relation.iter() {
|
|
||||||
// let mut doc = btmap! { node_id_field.clone() => OwnedValue::Str(node_id.to_owned()) };
|
|
||||||
// let fields_mapping = fields
|
// let fields_mapping = fields
|
||||||
// .into_iter()
|
// .into_iter()
|
||||||
// .map(
|
// .map(
|
||||||
|
@ -289,8 +355,7 @@ impl AppState {
|
||||||
// };
|
// };
|
||||||
|
|
||||||
// if *is_fts_enabled {
|
// if *is_fts_enabled {
|
||||||
// if let Some(field) = self.tantivy_field_map.get_by_left(*key)
|
// if let Some(field) = self.tantivy_field_map.get_by_left(*key) {
|
||||||
// {
|
|
||||||
// doc.insert(
|
// doc.insert(
|
||||||
// field.clone(),
|
// field.clone(),
|
||||||
// OwnedValue::Str(new_value.get_str().unwrap().to_owned()),
|
// OwnedValue::Str(new_value.get_str().unwrap().to_owned()),
|
||||||
|
@ -303,17 +368,6 @@ impl AppState {
|
||||||
// )
|
// )
|
||||||
// .collect::<BTreeMap<_, _>>();
|
// .collect::<BTreeMap<_, _>>();
|
||||||
|
|
||||||
// let mut writer =
|
|
||||||
// self.tantivy_index.writer(15_000_000).into_diagnostic()?;
|
|
||||||
|
|
||||||
// let delete_term =
|
|
||||||
// Term::from_field_text(node_id_field.clone(), &node_id);
|
|
||||||
// writer.delete_term(delete_term);
|
|
||||||
|
|
||||||
// writer.add_document(doc).into_diagnostic()?;
|
|
||||||
// writer.commit().into_diagnostic()?;
|
|
||||||
// drop(writer);
|
|
||||||
|
|
||||||
// let keys = fields_mapping.keys().collect::<Vec<_>>();
|
// let keys = fields_mapping.keys().collect::<Vec<_>>();
|
||||||
// let keys_joined = keys.iter().join(", ");
|
// let keys_joined = keys.iter().join(", ");
|
||||||
|
|
||||||
|
@ -338,41 +392,18 @@ impl AppState {
|
||||||
// },
|
// },
|
||||||
// );
|
// );
|
||||||
// }
|
// }
|
||||||
// }
|
}
|
||||||
|
|
||||||
// let input = DataValue::List(
|
let mut query =
|
||||||
// keys
|
QueryBuilder::new("INSERT INTO node_has_key (node_id, full_key) VALUES ");
|
||||||
// .iter()
|
query.push_values(node_has_keys, |mut b, key| {
|
||||||
// .map(|s| {
|
b.push_bind(node_id).push_bind(key);
|
||||||
// DataValue::List(vec![
|
});
|
||||||
// DataValue::from(s.to_owned()),
|
println!("Query: {:?}", query.sql());
|
||||||
// DataValue::from(node_id.clone()),
|
query.build().execute(x).await?;
|
||||||
// ])
|
|
||||||
// })
|
|
||||||
// .collect_vec(),
|
|
||||||
// );
|
|
||||||
|
|
||||||
// tx.run_script(
|
Ok(())
|
||||||
// "
|
}
|
||||||
// ?[key, id] <- $input_data
|
|
||||||
// :put node_has_key { key, id }
|
|
||||||
// ",
|
|
||||||
// btmap! {
|
|
||||||
// "input_data".to_owned() => input
|
|
||||||
// },
|
|
||||||
// )?;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// tx.commit()?;
|
|
||||||
|
|
||||||
// Ok(NodeInfo {
|
|
||||||
// node_id: NodeId(Uuid::from_str(&node_id).unwrap()),
|
|
||||||
// created_at,
|
|
||||||
// updated_at,
|
|
||||||
// fields: None,
|
|
||||||
// })
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// impl AppState {
|
// impl AppState {
|
||||||
|
|
|
@ -4,15 +4,15 @@ use crate::AppState;
|
||||||
|
|
||||||
#[derive(FromRow)]
|
#[derive(FromRow)]
|
||||||
pub struct FieldMappingRow {
|
pub struct FieldMappingRow {
|
||||||
pub(crate) full_key: String,
|
pub full_key: String,
|
||||||
pub(crate) app_id: i64,
|
pub app_id: i64,
|
||||||
pub(crate) app_table_name: String,
|
pub app_table_name: String,
|
||||||
pub(crate) app_table_field: String,
|
pub app_table_field: String,
|
||||||
pub(crate) db_table_name: Option<String>,
|
pub db_table_name: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AppState {
|
impl AppState {
|
||||||
pub async fn get_related_field_list_for_node_id<'e, 'c: 'e, X>(
|
pub(crate) async fn get_related_field_list_for_node_id<'e, 'c: 'e, X>(
|
||||||
x: X,
|
x: X,
|
||||||
node_id: &str,
|
node_id: &str,
|
||||||
) -> sqlx::Result<Vec<FieldMappingRow>>
|
) -> sqlx::Result<Vec<FieldMappingRow>>
|
||||||
|
|
Loading…
Reference in a new issue