keyfork/crates/daemon/keyforkd/src/server.rs

113 lines
4.2 KiB
Rust

use keyfork_frame::asyncext::{try_decode_from, try_encode_to};
use std::{
io::Error,
path::{Path, PathBuf},
};
use tokio::net::UnixListener;
use tower::{Service, ServiceExt};
#[cfg(feature = "tracing")]
use tracing::debug;
#[allow(clippy::module_name_repetitions)]
pub struct UnixServer {
listener: UnixListener,
}
impl UnixServer {
pub fn bind(address: impl AsRef<Path>) -> Result<Self, Error> {
let mut path = PathBuf::new();
path.extend(address.as_ref().components());
tokio::spawn(async move {
#[cfg(feature = "tracing")]
debug!("Binding tokio ctrl-c handler");
let result = tokio::signal::ctrl_c().await;
#[cfg(feature = "tracing")]
debug!(
?result,
"encountered ctrl-c, performing cleanup and exiting"
);
let result = tokio::fs::remove_file(&path).await;
#[cfg(feature = "tracing")]
if let Err(error) = result {
debug!(%error, "unable to remove path: {}", path.display());
}
std::process::exit(0x80);
});
Ok(Self {
listener: UnixListener::bind(address)?,
})
}
pub async fn run<S, R>(&mut self, app: S) -> Result<(), Box<dyn std::error::Error>>
where
S: Service<R> + Clone + Send + 'static,
R: From<Vec<u8>> + Send,
<S as Service<R>>::Error: std::error::Error + Send,
<S as Service<R>>::Response: std::convert::Into<Vec<u8>> + Send,
<S as Service<R>>::Future: Send,
{
#[cfg(feature = "tracing")]
debug!("Listening for clients");
loop {
let mut app = app.clone();
let (mut socket, _) = self.listener.accept().await?;
#[cfg(feature = "tracing")]
debug!("new socket connected");
tokio::spawn(async move {
let bytes = match try_decode_from(&mut socket).await {
Ok(bytes) => bytes,
Err(e) => {
#[cfg(feature = "tracing")]
debug!(%e, "Error reading DerivationPath from socket");
let content = e.to_string().bytes().collect::<Vec<_>>();
let result = try_encode_to(&content[..], &mut socket).await;
#[cfg(feature = "tracing")]
if let Err(error) = result {
debug!(%error, "Error sending error to client");
}
return;
}
};
let app = match app.ready().await {
Ok(app) => app,
Err(e) => {
#[cfg(feature = "tracing")]
debug!(%e, "Could not poll ready");
let content = e.to_string().bytes().collect::<Vec<_>>();
let result = try_encode_to(&content[..], &mut socket).await;
#[cfg(feature = "tracing")]
if let Err(error) = result {
debug!(%error, "Error sending error to client");
}
return;
}
};
let response = match app.call(bytes.into()).await {
Ok(response) => response,
Err(e) => {
#[cfg(feature = "tracing")]
debug!(%e, "Error reading DerivationPath from socket");
let content = e.to_string().bytes().collect::<Vec<_>>();
let result = try_encode_to(&content[..], &mut socket).await;
#[cfg(feature = "tracing")]
if let Err(error) = result {
debug!(%error, "Error sending error to client");
}
return;
}
}
.into();
if let Err(e) = try_encode_to(&response[..], &mut socket).await {
#[cfg(feature = "tracing")]
debug!(%e, "Error sending response to client");
}
});
}
}
}