Compare commits

..

No commits in common. "2d424d763fdab99f8bd56af6e08f8ac5425d6e5e" and "e0b7ebedacb5874e93c467593414c71b1321d36d" have entirely different histories.

74 changed files with 1300 additions and 1244 deletions

1216
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

View file

@ -1,5 +1,5 @@
{
"name": "panorama",
"name": "panorama-app",
"version": "0.1.0",
"type": "module",
"scripts": {

View file

Before

Width:  |  Height:  |  Size: 2.5 KiB

After

Width:  |  Height:  |  Size: 2.5 KiB

View file

Before

Width:  |  Height:  |  Size: 1.5 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

View file

Before

Width:  |  Height:  |  Size: 5.9 KiB

After

Width:  |  Height:  |  Size: 5.9 KiB

View file

Before

Width:  |  Height:  |  Size: 17 KiB

After

Width:  |  Height:  |  Size: 17 KiB

View file

Before

Width:  |  Height:  |  Size: 1 KiB

After

Width:  |  Height:  |  Size: 1 KiB

View file

Before

Width:  |  Height:  |  Size: 4.5 KiB

After

Width:  |  Height:  |  Size: 4.5 KiB

View file

Before

Width:  |  Height:  |  Size: 6.8 KiB

After

Width:  |  Height:  |  Size: 6.8 KiB

View file

Before

Width:  |  Height:  |  Size: 6.9 KiB

After

Width:  |  Height:  |  Size: 6.9 KiB

View file

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View file

Before

Width:  |  Height:  |  Size: 965 B

After

Width:  |  Height:  |  Size: 965 B

View file

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 22 KiB

View file

Before

Width:  |  Height:  |  Size: 1.4 KiB

After

Width:  |  Height:  |  Size: 1.4 KiB

View file

Before

Width:  |  Height:  |  Size: 2.6 KiB

After

Width:  |  Height:  |  Size: 2.6 KiB

View file

Before

Width:  |  Height:  |  Size: 3.4 KiB

After

Width:  |  Height:  |  Size: 3.4 KiB

View file

Before

Width:  |  Height:  |  Size: 1.6 KiB

After

Width:  |  Height:  |  Size: 1.6 KiB

View file

Before

Width:  |  Height:  |  Size: 24 KiB

After

Width:  |  Height:  |  Size: 24 KiB

View file

Before

Width:  |  Height:  |  Size: 50 KiB

After

Width:  |  Height:  |  Size: 50 KiB

View file

Before

Width:  |  Height:  |  Size: 4 KiB

After

Width:  |  Height:  |  Size: 4 KiB

View file

@ -1,18 +1,20 @@
import styles from "./SearchBar.module.scss";
import {
FloatingFocusManager,
FloatingOverlay,
FloatingPortal,
autoUpdate,
offset,
useClick,
useDismiss,
useFloating,
useFocus,
useInteractions,
} from "@floating-ui/react";
import { useCallback, useEffect, useState } from "react";
import { useDebounce } from "use-debounce";
import { useEffect, useState } from "react";
import { atom, useAtom, useSetAtom } from "jotai";
import { useNodeControls } from "../App";
import { useDebounce, useDebouncedCallback } from "use-debounce";
const searchQueryAtom = atom("");
const showMenuAtom = atom(false);
@ -35,7 +37,8 @@ export default function SearchBar() {
useDismiss(context),
]);
const performSearch = useCallback(() => {
useEffect(() => {
setSearchResults([]);
const trimmed = searchQuery.trim();
if (trimmed === "") return;
@ -60,11 +63,7 @@ export default function SearchBar() {
onFocus={() => setShowMenu(true)}
ref={refs.setReference}
value={searchQuery}
onChange={(evt) => {
setSearchQuery(evt.target.value);
if (evt.target.value) performSearch();
else setSearchResults([]);
}}
onChange={(evt) => setSearchQuery(evt.target.value)}
{...getReferenceProps()}
/>
</div>

View file

@ -7,15 +7,18 @@ import remarkMath from "remark-math";
import rehypeKatex from "rehype-katex";
import { parse as parseDate, format as formatDate } from "date-fns";
import { useDebounce } from "use-debounce";
import {
JOURNAL_PAGE_CONTENT_FIELD_NAME,
JOURNAL_PAGE_TITLE_FIELD_NAME,
NodeInfo,
} from "../../lib/data";
const JOURNAL_PAGE_CONTENT_FIELD_NAME = "panorama/journal/page/content";
const JOURNAL_PAGE_TITLE_FIELD_NAME = "panorama/journal/page/title";
export interface JournalPageProps {
id: string;
data: NodeInfo;
data: {
day?: string;
title?: string;
content: string;
fields: object;
};
}
export default function JournalPage({ id, data }: JournalPageProps) {

View file

@ -7,12 +7,12 @@ edition = "2021"
backoff = { version = "0.4.0", features = ["tokio"] }
bimap = "0.6.3"
chrono = { version = "0.4.38", features = ["serde"] }
cozo = { version = "0.7.6", features = ["storage-rocksdb"] }
futures = "0.3.30"
itertools = "0.13.0"
miette = { version = "5.5.0", features = ["fancy", "backtrace"] }
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "tls-rustls", "macros", "sqlite", "uuid", "chrono", "regexp"] }
sugars = "3.0.1"
tantivy = { version = "0.22.0", features = ["zstd"] }
tokio = { version = "1.38.0", features = ["full"] }

View file

@ -1,3 +0,0 @@
fn main() {
println!("cargo:rerun-if-changed=migrations");
}

View file

@ -1,6 +0,0 @@
CREATE TABLE "node" (
id TEXT PRIMARY KEY,
type TEXT,
updated_at DATETIME DEFAULT NOW(),
extra_data JSON
);

View file

@ -8,8 +8,7 @@ extern crate sugars;
pub mod migrations;
pub mod state;
// pub mod mail;
pub mod messaging;
pub mod mail;
#[cfg(test)]
mod tests;

View file

@ -1,57 +1,35 @@
use std::{
collections::{HashMap, HashSet},
time::Duration,
};
use std::{collections::HashMap, time::Duration};
use async_imap::Session;
use backoff::{exponential::ExponentialBackoff, SystemClock};
use cozo::{DataValue, JsonData, ScriptMutability};
use futures::TryStreamExt;
use itertools::Itertools;
use miette::{Context, IntoDiagnostic, Result};
use miette::{IntoDiagnostic, Result};
use tokio::{net::TcpStream, time::sleep};
use uuid::Uuid;
use crate::{mail, AppState};
use crate::AppState;
pub struct MailWorker {
state: AppState,
}
impl MailWorker {
pub fn new(state: AppState) -> MailWorker {
MailWorker { state }
}
pub async fn mail_loop(self) -> Result<()> {
loop {
let mut policy = ExponentialBackoff::<SystemClock>::default();
policy.current_interval = Duration::from_secs(5);
policy.initial_interval = Duration::from_secs(5);
backoff::future::retry(policy, || async {
match self.mail_loop_inner().await {
Ok(_) => {}
Err(err) => {
eprintln!("Mail error: {:?}", err);
Err(err)?;
}
}
pub async fn mail_loop(state: AppState) -> Result<()> {
backoff::future::retry(
ExponentialBackoff::<SystemClock>::default(),
|| async {
mail_loop_inner(&state).await?;
// For now, just sleep 30 seconds and then fetch again
// TODO: Run a bunch of connections at once and do IDLE over them (if possible)
sleep(Duration::from_secs(30)).await;
Ok(())
})
},
)
.await?;
}
Ok(())
}
}
async fn mail_loop_inner(&self) -> Result<()> {
async fn mail_loop_inner(state: &AppState) -> Result<()> {
// Fetch the mail configs
let configs = self.state.fetch_mail_configs()?;
if configs.is_empty() {
let configs = state.fetch_mail_configs()?;
if configs.len() == 0 {
return Ok(());
}
@ -70,31 +48,6 @@ impl MailWorker {
.map_err(|(err, _)| err)
.into_diagnostic()?;
let all_mailbox_ids = self
.fetch_and_store_all_mailboxes(config.node_id.to_string(), &mut session)
.await
.context("Could not fetch mailboxes")?;
self
.fetch_all_mail_from_single_mailbox(
&mut session,
&all_mailbox_ids,
config.node_id.to_string(),
"INBOX",
)
.await
.context("Could not fetch mail from INBOX")?;
session.logout().await.into_diagnostic()?;
Ok(())
}
async fn fetch_and_store_all_mailboxes(
&self,
config_node_id: String,
session: &mut Session<TcpStream>,
) -> Result<HashMap<String, String>> {
// println!("Session: {:?}", session);
let mailboxes = session
.list(None, Some("*"))
@ -103,189 +56,105 @@ impl MailWorker {
.try_collect::<Vec<_>>()
.await
.into_diagnostic()?;
let mailbox_names =
mailboxes.iter().map(|name| name.name()).collect::<Vec<_>>();
println!("mailboxes: {mailbox_names:?}");
let mut all_mailboxes = HashMap::new();
// TODO: Make this more efficient by using bulk in query
for mailbox in mailboxes {
let tx = self.state.db.multi_transaction(true);
let result = tx.run_script(
"
// Get the mailbox with INBOX
let inbox_node_id = {
let result = state.db.run_script("
?[node_id] :=
*mailbox{node_id, account_node_id, mailbox_name},
account_node_id = $account_node_id,
mailbox_name = $mailbox_name,
",
btmap! {
"account_node_id".to_owned()=>DataValue::from(config_node_id.clone()),
"mailbox_name".to_owned()=>DataValue::from(mailbox.name().to_string()),
},
)?;
mailbox_name = 'INBOX'
", btmap! {"account_node_id".to_owned()=>DataValue::from(config.node_id.to_string())}, ScriptMutability::Immutable)?;
let node_id = if result.rows.len() == 0 {
if result.rows.len() == 0 {
let new_node_id = Uuid::now_v7();
let new_node_id = new_node_id.to_string();
let extra_data = json!({
"name": mailbox.name(),
});
tx.run_script("
?[node_id, account_node_id, mailbox_name, extra_data] <-
[[$new_node_id, $account_node_id, $mailbox_name, $extra_data]]
:put mailbox { node_id, account_node_id, mailbox_name, extra_data }
state.db.run_script("
?[node_id, account_node_id, mailbox_name] <-
[[$new_node_id, $account_node_id, 'INBOX']]
:put mailbox { node_id, account_node_id, mailbox_name }
",
btmap! {
"new_node_id".to_owned() => DataValue::from(new_node_id.clone()),
"account_node_id".to_owned() => DataValue::from(config_node_id.clone()),
"mailbox_name".to_owned()=>DataValue::from(mailbox.name().to_string()),
"extra_data".to_owned()=>DataValue::Json(JsonData(extra_data)),
"account_node_id".to_owned() => DataValue::from(config.node_id.to_string()),
},
)?;
ScriptMutability::Mutable)?;
new_node_id
} else {
result.rows[0][0].get_str().unwrap().to_owned()
}
};
println!("INBOX: {:?}", inbox_node_id);
tx.commit()?;
all_mailboxes.insert(mailbox.name().to_owned(), node_id);
}
// println!("All mailboxes: {:?}", all_mailboxes);
Ok(all_mailboxes)
}
async fn fetch_all_mail_from_single_mailbox(
&self,
session: &mut Session<TcpStream>,
all_mailbox_ids: &HashMap<String, String>,
config_node_id: String,
mailbox_name: impl AsRef<str>,
) -> Result<()> {
let mailbox_name = mailbox_name.as_ref();
let mailbox = session.select(mailbox_name).await.into_diagnostic()?;
let mailbox_node_id = all_mailbox_ids.get(mailbox_name).unwrap();
let extra_data = json!({
"uid_validity": mailbox.uid_validity,
"last_seen": mailbox.unseen,
});
// TODO: Validate uid validity here
let all_uids = session
.uid_search("ALL")
.await
.into_diagnostic()
.context("Could not fetch all UIDs")?;
println!("All UIDs ({}): {:?}", all_uids.len(), all_uids);
let inbox = session.select("INBOX").await.into_diagnostic()?;
println!("last unseen: {:?}", inbox.unseen);
let messages = session
.uid_fetch(
all_uids.iter().join(","),
.fetch(
"1:4",
"(FLAGS ENVELOPE BODY[HEADER] BODY[TEXT] INTERNALDATE)",
)
.await
.into_diagnostic()?
.try_collect::<Vec<_>>()
.await
.into_diagnostic()
.context("Could not fetch messages")?;
.into_diagnostic()?;
println!(
"messages {:?}",
messages.iter().map(|f| f.body()).collect::<Vec<_>>()
);
let mut unique_message_ids = HashSet::new();
let data: Vec<_> = messages
let input_data = DataValue::List(
messages
.iter()
.map(|msg| {
let message_node_id = Uuid::now_v7();
let message_id = Uuid::now_v7();
let headers =
String::from_utf8(msg.header().unwrap().to_vec()).unwrap();
let headers = headers
.split("\r\n")
.filter_map(|s| {
// This is really bad lmao
let p = s.split(": ").collect::<Vec<_>>();
if p.len() < 2 {
None
} else {
Some((p[0], p[1..].join(": ")))
Some((p[0], p[1]))
}
})
.collect::<HashMap<_, _>>();
let message_id = headers
.get("Message-ID")
.map(|s| (*s).to_owned())
.unwrap_or(message_node_id.to_string());
unique_message_ids.insert(message_id.clone());
DataValue::List(vec![
DataValue::from(message_node_id.to_string()),
DataValue::from(config_node_id.to_string()),
DataValue::from(mailbox_node_id.clone()),
DataValue::from(message_id.to_string()),
DataValue::from(config.node_id.to_string()),
DataValue::from(inbox_node_id.clone()),
DataValue::from(
headers
.get("Subject")
.map(|s| (*s).to_owned())
.unwrap_or("Subject".to_owned()),
),
DataValue::Json(JsonData(serde_json::to_value(&headers).unwrap())),
DataValue::Json(JsonData(serde_json::to_value(headers).unwrap())),
DataValue::Bytes(msg.text().unwrap().to_vec()),
DataValue::from(msg.internal_date().unwrap().to_rfc3339()),
DataValue::from(message_id),
])
})
.collect();
println!("Adding {} messages to database...", data.len());
let input_data = DataValue::List(data);
// TODO: Can this be one query?
let tx = self.state.db.multi_transaction(true);
let unique_message_ids_data_value = DataValue::List(
unique_message_ids
.into_iter()
.map(|s| DataValue::from(s))
.collect_vec(),
.collect(),
);
let existing_ids = tx.run_script(
state.db.run_script(
"
?[node_id] := *message { node_id, message_id },
is_in(message_id, $message_ids)
",
btmap! { "message_ids".to_owned() => unique_message_ids_data_value },
)?;
println!("Existing ids: {:?}", existing_ids);
self
.state
.db
.run_script(
"
?[
node_id, account_node_id, mailbox_node_id, subject, headers, body,
internal_date, message_id
] <- $input_data
:put message {
node_id, account_node_id, mailbox_node_id, subject, headers, body,
internal_date, message_id
}
?[node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date] <- $input_data
:put message { node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date }
",
btmap! {
"input_data".to_owned() => input_data,
},
ScriptMutability::Mutable,
)
.context("Could not add message to database")?;
)?;
session.logout().await.into_diagnostic()?;
Ok(())
}
}

View file

@ -1,4 +0,0 @@
//! Panorama uses an internal messaging system to pass content around
//!
//! This implementation is dead simple, just passes all messages and filters on the other end
pub struct Messaging {}

View file

@ -1,200 +1,196 @@
use cozo::DbInstance;
use miette::{IntoDiagnostic, Result};
use sqlx::migrate::Migrator;
use serde_json::Value;
use crate::ensure_ok;
pub static MIGRATOR: Migrator = sqlx::migrate!();
pub async fn run_migrations(db: &DbInstance) -> Result<()> {
let migration_status = check_migration_status(db).await?;
println!("migration status: {:?}", migration_status);
// pub async fn run_migrations(db: &DbInstance) -> Result<()> {
// let migration_status = check_migration_status(db).await?;
// println!("migration status: {:?}", migration_status);
let migrations: Vec<Box<dyn for<'a> Fn(&'a DbInstance) -> Result<()>>> =
vec![Box::new(no_op), Box::new(migration_01)];
// let migrations: Vec<Box<dyn for<'a> Fn(&'a DbInstance) -> Result<()>>> =
// vec![Box::new(no_op), Box::new(migration_01)];
if let MigrationStatus::NoMigrations = migration_status {
let result = db.run_script_str(
"
{ :create migrations { yeah: Int default 0 => version: Int default 0 } }
{
?[yeah, version] <- [[0, 0]]
:put migrations { yeah, version }
}
",
"",
false,
);
ensure_ok(&result)?;
}
// if let MigrationStatus::NoMigrations = migration_status {
// let result = db.run_script_str(
// "
// { :create migrations { yeah: Int default 0 => version: Int default 0 } }
// {
// ?[yeah, version] <- [[0, 0]]
// :put migrations { yeah, version }
// }
// ",
// "",
// false,
// );
// ensure_ok(&result)?;
// }
let start_at_migration = match migration_status {
MigrationStatus::NoMigrations => 0,
MigrationStatus::HasVersion(n) => n,
};
let migrations_to_run = migrations
.iter()
.enumerate()
.skip(start_at_migration as usize + 1);
// println!("running {} migrations...", migrations_to_run.len());
// let start_at_migration = match migration_status {
// MigrationStatus::NoMigrations => 0,
// MigrationStatus::HasVersion(n) => n,
// };
// let migrations_to_run = migrations
// .iter()
// .enumerate()
// .skip(start_at_migration as usize + 1);
// // println!("running {} migrations...", migrations_to_run.len());
//TODO: This should all be done in a transaction
for (idx, migration) in migrations_to_run {
println!("running migration {idx}...");
// //TODO: This should all be done in a transaction
// for (idx, migration) in migrations_to_run {
// println!("running migration {idx}...");
migration(db)?;
// migration(db)?;
let result = db.run_script_str(
"
?[yeah, version] <- [[0, $version]]
:put migrations { yeah => version }
",
&format!("{{\"version\":{}}}", idx),
false,
);
// let result = db.run_script_str(
// "
// ?[yeah, version] <- [[0, $version]]
// :put migrations { yeah => version }
// ",
// &format!("{{\"version\":{}}}", idx),
// false,
// );
ensure_ok(&result)?;
// ensure_ok(&result)?;
println!("succeeded migration {idx}!");
}
// println!("succeeded migration {idx}!");
// }
Ok(())
}
// Ok(())
// }
#[derive(Debug)]
enum MigrationStatus {
NoMigrations,
HasVersion(u64),
}
// #[derive(Debug)]
// enum MigrationStatus {
// NoMigrations,
// HasVersion(u64),
// }
async fn check_migration_status(db: &DbInstance) -> Result<MigrationStatus> {
let status = db.run_script_str(
"
?[yeah, version] := *migrations[yeah, version]
",
"",
true,
);
println!("Status: {}", status);
// async fn check_migration_status(db: &DbInstance) -> Result<MigrationStatus> {
// let status = db.run_script_str(
// "
// ?[yeah, version] := *migrations[yeah, version]
// ",
// "",
// true,
// );
// println!("Status: {}", status);
let status: Value = serde_json::from_str(&status).into_diagnostic()?;
let status = status.as_object().unwrap();
let ok = status.get("ok").unwrap().as_bool().unwrap_or(false);
if !ok {
let status_code = status.get("code").unwrap().as_str().unwrap();
if status_code == "query::relation_not_found" {
return Ok(MigrationStatus::NoMigrations);
}
}
// let status: Value = serde_json::from_str(&status).into_diagnostic()?;
// let status = status.as_object().unwrap();
// let ok = status.get("ok").unwrap().as_bool().unwrap_or(false);
// if !ok {
// let status_code = status.get("code").unwrap().as_str().unwrap();
// if status_code == "query::relation_not_found" {
// return Ok(MigrationStatus::NoMigrations);
// }
// }
let rows = status.get("rows").unwrap().as_array().unwrap();
let row = rows[0].as_array().unwrap();
let version = row[1].as_number().unwrap().as_u64().unwrap();
println!("row: {row:?}");
// let rows = status.get("rows").unwrap().as_array().unwrap();
// let row = rows[0].as_array().unwrap();
// let version = row[1].as_number().unwrap().as_u64().unwrap();
// println!("row: {row:?}");
Ok(MigrationStatus::HasVersion(version))
}
// Ok(MigrationStatus::HasVersion(version))
// }
fn no_op(_: &DbInstance) -> Result<()> {
Ok(())
}
// fn no_op(_: &DbInstance) -> Result<()> {
// Ok(())
// }
fn migration_01(db: &DbInstance) -> Result<()> {
let result = db.run_script_str(
"
# Primary node type
{
:create node {
id: String
=>
type: String,
created_at: Float default now(),
updated_at: Float default now(),
extra_data: Json default {},
}
}
// fn migration_01(db: &DbInstance) -> Result<()> {
// let result = db.run_script_str(
// "
// # Primary node type
// {
// :create node {
// id: String
// =>
// type: String,
// created_at: Float default now(),
// updated_at: Float default now(),
// extra_data: Json default {},
// }
// }
# Inverse mappings for easy querying
{ :create node_has_key { key: String => id: String } }
{ ::index create node_has_key:inverse { id } }
{ :create node_managed_by_app { node_id: String => app: String } }
{ :create node_refers_to { node_id: String => other_node_id: String } }
{
:create fqkey_to_dbkey {
key: String
=>
relation: String,
field_name: String,
type: String,
is_fts_enabled: Bool,
}
}
{
?[key, relation, field_name, type, is_fts_enabled] <- [
['panorama/journal/page/day', 'journal_day', 'day', 'string', false],
['panorama/journal/page/title', 'journal', 'title', 'string', true],
['panorama/journal/page/content', 'journal', 'content', 'string', true],
['panorama/mail/config/imap_hostname', 'mail_config', 'imap_hostname', 'string', false],
['panorama/mail/config/imap_port', 'mail_config', 'imap_port', 'int', false],
['panorama/mail/config/imap_username', 'mail_config', 'imap_username', 'string', false],
['panorama/mail/config/imap_password', 'mail_config', 'imap_password', 'string', false],
['panorama/mail/message/body', 'message', 'body', 'string', true],
['panorama/mail/message/subject', 'message', 'subject', 'string', true],
['panorama/mail/message/message_id', 'message', 'message_id', 'string', false],
]
:put fqkey_to_dbkey { key, relation, field_name, type, is_fts_enabled }
}
// # Inverse mappings for easy querying
// { :create node_has_key { key: String => id: String } }
// { ::index create node_has_key:inverse { id } }
// { :create node_managed_by_app { node_id: String => app: String } }
// { :create node_refers_to { node_id: String => other_node_id: String } }
// {
// :create fqkey_to_dbkey {
// key: String
// =>
// relation: String,
// field_name: String,
// type: String,
// is_fts_enabled: Bool,
// }
// }
// {
// ?[key, relation, field_name, type, is_fts_enabled] <- [
// ['panorama/journal/page/day', 'journal_day', 'day', 'string', false],
// ['panorama/journal/page/title', 'journal', 'title', 'string', true],
// ['panorama/journal/page/content', 'journal', 'content', 'string', true],
// ['panorama/mail/config/imap_hostname', 'mail_config', 'imap_hostname', 'string', false],
// ['panorama/mail/config/imap_port', 'mail_config', 'imap_port', 'int', false],
// ['panorama/mail/config/imap_username', 'mail_config', 'imap_username', 'string', false],
// ['panorama/mail/config/imap_password', 'mail_config', 'imap_password', 'string', false],
// ['panorama/mail/message/body', 'message', 'body', 'string', true],
// ['panorama/mail/message/subject', 'message', 'subject', 'string', true],
// ['panorama/mail/message/message_id', 'message', 'message_id', 'string', false],
// ]
// :put fqkey_to_dbkey { key, relation, field_name, type, is_fts_enabled }
// }
# Create journal type
{ :create journal { node_id: String => title: String default '', content: String } }
{ :create journal_day { day: String => node_id: String } }
// # Create journal type
// { :create journal { node_id: String => title: String default '', content: String } }
// { :create journal_day { day: String => node_id: String } }
# Mail
{
:create mail_config {
node_id: String
=>
imap_hostname: String,
imap_port: Int,
imap_username: String,
imap_password: String,
}
}
{
:create mailbox {
node_id: String
=>
account_node_id: String,
mailbox_name: String,
}
}
{ ::index create mailbox:by_account_id_and_name { account_node_id, mailbox_name } }
{
:create message {
node_id: String
=>
message_id: String,
account_node_id: String,
mailbox_node_id: String,
subject: String,
headers: Json?,
body: Bytes,
internal_date: String,
}
}
{ ::index create message:message_id { message_id } }
{ ::index create message:date { internal_date } }
{ ::index create message:by_mailbox_id { mailbox_node_id } }
// # Mail
// {
// :create mail_config {
// node_id: String
// =>
// imap_hostname: String,
// imap_port: Int,
// imap_username: String,
// imap_password: String,
// }
// }
// {
// :create mailbox {
// node_id: String
// =>
// account_node_id: String,
// mailbox_name: String,
// uid_validity: Int? default null,
// extra_data: Json default {},
// }
// }
// { ::index create mailbox:by_account_id_and_name { account_node_id, mailbox_name } }
// {
// :create message {
// node_id: String
// =>
// message_id: String,
// account_node_id: String,
// mailbox_node_id: String,
// subject: String,
// headers: Json?,
// body: Bytes,
// internal_date: String,
// }
// }
// { ::index create message:message_id { message_id } }
// { ::index create message:date { internal_date } }
// { ::index create message:by_mailbox_id { mailbox_node_id } }
# Calendar
",
"",
false,
);
ensure_ok(&result)?;
// # Calendar
// ",
// "",
// false,
// );
// ensure_ok(&result)?;
// Ok(())
// }
Ok(())
}

View file

@ -1,3 +0,0 @@
use crate::AppState;
impl AppState {}

View file

@ -47,10 +47,6 @@ impl AppState {
let mut all_relations = hmap! {};
for relation_name in relation_names.iter() {
if relation_name.contains(":") {
continue;
}
let mut relation_info = vec![];
let columns = relation_columns.get(relation_name.as_str()).unwrap();

View file

@ -1,7 +1,7 @@
use std::str::FromStr;
use chrono::Local;
// use cozo::ScriptMutability;
use cozo::ScriptMutability;
use miette::{IntoDiagnostic, Result};
use uuid::Uuid;

View file

@ -1,28 +1,21 @@
// pub mod codetrack;
// pub mod export;
// pub mod journal;
// pub mod mail;
// pub mod node;
// pub mod utils;
pub mod export;
pub mod journal;
pub mod mail;
pub mod node;
pub mod utils;
use std::{collections::HashMap, fs, path::Path};
use bimap::BiMap;
use miette::{Context, IntoDiagnostic, Result};
use sqlx::{
sqlite::{SqliteConnectOptions, SqliteJournalMode, SqlitePoolOptions},
SqlitePool,
};
use cozo::DbInstance;
use miette::{IntoDiagnostic, Result};
use tantivy::{
directory::MmapDirectory,
schema::{Field, Schema, STORED, STRING, TEXT},
Index,
};
use crate::{
// mail::MailWorker,
migrations::{self, MIGRATOR},
};
use crate::{mail::mail_loop, migrations::run_migrations};
pub fn tantivy_schema() -> (Schema, BiMap<String, Field>) {
let mut schema_builder = Schema::builder();
@ -40,7 +33,7 @@ pub fn tantivy_schema() -> (Schema, BiMap<String, Field>) {
#[derive(Clone)]
pub struct AppState {
pub db: SqlitePool,
pub db: DbInstance,
pub tantivy_index: Index,
pub tantivy_field_map: BiMap<String, Field>,
}
@ -48,10 +41,6 @@ pub struct AppState {
impl AppState {
pub async fn new(panorama_dir: impl AsRef<Path>) -> Result<Self> {
let panorama_dir = panorama_dir.as_ref().to_path_buf();
fs::create_dir_all(&panorama_dir)
.into_diagnostic()
.context("Could not create panorama directory")?;
println!("Panorama dir: {}", panorama_dir.display());
let (tantivy_index, tantivy_field_map) = {
@ -67,14 +56,12 @@ impl AppState {
};
let db_path = panorama_dir.join("db.sqlite");
let sqlite_connect_options = SqliteConnectOptions::new()
.filename(db_path)
.journal_mode(SqliteJournalMode::Wal);
let db = SqlitePoolOptions::new()
.connect_with(sqlite_connect_options)
.await
.into_diagnostic()
.context("Could not connect to SQLite database")?;
let db = DbInstance::new(
"sqlite",
db_path.display().to_string(),
Default::default(),
)
.unwrap();
let state = AppState {
db,
@ -87,16 +74,10 @@ impl AppState {
}
async fn init(&self) -> Result<()> {
// run_migrations(&self.db).await?;
MIGRATOR
.run(&self.db)
.await
.into_diagnostic()
.context("Could not migrate database")?;
run_migrations(&self.db).await?;
// let state = self.clone();
// let mail_worker = MailWorker::new(state);
// tokio::spawn(mail_worker.mail_loop());
let state = self.clone();
tokio::spawn(async move { mail_loop(state).await });
Ok(())
}

View file

@ -4,6 +4,7 @@ use std::{
};
use chrono::{DateTime, Utc};
use cozo::{DataValue, MultiTransaction, NamedRows};
use itertools::Itertools;
use miette::{bail, IntoDiagnostic, Result};
use serde_json::Value;
@ -17,7 +18,7 @@ use uuid::Uuid;
use crate::{AppState, NodeId};
use super::utils::owned_value_to_json_value;
use super::utils::{data_value_to_json_value, owned_value_to_json_value};
pub type ExtraData = BTreeMap<String, Value>;
@ -376,9 +377,8 @@ impl AppState {
.get_by_left("panorama/journal/page/content")
.unwrap()
.clone();
let mut query_parser =
let query_parser =
QueryParser::for_index(&self.tantivy_index, vec![journal_page_field]);
query_parser.set_field_fuzzy(journal_page_field, true, 2, true);
let query = query_parser.parse_query(query).into_diagnostic()?;
let top_docs = searcher

View file

@ -1,3 +1,4 @@
use cozo::{DataValue, Num};
use itertools::Itertools;
use serde_json::{Number, Value};
use tantivy::schema::OwnedValue;
@ -29,31 +30,28 @@ pub fn owned_value_to_json_value(data_value: &OwnedValue) -> Value {
}
}
// pub fn data_value_to_json_value(data_value: &DataValue) -> Value {
// match data_value {
// DataValue::Null => Value::Null,
// DataValue::Bool(b) => Value::Bool(*b),
// DataValue::Num(n) => Value::Number(match n {
// Num::Int(i) => Number::from(*i),
// Num::Float(f) => Number::from_f64(*f).unwrap(),
// }),
// DataValue::Str(s) => Value::String(s.to_string()),
// DataValue::List(v) => {
// Value::Array(v.into_iter().map(data_value_to_json_value).collect_vec())
// }
// DataValue::Json(v) => v.0.clone(),
// DataValue::Bytes(s) => {
// Value::String(String::from_utf8_lossy(s).to_string())
// }
// _ => {
// println!("Converting unknown {:?}", data_value);
// serde_json::to_value(data_value).unwrap()
// } // DataValue::Bytes(s) => todo!(),
// // DataValue::Uuid(_) => todo!(),
// // DataValue::Regex(_) => todo!(),
// // DataValue::Set(_) => todo!(),
// // DataValue::Vec(_) => todo!(),
// // DataValue::Validity(_) => todo!(),
// // DataValue::Bot => todo!(),
// }
// }
pub fn data_value_to_json_value(data_value: &DataValue) -> Value {
match data_value {
DataValue::Null => Value::Null,
DataValue::Bool(b) => Value::Bool(*b),
DataValue::Num(n) => Value::Number(match n {
Num::Int(i) => Number::from(*i),
Num::Float(f) => Number::from_f64(*f).unwrap(),
}),
DataValue::Str(s) => Value::String(s.to_string()),
DataValue::List(v) => {
Value::Array(v.into_iter().map(data_value_to_json_value).collect_vec())
}
DataValue::Json(v) => v.0.clone(),
_ => {
println!("Converting unknown {:?}", data_value);
serde_json::to_value(data_value).unwrap()
} // DataValue::Bytes(s) => todo!(),
// DataValue::Uuid(_) => todo!(),
// DataValue::Regex(_) => todo!(),
// DataValue::Set(_) => todo!(),
// DataValue::Vec(_) => todo!(),
// DataValue::Validity(_) => todo!(),
// DataValue::Bot => todo!(),
}
}

View file

@ -9,7 +9,7 @@ edition = "2021"
anyhow = "1.0.86"
axum = "0.7.5"
chrono = { version = "0.4.38", features = ["serde"] }
# cozo = { version = "0.7.6", features = ["storage-rocksdb"] }
cozo = { version = "0.7.6", features = ["storage-rocksdb"] }
csv = "1.3.0"
dirs = "5.0.1"
futures = "0.3.30"

View file

@ -0,0 +1,25 @@
use std::{
fs::{self, File},
path::PathBuf,
};
use axum::{extract::State, Json};
use miette::IntoDiagnostic;
use serde_json::Value;
use crate::{error::AppResult, AppState};
// This code is really bad but gives me a quick way to look at all of the data
// in the data at once. Rip this out once there's any Real Security Mechanism.
pub async fn export(State(state): State<AppState>) -> AppResult<Json<Value>> {
let export = state.export().await?;
let base_dir = PathBuf::from("export");
fs::create_dir_all(&base_dir).into_diagnostic()?;
let file = File::create(base_dir.join("export.json")).into_diagnostic()?;
serde_json::to_writer_pretty(file, &export).into_diagnostic()?;
Ok(Json(export))
}

View file

@ -1,5 +1,6 @@
use axum::{extract::State, routing::get, Json, Router};
use chrono::Local;
use cozo::ScriptMutability;
use serde_json::Value;
use utoipa::OpenApi;
use uuid::Uuid;
@ -8,27 +9,26 @@ use crate::{error::AppResult, AppState};
/// Node API
#[derive(OpenApi)]
#[openapi(paths(), components(schemas()))]
#[openapi(paths(get_todays_journal_id), components(schemas()))]
pub(super) struct JournalApi;
pub(super) fn router() -> Router<AppState> {
Router::new()
// .route("/get_todays_journal_id", get(get_todays_journal_id))
Router::new().route("/get_todays_journal_id", get(get_todays_journal_id))
}
// #[utoipa::path(
// get,
// path = "/get_todays_journal_id",
// responses(
// (status = 200),
// ),
// )]
// pub async fn get_todays_journal_id(
// State(state): State<AppState>,
// ) -> AppResult<Json<Value>> {
// let node_id = state.get_todays_journal_id().await?;
#[utoipa::path(
get,
path = "/get_todays_journal_id",
responses(
(status = 200),
),
)]
pub async fn get_todays_journal_id(
State(state): State<AppState>,
) -> AppResult<Json<Value>> {
let node_id = state.get_todays_journal_id().await?;
// Ok(Json(json!({
// "node_id": node_id.to_string(),
// })))
// }
Ok(Json(json!({
"node_id": node_id.to_string(),
})))
}

View file

@ -8,6 +8,7 @@ extern crate serde_json;
extern crate sugars;
mod error;
mod export;
mod journal;
pub mod mail;
mod node;
@ -26,6 +27,12 @@ use tower_http::{
use utoipa::OpenApi;
use utoipa_scalar::{Scalar, Servable};
use crate::{
export::export,
mail::{get_mail, get_mail_config},
node::search_nodes,
};
pub async fn run() -> Result<()> {
#[derive(OpenApi)]
#[openapi(
@ -54,10 +61,11 @@ pub async fn run() -> Result<()> {
let app = Router::new()
.merge(Scalar::with_url("/api/docs", ApiDoc::openapi()))
.route("/", get(|| async { "Hello, World!" }))
.route("/export", get(export))
.nest("/node", node::router().with_state(state.clone()))
.nest("/journal", journal::router().with_state(state.clone()))
// .route("/mail/config", get(get_mail_config))
// .route("/mail", get(get_mail))
.route("/mail/config", get(get_mail_config))
.route("/mail", get(get_mail))
.layer(ServiceBuilder::new().layer(cors_layer))
.layer(ServiceBuilder::new().layer(trace_layer))
.with_state(state.clone());

View file

@ -1,53 +1,54 @@
use axum::{extract::State, Json};
use cozo::ScriptMutability;
use panorama_core::AppState;
use serde_json::Value;
use crate::error::AppResult;
// pub async fn get_mail_config(
// State(state): State<AppState>,
// ) -> AppResult<Json<Value>> {
// let configs = state.fetch_mail_configs()?;
// Ok(Json(json!({ "configs": configs })))
// }
pub async fn get_mail_config(
State(state): State<AppState>,
) -> AppResult<Json<Value>> {
let configs = state.fetch_mail_configs()?;
Ok(Json(json!({ "configs": configs })))
}
// pub async fn get_mail(State(state): State<AppState>) -> AppResult<Json<Value>> {
// let mailboxes = state.db.run_script("
// ?[node_id, account_node_id, mailbox_name] := *mailbox {node_id, account_node_id, mailbox_name}
// ", Default::default(), ScriptMutability::Immutable)?;
pub async fn get_mail(State(state): State<AppState>) -> AppResult<Json<Value>> {
let mailboxes = state.db.run_script("
?[node_id, account_node_id, mailbox_name] := *mailbox {node_id, account_node_id, mailbox_name}
", Default::default(), ScriptMutability::Immutable)?;
// let mailboxes = mailboxes
// .rows
// .iter()
// .map(|mb| {
// json!({
// "node_id": mb[0].get_str().unwrap(),
// "account_node_id": mb[1].get_str().unwrap(),
// "mailbox_name": mb[2].get_str().unwrap(),
// })
// })
// .collect::<Vec<_>>();
let mailboxes = mailboxes
.rows
.iter()
.map(|mb| {
json!({
"node_id": mb[0].get_str().unwrap(),
"account_node_id": mb[1].get_str().unwrap(),
"mailbox_name": mb[2].get_str().unwrap(),
})
})
.collect::<Vec<_>>();
// let messages = state.db.run_script("
// ?[node_id, subject, body, internal_date] := *message {node_id, subject, body, internal_date}
// :limit 10
// ", Default::default(), ScriptMutability::Immutable)?;
let messages = state.db.run_script("
?[node_id, subject, body, internal_date] := *message {node_id, subject, body, internal_date}
:limit 10
", Default::default(), ScriptMutability::Immutable)?;
// let messages = messages
// .rows
// .iter()
// .map(|m| {
// json!({
// "node_id": m[0].get_str().unwrap(),
// "subject": m[1].get_str().unwrap(),
// "body": m[2].get_str(),
// "internal_date": m[3].get_str().unwrap(),
// })
// })
// .collect::<Vec<_>>();
let messages = messages
.rows
.iter()
.map(|m| {
json!({
"node_id": m[0].get_str().unwrap(),
"subject": m[1].get_str().unwrap(),
"body": m[2].get_str(),
"internal_date": m[3].get_str().unwrap(),
})
})
.collect::<Vec<_>>();
// Ok(Json(json!({
// "mailboxes": mailboxes,
// "messages": messages,
// })))
// }
Ok(Json(json!({
"mailboxes": mailboxes,
"messages": messages,
})))
}

View file

@ -10,10 +10,11 @@ use axum::{
Json, Router,
};
use chrono::{DateTime, Utc};
use cozo::{DataValue, MultiTransaction};
use itertools::Itertools;
use miette::IntoDiagnostic;
use panorama_core::{
// state::node::{CreateOrUpdate, ExtraData},
state::node::{CreateOrUpdate, ExtraData},
NodeId,
};
use serde_json::Value;
@ -24,169 +25,172 @@ use crate::{error::AppResult, AppState};
/// Node API
#[derive(OpenApi)]
#[openapi(paths(), components(schemas()))]
#[openapi(
paths(get_node, update_node, create_node),
components(schemas(GetNodeResult))
)]
pub(super) struct NodeApi;
pub(super) fn router() -> Router<AppState> {
Router::new()
// .route("/", put(create_node))
// .route("/:id", get(get_node))
// .route("/:id", post(update_node))
// .route("/search", get(search_nodes))
.route("/", put(create_node))
.route("/:id", get(get_node))
.route("/:id", post(update_node))
.route("/search", get(search_nodes))
}
// #[derive(Serialize, Deserialize, ToSchema, Clone)]
// struct GetNodeResult {
// node_id: String,
// fields: HashMap<String, Value>,
// created_at: DateTime<Utc>,
// updated_at: DateTime<Utc>,
// }
#[derive(Serialize, Deserialize, ToSchema, Clone)]
struct GetNodeResult {
node_id: String,
fields: HashMap<String, Value>,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
}
// /// Get node info
// ///
// /// This endpoint retrieves all the fields for a particular node
// #[utoipa::path(
// get,
// path = "/{id}",
// responses(
// (status = 200, body = [GetNodeResult]),
// (status = 404, description = "the node ID provided was not found")
// ),
// params(
// ("id" = String, Path, description = "Node ID"),
// ),
// )]
// pub async fn get_node(
// State(state): State<AppState>,
// Path(node_id): Path<String>,
// ) -> AppResult<(StatusCode, Json<Value>)> {
// let node_info = state.get_node(&node_id).await?;
/// Get node info
///
/// This endpoint retrieves all the fields for a particular node
#[utoipa::path(
get,
path = "/{id}",
responses(
(status = 200, body = [GetNodeResult]),
(status = 404, description = "the node ID provided was not found")
),
params(
("id" = String, Path, description = "Node ID"),
),
)]
pub async fn get_node(
State(state): State<AppState>,
Path(node_id): Path<String>,
) -> AppResult<(StatusCode, Json<Value>)> {
let node_info = state.get_node(&node_id).await?;
// Ok((
// StatusCode::OK,
// Json(json!({
// "node_id": node_id,
// "fields": node_info.fields,
// "created_at": node_info.created_at,
// "updated_at": node_info.updated_at,
// })),
// ))
// }
Ok((
StatusCode::OK,
Json(json!({
"node_id": node_id,
"fields": node_info.fields,
"created_at": node_info.created_at,
"updated_at": node_info.updated_at,
})),
))
}
// #[derive(Deserialize, Debug)]
// pub struct UpdateData {
// extra_data: Option<ExtraData>,
// }
#[derive(Deserialize, Debug)]
pub struct UpdateData {
extra_data: Option<ExtraData>,
}
// /// Update node info
// #[utoipa::path(
// post,
// path = "/{id}",
// responses(
// (status = 200)
// ),
// params(
// ("id" = String, Path, description = "Node ID"),
// )
// )]
// pub async fn update_node(
// State(state): State<AppState>,
// Path(node_id): Path<String>,
// Json(opts): Json<UpdateData>,
// ) -> AppResult<Json<Value>> {
// let node_id = NodeId(Uuid::from_str(&node_id).into_diagnostic()?);
// let node_info = state
// .create_or_update_node(CreateOrUpdate::Update { node_id }, opts.extra_data)
// .await?;
/// Update node info
#[utoipa::path(
post,
path = "/{id}",
responses(
(status = 200)
),
params(
("id" = String, Path, description = "Node ID"),
)
)]
pub async fn update_node(
State(state): State<AppState>,
Path(node_id): Path<String>,
Json(opts): Json<UpdateData>,
) -> AppResult<Json<Value>> {
let node_id = NodeId(Uuid::from_str(&node_id).into_diagnostic()?);
let node_info = state
.create_or_update_node(CreateOrUpdate::Update { node_id }, opts.extra_data)
.await?;
// Ok(Json(json!({
// "node_id": node_info.node_id.to_string(),
// })))
// }
Ok(Json(json!({
"node_id": node_info.node_id.to_string(),
})))
}
// #[derive(Debug, Deserialize)]
// pub struct CreateNodeOpts {
// // TODO: Allow submitting a string
// // id: Option<String>,
// #[serde(rename = "type")]
// ty: String,
// extra_data: Option<ExtraData>,
// }
#[derive(Debug, Deserialize)]
pub struct CreateNodeOpts {
// TODO: Allow submitting a string
// id: Option<String>,
#[serde(rename = "type")]
ty: String,
extra_data: Option<ExtraData>,
}
// #[utoipa::path(
// put,
// path = "/",
// responses((status = 200)),
// )]
// pub async fn create_node(
// State(state): State<AppState>,
// Json(opts): Json<CreateNodeOpts>,
// ) -> AppResult<Json<Value>> {
// let node_info = state
// .create_or_update_node(
// CreateOrUpdate::Create { r#type: opts.ty },
// opts.extra_data,
// )
// .await?;
#[utoipa::path(
put,
path = "/",
responses((status = 200)),
)]
pub async fn create_node(
State(state): State<AppState>,
Json(opts): Json<CreateNodeOpts>,
) -> AppResult<Json<Value>> {
let node_info = state
.create_or_update_node(
CreateOrUpdate::Create { r#type: opts.ty },
opts.extra_data,
)
.await?;
// Ok(Json(json!({
// "node_id": node_info.node_id.to_string(),
// })))
// }
Ok(Json(json!({
"node_id": node_info.node_id.to_string(),
})))
}
// #[derive(Deserialize)]
// pub struct SearchQuery {
// query: String,
// }
#[derive(Deserialize)]
pub struct SearchQuery {
query: String,
}
// #[utoipa::path(
// get,
// path = "/search",
// responses((status = 200)),
// )]
// pub async fn search_nodes(
// State(state): State<AppState>,
// Query(query): Query<SearchQuery>,
// ) -> AppResult<Json<Value>> {
// let search_result = state.search_nodes(query.query).await?;
// let search_result = search_result
// .into_iter()
// .map(|(id, value)| value["fields"].clone())
// .collect_vec();
#[utoipa::path(
get,
path = "/search",
responses((status = 200)),
)]
pub async fn search_nodes(
State(state): State<AppState>,
Query(query): Query<SearchQuery>,
) -> AppResult<Json<Value>> {
let search_result = state.search_nodes(query.query).await?;
let search_result = search_result
.into_iter()
.map(|(id, value)| value["fields"].clone())
.collect_vec();
// Ok(Json(json!({
// "results": search_result,
// })))
// }
Ok(Json(json!({
"results": search_result,
})))
}
// fn get_rows_for_extra_keys(
// tx: &MultiTransaction,
// extra_data: &ExtraData,
// ) -> AppResult<HashMap<String, (String, String, String)>> {
// let result = tx.run_script(
// "
// ?[key, relation, field_name, type] :=
// *fqkey_to_dbkey{key, relation, field_name, type},
// is_in(key, $keys)
// ",
// btmap! {
// "keys".to_owned() => DataValue::List(
// extra_data
// .keys()
// .map(|s| DataValue::from(s.as_str()))
// .collect::<Vec<_>>()
// ),
// },
// )?;
fn get_rows_for_extra_keys(
tx: &MultiTransaction,
extra_data: &ExtraData,
) -> AppResult<HashMap<String, (String, String, String)>> {
let result = tx.run_script(
"
?[key, relation, field_name, type] :=
*fqkey_to_dbkey{key, relation, field_name, type},
is_in(key, $keys)
",
btmap! {
"keys".to_owned() => DataValue::List(
extra_data
.keys()
.map(|s| DataValue::from(s.as_str()))
.collect::<Vec<_>>()
),
},
)?;
// let s = |s: &DataValue| s.get_str().unwrap().to_owned();
let s = |s: &DataValue| s.get_str().unwrap().to_owned();
// Ok(
// result
// .rows
// .into_iter()
// .map(|row| (s(&row[0]), (s(&row[1]), s(&row[2]), s(&row[3]))))
// .collect::<HashMap<_, _>>(),
// )
// }
Ok(
result
.rows
.into_iter()
.map(|row| (s(&row[0]), (s(&row[1]), s(&row[2]), s(&row[3]))))
.collect::<HashMap<_, _>>(),
)
}

View file

@ -1,11 +0,0 @@
export interface NodeInfo {
fields: Partial<NodeFields>;
}
export const JOURNAL_PAGE_CONTENT_FIELD_NAME = "panorama/journal/page/content";
export const JOURNAL_PAGE_TITLE_FIELD_NAME = "panorama/journal/page/title";
export interface NodeFields {
[JOURNAL_PAGE_CONTENT_FIELD_NAME]: string;
[JOURNAL_PAGE_TITLE_FIELD_NAME]: string;
}