Some more pool stuff

This commit is contained in:
Michael Zhang 2021-11-03 20:14:12 -05:00
parent 449b71d41b
commit 7c36d9ef7d
Signed by: michael
GPG key ID: BDA47A31A3C8EE6B
8 changed files with 110 additions and 35 deletions

View file

@ -24,7 +24,7 @@ use tokio::{
use xdg::BaseDirectories; use xdg::BaseDirectories;
use crate::config::{Config, MailAccountConfig}; use crate::config::{Config, MailAccountConfig};
use crate::mail::{sync_main, MailStore}; use crate::mail::{mail_main, MailStore};
type ExitListener = oneshot::Receiver<()>; type ExitListener = oneshot::Receiver<()>;
@ -155,7 +155,7 @@ async fn run_single_mail_account(
let (tx, mut rx) = mpsc::unbounded_channel(); let (tx, mut rx) = mpsc::unbounded_channel();
let sync_fut = sync_main(&account_name, account, tx, store).fuse(); let sync_fut = mail_main(&account_name, account, tx, store).fuse();
pin_mut!(sync_fut); pin_mut!(sync_fut);
debug!("Mail account loop for {}.", account_name); debug!("Mail account loop for {}.", account_name);
@ -164,7 +164,7 @@ async fn run_single_mail_account(
res = sync_fut => match res { res = sync_fut => match res {
Ok(_) => {}, Ok(_) => {},
Err(err) => { Err(err) => {
error!("sync_main died with: {:?}", err); error!("mail_main died with: {:?}", err);
break; break;
} }
}, },

View file

@ -7,6 +7,7 @@ use anyhow::{Context, Result};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use panorama_imap::{ use panorama_imap::{
client::{auth::Login, ConfigBuilder}, client::{auth::Login, ConfigBuilder},
pool::{ImapPool, PoolConfig},
proto::{ proto::{
command::FetchItems, command::FetchItems,
response::{MailboxData, Response}, response::{MailboxData, Response},
@ -23,14 +24,37 @@ pub use self::store::MailStore;
static MIGRATOR: Migrator = sqlx::migrate!(); static MIGRATOR: Migrator = sqlx::migrate!();
/// The main function for the IMAP syncing thread /// The main function for the IMAP syncing thread
pub async fn sync_main( pub async fn mail_main(
acct_name: impl AsRef<str>, acct_name: impl AsRef<str>,
acct: MailAccountConfig, acct: MailAccountConfig,
_mail2ui_tx: UnboundedSender<MailEvent>, _mail2ui_tx: UnboundedSender<MailEvent>,
_mail_store: MailStore, _mail_store: MailStore,
) -> Result<()> { ) -> Result<()> {
let acct_name = acct_name.as_ref(); let acct_name = acct_name.as_ref();
debug!("Starting main synchronization procedure for {}", acct_name); debug!(
"Starting main synchronization procedure for account '{}'",
acct_name
);
// create a connection pool
let client_config = ConfigBuilder::default()
.hostname(acct.imap.server.clone())
.port(acct.imap.port)
.tls(matches!(acct.imap.tls, TlsMethod::On))
.build()?;
let pool_config = PoolConfig {
max_connections: 10,
};
let pool = ImapPool::new(client_config, pool_config);
// grab one connection from that pool and start running a background
// synchronization thread
let sync_conn = pool.acquire().await?;
// let the rest of the pool respond to requests coming from the channel
// TODO:
return Ok(());
// loop ensures that the connection is retried after it dies // loop ensures that the connection is retried after it dies
loop { loop {

View file

@ -29,7 +29,6 @@ use super::tls::wrap_tls;
/// An IMAP client that hasn't been connected yet. /// An IMAP client that hasn't been connected yet.
#[derive(Builder, Clone, Debug)] #[derive(Builder, Clone, Debug)]
#[builder(build_fn(private))]
pub struct Config { pub struct Config {
/// The hostname of the IMAP server. If using TLS, must be an address /// The hostname of the IMAP server. If using TLS, must be an address
pub hostname: String, pub hostname: String,

View file

@ -21,14 +21,12 @@ pub mod response_stream;
mod client; mod client;
mod codec; mod codec;
mod inner; mod inner;
mod pool;
mod tls; mod tls;
pub use self::client::{ pub use self::client::{
ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder, ClientAuthenticated, ClientUnauthenticated, Config, ConfigBuilder,
}; };
pub use self::codec::{ImapCodec, TaggedCommand}; pub use self::codec::{ImapCodec, TaggedCommand};
pub use self::pool::ImapPool;
#[cfg(feature = "low-level")] #[cfg(feature = "low-level")]
pub use self::inner::Inner; pub use self::inner::Inner;

View file

@ -1,27 +0,0 @@
use std::sync::Arc;
use crossbeam::queue::ArrayQueue;
use super::{ClientAuthenticated, Config};
#[derive(Clone)]
pub struct ImapPool(Arc<InnerPool>);
impl ImapPool {
pub fn new(config: Config) -> Self {
let inner = InnerPool {
config,
connections: ArrayQueue::new(10),
};
ImapPool(Arc::new(inner))
}
}
struct InnerPool {
config: Config,
connections: ArrayQueue<ClientAuthenticated>,
}
impl InnerPool {
pub fn connect() {}
}

10
imap/src/interface.rs Normal file
View file

@ -0,0 +1,10 @@
use anyhow::Result;
use crate::proto::response::Envelope;
#[async_trait]
pub trait ImapClient {
async fn list_messages(&self) -> Result<Vec<Envelope>>;
}
pub struct ListMessagesOptions {}

View file

@ -20,4 +20,8 @@ extern crate panorama_proto_common;
extern crate maplit; extern crate maplit;
pub mod client; pub mod client;
pub mod interface;
pub mod proto; pub mod proto;
#[cfg(feature = "pool")]
pub mod pool;

67
imap/src/pool/mod.rs Normal file
View file

@ -0,0 +1,67 @@
use std::{borrow::Borrow, ops::Deref, sync::Arc};
use anyhow::Result;
use crossbeam::queue::ArrayQueue;
use tokio::sync::Semaphore;
use crate::{interface::ImapClient, proto::response::Envelope};
use super::client::{ClientAuthenticated, Config};
pub struct PoolConfig {
pub max_connections: usize,
}
/// A pool of IMAP connections.
#[derive(Clone)]
pub struct ImapPool(Arc<InnerPool>);
impl Deref for ImapPool {
type Target = InnerPool;
fn deref(&self) -> &Self::Target { self.0.borrow() }
}
#[async_trait]
impl ImapClient for ImapPool {
async fn list_messages(&self) -> Result<Vec<Envelope>> {
let client = self.acquire();
todo!()
}
}
impl ImapPool {
pub fn new(config: Config, pool_config: PoolConfig) -> Self {
let inner = InnerPool::init(config, pool_config);
ImapPool(Arc::new(inner))
}
}
pub struct InnerPool {
config: Config,
semaphore: Semaphore,
connections: ArrayQueue<ClientAuthenticated>,
}
impl InnerPool {
pub fn init(config: Config, pool_config: PoolConfig) -> Self {
InnerPool {
config,
semaphore: Semaphore::new(pool_config.max_connections),
connections: ArrayQueue::new(pool_config.max_connections),
}
}
pub async fn acquire(&self) -> Result<ClientAuthenticated> {
let guard = match self.connections.pop() {
// we can reuse
Some(conn) => {}
// no existing connection, time to make a new one
None => {}
};
todo!()
}
}