icepick/crates/icepick-workflow/src/lib.rs

206 lines
7.4 KiB
Rust

use keyfork_derive_util::{request::DerivationAlgorithm, DerivationIndex, DerivationPath};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{HashMap, HashSet};
#[derive(thiserror::Error, Debug)]
pub enum SimulationError {
#[error("Step not found: {0}")]
StepNotFound(String),
#[error("Expected input variable or static value not found in step {0}: {1}")]
ValueNotFound(String, String),
}
#[derive(thiserror::Error, Debug)]
pub enum WorkflowError {
#[error("Invocable operation could not be found: {0}")]
InvocableOperationNotFound(String),
#[error("Derivation configuration for operation not found: {0}")]
DerivationConfigurationNotFound(String),
#[error("An error was encountered while invoking an operation")]
InvocationError(String),
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Workflow {
pub name: String,
#[serde(default)]
pub inputs: Vec<String>,
#[serde(default)]
pub optional_inputs: Vec<String>,
#[serde(rename = "step")]
steps: Vec<WorkflowStep>,
}
pub type StringMap = HashMap<String, String>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WorkflowStep {
r#type: String,
#[serde(default)]
values: StringMap,
#[serde(default)]
inputs: StringMap,
#[serde(default)]
outputs: StringMap,
}
// TODO: This should probably be migrated to an actual Result type, instead of
// currently just shoving everything in "blob". Probably done after derivation_accounts
// gets hoisted out of here.
#[derive(Serialize, Deserialize)]
pub struct OperationResult {
// All values returned from an operation.
blob: HashMap<String, Value>,
// Any requested accounts from an operation.
//
// TODO: Move this to its own step.
#[serde(default)]
derivation_accounts: Vec<DerivationIndex>,
}
type DeriveKeys<'a> = &'a dyn Fn(&DerivationAlgorithm, &DerivationPath, &[DerivationIndex]) -> Vec<Vec<u8>>;
impl Workflow {
pub fn simulate_workflow<T: InvocableOperation + Sized>(
&self,
mut data: HashSet<String>,
operations: &[T],
) -> Result<Vec<String>, SimulationError> {
let mut reports = vec![];
for step in self.steps.iter() {
let step_type = step.r#type.clone();
let Some(invocable) = operations.iter().find(|op| *op.name() == step_type) else {
return Err(SimulationError::StepNotFound(step_type));
};
// Check we have the values the module expects
for in_memory_name in step.inputs.values() {
if !data.contains(in_memory_name) && !step.values.contains_key(in_memory_name) {
return Err(SimulationError::ValueNotFound(
step_type,
in_memory_name.to_owned(),
));
}
}
// Check whether the module expects the keys as arguments, or if the
// keys will be passed as a "payload" variable.
let mut inputs = step.inputs.keys().collect::<HashSet<_>>();
for argument in invocable.argument_names() {
inputs.remove(argument);
}
for remaining_input in inputs {
reports.push(format!(
"Step {step_type}: Input {remaining_input} is not interpreted as a argument"
));
}
// Add the return values from the module into memory
data.extend(step.outputs.values().cloned());
}
Ok(reports)
}
pub fn run_workflow<T: InvocableOperation>(
&self,
mut data: HashMap<String, Value>,
operations: &[T],
derive_keys: DeriveKeys,
) -> Result<HashMap<String, Value>, WorkflowError> {
let mut derived_keys = vec![];
let mut derivation_accounts = vec![];
for step in &self.steps {
let step_type = step.r#type.clone();
let Some(operation) = operations.iter().find(|op| *op.name() == step_type) else {
return Err(WorkflowError::InvocableOperationNotFound(step_type));
};
// Add requested derivation keys and clear derivation account requests.
if !derivation_accounts.is_empty() {
let Some((algo, path_prefix)) = operation.derivation_configuration() else {
return Err(WorkflowError::DerivationConfigurationNotFound(step_type));
};
derived_keys.extend(derive_keys(algo, path_prefix, &derivation_accounts));
}
derivation_accounts.clear();
// Prepare all inputs for the operation invocation
let inputs: HashMap<String, Value> = data
.iter()
.map(|(k, v)| (k, v.clone()))
.filter_map(|(k, v)| {
// We have our stored name, `k`, which matches with this inner loop's `v`. We
// need to return our desired name, rather than our stored name, and the value
// in our storage, our current `v`.
let (desired, _stored) = step.inputs.iter().find(|(_, v)| k == *v)?;
Some((desired.clone(), v))
})
.chain(
step.values
.iter()
.map(|(k, v)| (k.clone(), Value::String(v.clone()))),
)
.collect();
let OperationResult {
blob,
derivation_accounts: new_accounts,
} = operation.invoke(&inputs, &derived_keys);
derived_keys.clear();
derivation_accounts.extend(new_accounts);
data.extend(blob.into_iter().filter_map(|(k, v)| {
// We have our stored name, `k`, which matches with this inner loop's `v`. We
// need to return our desired name, rather than our stored name, and the value
// in our storage, our current `v`.
let (_given, stored) = step.outputs.iter().find(|(k1, _)| k == **k1)?;
Some((stored.clone(), v))
}));
}
if let Some(last_step) = &self.steps.last() {
let values = last_step.outputs.values().collect::<HashSet<_>>();
data.retain(|stored_name, _| {
values.contains(stored_name)
});
}
Ok(data)
}
}
pub trait WorkflowHandler {
/// Load all inputs for the Workflow from some external source, such as CLI arguments or
/// JSON payloads. The inputs can then be used to simulate or perform a workflow.
fn load_inputs(&self) -> StringMap;
}
/// The configuration for an Icepick operation that can be invoked.
///
/// Implementors of this trait should include all necessary requirements to invoke the operation
/// within themselves.
pub trait InvocableOperation {
/// Invoke the operation with the supplied inputs and derived keys.
fn invoke(&self, input: &HashMap<String, Value>, derived_keys: &[Vec<u8>]) -> OperationResult;
/// The name of the operation.
fn name(&self) -> &String;
/// The names of arguments that can be passed to the function.
fn argument_names(&self) -> impl Iterator<Item = &String>;
/// The derivation algorithm and derivation path to be prefixed to all derivation requests.
fn derivation_configuration(&self) -> Option<(&DerivationAlgorithm, &DerivationPath)>;
}