From 791349a2c4b004fe1365956a4df0cc88232e2a81 Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Thu, 13 Jun 2024 14:42:45 -0400 Subject: [PATCH] save data --- .gitignore | 2 +- Cargo.lock | 132 ++++++++++++++++++ app/src-tauri/Cargo.toml | 5 +- app/src-tauri/src/main.rs | 33 ++++- app/src/components/Header.module.scss | 27 ++++ app/src/components/Header.tsx | 82 +++++++++++- app/src/components/SearchBar.tsx | 4 +- app/src/components/nodes/JournalPage.tsx | 7 +- crates/panorama-core/Cargo.toml | 2 + crates/panorama-core/src/lib.rs | 3 +- crates/panorama-core/src/mail.rs | 160 ++++++++++++++++++++++ crates/panorama-core/src/migrations.rs | 15 +-- crates/panorama-core/src/state/export.rs | 5 +- crates/panorama-core/src/state/mail.rs | 162 +---------------------- crates/panorama-core/src/state/mod.rs | 12 +- crates/panorama-core/src/state/node.rs | 115 +++++++++++----- crates/panorama-core/src/state/utils.rs | 57 ++++++++ crates/panorama-core/src/tests/mod.rs | 1 + crates/panorama-daemon/src/lib.rs | 72 ++++++++++ crates/panorama-daemon/src/main.rs | 70 +--------- crates/panorama-daemon/src/node.rs | 94 ++++--------- 21 files changed, 711 insertions(+), 349 deletions(-) create mode 100644 crates/panorama-core/src/mail.rs create mode 100644 crates/panorama-core/src/state/utils.rs create mode 100644 crates/panorama-daemon/src/lib.rs diff --git a/.gitignore b/.gitignore index a19807c..1e2d465 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,4 @@ node_modules dist target .DS_Store -/export \ No newline at end of file +**/export/export.json \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index b3c9f7c..25c2f64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,6 +86,55 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b" + +[[package]] +name = "anstyle-parse" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad186efb764318d35165f1758e7dcef3b10628e26d41a44bc5550652e6804391" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "anyhow" version = "1.0.86" @@ -410,6 +459,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom 0.2.15", + "instant", + "pin-project-lite", + "rand 0.8.5", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -446,6 +509,12 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bimap" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "230c5f1ca6a325a32553f8640d31ac9b49f2411e901e427570154868b46da4f7" + [[package]] name = "bincode" version = "1.3.3" @@ -748,6 +817,46 @@ dependencies = [ "phf_codegen 0.11.2", ] +[[package]] +name = "clap" +version = "4.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5db83dced34638ad474f39f250d7fea9598bdd239eaced1bdf45d597da0f433f" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7e204572485eb3fbf28f871612191521df159bc3e15a9f5064c66dba3a8c05f" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c780290ccf4fb26629baa7a1081e68ced113f1d3ec302fa5948f1c381ebf06c6" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.66", +] + +[[package]] +name = "clap_lex" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b82cf0babdbd58558212896d1a4272303a57bdb245c2bf1147185fb45640e70" + [[package]] name = "cocoa" version = "0.25.0" @@ -788,6 +897,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "colorchoice" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422" + [[package]] name = "combine" version = "4.6.7" @@ -2454,6 +2569,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7655c9839580ee829dfacba1d1278c2b7883e50a277ff7541299489d6bdfdc45" +[[package]] +name = "is_terminal_polyfill" +version = "1.70.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800" + [[package]] name = "itertools" version = "0.12.1" @@ -3498,6 +3619,8 @@ dependencies = [ name = "panorama" version = "0.1.0" dependencies = [ + "clap", + "panorama-daemon", "serde", "serde_json", "tauri", @@ -3506,6 +3629,7 @@ dependencies = [ "tauri-plugin-shell", "tauri-plugin-single-instance", "tauri-plugin-window-state", + "tokio", ] [[package]] @@ -3513,6 +3637,8 @@ name = "panorama-core" version = "0.1.0" dependencies = [ "async-imap", + "backoff", + "bimap", "chrono", "cozo", "futures", @@ -6186,6 +6312,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcfc827f90e53a02eaef5e535ee14266c1d569214c6aa70133a624d8a3164ba" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "utoipa" version = "5.0.0-alpha.0" diff --git a/app/src-tauri/Cargo.toml b/app/src-tauri/Cargo.toml index e854ce3..5d1884b 100644 --- a/app/src-tauri/Cargo.toml +++ b/app/src-tauri/Cargo.toml @@ -18,14 +18,17 @@ crate-type = [ tauri-build = { version = "2.0.0-beta", features = [] } [dependencies] -tauri = { version = "2.0.0-beta", features = [] } +clap = { version = "4.5.7", features = ["derive"] } +panorama-daemon = { path = "../../crates/panorama-daemon" } serde = { version = "1", features = ["derive"] } serde_json = "1" +tauri = { version = "2.0.0-beta", features = [] } tauri-build = { version = "2.0.0-beta.17", features = ["config-toml"] } tauri-plugin-http = "2.0.0-beta.9" tauri-plugin-shell = "2.0.0-beta.7" tauri-plugin-single-instance = "2.0.0-beta.9" tauri-plugin-window-state = "2.0.0-beta" +tokio = { version = "1.38.0", features = ["full"] } [features] # This feature is used for production builds or when a dev server is not specified, DO NOT REMOVE!! diff --git a/app/src-tauri/src/main.rs b/app/src-tauri/src/main.rs index c3623a9..322f0b5 100644 --- a/app/src-tauri/src/main.rs +++ b/app/src-tauri/src/main.rs @@ -1,5 +1,34 @@ #![cfg_attr(not(debug_assertions), windows_subsystem = "windows")] -fn main() { - app_lib::run(); +use clap::{Parser, Subcommand}; + +#[derive(Parser, Debug)] +struct Opt { + #[clap(long = "no-embedded-daemon")] + no_embedded_daemon: bool, + + #[clap(subcommand)] + command: Option, +} + +#[derive(Subcommand, Debug)] +enum Command { + Daemon, +} + +#[tokio::main] +async fn main() { + let opt = Opt::parse(); + + match opt.command { + Some(Command::Daemon) => { + panorama_daemon::run().await; + } + None => { + if !opt.no_embedded_daemon { + tokio::spawn(panorama_daemon::run()); + } + app_lib::run(); + } + } } diff --git a/app/src/components/Header.module.scss b/app/src/components/Header.module.scss index 38f05fe..e38d263 100644 --- a/app/src/components/Header.module.scss +++ b/app/src/components/Header.module.scss @@ -29,4 +29,31 @@ color: rgb(0, 0, 0, 0.5); margin: 0; } +} + +.newNodeMenu { + background-color: rgba(255, 255, 255, 0.5); + backdrop-filter: blur(20px); + -webkit-backdrop-filter: blur(10px); + box-shadow: 0px 3px 6px rgba(0, 0, 0, 0.25); + border-bottom-left-radius: 4px; + border-bottom-right-radius: 4px; + + ul { + display: flex; + flex-direction: column; + padding: 0; + list-style-type: none; + + li { + display: flex; + flex-direction: column; + align-items: stretch; + + button { + cursor: pointer; + display: flex; + } + } + } } \ No newline at end of file diff --git a/app/src/components/Header.tsx b/app/src/components/Header.tsx index e577b74..1c10327 100644 --- a/app/src/components/Header.tsx +++ b/app/src/components/Header.tsx @@ -1,13 +1,24 @@ import styles from "./Header.module.scss"; import NoteAddIcon from "@mui/icons-material/NoteAdd"; import SearchBar from "./SearchBar"; -import { getVersion } from "@tauri-apps/api/app"; import ListIcon from "@mui/icons-material/List"; +import ArrowDropDownIcon from "@mui/icons-material/ArrowDropDown"; import { useSetAtom } from "jotai"; import { sidebarExpandedAtom } from "./Sidebar"; import { useNodeControls } from "../App"; -import { useCallback } from "react"; +import { useCallback, useState } from "react"; import { useQuery } from "@tanstack/react-query"; +import { getVersion } from "@tauri-apps/api/app"; +import { + FloatingOverlay, + FloatingPortal, + autoUpdate, + offset, + useDismiss, + useFloating, + useFocus, + useInteractions, +} from "@floating-ui/react"; export default function Header() { const { openNode } = useNodeControls(); @@ -17,6 +28,7 @@ export default function Header() { queryFn: getVersion, }); const { data: version } = versionData; + console.log("version", version); const createNewJournalPage = useCallback(() => { (async () => { @@ -50,12 +62,72 @@ export default function Header() { Panorama v{version} - +
+ + +
); } + +function NewNodeButton() { + const [showMenu, setShowMenu] = useState(false); + const { refs, context, floatingStyles } = useFloating({ + placement: "bottom-start", + open: showMenu, + onOpenChange: setShowMenu, + whileElementsMounted: autoUpdate, + // middleware: [offset(10)], + }); + const focus = useFocus(context); + const { getReferenceProps, getFloatingProps } = useInteractions([ + focus, + useDismiss(context), + ]); + + return ( + <> + + + {showMenu && ( + + +
+ +
+
+
+ )} + + ); +} + +function NewNodeMenu() { + return ( +
+
    +
  • + +
  • +
  • + +
  • +
+
+ ); +} diff --git a/app/src/components/SearchBar.tsx b/app/src/components/SearchBar.tsx index 843fa48..2f5f22a 100644 --- a/app/src/components/SearchBar.tsx +++ b/app/src/components/SearchBar.tsx @@ -105,8 +105,8 @@ function SearchMenu({ results }) { openNode(result.node_id); }} > -
{result.title}
-
{result.content}
+ {/*
{result.title}
*/} +
{JSON.stringify(result)}
); })} diff --git a/app/src/components/nodes/JournalPage.tsx b/app/src/components/nodes/JournalPage.tsx index a81e3a6..8e2ace0 100644 --- a/app/src/components/nodes/JournalPage.tsx +++ b/app/src/components/nodes/JournalPage.tsx @@ -8,19 +8,24 @@ import rehypeKatex from "rehype-katex"; import { parse as parseDate, format as formatDate } from "date-fns"; import { useDebounce } from "use-debounce"; +const JOURNAL_PAGE_CONTENT_FIELD_NAME = "panorama/journal/page/content"; + export interface JournalPageProps { id: string; data: { day?: string; title?: string; content: string; + fields: object; }; } export default function JournalPage({ id, data }: JournalPageProps) { const { day } = data; const queryClient = useQueryClient(); - const [value, setValue] = useState(() => data.content); + const [value, setValue] = useState( + () => data?.fields?.[JOURNAL_PAGE_CONTENT_FIELD_NAME], + ); const [valueToSave] = useDebounce(value, 1000, { leading: true, trailing: true, diff --git a/crates/panorama-core/Cargo.toml b/crates/panorama-core/Cargo.toml index f345704..44ca1eb 100644 --- a/crates/panorama-core/Cargo.toml +++ b/crates/panorama-core/Cargo.toml @@ -4,6 +4,8 @@ version = "0.1.0" edition = "2021" [dependencies] +backoff = { version = "0.4.0", features = ["tokio"] } +bimap = "0.6.3" chrono = { version = "0.4.38", features = ["serde"] } cozo = { version = "0.7.6", features = ["storage-rocksdb"] } futures = "0.3.30" diff --git a/crates/panorama-core/src/lib.rs b/crates/panorama-core/src/lib.rs index d5322b6..810f458 100644 --- a/crates/panorama-core/src/lib.rs +++ b/crates/panorama-core/src/lib.rs @@ -8,6 +8,7 @@ extern crate sugars; pub mod migrations; pub mod state; +pub mod mail; #[cfg(test)] mod tests; @@ -20,7 +21,7 @@ use serde_json::Value; use uuid::Uuid; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] -pub struct NodeId(Uuid); +pub struct NodeId(pub Uuid); impl fmt::Display for NodeId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/crates/panorama-core/src/mail.rs b/crates/panorama-core/src/mail.rs new file mode 100644 index 0000000..8af9702 --- /dev/null +++ b/crates/panorama-core/src/mail.rs @@ -0,0 +1,160 @@ +use std::{collections::HashMap, time::Duration}; + +use backoff::{exponential::ExponentialBackoff, SystemClock}; +use cozo::{DataValue, JsonData, ScriptMutability}; +use futures::TryStreamExt; +use miette::{IntoDiagnostic, Result}; +use tokio::{net::TcpStream, time::sleep}; +use uuid::Uuid; + +use crate::AppState; + +pub async fn mail_loop(state: AppState) -> Result<()> { + backoff::future::retry( + ExponentialBackoff::::default(), + || async { + mail_loop_inner(&state).await?; + // For now, just sleep 30 seconds and then fetch again + // TODO: Run a bunch of connections at once and do IDLE over them (if possible) + sleep(Duration::from_secs(30)).await; + + Ok(()) + }, + ) + .await?; + + Ok(()) +} + +async fn mail_loop_inner(state: &AppState) -> Result<()> { + // Fetch the mail configs + let configs = state.fetch_mail_configs()?; + if configs.len() == 0 { + return Ok(()); + } + + // TODO: Do all configs instead of just the first + let config = &configs[0]; + + let stream = + TcpStream::connect((config.imap_hostname.as_str(), config.imap_port)) + .await + .into_diagnostic()?; + + let client = async_imap::Client::new(stream); + let mut session = client + .login(&config.imap_username, &config.imap_password) + .await + .map_err(|(err, _)| err) + .into_diagnostic()?; + + // println!("Session: {:?}", session); + let mailboxes = session + .list(None, Some("*")) + .await + .into_diagnostic()? + .try_collect::>() + .await + .into_diagnostic()?; + let mailbox_names = + mailboxes.iter().map(|name| name.name()).collect::>(); + println!("mailboxes: {mailbox_names:?}"); + + // Get the mailbox with INBOX + let inbox_node_id = { + let result = state.db.run_script(" + ?[node_id] := + *mailbox{node_id, account_node_id, mailbox_name}, + account_node_id = $account_node_id, + mailbox_name = 'INBOX' + ", btmap! {"account_node_id".to_owned()=>DataValue::from(config.node_id.to_string())}, ScriptMutability::Immutable)?; + + if result.rows.len() == 0 { + let new_node_id = Uuid::now_v7(); + let new_node_id = new_node_id.to_string(); + state.db.run_script(" + ?[node_id, account_node_id, mailbox_name] <- + [[$new_node_id, $account_node_id, 'INBOX']] + :put mailbox { node_id, account_node_id, mailbox_name } + ", + btmap! { + "new_node_id".to_owned() => DataValue::from(new_node_id.clone()), + "account_node_id".to_owned() => DataValue::from(config.node_id.to_string()), + }, + ScriptMutability::Mutable)?; + new_node_id + } else { + result.rows[0][0].get_str().unwrap().to_owned() + } + }; + println!("INBOX: {:?}", inbox_node_id); + + let inbox = session.select("INBOX").await.into_diagnostic()?; + println!("last unseen: {:?}", inbox.unseen); + + let messages = session + .fetch( + "1:4", + "(FLAGS ENVELOPE BODY[HEADER] BODY[TEXT] INTERNALDATE)", + ) + .await + .into_diagnostic()? + .try_collect::>() + .await + .into_diagnostic()?; + println!( + "messages {:?}", + messages.iter().map(|f| f.body()).collect::>() + ); + + let input_data = DataValue::List( + messages + .iter() + .map(|msg| { + let message_id = Uuid::now_v7(); + let headers = + String::from_utf8(msg.header().unwrap().to_vec()).unwrap(); + let headers = headers + .split("\r\n") + .filter_map(|s| { + let p = s.split(": ").collect::>(); + if p.len() < 2 { + None + } else { + Some((p[0], p[1])) + } + }) + .collect::>(); + DataValue::List(vec![ + DataValue::from(message_id.to_string()), + DataValue::from(config.node_id.to_string()), + DataValue::from(inbox_node_id.clone()), + DataValue::from( + headers + .get("Subject") + .map(|s| (*s).to_owned()) + .unwrap_or("Subject".to_owned()), + ), + DataValue::Json(JsonData(serde_json::to_value(headers).unwrap())), + DataValue::Bytes(msg.text().unwrap().to_vec()), + DataValue::from(msg.internal_date().unwrap().to_rfc3339()), + ]) + }) + .collect(), + ); + + state.db.run_script( + " + ?[node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date] <- $input_data + :put message { node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date } + ", + btmap! { + "input_data".to_owned() => input_data, + }, + ScriptMutability::Mutable, + )?; + + session.logout().await.into_diagnostic()?; + + Ok(()) +} diff --git a/crates/panorama-core/src/migrations.rs b/crates/panorama-core/src/migrations.rs index 33761a6..1566769 100644 --- a/crates/panorama-core/src/migrations.rs +++ b/crates/panorama-core/src/migrations.rs @@ -136,6 +136,9 @@ fn migration_01(db: &DbInstance) -> Result<()> { ['panorama/mail/config/imap_port', 'mail_config', 'imap_port', 'int', false], ['panorama/mail/config/imap_username', 'mail_config', 'imap_username', 'string', false], ['panorama/mail/config/imap_password', 'mail_config', 'imap_password', 'string', false], + ['panorama/mail/message/body', 'message', 'body', 'string', true], + ['panorama/mail/message/subject', 'message', 'subject', 'string', true], + ['panorama/mail/message/message_id', 'message', 'message_id', 'string', true], ] :put fqkey_to_dbkey { key, relation, field_name, type, is_fts_enabled } } @@ -143,14 +146,6 @@ fn migration_01(db: &DbInstance) -> Result<()> { # Create journal type { :create journal { node_id: String => content: String } } { :create journal_day { day: String => node_id: String } } - { - ::fts create journal:text_index { - extractor: content, - extract_filter: !is_null(content), - tokenizer: Simple, - filters: [Lowercase, Stemmer('english'), Stopwords('en')], - } - } # Mail { @@ -176,6 +171,7 @@ fn migration_01(db: &DbInstance) -> Result<()> { :create message { node_id: String => + message_id: String, account_node_id: String, mailbox_node_id: String, subject: String, @@ -184,6 +180,9 @@ fn migration_01(db: &DbInstance) -> Result<()> { internal_date: String, } } + { ::index create message:message_id { message_id } } + { ::index create message:date { internal_date } } + { ::index create message:by_mailbox_id { mailbox_node_id } } # Calendar ", diff --git a/crates/panorama-core/src/state/export.rs b/crates/panorama-core/src/state/export.rs index 5307692..fdd5607 100644 --- a/crates/panorama-core/src/state/export.rs +++ b/crates/panorama-core/src/state/export.rs @@ -6,6 +6,8 @@ use serde_json::Value; use crate::AppState; +use super::utils::data_value_to_json_value; + impl AppState { pub async fn export(&self) -> Result { let result = self.db.run_script( @@ -57,7 +59,8 @@ impl AppState { for row in result.rows.into_iter() { let mut object = hmap! {}; row.into_iter().enumerate().for_each(|(idx, col)| { - object.insert(columns[idx].to_owned(), col); + object + .insert(columns[idx].to_owned(), data_value_to_json_value(&col)); }); relation_info.push(object); } diff --git a/crates/panorama-core/src/state/mail.rs b/crates/panorama-core/src/state/mail.rs index 6a27ab8..ed4cf97 100644 --- a/crates/panorama-core/src/state/mail.rs +++ b/crates/panorama-core/src/state/mail.rs @@ -10,11 +10,11 @@ use crate::{AppState, NodeId}; #[derive(Debug, Serialize)] pub struct MailConfig { - node_id: NodeId, - imap_hostname: String, - imap_port: u16, - imap_username: String, - imap_password: String, + pub node_id: NodeId, + pub imap_hostname: String, + pub imap_port: u16, + pub imap_username: String, + pub imap_password: String, } impl AppState { @@ -44,156 +44,4 @@ impl AppState { Ok(result) } - - pub async fn mail_loop(&self) { - loop { - match self.mail_loop_inner().await { - Ok(_) => { - // For now, just sleep 30 seconds and then fetch again - // TODO: Run a bunch of connections at once and do IDLE over them (if possible) - sleep(Duration::from_secs(30)).await; - } - Err(err) => { - eprintln!("Fetch config error: {err:?}"); - // Back off, retry - // TODO: Exponential backoff - sleep(Duration::from_secs(5)).await; - continue; - } - } - } - } - - async fn mail_loop_inner(&self) -> Result<()> { - // Fetch the mail configs - let configs = self.fetch_mail_configs()?; - if configs.len() == 0 { - return Ok(()); - } - - // TODO: Do all configs instead of just the first - let config = &configs[0]; - - let stream = - TcpStream::connect((config.imap_hostname.as_str(), config.imap_port)) - .await - .into_diagnostic()?; - - let client = async_imap::Client::new(stream); - let mut session = client - .login(&config.imap_username, &config.imap_password) - .await - .map_err(|(err, _)| err) - .into_diagnostic()?; - - // println!("Session: {:?}", session); - let mailboxes = session - .list(None, Some("*")) - .await - .into_diagnostic()? - .try_collect::>() - .await - .into_diagnostic()?; - let mailbox_names = - mailboxes.iter().map(|name| name.name()).collect::>(); - println!("mailboxes: {mailbox_names:?}"); - - // Get the mailbox with INBOX - let inbox_node_id = { - let result = self.db.run_script(" - ?[node_id] := - *mailbox{node_id, account_node_id, mailbox_name}, - account_node_id = $account_node_id, - mailbox_name = 'INBOX' - ", btmap! {"account_node_id".to_owned()=>DataValue::from(config.node_id.to_string())}, ScriptMutability::Immutable)?; - - if result.rows.len() == 0 { - let new_node_id = Uuid::now_v7(); - let new_node_id = new_node_id.to_string(); - self.db.run_script(" - ?[node_id, account_node_id, mailbox_name] <- - [[$new_node_id, $account_node_id, 'INBOX']] - :put mailbox { node_id, account_node_id, mailbox_name } - ", - btmap! { - "new_node_id".to_owned() => DataValue::from(new_node_id.clone()), - "account_node_id".to_owned() => DataValue::from(config.node_id.to_string()), - }, - ScriptMutability::Mutable)?; - new_node_id - } else { - result.rows[0][0].get_str().unwrap().to_owned() - } - }; - println!("INBOX: {:?}", inbox_node_id); - - let inbox = session.select("INBOX").await.into_diagnostic()?; - println!("last unseen: {:?}", inbox.unseen); - - let messages = session - .fetch( - "1:4", - "(FLAGS ENVELOPE BODY[HEADER] BODY[TEXT] INTERNALDATE)", - ) - .await - .into_diagnostic()? - .try_collect::>() - .await - .into_diagnostic()?; - println!( - "messages {:?}", - messages.iter().map(|f| f.body()).collect::>() - ); - - let input_data = DataValue::List( - messages - .iter() - .map(|msg| { - let message_id = Uuid::now_v7(); - let headers = - String::from_utf8(msg.header().unwrap().to_vec()).unwrap(); - let headers = headers - .split("\r\n") - .filter_map(|s| { - let p = s.split(": ").collect::>(); - if p.len() < 2 { - None - } else { - Some((p[0], p[1])) - } - }) - .collect::>(); - DataValue::List(vec![ - DataValue::from(message_id.to_string()), - DataValue::from(config.node_id.to_string()), - DataValue::from(inbox_node_id.clone()), - DataValue::from( - headers - .get("Subject") - .map(|s| (*s).to_owned()) - .unwrap_or("Subject".to_owned()), - ), - DataValue::Json(JsonData(serde_json::to_value(headers).unwrap())), - DataValue::Bytes(msg.text().unwrap().to_vec()), - DataValue::from(msg.internal_date().unwrap().to_rfc3339()), - ]) - }) - .collect(), - ); - - self.db.run_script( - " - ?[node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date] <- $input_data - :put message { node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date } - ", - btmap! { - "input_data".to_owned() => input_data, - }, - ScriptMutability::Mutable, - )?; - - session.logout().await.into_diagnostic()?; - - Ok(()) - } } diff --git a/crates/panorama-core/src/state/mod.rs b/crates/panorama-core/src/state/mod.rs index 6fd763e..a117b4c 100644 --- a/crates/panorama-core/src/state/mod.rs +++ b/crates/panorama-core/src/state/mod.rs @@ -2,9 +2,11 @@ pub mod export; pub mod journal; pub mod mail; pub mod node; +pub mod utils; use std::{collections::HashMap, fs, path::Path}; +use bimap::BiMap; use cozo::DbInstance; use miette::{IntoDiagnostic, Result}; use tantivy::{ @@ -13,12 +15,12 @@ use tantivy::{ Index, }; -use crate::migrations::run_migrations; +use crate::{mail::mail_loop, migrations::run_migrations}; -pub fn tantivy_schema() -> (Schema, HashMap) { +pub fn tantivy_schema() -> (Schema, BiMap) { let mut schema_builder = Schema::builder(); - let mut field_map = HashMap::new(); + let mut field_map = BiMap::new(); let node_id = schema_builder.add_text_field("node_id", STRING | STORED); field_map.insert("node_id".to_owned(), node_id); @@ -33,7 +35,7 @@ pub fn tantivy_schema() -> (Schema, HashMap) { pub struct AppState { pub db: DbInstance, pub tantivy_index: Index, - pub tantivy_field_map: HashMap, + pub tantivy_field_map: BiMap, } impl AppState { @@ -75,7 +77,7 @@ impl AppState { run_migrations(&self.db).await?; let state = self.clone(); - tokio::spawn(async move { state.mail_loop().await }); + tokio::spawn(async move { mail_loop(state).await }); Ok(()) } diff --git a/crates/panorama-core/src/state/node.rs b/crates/panorama-core/src/state/node.rs index d931775..5067a75 100644 --- a/crates/panorama-core/src/state/node.rs +++ b/crates/panorama-core/src/state/node.rs @@ -18,6 +18,8 @@ use uuid::Uuid; use crate::{AppState, NodeId}; +use super::utils::{data_value_to_json_value, owned_value_to_json_value}; + pub type ExtraData = BTreeMap; #[derive(Debug)] @@ -25,7 +27,7 @@ pub struct NodeInfo { pub node_id: NodeId, pub created_at: DateTime, pub updated_at: DateTime, - pub fields: Option>, + pub fields: Option>, } pub struct FieldInfo { @@ -142,7 +144,7 @@ impl AppState { .map(|row| row.into_iter().skip(4).zip(all_fields.iter())) { for (value, (_, _, field_name)) in row { - fields.insert(field_name.to_string(), value); + fields.insert(field_name.to_string(), data_value_to_json_value(&value)); } } @@ -153,35 +155,81 @@ impl AppState { fields: Some(fields), }) } +} +pub enum CreateOrUpdate { + Create { r#type: String }, + Update { node_id: NodeId }, +} + +impl AppState { // TODO: Split this out into create and update pub async fn create_or_update_node( &self, - r#type: impl AsRef, + opts: CreateOrUpdate, extra_data: Option, ) -> Result { - let ty = r#type.as_ref(); - - let node_id = Uuid::now_v7(); + let node_id = match opts { + CreateOrUpdate::Create { .. } => NodeId(Uuid::now_v7()), + CreateOrUpdate::Update { ref node_id } => node_id.clone(), + }; let node_id = node_id.to_string(); let tx = self.db.multi_transaction(true); - let node_result = tx.run_script( - " + let (created_at, updated_at) = match opts { + CreateOrUpdate::Create { r#type } => { + let node_result = tx.run_script( + " ?[id, type] <- [[$node_id, $type]] :put node { id, type } :returning ", - btmap! { - "node_id".to_owned() => DataValue::from(node_id.clone()), - "type".to_owned() => DataValue::from(ty), - }, - )?; + btmap! { + "node_id".to_owned() => DataValue::from(node_id.clone()), + "type".to_owned() => DataValue::from(r#type), + }, + )?; + println!("ROWS(1): {:?}", node_result); + let created_at = DateTime::from_timestamp_millis( + (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64, + ) + .unwrap(); + let updated_at = DateTime::from_timestamp_millis( + (node_result.rows[0][5].get_float().unwrap() * 1000.0) as i64, + ) + .unwrap(); + (created_at, updated_at) + } + CreateOrUpdate::Update { .. } => { + let node_result = tx.run_script( + " + ?[id, type, created_at, updated_at] := *node { id, type, created_at, updated_at }, + id = $node_id + ", + btmap! { + "node_id".to_owned() => DataValue::from(node_id.clone()), + }, + )?; + println!("ROWS(2): {:?}", node_result); + let created_at = DateTime::from_timestamp_millis( + (node_result.rows[0][2].get_float().unwrap() * 1000.0) as i64, + ) + .unwrap(); + let updated_at = DateTime::from_timestamp_millis( + (node_result.rows[0][3].get_float().unwrap() * 1000.0) as i64, + ) + .unwrap(); + (created_at, updated_at) + } + }; if let Some(extra_data) = extra_data { - let node_id_field = - self.tantivy_field_map.get("node_id").unwrap().clone(); + let node_id_field = self + .tantivy_field_map + .get_by_left("node_id") + .unwrap() + .clone(); if !extra_data.is_empty() { let keys = extra_data.keys().map(|s| s.to_owned()).collect::>(); let field_mapping = @@ -215,7 +263,8 @@ impl AppState { }; if *is_fts_enabled { - if let Some(field) = self.tantivy_field_map.get(*key) { + if let Some(field) = self.tantivy_field_map.get_by_left(*key) + { doc.insert( field.clone(), OwnedValue::Str(new_value.get_str().unwrap().to_owned()), @@ -240,7 +289,7 @@ impl AppState { let query = format!( " ?[ node_id, {keys_joined} ] <- [$input_data] - :insert {relation} {{ node_id, {keys_joined} }} + :put {relation} {{ node_id, {keys_joined} }} " ); @@ -285,15 +334,6 @@ impl AppState { tx.commit()?; - let created_at = DateTime::from_timestamp_millis( - (node_result.rows[0][4].get_float().unwrap() * 1000.0) as i64, - ) - .unwrap(); - let updated_at = DateTime::from_timestamp_millis( - (node_result.rows[0][5].get_float().unwrap() * 1000.0) as i64, - ) - .unwrap(); - Ok(NodeInfo { node_id: NodeId(Uuid::from_str(&node_id).unwrap()), created_at, @@ -313,10 +353,14 @@ impl AppState { let reader = self.tantivy_index.reader().into_diagnostic()?; let searcher = reader.searcher(); - let node_id_field = self.tantivy_field_map.get("node_id").unwrap().clone(); + let node_id_field = self + .tantivy_field_map + .get_by_left("node_id") + .unwrap() + .clone(); let journal_page_field = self .tantivy_field_map - .get("panorama/journal/page/content") + .get_by_left("panorama/journal/page/content") .unwrap() .clone(); let query_parser = @@ -342,10 +386,19 @@ impl AppState { let node_id = NodeId(Uuid::from_str(node_id).unwrap()); let fields = all_fields .into_iter() - .map(|(field, value)| { + .map(|(field, values)| { ( - serde_json::to_string(&field).unwrap(), - serde_json::to_string(&value).unwrap(), + self.tantivy_field_map.get_by_right(&field).unwrap(), + if values.len() == 1 { + owned_value_to_json_value(values[0]) + } else { + Value::Array( + values + .into_iter() + .map(owned_value_to_json_value) + .collect_vec(), + ) + }, ) }) .collect::>(); diff --git a/crates/panorama-core/src/state/utils.rs b/crates/panorama-core/src/state/utils.rs new file mode 100644 index 0000000..959249a --- /dev/null +++ b/crates/panorama-core/src/state/utils.rs @@ -0,0 +1,57 @@ +use cozo::{DataValue, Num}; +use itertools::Itertools; +use serde_json::{Number, Value}; +use tantivy::schema::OwnedValue; + +pub fn owned_value_to_json_value(data_value: &OwnedValue) -> Value { + match data_value { + OwnedValue::Null => Value::Null, + OwnedValue::Str(s) => Value::String(s.to_string()), + OwnedValue::U64(u) => Value::Number(Number::from(*u)), + OwnedValue::I64(i) => Value::Number(Number::from(*i)), + OwnedValue::F64(f) => Value::Number(Number::from_f64(*f).unwrap()), + OwnedValue::Bool(b) => Value::Bool(*b), + OwnedValue::Array(a) => { + Value::Array(a.into_iter().map(owned_value_to_json_value).collect_vec()) + } + OwnedValue::Object(o) => Value::Object( + o.into_iter() + .map(|(k, v)| (k.to_owned(), owned_value_to_json_value(v))) + .collect(), + ), + _ => { + println!("Converting unknown {:?}", data_value); + serde_json::to_value(data_value).unwrap() + } // OwnedValue::Date(_) => todo!(), + // OwnedValue::Facet(_) => todo!(), + // OwnedValue::Bytes(_) => todo!(), + // OwnedValue::IpAddr(_) => todo!(), + // OwnedValue::PreTokStr(_) => todo!(), + } +} + +pub fn data_value_to_json_value(data_value: &DataValue) -> Value { + match data_value { + DataValue::Null => Value::Null, + DataValue::Bool(b) => Value::Bool(*b), + DataValue::Num(n) => Value::Number(match n { + Num::Int(i) => Number::from(*i), + Num::Float(f) => Number::from_f64(*f).unwrap(), + }), + DataValue::Str(s) => Value::String(s.to_string()), + DataValue::List(v) => { + Value::Array(v.into_iter().map(data_value_to_json_value).collect_vec()) + } + DataValue::Json(v) => v.0.clone(), + _ => { + println!("Converting unknown {:?}", data_value); + serde_json::to_value(data_value).unwrap() + } // DataValue::Bytes(s) => todo!(), + // DataValue::Uuid(_) => todo!(), + // DataValue::Regex(_) => todo!(), + // DataValue::Set(_) => todo!(), + // DataValue::Vec(_) => todo!(), + // DataValue::Validity(_) => todo!(), + // DataValue::Bot => todo!(), + } +} diff --git a/crates/panorama-core/src/tests/mod.rs b/crates/panorama-core/src/tests/mod.rs index fc02fdc..6ee166d 100644 --- a/crates/panorama-core/src/tests/mod.rs +++ b/crates/panorama-core/src/tests/mod.rs @@ -26,6 +26,7 @@ pub async fn test_create_node() -> Result<()> { let node_info = state .create_or_update_node( + None, "panorama/journal/page", Some(btmap! { "panorama/journal/page/content".to_owned() => json!("helloge"), diff --git a/crates/panorama-daemon/src/lib.rs b/crates/panorama-daemon/src/lib.rs new file mode 100644 index 0000000..0e067ba --- /dev/null +++ b/crates/panorama-daemon/src/lib.rs @@ -0,0 +1,72 @@ +#[macro_use] +extern crate anyhow; +#[macro_use] +extern crate serde; +#[macro_use] +extern crate serde_json; +#[macro_use] +extern crate sugars; + +mod error; +mod export; +mod journal; +pub mod mail; +mod node; + +use std::fs; + +use axum::{http::Method, routing::get, Router}; +use miette::{IntoDiagnostic, Result}; +use panorama_core::AppState; +use tokio::net::TcpListener; +use tower::ServiceBuilder; +use tower_http::cors::{self, CorsLayer}; +use utoipa::OpenApi; +use utoipa_scalar::{Scalar, Servable}; + +use crate::{ + export::export, + mail::{get_mail, get_mail_config}, + node::search_nodes, +}; + +pub async fn run() -> Result<()> { + #[derive(OpenApi)] + #[openapi( + modifiers(), + nest( + (path = "/journal", api = crate::journal::JournalApi), + (path = "/node", api = crate::node::NodeApi), + ), + )] + struct ApiDoc; + + let data_dir = dirs::data_dir().unwrap(); + let panorama_dir = data_dir.join("panorama"); + fs::create_dir_all(&panorama_dir).into_diagnostic()?; + + let state = AppState::new(&panorama_dir).await?; + + let cors = CorsLayer::new() + .allow_methods([Method::GET, Method::POST, Method::PUT]) + .allow_headers(cors::Any) + .allow_origin(cors::Any); + + // build our application with a single route + let app = Router::new() + .merge(Scalar::with_url("/api/docs", ApiDoc::openapi())) + .route("/", get(|| async { "Hello, World!" })) + .route("/export", get(export)) + .nest("/node", node::router().with_state(state.clone())) + .nest("/journal", journal::router().with_state(state.clone())) + .route("/mail/config", get(get_mail_config)) + .route("/mail", get(get_mail)) + .layer(ServiceBuilder::new().layer(cors)) + .with_state(state.clone()); + + let listener = TcpListener::bind("0.0.0.0:5195").await.into_diagnostic()?; + println!("Listening... {:?}", listener); + axum::serve(listener, app).await.into_diagnostic()?; + + Ok(()) +} diff --git a/crates/panorama-daemon/src/main.rs b/crates/panorama-daemon/src/main.rs index 3bbdb41..f2aadcf 100644 --- a/crates/panorama-daemon/src/main.rs +++ b/crates/panorama-daemon/src/main.rs @@ -1,73 +1,7 @@ -#[macro_use] -extern crate anyhow; -#[macro_use] -extern crate serde; -#[macro_use] -extern crate serde_json; -#[macro_use] -extern crate sugars; - -mod error; -mod export; -mod journal; -pub mod mail; -mod node; - -use std::fs; - -use axum::{http::Method, routing::get, Router}; -use miette::{IntoDiagnostic, Result}; -use panorama_core::AppState; -use tokio::net::TcpListener; -use tower::ServiceBuilder; -use tower_http::cors::{self, CorsLayer}; -use utoipa::OpenApi; -use utoipa_scalar::{Scalar, Servable}; - -use crate::{ - export::export, - mail::{get_mail, get_mail_config}, - node::search_nodes, -}; +use miette::Result; #[tokio::main] async fn main() -> Result<()> { - #[derive(OpenApi)] - #[openapi( - modifiers(), - nest( - (path = "/journal", api = crate::journal::JournalApi), - (path = "/node", api = crate::node::NodeApi), - ), - )] - struct ApiDoc; - - let data_dir = dirs::data_dir().unwrap(); - let panorama_dir = data_dir.join("panorama"); - fs::create_dir_all(&panorama_dir).into_diagnostic()?; - - let state = AppState::new(&panorama_dir).await?; - - let cors = CorsLayer::new() - .allow_methods([Method::GET, Method::POST, Method::PUT]) - .allow_headers(cors::Any) - .allow_origin(cors::Any); - - // build our application with a single route - let app = Router::new() - .merge(Scalar::with_url("/api/docs", ApiDoc::openapi())) - .route("/", get(|| async { "Hello, World!" })) - .route("/export", get(export)) - .nest("/node", node::router().with_state(state.clone())) - .nest("/journal", journal::router().with_state(state.clone())) - .route("/mail/config", get(get_mail_config)) - .route("/mail", get(get_mail)) - .layer(ServiceBuilder::new().layer(cors)) - .with_state(state.clone()); - - let listener = TcpListener::bind("0.0.0.0:5195").await.into_diagnostic()?; - println!("Listening... {:?}", listener); - axum::serve(listener, app).await.into_diagnostic()?; - + panorama_daemon::run().await?; Ok(()) } diff --git a/crates/panorama-daemon/src/node.rs b/crates/panorama-daemon/src/node.rs index d85b8ae..0347649 100644 --- a/crates/panorama-daemon/src/node.rs +++ b/crates/panorama-daemon/src/node.rs @@ -1,4 +1,7 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + str::FromStr, +}; use axum::{ extract::{Path, Query, State}, @@ -8,9 +11,15 @@ use axum::{ }; use chrono::{DateTime, Utc}; use cozo::{DataValue, MultiTransaction}; -use panorama_core::state::node::ExtraData; +use itertools::Itertools; +use miette::IntoDiagnostic; +use panorama_core::{ + state::node::{CreateOrUpdate, ExtraData}, + NodeId, +}; use serde_json::Value; use utoipa::{OpenApi, ToSchema}; +use uuid::Uuid; use crate::{error::AppResult, AppState}; @@ -89,70 +98,16 @@ pub struct UpdateData { pub async fn update_node( State(state): State, Path(node_id): Path, - Json(update_data): Json, + Json(opts): Json, ) -> AppResult> { - let node_id_data = DataValue::from(node_id.clone()); + let node_id = NodeId(Uuid::from_str(&node_id).into_diagnostic()?); + let node_info = state + .create_or_update_node(CreateOrUpdate::Update { node_id }, opts.extra_data) + .await?; - // TODO: Combine these into the same script - - let tx = state.db.multi_transaction(true); - - if let Some(title) = update_data.title { - let title = DataValue::from(title); - - tx.run_script( - " - # Always update the time - ?[ id, title ] <- [[ $node_id, $title ]] - :update node { id, title } - ", - btmap! { - "node_id".to_owned() => node_id_data.clone(), - "title".to_owned() => title, - }, - )?; - } - - if let Some(extra_data) = update_data.extra_data { - let result = get_rows_for_extra_keys(&tx, &extra_data)?; - - for (key, (relation, field_name, ty)) in result.iter() { - let new_value = extra_data.get(key).unwrap(); - - // TODO: Make this more generic - let new_value = DataValue::from(new_value.as_str().unwrap()); - - let query = format!( - " - ?[ node_id, {field_name} ] <- [[$node_id, $input_data]] - :update {relation} {{ node_id, {field_name} }} - " - ); - - let result = tx.run_script( - &query, - btmap! { - "node_id".to_owned() => node_id_data.clone(), - "input_data".to_owned() => new_value, - }, - )?; - } - } - - tx.run_script( - " - # Always update the time - ?[ id, updated_at ] <- [[ $node_id, now() ]] - :update node { id, updated_at } - ", - btmap! { - "node_id".to_owned() => node_id_data, - }, - )?; - - tx.commit()?; - - Ok(Json(json!({}))) + Ok(Json(json!({ + "node_id": node_info.node_id.to_string(), + }))) } #[derive(Debug, Deserialize)] @@ -174,7 +129,10 @@ pub async fn create_node( Json(opts): Json, ) -> AppResult> { let node_info = state - .create_or_update_node(opts.ty, opts.extra_data) + .create_or_update_node( + CreateOrUpdate::Create { r#type: opts.ty }, + opts.extra_data, + ) .await?; Ok(Json(json!({ @@ -197,9 +155,13 @@ pub async fn search_nodes( Query(query): Query, ) -> AppResult> { let search_result = state.search_nodes(query.query).await?; + let search_result = search_result + .into_iter() + .map(|(id, value)| value["fields"].clone()) + .collect_vec(); Ok(Json(json!({ - "results": search_result + "results": search_result, }))) }