A TypeScript-first workflow engine for Val Town, inspired by Mastra.
Build strongly-typed, composable workflows that run on Val Town's serverless platform.
- Strongly Typed: Full TypeScript inference across workflow definitions
- Composable: Build workflows from reusable steps
- Trigger Agnostic: Works with HTTP, Cron, or Email Val Town triggers
- Durable State: SQLite-backed persistence with ACID guarantees
- Runtime Validation: Zod schema validation for inputs and outputs
- Error Handling: Structured error tracking and recovery
- Visual Workflows: Generate Mermaid diagrams of workflow structure and execution state
import { createStep } from "./backend/index.ts";
import { z } from "npm:zod@^3.23";
const fetchUser = createStep({
id: 'fetch-user',
inputSchema: z.object({ userId: z.string() }),
outputSchema: z.object({ name: z.string(), email: z.string() }),
execute: async ({ inputData }) => {
const user = await db.getUser(inputData.userId);
return { name: user.name, email: user.email };
},
});
import { createWorkflow } from "./backend/index.ts";
const emailWorkflow = createWorkflow({
id: 'email-campaign',
inputSchema: z.object({ userId: z.string() }),
outputSchema: z.object({ sent: z.boolean() }),
})
.then(fetchUser) // Type: { userId: string } -> { name, email }
.then(generateEmail) // Type: { name, email } -> { subject, body }
.then(sendEmail) // Type: { subject, body } -> { sent }
.commit();
// Create a run
const run = await emailWorkflow.createRun();
// Execute
const result = await run.start({
inputData: { userId: 'user-123' },
});
console.log(result); // { sent: true }
wrkflw includes a collection of ready-to-use steps for common tasks:
httpGet - Make GET requests to APIs
import { httpGet, createWorkflow } from "./backend/index.ts";
const workflow = createWorkflow({
id: 'fetch-api-data',
inputSchema: z.object({
url: z.string().url(),
headers: z.record(z.string()).optional(),
queryParams: z.record(z.string()).optional(),
}),
outputSchema: z.object({
status: z.number(),
data: z.unknown(),
headers: z.record(z.string()),
}),
})
.then(httpGet)
.commit();
httpPost - POST JSON data to APIs
import { httpPost } from "./backend/index.ts";
// Use in workflow with inputData: { url, body, headers? }
template - String interpolation with variables
import { template } from "./backend/index.ts";
const workflow = createWorkflow({
inputSchema: z.object({
template: z.string(),
variables: z.record(z.union([z.string(), z.number(), z.boolean()])),
}),
outputSchema: z.object({ result: z.string() }),
})
.then(template)
.commit();
// Input: { template: "Hello {{name}}!", variables: { name: "Alice" } }
// Output: { result: "Hello Alice!" }
filterArray - Filter array items by condition
import { filterArray } from "./backend/index.ts";
// Input: {
// array: [1, 2, 3, 4, 5],
// filterFn: "(n) => n % 2 === 0"
// }
// Output: {
// filtered: [2, 4],
// originalCount: 5,
// filteredCount: 2
// }
mapArray - Transform array items
import { mapArray } from "./backend/index.ts";
// Input: {
// array: [1, 2, 3],
// mapFn: "(n) => n * 2"
// }
// Output: { mapped: [2, 4, 6], count: 3 }
pickFields - Extract specific fields from objects
import { pickFields } from "./backend/index.ts";
// Input: {
// object: { id: 1, name: "Alice", secret: "xyz" },
// fields: ["id", "name"]
// }
// Output: { result: { id: 1, name: "Alice" } }
delay - Wait for a specified duration
import { delay } from "./backend/index.ts";
// Input: { durationMs: 2000 }
// Waits 2 seconds, then outputs: { durationMs: 2000, completedAt: "..." }
logger - Structured logging at different levels
import { logger } from "./backend/index.ts";
// Input: {
// level: "info",
// message: "Processing data",
// data: { count: 10 }
// }
// Logs to console and passes data through
import {
createWorkflow,
createStep,
httpGet,
filterArray,
mapArray,
template
} from "./backend/index.ts";
// Helper step to prepare API request
const prepareRequest = createStep({
id: "prepare-request",
inputSchema: z.object({ userId: z.number() }),
outputSchema: z.object({
url: z.string(),
queryParams: z.record(z.string()).optional(),
}),
execute: async ({ inputData }) => ({
url: "https://api.example.com/posts",
queryParams: { userId: String(inputData.userId) },
}),
});
// Build workflow using prebuilt steps
const workflow = createWorkflow({
id: 'process-user-posts',
inputSchema: z.object({ userId: z.number() }),
outputSchema: z.object({ titles: z.array(z.string()) }),
})
.then(prepareRequest) // Convert input to API request
.then(httpGet) // Fetch from API (prebuilt)
// Add more steps as needed
.commit();
See examples/prebuilt-steps-simple.ts for more examples.
- Create a new val or project in Val Town
- Copy the
backend/directory into your val - Import and use:
import { createStep, createWorkflow } from "./backend/index.ts";
git clone <your-repo> cd wrkflw deno run --allow-all examples/simple-workflow.ts
Create a new HTTP val in Val Town:
import { emailWorkflow } from "./workflows/email-campaign.ts";
export default async function(req: Request) {
const url = new URL(req.url);
const userId = url.searchParams.get("userId");
if (!userId) {
return Response.json({ error: "Missing userId" }, { status: 400 });
}
const run = await emailWorkflow.createRun();
const result = await run.start({ inputData: { userId } });
return Response.json({ success: true, result });
}
Create a new Cron val in Val Town:
import { welcomeWorkflow } from "./workflows/welcome.ts";
export default async function(interval: Interval) {
// Get users who signed up in the last hour
const newUsers = await getRecentUsers();
for (const user of newUsers) {
const run = await welcomeWorkflow.createRun();
await run.start({ inputData: { userId: user.id } });
console.log(`Sent welcome email to ${user.id}`);
}
}
Create a new Email val in Val Town:
import { supportWorkflow } from "./workflows/support.ts";
export default async function(email: Email) {
const run = await supportWorkflow.createRun();
const result = await run.start({
inputData: {
from: email.from,
subject: email.subject,
body: email.text,
},
});
console.log(`Processed support email: ${result.ticketId}`);
}
Create a typed workflow step.
createStep({
id: 'step-id',
description: 'Optional description',
inputSchema: z.object({ /* input type */ }),
outputSchema: z.object({ /* output type */ }),
execute: async ({ inputData, getStepResult, getInitData }) => {
// Your logic here
return { /* matches outputSchema */ };
},
});
Execute Context:
inputData- Validated input for this stepgetStepResult(step)- Get output from a previous stepgetInitData()- Get the workflow's initial inputstate- Workflow-level state (if defined)setState(newState)- Update workflow staterunId- Current run IDworkflowId- Current workflow ID
Create a typed workflow builder.
createWorkflow({
id: 'workflow-id',
description: 'Optional description',
inputSchema: z.object({ /* input type */ }),
outputSchema: z.object({ /* output type */ }),
stateSchema: z.object({ /* optional state */ }),
})
.then(step1)
.then(step2)
.commit();
Builder Methods:
.then(step)- Add a sequential step.commit()- Finalize the workflow
Create a new workflow run instance.
const run = await workflow.createRun(); // Auto-generated ID
// or
const run = await workflow.createRun('custom-run-id');
Execute the workflow.
const result = await run.start({
inputData: { /* matches workflow.inputSchema */ },
initialState: { /* optional, matches workflow.stateSchema */ },
});
Get the current state snapshot.
const snapshot = await run.getSnapshot();
console.log({
status: snapshot.status, // 'running' | 'success' | 'failed'
stepResults: snapshot.stepResults, // Results from each step
executionPath: snapshot.executionPath, // Steps executed
});
The workflow builder automatically infers types through the chain:
const step1 = createStep({
inputSchema: z.object({ a: z.string() }),
outputSchema: z.object({ b: z.number() }),
execute: async ({ inputData }) => ({ b: 42 }),
});
const step2 = createStep({
inputSchema: z.object({ b: z.number() }), // Must match step1's output
outputSchema: z.object({ c: z.boolean() }),
execute: async ({ inputData }) => ({ c: true }),
});
const workflow = createWorkflow({
inputSchema: z.object({ a: z.string() }),
outputSchema: z.object({ c: z.boolean() }),
})
.then(step1) // Accepts { a: string }
.then(step2) // Accepts { b: number } from step1
.commit();
// TypeScript knows the output is { c: boolean }
If step types don't match, TypeScript will show a compile error:
const incompatibleStep = createStep({
inputSchema: z.object({ x: z.string() }), // Error: doesn't match step1 output
outputSchema: z.object({ y: z.string() }),
execute: async ({ inputData }) => ({ y: "value" }),
});
workflow.then(step1).then(incompatibleStep); // TypeScript error
Errors are automatically captured and persisted:
try {
const result = await run.start({ inputData });
} catch (error) {
// Workflow failed - snapshot saved with error
const snapshot = await run.getSnapshot();
console.log(snapshot.status); // 'failed'
console.log(snapshot.error); // Error message
}
Define workflow-level state that persists across steps:
const workflow = createWorkflow({
id: 'stateful-workflow',
inputSchema: z.object({ start: z.number() }),
outputSchema: z.object({ total: z.number() }),
stateSchema: z.object({ counter: z.number() }),
});
const incrementStep = createStep({
id: 'increment',
inputSchema: z.object({ start: z.number() }),
outputSchema: z.object({ value: z.number() }),
execute: async ({ inputData, state, setState }) => {
const newCounter = (state?.counter || 0) + inputData.start;
setState({ counter: newCounter });
return { value: newCounter };
},
});
All workflow runs are persisted to Val Town's SQLite database:
import { WorkflowStorage } from "./backend/index.ts";
const storage = new WorkflowStorage();
await storage.init();
// List all runs
const runs = await storage.listRuns();
// List runs for specific workflow
const workflowRuns = await storage.listRuns('my-workflow-id');
// Get runs by status
const failedRuns = await storage.getRunsByStatus('failed');
wrkflw includes built-in visualization capabilities to help you understand and debug your workflows. You can generate visual diagrams in multiple formats (currently Mermaid, with more formats planned).
Generate a diagram showing the static structure of your workflow:
const workflow = createWorkflow({
id: 'data-processing',
inputSchema: z.object({ url: z.string() }),
outputSchema: z.object({ saved: z.boolean() }),
})
.then(fetchData)
.then(processData)
.then(saveResults)
.commit();
// Generate Mermaid diagram
const diagram = workflow.visualize("mermaid");
console.log(diagram);
Output:
graph TD Start([data-processing]) fetch_data[fetch-data<br/><small>Fetch data from API</small>] Start --> fetch_data process_data[process-data<br/><small>Process and transform data</small>] fetch_data --> process_data save_results[save-results<br/><small>Save to database</small>] process_data --> save_results End([End]) save_results --> End
Generate a diagram showing the current state of a workflow run:
const run = await workflow.createRun();
await run.start({ inputData: { url: "https://api.example.com/data" } });
// Visualize execution state
const executionDiagram = await run.visualize("mermaid", {
highlightCurrentStep: true,
showStatus: true,
});
console.log(executionDiagram);
The execution visualization includes:
- ✓ Checkmarks for completed steps
- ✗ X marks for failed steps
- ● Dots for currently running steps
- Color-coded step states (success, failed, running, pending)
- Error messages for failed steps
Both workflow and execution visualizations support various options:
// Workflow structure options
workflow.visualize("mermaid", {
includeDescriptions: true, // Show step descriptions (default: true)
includeSchemas: false, // Show schema info (not yet implemented)
});
// Execution state options
await run.visualize("mermaid", {
includeDescriptions: true, // Show step descriptions (default: true)
highlightCurrentStep: true, // Highlight the current/last step (default: true)
showStatus: true, // Show status icons (default: true)
showResults: false, // Show step results (default: false)
});
Mermaid diagrams can be rendered in multiple ways:
- GitHub/GitLab: Paste into markdown files (rendered automatically)
- Mermaid Live Editor: Copy to mermaid.live
- VS Code: Use the Mermaid Preview extension
- Documentation sites: Most support Mermaid natively
You can also use the visualization functions directly without calling methods on workflow/run instances:
import { visualizeWorkflow, visualizeExecution } from "./backend/index.ts";
// Visualize workflow structure
const diagram = visualizeWorkflow(workflow, "mermaid", {
includeDescriptions: true,
});
// Visualize execution with snapshot
const snapshot = await run.getSnapshot();
if (snapshot) {
const executionDiagram = visualizeExecution(
workflow,
snapshot,
"mermaid",
{ showStatus: true }
);
}
The visualization API is designed to be extensible. Planned formats include:
- JSON: Graph data for custom visualization libraries (D3, Cytoscape)
- ASCII: Text-based diagrams for CLI/terminal output
- DOT: Graphviz format for advanced layout algorithms
See examples/workflow-visualization.ts for a complete example.
wrkflw includes a powerful React-based canvas visualizer that provides real-time, interactive workflow visualization. Unlike the text-based Mermaid diagrams, the canvas visualizer offers a dynamic, web-based UI for monitoring and debugging workflows.
- Interactive Canvas: Pan, zoom, and navigate workflow diagrams with smooth animations
- Real-time Updates: Watch workflow execution live with auto-refresh
- Step Details: Click on nodes to view outputs, errors, and execution status
- Execution History: Browse past workflow runs and their results
- Multiple Workflows: Switch between different workflows seamlessly
- Status Indicators: Visual feedback for running, success, and failed steps
Option 1: Embedded Visualizer (Easiest)
# Single server with embedded React visualizer deno run --allow-all examples/api-server-example-local.ts # Open http://localhost:8000
Option 2: Full ReactFlow Canvas (Most Features)
- Start the API Server:
# From the root directory deno run --allow-all examples/api-server-example-local.ts
- Start the Frontend (in a separate terminal):
cd frontend npm install npm run dev
- Open the Visualizer:
Visit
http://localhost:3000in your browser
import { createApiServer, WorkflowStorage } from "./backend/index.ts";
// Initialize storage and API server
const storage = new WorkflowStorage();
await storage.init();
const apiServer = createApiServer(storage);
// Register your workflows
apiServer.registerWorkflow(dataProcessingWorkflow, {
url: "https://api.example.com/data" // Example input
});
// Start the server
await apiServer.serve(8000);
The canvas visualizer will automatically:
- Display all registered workflows in the sidebar
- Show workflow structure with connected nodes
- Update in real-time as workflows execute
- Highlight running steps with animations
- Show success/failure states with color coding
- Display step outputs and error messages
The API server exposes the following endpoints for the visualizer:
GET /api/workflows - List all registered workflows
GET /api/workflows/:id - Get workflow details
GET /api/workflows/:id/runs - List runs for a workflow
POST /api/workflows/:id/runs - Create and start a new run
GET /api/runs/:runId - Get detailed run status
┌──────────────────┐ HTTP ┌──────────────────┐
│ React Frontend │ ◄─────────► │ Deno API Server │
│ (Port 3000) │ /api/* │ (Port 8000) │
└──────────────────┘ └──────────────────┘
│ │
│ React Flow │ Workflow Engine
│ Canvas Rendering │ SQLite Storage
▼ ▼
Visualizer UI Execution State
See frontend/README.md for detailed documentation and customization options.
wrkflw/
├── backend/
│ ├── index.ts # Main exports
│ ├── types.ts # TypeScript types
│ ├── step.ts # Step creation
│ ├── workflow.ts # Workflow builder
│ ├── engine.ts # Execution engine
│ ├── storage.ts # SQLite persistence
│ ├── run.ts # Run management
│ ├── visualize.ts # Workflow visualization
│ ├── api-server.ts # API server for canvas visualizer
│ └── prebuilt-steps.ts # Ready-to-use steps
├── frontend/
│ ├── src/
│ │ ├── components/
│ │ │ ├── WorkflowVisualizer.tsx # Main canvas component
│ │ │ ├── StepNode.tsx # Custom step node
│ │ │ ├── WorkflowVisualizer.css
│ │ │ └── StepNode.css
│ │ ├── App.tsx # Main app component
│ │ ├── types.ts # Frontend types
│ │ └── main.tsx # Entry point
│ ├── package.json
│ ├── vite.config.ts
│ └── README.md
├── examples/
│ ├── simple-workflow.ts
│ ├── http-trigger.ts
│ ├── cron-trigger.ts
│ ├── prebuilt-steps-simple.ts
│ ├── workflow-visualization.ts
│ └── api-server-example.ts # Canvas visualizer demo
├── PLAN.md
└── README.md
Here's a full example of a workflow that fetches data, processes it, and sends a notification:
import { createWorkflow, createStep } from "./backend/index.ts";
import { z } from "npm:zod@^3.23";
// Step 1: Fetch data from an API
const fetchData = createStep({
id: 'fetch-data',
inputSchema: z.object({ url: z.string() }),
outputSchema: z.object({ data: z.array(z.any()) }),
execute: async ({ inputData }) => {
const response = await fetch(inputData.url);
const data = await response.json();
return { data };
},
});
// Step 2: Process the data
const processData = createStep({
id: 'process-data',
inputSchema: z.object({ data: z.array(z.any()) }),
outputSchema: z.object({
count: z.number(),
summary: z.string()
}),
execute: async ({ inputData }) => {
return {
count: inputData.data.length,
summary: `Processed ${inputData.data.length} items`,
};
},
});
// Step 3: Send notification
const sendNotification = createStep({
id: 'send-notification',
inputSchema: z.object({ count: z.number(), summary: z.string() }),
outputSchema: z.object({ sent: z.boolean() }),
execute: async ({ inputData, getStepResult }) => {
// Access original data from first step
const originalData = getStepResult(fetchData);
console.log(`Sending notification: ${inputData.summary}`);
// Send to Slack, Discord, etc.
return { sent: true };
},
});
// Build the workflow
export const dataProcessingWorkflow = createWorkflow({
id: 'data-processing',
inputSchema: z.object({ url: z.string() }),
outputSchema: z.object({ sent: z.boolean() }),
})
.then(fetchData)
.then(processData)
.then(sendNotification)
.commit();
// Use in an HTTP val
export default async function(req: Request) {
const { url } = await req.json();
const run = await dataProcessingWorkflow.createRun();
const result = await run.start({ inputData: { url } });
return Response.json(result);
}
┌─────────────────────────────────────────────────────────┐
│ Val Town Trigger │
│ (HTTP / Cron / Email val) │
└─────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Workflow Instance │
│ - createRun() creates a new execution │
│ - start() begins execution │
└─────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Execution Engine │
│ - Validates inputs with Zod schemas │
│ - Executes steps sequentially │
│ - Captures errors and results │
└─────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Step Execution Context │
│ - inputData: validated input │
│ - getStepResult(): access previous steps │
│ - getInitData(): access workflow input │
│ - state/setState: workflow-level state │
└─────────────────┬───────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ SQLite Storage │
│ - Saves snapshot after each step │
│ - Tracks execution path and results │
│ - Enables debugging and recovery │
└─────────────────────────────────────────────────────────┘
Phase 1 (Complete): Core engine with sequential execution
Phase 2: Advanced control flow (parallel, branching, loops)
Phase 3: Durability (suspend/resume, sleep)
Phase 4: Enhanced features (foreach, nested workflows, retries)
Phase 5: Observability (streaming, debugging UI)
This is an experimental project for learning and personal automation. Contributions, ideas, and feedback are welcome.
This project is heavily inspired by Mastra, adapted for Val Town's serverless environment and optimized for personal automation use cases.
MIT