icepick/crates/icepick-workflow/src/lib.rs

235 lines
8.2 KiB
Rust

use keyfork_derive_util::{request::DerivationAlgorithm, DerivationIndex, DerivationPath};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::{BTreeMap, HashSet};
#[derive(thiserror::Error, Debug)]
pub enum SimulationError {
#[error("Step not found: {0}")]
StepNotFound(String),
#[error("Expected input variable or static value not found in step {0}: {1}")]
ValueNotFound(String, String),
}
#[derive(thiserror::Error, Debug)]
pub enum WorkflowError {
#[error("Invocable operation could not be found: {0}")]
InvocableOperationNotFound(String),
#[error("Derivation configuration for operation not found: {0}")]
DerivationConfigurationNotFound(String),
#[error("An error was encountered while invoking an operation")]
InvocationError(String),
}
/// An input for a workflow argument. When inputs are read, they should be referenced by the first
/// name. Additional names can be provided as aliases, to allow chaining workflows together when
/// names may not make sense - such as a Solana address then being used as an authorization
/// address.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Input {
/// An input with a single identifier.
/// The name of the input.
pub name: String,
/// A description of the input.
pub description: String,
/// Aliases used when loading inputs.
#[serde(default)]
pub aliases: Vec<String>,
/// Whether the workflow input is optional.
pub optional: Option<bool>,
}
impl Input {
pub fn identifiers(&self) -> impl Iterator<Item = &String> {
[&self.name].into_iter().chain(self.aliases.iter())
}
pub fn is_required(&self) -> bool {
self.optional.is_some_and(|o| o)
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Workflow {
pub name: String,
pub description: String,
#[serde(default)]
pub inputs: Vec<Input>,
#[serde(rename = "step")]
steps: Vec<WorkflowStep>,
}
pub type StringMap<T = String> = BTreeMap<String, T>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WorkflowStep {
r#type: String,
#[serde(default)]
values: StringMap,
#[serde(default)]
inputs: StringMap,
#[serde(default)]
outputs: StringMap,
}
// TODO: This should probably be migrated to an actual Result type, instead of
// currently just shoving everything in "blob". Probably done after derivation_accounts
// gets hoisted out of here.
#[derive(Serialize, Deserialize)]
pub struct OperationResult {
// All values returned from an operation.
blob: StringMap<Value>,
// Any requested accounts from an operation.
//
// TODO: Move this to its own step.
#[serde(default)]
derivation_accounts: Vec<DerivationIndex>,
}
type DeriveKeys<'a> =
&'a dyn Fn(&DerivationAlgorithm, &DerivationPath, &[DerivationIndex]) -> Vec<Vec<u8>>;
impl Workflow {
pub fn simulate_workflow<T: InvocableOperation + Sized>(
&self,
mut data: HashSet<String>,
operations: &[T],
) -> Result<Vec<String>, SimulationError> {
let mut reports = vec![];
for step in self.steps.iter() {
let step_type = step.r#type.clone();
let Some(invocable) = operations.iter().find(|op| *op.name() == step_type) else {
return Err(SimulationError::StepNotFound(step_type));
};
// Check we have the values the module expects
for in_memory_name in step.inputs.values() {
if !data.contains(in_memory_name) && !step.values.contains_key(in_memory_name) {
return Err(SimulationError::ValueNotFound(
step_type,
in_memory_name.to_owned(),
));
}
}
// Check whether the module expects the keys as arguments, or if the
// keys will be passed as a "payload" variable.
let mut inputs = step.inputs.keys().collect::<HashSet<_>>();
for argument in invocable.argument_names() {
inputs.remove(argument);
}
for remaining_input in inputs {
reports.push(format!(
"Step {step_type}: Input {remaining_input} is not interpreted as a argument"
));
}
// Add the return values from the module into memory
data.extend(step.outputs.values().cloned());
}
Ok(reports)
}
pub fn run_workflow<T: InvocableOperation>(
&self,
mut data: StringMap<Value>,
operations: &[T],
derive_keys: DeriveKeys,
) -> Result<StringMap<Value>, WorkflowError> {
let mut derived_keys = vec![];
let mut derivation_accounts = vec![];
for step in &self.steps {
let step_type = step.r#type.clone();
let Some(operation) = operations.iter().find(|op| *op.name() == step_type) else {
return Err(WorkflowError::InvocableOperationNotFound(step_type));
};
// Prepare all inputs for the operation invocation
let inputs: StringMap<Value> = data
.iter()
.map(|(k, v)| (k, v.clone()))
.filter_map(|(k, v)| {
// We have our stored name, `k`, which matches with this inner loop's `v`. We
// need to return our desired name, rather than our stored name, and the value
// in our storage, our current `v`.
let (desired, _stored) = step.inputs.iter().find(|(_, v)| k == *v)?;
Some((desired.clone(), v))
})
.chain(
step.values
.iter()
.map(|(k, v)| (k.clone(), Value::String(v.clone()))),
)
.collect();
let OperationResult {
blob,
derivation_accounts: new_accounts,
} = operation.invoke(&inputs, &derived_keys);
derived_keys.clear();
derivation_accounts.extend(new_accounts);
data.extend(blob.into_iter().filter_map(|(k, v)| {
// We have our stored name, `k`, which matches with this inner loop's `v`. We
// need to return our desired name, rather than our stored name, and the value
// in our storage, our current `v`.
let (_given, stored) = step.outputs.iter().find(|(k1, _)| k == **k1)?;
Some((stored.clone(), v))
}));
// Add requested derivation keys and clear derivation account requests.
if !derivation_accounts.is_empty() {
let Some((algo, path_prefix)) = operation.derivation_configuration() else {
return Err(WorkflowError::DerivationConfigurationNotFound(step_type));
};
derived_keys.extend(derive_keys(algo, path_prefix, &derivation_accounts));
}
derivation_accounts.clear();
}
if let Some(last_step) = &self.steps.last() {
let values = last_step.outputs.values().collect::<HashSet<_>>();
data.retain(|stored_name, _| values.contains(stored_name));
}
Ok(data)
}
}
pub trait WorkflowHandler {
/// Load all inputs for the Workflow from some external source, such as CLI arguments or
/// JSON payloads. The inputs can then be used to simulate or perform a workflow.
fn load_inputs(&self) -> StringMap;
}
/// The configuration for an Icepick operation that can be invoked.
///
/// Implementors of this trait should include all necessary requirements to invoke the operation
/// within themselves.
pub trait InvocableOperation {
/// Invoke the operation with the supplied inputs and derived keys.
fn invoke(&self, input: &StringMap<Value>, derived_keys: &[Vec<u8>]) -> OperationResult;
/// The name of the operation.
fn name(&self) -> &String;
/// The names of arguments that can be passed to the function.
fn argument_names(&self) -> impl Iterator<Item = &String>;
/// The derivation algorithm and derivation path to be prefixed to all derivation requests.
fn derivation_configuration(&self) -> Option<(&DerivationAlgorithm, &DerivationPath)>;
}