use std::time::Duration; use anyhow::anyhow; use axum::{ extract::{Multipart, State}, Json, }; use bollard::{ container::{ Config as ContainerConfig, CreateContainerOptions, StartContainerOptions, }, image::CreateImageOptions, network::CreateNetworkOptions, Docker, }; use entity::{process, service}; use futures::{FutureExt, TryStreamExt}; use migration::OnConflict; use sea_orm::{ActiveModelTrait, DatabaseConnection, EntityTrait, Set}; use serde_json::{json, Value}; use tokio::time::sleep; use uuid::Uuid; use crate::error::Result; use crate::{ config::{Config, SourceConfig}, error::AppError, jobs::{Job, Schedule}, AppState, }; pub async fn handle_upload_configuration( state: State, form: Multipart, ) -> Result, AppError> { let upload_form = extract_multipart(form).await.unwrap(); let config_string = serde_json::to_string(&upload_form.config).unwrap(); let mut result = service::ActiveModel { name: Set(upload_form.name), config: Set(config_string), status: Set("New".into()), ..Default::default() } .save(&state.conn) .await .unwrap(); let id = result.id.take().unwrap(); let conn = state.conn.clone(); let job = Job { code: async move { spawn_docker(SpawnDockerOpts { service_id: id, conn, config: &upload_form.config, }) .await .unwrap(); println!("done with job"); } .boxed(), schedule: Schedule::ASAP, }; state.queue.push(job).await; println!("spawned job"); Ok(Json(json! ({"id": id}))) } struct UploadForm { name: String, config: Config, } async fn extract_multipart(mut form: Multipart) -> Result { let mut name = None; let mut config = None::; while let Some(field) = form.next_field().await.unwrap() { let key = field.name().unwrap().to_owned(); let value = String::from_utf8(field.bytes().await.unwrap().to_vec()).unwrap(); match key.as_ref() { "name" => name = Some(value), "config" => config = Some(serde_yaml::from_str(&value).unwrap()), _ => return Err(anyhow!("error").into()), } } Ok(UploadForm { name: name.unwrap(), config: config.unwrap(), }) } struct SpawnDockerOpts<'a> { conn: DatabaseConnection, config: &'a Config, service_id: i32, } async fn spawn_docker(opts: SpawnDockerOpts<'_>) -> Result<()> { println!("Loading config: {:?}", opts.config); let docker = Docker::connect_with_local_defaults().unwrap(); println!("connected to docker {docker:?}"); sleep(Duration::from_secs(1)).await; // Create a bridge network let network_id = format!("aah-network-{}", Uuid::new_v4()); docker .create_network(CreateNetworkOptions { name: network_id.clone(), ..Default::default() }) .await?; println!("Created network."); // Create all the services for (service_name, service_config) in opts.config.service.iter() { // Insert metadata into the db let new_process = process::ActiveModel { service_id: Set(Some(opts.service_id)), name: Set(service_name.clone()), status: Set("New".to_owned()), ..Default::default() }; process::Entity::insert(new_process) .on_conflict( OnConflict::columns([ process::Column::ServiceId, process::Column::Name, ]) .update_column(process::Column::Status) .to_owned(), ) .exec(&opts.conn) .await?; println!("preparing config for {}", service_name); let SourceConfig::Image { image: from_image } = &service_config.source; println!("creating image {}", from_image); let mut result_stream = docker.create_image( Some(CreateImageOptions { from_image: from_image.to_owned(), ..Default::default() }), None, None, ); println!("result stream: "); while let Ok(Some(v)) = result_stream.try_next().await { println!("v: {v:?}"); } println!("completed pulling."); // create a container let result = docker .create_container( Some(CreateContainerOptions:: { ..Default::default() }), ContainerConfig { image: Some(from_image.to_owned()), ..Default::default() }, ) .await .unwrap(); // Start the container docker .start_container(&result.id, None::>) .await .unwrap(); println!("result: {result:?}"); } Ok(()) }