icepick/crates/icepick/src/cli/workflow.rs

289 lines
10 KiB
Rust

use keyfork_derive_util::DerivationIndex;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{
collections::{HashMap, HashSet},
io::Write,
process::{Command, Stdio},
};
use super::{derive_keys, get_command, Commands, ModuleConfig, Operation};
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct Workflow {
pub name: String,
pub inputs: Vec<String>,
#[serde(rename = "step")]
steps: Vec<WorkflowStep>,
}
pub type StringMap = std::collections::HashMap<String, String>;
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WorkflowStep {
r#type: String,
#[serde(default)]
blob: StringMap,
#[serde(default)]
values: StringMap,
#[serde(default)]
inputs: StringMap,
#[serde(default)]
outputs: StringMap,
}
#[derive(Clone, Debug)]
struct InvocableOperation {
module: String,
name: String,
binary: String,
operation: Operation,
}
// TODO: This should probably be migrated to an actual Result type, instead of
// currently just shoving everything in "blob". Probably done after derivation_accounts
// gets hoisted out of here.
#[derive(Serialize, Deserialize)]
struct OperationResult {
// All values returned from an operation.
blob: HashMap<String, Value>,
// Any requested accounts from an operation.
//
// TODO: Move this to its own step.
#[serde(default)]
derivation_accounts: Vec<DerivationIndex>,
}
impl InvocableOperation {
fn invoke(&self, input: &HashMap<String, Value>, derived_keys: &[Vec<u8>]) -> OperationResult {
let (command, args) = get_command(&self.binary);
let json = serde_json::json!({
"operation": self.operation.name,
"values": input,
"derived_keys": derived_keys,
});
let mut child = Command::new(command)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap();
let mut child_input = child.stdin.take().unwrap();
serde_json::to_writer(&mut child_input, &json).unwrap();
child_input
.write_all(b"\n{\"operation\": \"exit\"}\n")
.unwrap();
let result = child.wait_with_output().unwrap();
if !result.status.success() {
panic!("Bad exit: {}", String::from_utf8_lossy(&result.stderr));
}
let output = result.stdout;
let json: OperationResult = serde_json::from_slice(&output).expect("valid json");
json
}
}
impl Workflow {
/// Generate a [`clap::Command`] for a [`Workflow`], where the inputs can be defined either by
/// command-line arguments or via a JSON input file.
pub fn generate_command(&self) -> clap::Command {
let mut command = clap::Command::new(&self.name).arg(clap::arg!(
--"input-file" [FILE]
"A file containing any inputs not passed on the command line"
));
for input in &self.inputs {
let arg = clap::Arg::new(input)
.required(false)
.long(input.replace('_', "-"))
.value_name(input.to_uppercase());
command = command.arg(arg);
}
command
}
fn load_inputs(&self, matches: &clap::ArgMatches) -> StringMap {
let mut map = StringMap::default();
let input_file: Option<StringMap> = matches
.get_one::<std::path::PathBuf>("input-file")
.and_then(|p| std::fs::File::open(p).ok())
.and_then(|f| serde_json::from_reader(f).ok());
for input in &self.inputs {
match matches.get_one::<String>(input) {
Some(value) => {
map.insert(input.clone(), value.clone());
continue;
}
None => {
if let Some(value) = input_file.as_ref().and_then(|f| f.get(input)) {
map.insert(input.clone(), value.clone());
continue;
}
}
}
panic!("Key was not found: {input}");
}
map
}
fn simulate_workflow(&self, mut data: HashSet<String>, operations: &[InvocableOperation]) {
// simulate the steps by using a HashSet to traverse the inputs and outputs and ensure
// there's no inconsistencies
for (i, step) in self.steps.iter().enumerate() {
// NOTE: overflow possible but unlikely
let step_index = i + 1;
let step_type = &step.r#type;
// Find the relevant Operation
let Some(invocable) = operations.iter().find(|op| op.name == *step_type) else {
panic!("Could not find operation: {step_type}");
};
// Check if we have the keys we want to pass into the module.
for in_memory_name in step.inputs.values() {
if !data.contains(in_memory_name) && !step.values.contains_key(in_memory_name) {
panic!("Failed simulation: step #{step_index} ({step_type}): missing value {in_memory_name}");
}
}
// Check that the module accepts those keys.
for module_input_name in step.inputs.keys() {
if !invocable
.operation
.arguments
.iter()
.any(|arg| *module_input_name == arg.name)
{
eprintln!("Simulation: step #{step_index} ({step_type}): input value {module_input_name} will be passed through as JSON input");
}
}
// Add the keys we get from the module.
for in_memory_name in step.outputs.values() {
data.insert(in_memory_name.clone());
}
}
}
fn run_workflow(
&self,
mut data: HashMap<String, Value>,
operations: &[InvocableOperation],
config: &[ModuleConfig],
) {
let mut derived_keys = vec![];
let mut derivation_accounts = vec![];
for step in &self.steps {
let operation = operations
.iter()
.find(|op| op.name == step.r#type)
.expect("operation matched step type");
// Load keys from Keyfork, from previously requested workflow
let config = config
.iter()
.find(|module| module.name == operation.module)
.expect("could not find module config");
let algo = &config.algorithm;
let path_prefix = &config.derivation_prefix;
if !derivation_accounts.is_empty() {
derived_keys.extend(derive_keys(
algo.as_ref()
.expect("a module requested keys but didn't provide algorithm"),
path_prefix
.as_ref()
.expect("a module requested keys but didn't provide prefix"),
&derivation_accounts,
));
}
derivation_accounts.clear();
// Prepare all inputs for the operation invocation
//
// NOTE: this could be .clone().into_iter() but it would create an extra allocation of
// the HashMap, and an unnecessary alloc of the key.
let inputs: HashMap<String, Value> = data
.iter()
.map(|(k, v)| (k, v.clone()))
.filter_map(|(k, v)| {
// We have our stored name, `k`, which matches with this inner loop's `v`. We
// need to return our desired name, rather than our stored name, and the value
// in our storage, our current `v`.
let (desired, _stored) = step.inputs.iter().find(|(_, v)| k == *v)?;
Some((desired.clone(), v))
})
.chain(
step.values
.iter()
.map(|(k, v)| (k.clone(), Value::String(v.clone()))),
)
.collect();
let OperationResult {
blob,
derivation_accounts: new_accounts,
} = operation.invoke(&inputs, &derived_keys);
derived_keys.clear();
derivation_accounts.extend(new_accounts);
data.extend(blob.into_iter().filter_map(|(k, v)| {
// We have our stored name, `k`, which matches with this inner loop's `v`. We
// need to return our desired name, rather than our stored name, and the value
// in our storage, our current `v`.
let (_given, stored) = step.outputs.iter().find(|(k1, _)| k == **k1)?;
Some((stored.clone(), v))
}));
}
let last_outputs = &self.steps.last().unwrap().outputs;
data.retain(|stored_name, _| {
last_outputs
.values()
.any(|storage_name| stored_name == storage_name)
});
let json_as_str = serde_json::to_string(&data).unwrap();
println!("{json_as_str}");
}
pub fn handle(&self, matches: &clap::ArgMatches, modules: Commands, config: &[ModuleConfig]) {
let inputs = self.load_inputs(matches);
let data: HashMap<String, Value> = inputs
.into_iter()
.map(|(k, v)| (k, Value::String(v)))
.collect();
let mut operations = vec![];
for (module_name, module_binary, module_operations) in modules {
for operation in module_operations {
let operation_name = &operation.name;
let io = InvocableOperation {
module: module_name.clone(),
name: format!("{module_name}-{operation_name}"),
binary: module_binary.clone(),
operation: operation.clone(),
};
operations.push(io);
}
}
if matches.get_flag("simulate-workflow") {
self.simulate_workflow(data.into_keys().collect(), &operations);
return;
}
self.run_workflow(data, &operations, config);
}
}