panorama/crates/panorama-daemon/src/migrations.rs
2024-05-25 05:04:05 -05:00

150 lines
3.6 KiB
Rust

use anyhow::{bail, Result};
use cozo::{DbInstance, ScriptMutability};
use futures::Future;
use serde_json::Value;
use crate::ensure_ok;
pub async fn run_migrations(db: &DbInstance) -> Result<()> {
let migration_status = check_migration_status(db).await?;
println!("migration status: {:?}", migration_status);
let migrations: Vec<Box<dyn for<'a> Fn(&'a DbInstance) -> Result<()>>> = vec![
Box::new(no_op),
Box::new(migration_01),
Box::new(migration_02),
];
if let MigrationStatus::NoMigrations = migration_status {
let result = db.run_script_str(
"
{ :create migrations { yeah: Int default 0 => version: Int default 0 } }
{
?[yeah, version] <- [[0, 0]]
:put migrations { yeah, version }
}
",
"",
false,
);
ensure_ok(&result)?;
}
let start_at_migration = match migration_status {
MigrationStatus::NoMigrations => 0,
MigrationStatus::HasVersion(n) => n,
};
let migrations_to_run = migrations
.iter()
.enumerate()
.skip(start_at_migration as usize + 1);
// println!("running {} migrations...", migrations_to_run.len());
//TODO: This should all be done in a transaction
for (idx, migration) in migrations_to_run {
println!("running migration {idx}...");
migration(db)?;
let result = db.run_script_str(
"
?[yeah, version] <- [[0, $version]]
:put migrations { yeah => version }
",
&format!("{{\"version\":{}}}", idx),
false,
);
ensure_ok(&result)?;
println!("succeeded migration {idx}!");
}
Ok(())
}
#[derive(Debug)]
enum MigrationStatus {
NoMigrations,
HasVersion(u64),
}
async fn check_migration_status(db: &DbInstance) -> Result<MigrationStatus> {
let status = db.run_script_str(
"
?[yeah, version] := *migrations[yeah, version]
",
"",
true,
);
println!("Status: {}", status);
let status: Value = serde_json::from_str(&status)?;
let status = status.as_object().unwrap();
let ok = status.get("ok").unwrap().as_bool().unwrap_or(false);
if !ok {
let status_code = status.get("code").unwrap().as_str().unwrap();
if status_code == "query::relation_not_found" {
return Ok(MigrationStatus::NoMigrations);
}
}
let rows = status.get("rows").unwrap().as_array().unwrap();
let row = rows[0].as_array().unwrap();
let version = row[1].as_number().unwrap().as_u64().unwrap();
println!("row: {row:?}");
Ok(MigrationStatus::HasVersion(version))
}
fn no_op(_: &DbInstance) -> Result<()> {
Ok(())
}
fn migration_01(db: &DbInstance) -> Result<()> {
let result = db.run_script_str(
"
# Primary node type
{
:create node {
id: String
=>
created_at: Float default now(),
updated_at: Float default now(),
extra_data: Json default {},
}
}
# Inverse mapping from keys to nodes
{ :create has_key { key: String => id: String } }
{ :create node_managed_by_app { node_id: String => app: String } }
",
"",
false,
);
ensure_ok(&result)?;
Ok(())
}
fn migration_02(db: &DbInstance) -> Result<()> {
let result = db.run_script_str(
"
# Create journal type
{ :create journal { node_id: String => plaintext: String } }
{ :create journal_days { day: String => node_id: String } }
{
::fts create journal:text_index {
extractor: plaintext,
extract_filter: !is_null(plaintext),
tokenizer: Simple,
filters: [Lowercase, Stemmer('english'), Stopwords('en')],
}
}
",
"",
false,
);
ensure_ok(&result)?;
Ok(())
}