Skip to content

Worker SDK

The Worker SDK lets you embed a job executor in your own Rust application with full control over configuration.

Setup

Add the dependency:

[dependencies]
workflow-graph-worker-sdk = { path = "crates/worker-sdk" }
tokio = { version = "1", features = ["full"] }

WorkerConfig

use std::time::Duration;
use workflow_graph_worker_sdk::{Worker, WorkerConfig};
let config = WorkerConfig {
server_url: "http://localhost:3000".into(),
worker_id: uuid::Uuid::new_v4().to_string(),
labels: vec!["docker".into(), "linux".into()],
lease_ttl: Duration::from_secs(30),
poll_interval: Duration::from_secs(2),
heartbeat_interval: Duration::from_secs(10), // must be < lease_ttl
cancellation_check_interval: Duration::from_secs(2),
log_batch_interval: Duration::from_millis(500),
};
let worker = Worker::new(config);
worker.run().await?;
FieldDefaultDescription
server_urlhttp://localhost:3000Server base URL
worker_idAuto-generated UUIDUnique worker identifier
labels[]Capability labels
lease_ttl30sHow long the server waits before reclaiming
poll_interval2sDelay between polls when idle
heartbeat_interval10sMust be less than lease_ttl
cancellation_check_interval2sHow often to check for cancellation
log_batch_interval500msHow often to flush log batches

Running Multiple Workers

Spawn workers with different labels in the same process:

#[tokio::main]
async fn main() {
let docker_worker = Worker::new(WorkerConfig {
labels: vec!["docker".into()],
..Default::default()
});
let gpu_worker = Worker::new(WorkerConfig {
labels: vec!["gpu".into(), "linux".into()],
..Default::default()
});
tokio::spawn(async move { docker_worker.run().await });
tokio::spawn(async move { gpu_worker.run().await });
// Block forever (or until Ctrl+C)
tokio::signal::ctrl_c().await.ok();
}

Graceful Shutdown

The worker handles SIGTERM/SIGINT automatically — it finishes the current job before exiting:

// worker.run() handles Ctrl+C / SIGTERM internally:
// 1. Receives signal
// 2. Finishes executing the current job (if any)
// 3. Reports result to server
// 4. Returns Ok(())
worker.run().await?;

For custom shutdown logic, use tokio::select!:

tokio::select! {
result = worker.run() => {
if let Err(e) = result {
eprintln!("Worker failed: {e}");
}
}
_ = custom_shutdown_signal() => {
println!("Custom shutdown...");
}
}

Retry Backoff

Jobs support configurable backoff strategies when retries are enabled:

StrategyConfigBehavior
NoneBackoffStrategy::NoneRetry immediately (default)
FixedBackoffStrategy::Fixed { delay_secs: 5 }Wait 5s between each retry
ExponentialBackoffStrategy::Exponential { base_secs: 2, max_secs: 60 }Wait 2s, 4s, 8s, 16s… capped at 60s

Backoff is configured per-job via RetryPolicy and enforced by the queue — jobs with pending backoff delay are not claimable until the delay elapses.

Custom Executors

The built-in executor runs shell commands via sh -c. To execute Docker containers, WASM modules, or API calls, fork the executor:

use workflow_graph_worker_sdk::executor::{JobOutput, JobError};
pub async fn execute_docker(
command: &str,
client: &reqwest::Client,
logs_url: &str,
workflow_id: &str,
job_id: &str,
batch_interval: std::time::Duration,
cancel_token: tokio_util::sync::CancellationToken,
) -> Result<JobOutput, JobError> {
let parts: Vec<&str> = command.splitn(2, ' ').collect();
let image = parts[0];
let args = parts.get(1).unwrap_or(&"");
let mut child = tokio::process::Command::new("docker")
.args(["run", "--rm", image, "sh", "-c", args])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| JobError {
message: format!("docker spawn failed: {e}"),
exit_code: None,
})?;
// Stream stdout/stderr as LogChunks (same pattern as executor.rs)
// Handle cancellation via cancel_token
// Return JobOutput with extracted outputs
todo!("wire up log streaming — see executor.rs for the pattern")
}

Extracting Structured Outputs

Parse special lines from stdout (GitHub Actions-style) to pass data between jobs:

let mut outputs = HashMap::new();
for line in stdout_lines {
if let Some(rest) = line.strip_prefix("::set-output ") {
if let Some((name, value)) = rest.split_once('=') {
outputs.insert(name.to_string(), value.to_string());
}
}
}
Ok(JobOutput { outputs })

Then in your workflow YAML:

jobs:
build:
run: |
cargo build --release
echo "::set-output artifact_url=s3://bucket/build-123.tar.gz"
deploy:
needs: [build]
run: deploy.sh