begin splitting out into lib crate

This commit is contained in:
Michael Zhang 2024-06-02 20:40:54 -04:00
parent de465aafb1
commit ae41b32313
15 changed files with 408 additions and 290 deletions

34
Cargo.lock generated
View file

@ -3508,6 +3508,23 @@ dependencies = [
"tauri-plugin-window-state",
]
[[package]]
name = "panorama-core"
version = "0.1.0"
dependencies = [
"async-imap",
"chrono",
"cozo",
"futures",
"miette",
"serde",
"serde_json",
"sugars",
"tantivy",
"tokio",
"uuid",
]
[[package]]
name = "panorama-daemon"
version = "0.1.0"
@ -3522,6 +3539,7 @@ dependencies = [
"futures",
"itertools 0.13.0",
"miette",
"panorama-core",
"serde",
"serde_json",
"sugars",
@ -4544,9 +4562,9 @@ dependencies = [
[[package]]
name = "serde"
version = "1.0.202"
version = "1.0.203"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "226b61a0d411b2ba5ff6d7f73a476ac4f8bb900373459cd00fab8512828ba395"
checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094"
dependencies = [
"serde_derive",
]
@ -4562,9 +4580,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.202"
version = "1.0.203"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6048858004bcff69094cd972ed40a32500f153bd3be9f716b2eed2e8217c4838"
checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba"
dependencies = [
"proc-macro2",
"quote",
@ -5752,9 +5770,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.37.0"
version = "1.38.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787"
checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a"
dependencies = [
"backtrace",
"bytes",
@ -5771,9 +5789,9 @@ dependencies = [
[[package]]
name = "tokio-macros"
version = "2.2.0"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a"
dependencies = [
"proc-macro2",
"quote",

View file

@ -0,0 +1,21 @@
[package]
name = "panorama-core"
version = "0.1.0"
edition = "2021"
[dependencies]
chrono = { version = "0.4.38", features = ["serde"] }
cozo = { version = "0.7.6", features = ["storage-rocksdb"] }
futures = "0.3.30"
miette = "5.5.0"
serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.117"
sugars = "3.0.1"
tantivy = { version = "0.22.0", features = ["zstd"] }
tokio = { version = "1.38.0", features = ["full"] }
uuid = { version = "1.8.0", features = ["v7"] }
[dependencies.async-imap]
version = "0.9.7"
default-features = false
features = ["runtime-tokio"]

View file

@ -0,0 +1,26 @@
#[macro_use]
extern crate serde;
#[macro_use]
extern crate sugars;
pub mod migrations;
pub mod state;
#[cfg(test)]
mod tests;
pub use crate::state::AppState;
use miette::{bail, IntoDiagnostic, Result};
use serde_json::Value;
pub fn ensure_ok(s: &str) -> Result<()> {
let status: Value = serde_json::from_str(&s).into_diagnostic()?;
let status = status.as_object().unwrap();
let ok = status.get("ok").unwrap().as_bool().unwrap_or(false);
if !ok {
let display = status.get("display").unwrap().as_str().unwrap();
bail!("shit (error: {display})")
}
Ok(())
}

View file

@ -1,5 +1,5 @@
use anyhow::Result;
use cozo::DbInstance;
use miette::{IntoDiagnostic, Result};
use serde_json::Value;
@ -76,7 +76,7 @@ async fn check_migration_status(db: &DbInstance) -> Result<MigrationStatus> {
);
println!("Status: {}", status);
let status: Value = serde_json::from_str(&status)?;
let status: Value = serde_json::from_str(&status).into_diagnostic()?;
let status = status.as_object().unwrap();
let ok = status.get("ok").unwrap().as_bool().unwrap_or(false);
if !ok {

View file

@ -0,0 +1,200 @@
use std::{collections::HashMap, default, time::Duration};
use cozo::{DataValue, DbInstance, JsonData, ScriptMutability};
use futures::TryStreamExt;
use miette::{IntoDiagnostic, Result};
use serde_json::Value;
use tokio::{net::TcpStream, time::sleep};
use uuid::Uuid;
use crate::AppState;
#[derive(Debug, Serialize)]
pub struct MailConfig {
node_id: String,
imap_hostname: String,
imap_port: u16,
imap_username: String,
imap_password: String,
}
impl AppState {
/// Fetch the list of mail configs in the database
pub fn fetch_mail_configs(&self) -> Result<Vec<MailConfig>> {
let result = self.db.run_script(
"
?[node_id, imap_hostname, imap_port, imap_username, imap_password] :=
*node{ id: node_id },
*mail_config{ node_id, imap_hostname, imap_port, imap_username, imap_password }
",
Default::default(),
ScriptMutability::Immutable,
)?;
let result = result
.rows
.into_iter()
.map(|row| MailConfig {
node_id: row[0].get_str().unwrap().to_owned(),
imap_hostname: row[1].get_str().unwrap().to_owned(),
imap_port: row[2].get_int().unwrap() as u16,
imap_username: row[3].get_str().unwrap().to_owned(),
imap_password: row[4].get_str().unwrap().to_owned(),
})
.collect::<Vec<_>>();
Ok(result)
}
pub async fn mail_loop(&self) {
loop {
match self.mail_loop_inner().await {
Ok(_) => {
// For now, just sleep 30 seconds and then fetch again
// TODO: Run a bunch of connections at once and do IDLE over them (if possible)
sleep(Duration::from_secs(30)).await;
}
Err(err) => {
eprintln!("Fetch config error: {err:?}");
// Back off, retry
// TODO: Exponential backoff
sleep(Duration::from_secs(5)).await;
continue;
}
}
}
}
async fn mail_loop_inner(&self) -> Result<()> {
// Fetch the mail configs
let configs = self.fetch_mail_configs()?;
if configs.len() == 0 {
return Ok(());
}
// TODO: Do all configs instead of just the first
let config = &configs[0];
let stream =
TcpStream::connect((config.imap_hostname.as_str(), config.imap_port))
.await
.into_diagnostic()?;
let client = async_imap::Client::new(stream);
let mut session = client
.login(&config.imap_username, &config.imap_password)
.await
.map_err(|(err, _)| err)
.into_diagnostic()?;
// println!("Session: {:?}", session);
let mailboxes = session
.list(None, Some("*"))
.await
.into_diagnostic()?
.try_collect::<Vec<_>>()
.await
.into_diagnostic()?;
let mailbox_names =
mailboxes.iter().map(|name| name.name()).collect::<Vec<_>>();
println!("mailboxes: {mailbox_names:?}");
// Get the mailbox with INBOX
let inbox_node_id = {
let result = self.db.run_script("
?[node_id] :=
*mailbox{node_id, account_node_id, mailbox_name},
account_node_id = $account_node_id,
mailbox_name = 'INBOX'
", btmap! {"account_node_id".to_owned()=>DataValue::from(config.node_id.to_owned())}, ScriptMutability::Immutable)?;
if result.rows.len() == 0 {
let new_node_id = Uuid::now_v7();
let new_node_id = new_node_id.to_string();
self.db.run_script("
?[node_id, account_node_id, mailbox_name] <-
[[$new_node_id, $account_node_id, 'INBOX']]
:put mailbox { node_id, account_node_id, mailbox_name }
",
btmap! {
"new_node_id".to_owned() => DataValue::from(new_node_id.clone()),
"account_node_id".to_owned() => DataValue::from(config.node_id.to_owned()),
},
ScriptMutability::Mutable)?;
new_node_id
} else {
result.rows[0][0].get_str().unwrap().to_owned()
}
};
println!("INBOX: {:?}", inbox_node_id);
let inbox = session.select("INBOX").await.into_diagnostic()?;
println!("last unseen: {:?}", inbox.unseen);
let messages = session
.fetch(
"1:4",
"(FLAGS ENVELOPE BODY[HEADER] BODY[TEXT] INTERNALDATE)",
)
.await
.into_diagnostic()?
.try_collect::<Vec<_>>()
.await
.into_diagnostic()?;
println!(
"messages {:?}",
messages.iter().map(|f| f.body()).collect::<Vec<_>>()
);
let input_data = DataValue::List(
messages
.iter()
.map(|msg| {
let message_id = Uuid::now_v7();
let headers =
String::from_utf8(msg.header().unwrap().to_vec()).unwrap();
let headers = headers
.split("\r\n")
.filter_map(|s| {
let p = s.split(": ").collect::<Vec<_>>();
if p.len() < 2 {
None
} else {
Some((p[0], p[1]))
}
})
.collect::<HashMap<_, _>>();
DataValue::List(vec![
DataValue::from(message_id.to_string()),
DataValue::from(config.node_id.clone()),
DataValue::from(inbox_node_id.clone()),
DataValue::from(
headers
.get("Subject")
.map(|s| (*s).to_owned())
.unwrap_or("Subject".to_owned()),
),
DataValue::Json(JsonData(serde_json::to_value(headers).unwrap())),
DataValue::Bytes(msg.text().unwrap().to_vec()),
DataValue::from(msg.internal_date().unwrap().to_rfc3339()),
])
})
.collect(),
);
self.db.run_script(
"
?[node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date] <- $input_data
:put message { node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date }
",
btmap! {
"input_data".to_owned() => input_data,
},
ScriptMutability::Mutable,
)?;
session.logout().await.into_diagnostic()?;
Ok(())
}
}

View file

@ -0,0 +1,68 @@
pub mod mail;
pub mod node;
use std::{fs, path::Path};
use cozo::DbInstance;
use miette::{IntoDiagnostic, Result};
use tantivy::{
directory::MmapDirectory,
schema::{self, Schema, STORED, STRING, TEXT},
Index,
};
use crate::migrations::run_migrations;
pub fn tantivy_schema() -> Schema {
let mut schema_builder = Schema::builder();
let node_id = schema_builder.add_text_field("node_id", STRING | STORED);
let title = schema_builder.add_text_field("title", TEXT | STORED);
let body = schema_builder.add_text_field("body", TEXT);
schema_builder.build()
}
#[derive(Clone)]
pub struct AppState {
pub db: DbInstance,
pub tantivy_index: Index,
}
impl AppState {
pub async fn new(panorama_dir: impl AsRef<Path>) -> Result<Self> {
let panorama_dir = panorama_dir.as_ref().to_path_buf();
println!("Panorama dir: {}", panorama_dir.display());
let tantivy_index = {
let schema = tantivy_schema();
let tantivy_path = panorama_dir.join("tantivy-index");
fs::create_dir_all(&tantivy_path).into_diagnostic()?;
let dir = MmapDirectory::open(&tantivy_path).into_diagnostic()?;
Index::builder()
.schema(schema)
.open_or_create(dir)
.into_diagnostic()?
};
let db_path = panorama_dir.join("db.sqlite");
let db = DbInstance::new(
"sqlite",
db_path.display().to_string(),
Default::default(),
)
.unwrap();
let state = AppState { db, tantivy_index };
state.init().await?;
Ok(state)
}
async fn init(&self) -> Result<()> {
run_migrations(&self.db).await?;
let state = self.clone();
tokio::spawn(async move { state.mail_loop().await });
Ok(())
}
}

View file

@ -0,0 +1,34 @@
use std::collections::HashMap;
use chrono::{DateTime, Utc};
use cozo::{DataValue, ScriptMutability};
use miette::Result;
use crate::AppState;
pub struct NodeInfo {
pub node_id: String,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub fields: HashMap<String, DataValue>,
}
impl AppState {
/// Get all properties of a node
pub async fn get_node(&self, node_id: impl AsRef<str>) -> Result<NodeInfo> {
let node_id = node_id.as_ref().to_owned();
let result = self.db.run_script(
"
?[relation, field_name, type, fts_enabled] :=
*node_has_key { key, id },
*fqkey_to_dbkey { key, relation, field_name, type, fts_enabled },
id = $node_id
",
btmap! {"node_id".to_owned() => node_id.clone().into()},
ScriptMutability::Immutable,
)?;
todo!()
}
}

View file

@ -0,0 +1,19 @@
use cozo::DbInstance;
use miette::Result;
use tantivy::Index;
use crate::{state::tantivy_schema, AppState};
pub fn test_state() -> Result<AppState> {
let db = DbInstance::new("mem", "", "")?;
let schema = tantivy_schema();
let tantivy_index = Index::create_in_ram(schema);
Ok(AppState { db, tantivy_index })
}
#[test]
pub fn test_create_node() -> Result<()> {
let state = test_state()?;
Ok(())
}

View file

@ -15,6 +15,7 @@ dirs = "5.0.1"
futures = "0.3.30"
itertools = "0.13.0"
miette = "5.5.0"
panorama-core = { path = "../panorama-core" }
serde = { version = "1.0.202", features = ["derive"] }
serde_json = "1.0.117"
sugars = "3.0.1"
@ -36,7 +37,6 @@ features = ["axum"]
git = "https://github.com/juhaku/utoipa"
features = ["axum"]
[dependencies.async-imap]
version = "0.9.7"
default-features = false

View file

@ -1,19 +1,14 @@
use std::{collections::HashMap, default, time::Duration};
use axum::{extract::State, routing::head, Json};
use cozo::{DataValue, DbInstance, JsonData, ScriptMutability};
use futures::TryStreamExt;
use miette::IntoDiagnostic;
use axum::{extract::State, Json};
use cozo::ScriptMutability;
use panorama_core::AppState;
use serde_json::Value;
use tokio::{net::TcpStream, time::sleep};
use uuid::Uuid;
use crate::{error::AppResult, AppState};
use crate::error::AppResult;
pub async fn get_mail_config(
State(state): State<AppState>,
) -> AppResult<Json<Value>> {
let configs = fetch_mail_configs(&state.db)?;
let configs = state.fetch_mail_configs()?;
Ok(Json(json!({ "configs": configs })))
}
@ -57,190 +52,3 @@ pub async fn get_mail(State(state): State<AppState>) -> AppResult<Json<Value>> {
"messages": messages,
})))
}
pub async fn mail_loop(db: DbInstance) {
loop {
match mail_loop_inner(&db).await {
Ok(_) => {
// For now, just sleep 30 seconds and then fetch again
// TODO: Run a bunch of connections at once and do IDLE over them (if possible)
sleep(Duration::from_secs(30)).await;
}
Err(err) => {
eprintln!("Fetch config error: {err:?}");
// Back off, retry
// TODO: Exponential backoff
sleep(Duration::from_secs(5)).await;
continue;
}
}
}
}
async fn mail_loop_inner(db: &DbInstance) -> AppResult<()> {
// Fetch the mail configs
let configs = fetch_mail_configs(&db)?;
if configs.len() == 0 {
return Ok(());
}
// TODO: Do all configs instead of just the first
let config = &configs[0];
let stream =
TcpStream::connect((config.imap_hostname.as_str(), config.imap_port))
.await
.into_diagnostic()?;
let client = async_imap::Client::new(stream);
let mut session = client
.login(&config.imap_username, &config.imap_password)
.await
.map_err(|(err, _)| err)
.into_diagnostic()?;
// println!("Session: {:?}", session);
let mailboxes = session
.list(None, Some("*"))
.await
.into_diagnostic()?
.try_collect::<Vec<_>>()
.await
.into_diagnostic()?;
let mailbox_names =
mailboxes.iter().map(|name| name.name()).collect::<Vec<_>>();
println!("mailboxes: {mailbox_names:?}");
// Get the mailbox with INBOX
let inbox_node_id = {
let result = db.run_script("
?[node_id] :=
*mailbox{node_id, account_node_id, mailbox_name},
account_node_id = $account_node_id,
mailbox_name = 'INBOX'
", btmap! {"account_node_id".to_owned()=>DataValue::from(config.node_id.to_owned())}, ScriptMutability::Immutable)?;
if result.rows.len() == 0 {
let new_node_id = Uuid::now_v7();
let new_node_id = new_node_id.to_string();
db.run_script("
?[node_id, account_node_id, mailbox_name] <-
[[$new_node_id, $account_node_id, 'INBOX']]
:put mailbox { node_id, account_node_id, mailbox_name }
",
btmap! {
"new_node_id".to_owned() => DataValue::from(new_node_id.clone()),
"account_node_id".to_owned() => DataValue::from(config.node_id.to_owned()),
},
ScriptMutability::Mutable)?;
new_node_id
} else {
result.rows[0][0].get_str().unwrap().to_owned()
}
};
println!("INBOX: {:?}", inbox_node_id);
let inbox = session.select("INBOX").await.into_diagnostic()?;
println!("last unseen: {:?}", inbox.unseen);
let messages = session
.fetch(
"1:4",
"(FLAGS ENVELOPE BODY[HEADER] BODY[TEXT] INTERNALDATE)",
)
.await
.into_diagnostic()?
.try_collect::<Vec<_>>()
.await
.into_diagnostic()?;
println!(
"messages {:?}",
messages.iter().map(|f| f.body()).collect::<Vec<_>>()
);
let input_data = DataValue::List(
messages
.iter()
.map(|msg| {
let message_id = Uuid::now_v7();
let headers =
String::from_utf8(msg.header().unwrap().to_vec()).unwrap();
let headers = headers
.split("\r\n")
.filter_map(|s| {
let p = s.split(": ").collect::<Vec<_>>();
if p.len() < 2 {
None
} else {
Some((p[0], p[1]))
}
})
.collect::<HashMap<_, _>>();
DataValue::List(vec![
DataValue::from(message_id.to_string()),
DataValue::from(config.node_id.clone()),
DataValue::from(inbox_node_id.clone()),
DataValue::from(
headers
.get("Subject")
.map(|s| (*s).to_owned())
.unwrap_or("Subject".to_owned()),
),
DataValue::Json(JsonData(serde_json::to_value(headers).unwrap())),
DataValue::Bytes(msg.text().unwrap().to_vec()),
DataValue::from(msg.internal_date().unwrap().to_rfc3339()),
])
})
.collect(),
);
db.run_script(
"
?[node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date] <- $input_data
:put message { node_id, account_node_id, mailbox_node_id, subject, headers, body, internal_date }
",
btmap! {
"input_data".to_owned() => input_data,
},
ScriptMutability::Mutable,
)?;
session.logout().await.into_diagnostic()?;
Ok(())
}
#[derive(Debug, Serialize)]
struct MailConfig {
node_id: String,
imap_hostname: String,
imap_port: u16,
imap_username: String,
imap_password: String,
}
fn fetch_mail_configs(db: &DbInstance) -> AppResult<Vec<MailConfig>> {
let result = db.run_script(
"
?[node_id, imap_hostname, imap_port, imap_username, imap_password] :=
*node{ id: node_id },
*mail_config{ node_id, imap_hostname, imap_port, imap_username, imap_password }
",
Default::default(),
ScriptMutability::Immutable,
)?;
let result = result
.rows
.into_iter()
.map(|row| MailConfig {
node_id: row[0].get_str().unwrap().to_owned(),
imap_hostname: row[1].get_str().unwrap().to_owned(),
imap_port: row[2].get_int().unwrap() as u16,
imap_username: row[3].get_str().unwrap().to_owned(),
imap_password: row[4].get_str().unwrap().to_owned(),
})
.collect::<Vec<_>>();
Ok(result)
}

View file

@ -10,20 +10,19 @@ extern crate sugars;
mod error;
mod export;
mod journal;
mod mail;
mod migrations;
pub mod mail;
mod node;
mod query_builder;
pub mod state;
use std::fs;
use anyhow::Result;
use axum::{
http::Method,
routing::{get, post, put},
Router,
};
use miette::{IntoDiagnostic, Result};
use panorama_core::AppState;
use serde_json::Value;
use tokio::net::TcpListener;
use tower::ServiceBuilder;
@ -36,7 +35,6 @@ use crate::{
journal::get_todays_journal_id,
mail::{get_mail, get_mail_config},
node::{create_node, node_types, search_nodes, update_node},
state::AppState,
};
#[tokio::main]
@ -50,7 +48,7 @@ async fn main() -> Result<()> {
let data_dir = dirs::data_dir().unwrap();
let panorama_dir = data_dir.join("panorama");
fs::create_dir_all(&panorama_dir)?;
fs::create_dir_all(&panorama_dir).into_diagnostic()?;
let state = AppState::new(&panorama_dir).await?;
@ -80,20 +78,9 @@ async fn main() -> Result<()> {
.layer(ServiceBuilder::new().layer(cors))
.with_state(state.clone());
let listener = TcpListener::bind("0.0.0.0:5195").await?;
let listener = TcpListener::bind("0.0.0.0:5195").await.into_diagnostic()?;
println!("Listening... {:?}", listener);
axum::serve(listener, app).await?;
axum::serve(listener, app).await.into_diagnostic()?;
Ok(())
}
pub fn ensure_ok(s: &str) -> Result<()> {
let status: Value = serde_json::from_str(&s)?;
let status = status.as_object().unwrap();
let ok = status.get("ok").unwrap().as_bool().unwrap_or(false);
if !ok {
let display = status.get("display").unwrap().as_str().unwrap();
bail!("shit (error: {display})")
}
Ok(())
}

View file

@ -53,19 +53,6 @@ pub async fn get_node(
State(state): State<AppState>,
Path(node_id): Path<String>,
) -> AppResult<(StatusCode, Json<Value>)> {
let result = state.db.run_script(
"
?[relation, field_name, type, fts_enabled] :=
*node_has_key { key, id },
*fqkey_to_dbkey { key, relation, field_name, type, fts_enabled },
id = $node_id
",
btmap! {"node_id".to_owned() => node_id.clone().into()},
ScriptMutability::Immutable,
)?;
println!("FIRST RESULT: {:?}", result);
let result = state.db.run_script(
"
j[content] := *journal{ node_id, content }, node_id = $node_id

View file

@ -1,50 +0,0 @@
use std::{fs, path::Path};
use anyhow::Result;
use cozo::DbInstance;
use tantivy::{
directory::MmapDirectory,
schema::{self, Schema, STORED, STRING, TEXT},
Index,
};
use crate::{mail::mail_loop, migrations::run_migrations};
#[derive(Clone)]
pub struct AppState {
pub db: DbInstance,
pub tantivy_index: Index,
}
impl AppState {
pub async fn new(panorama_dir: impl AsRef<Path>) -> Result<Self> {
let panorama_dir = panorama_dir.as_ref().to_path_buf();
println!("Panorama dir: {}", panorama_dir.display());
let tantivy_index = {
let mut schema_builder = Schema::builder();
let node_id = schema_builder.add_text_field("node_id", STRING | STORED);
let title = schema_builder.add_text_field("title", TEXT | STORED);
let body = schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let tantivy_path = panorama_dir.join("tantivy-index");
fs::create_dir_all(&tantivy_path)?;
let dir = MmapDirectory::open(&tantivy_path)?;
Index::builder().schema(schema).open_or_create(dir)?
};
let db_path = panorama_dir.join("db.sqlite");
let db = DbInstance::new(
"sqlite",
db_path.display().to_string(),
Default::default(),
)
.unwrap();
run_migrations(&db).await?;
tokio::spawn(mail_loop(db.clone()));
Ok(AppState { db, tantivy_index })
}
}