• Blog
  • Docs
  • Pricing
  • We’re hiring!
Log inSign up
nbbaier

nbbaier

lightweightQueue

SQLite-based queue system for Val Town
Public
Like
lightweightQueue
Home
Code
12
.vtignore
AGENTS.md
QUICKSTART.md
README.md
H
api.ts
biome.json
deno.json
processors.ts
H
schema.ts
test.ts
types.ts
C
worker.ts
Branches
1
Pull requests
Remixes
History
Environment variables
Val Town is a collaborative website to build and scale JavaScript apps.
Deploy APIs, crons, & store data – all from the browser, and deployed in milliseconds.
Sign up now
Code
/
README.md
Code
/
README.md
Search
…
Viewing readonly version of main branch: v61
View latest version
README.md

Lightweight Queue System

A SQLite-based task queue for Val Town with priority scheduling, delayed execution, automatic retries, and dead letter queue support. Simple, reliable, and production-ready.

Features

  • ✅ SQLite-backed queue with ACID guarantees
  • ✅ Priority-based task processing
  • ✅ Delayed/scheduled task execution
  • ✅ Automatic retry logic with configurable max attempts
  • ✅ Dead letter queue for failed tasks
  • ✅ Task status tracking (pending, processing, completed, failed)
  • ✅ Built-in monitoring API
  • ✅ Type-safe with TypeScript and Zod validation
  • ✅ Worker processes up to 10 tasks per minute

Quick Start

1. Initialize the Database

curl https://lqw.web.val.run/schema

2. Add Your First Task

curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "log_message", "payload": { "message": "Hello queue!", "level": "info" } }'

3. Check Queue Status

curl https://lqw.web.val.run/monitor

API Reference

POST /enqueue

Enqueue a new task.

Request Body:

{ "task_type": "log_message", "payload": { "message": "Hello", "level": "info" }, "priority": 0, "max_attempts": 3, "scheduled_for": null }

Parameters:

ParameterTypeDefaultDescription
task_typestringrequiredOne of: log_message, send_email, webhook, http_request
payloadobjectrequiredTask-specific data (structure depends on task_type)
prioritynumber0Priority level (higher = processed first)
max_attemptsnumber3Max retry attempts before moving to DLQ
scheduled_fornumbernullUnix timestamp for delayed execution

Response:

{ "success": true, "task_id": "1", "status": "queued", "scheduled_for": null }

GET /schema

Initialize or check the database schema.

Response:

{ "success": true, "message": "Queue schema initialized successfully!", "tables": ["queue", "dead_letter_queue"] }

GET /monitor

Get queue statistics and monitoring data.

Response:

{ "timestamp": "2024-11-23T10:00:00.000Z", "stats": { "pending": { "count": 5, "avg_duration_seconds": null }, "processing": { "count": 1, "avg_duration_seconds": 2.3 }, "completed": { "count": 42, "avg_duration_seconds": 1.5 }, "failed": { "count": 0, "avg_duration_seconds": null } }, "task_types": { "log_message": { "count": 30 }, "send_email": { "count": 15 }, "webhook": { "count": 2 }, "http_request": { "count": 1 } }, "dead_letter_queue": { "count": 2, "recent": [ { "id": 1, "task_type": "send_email", "error": "Invalid email address", "attempts": 3, "created_at": "2024-11-23T09:00:00.000Z" } ] }, "scheduled_tasks": [ { "id": 5, "task_type": "log_message", "scheduled_for": "2024-11-23T11:00:00.000Z", "priority": 0 } ] }

Task Types

log_message

Log a message to the console.

{ "task_type": "log_message", "payload": { "message": "Something happened", "level": "info" } }

Payload:

FieldTypeOptionsDescription
messagestringrequiredMessage to log
levelstring"info", "warning", "error"Log level

send_email

Send an email via Val Town's email service.

{ "task_type": "send_email", "payload": { "to": "user@example.com", "subject": "Hello", "text": "Email body" } }

Payload:

FieldTypeDescription
tostringRecipient email address
subjectstringEmail subject
textstringEmail body text

webhook

Call an external webhook endpoint.

{ "task_type": "webhook", "payload": { "url": "https://example.com/webhook", "method": "POST", "body": { "event": "task_complete" }, "headers": { "Authorization": "Bearer token123" } } }

Payload:

FieldTypeDefaultDescription
urlstringrequiredWebhook URL
methodstring"POST"HTTP method
bodyobjectnullRequest body (JSON)
headersobject{}HTTP headers

http_request

Make an HTTP request to any endpoint.

{ "task_type": "http_request", "payload": { "url": "https://api.example.com/data", "method": "GET", "headers": { "Authorization": "Bearer token" } } }

Payload:

FieldTypeDefaultDescription
urlstringrequiredRequest URL
methodstring"GET"HTTP method
bodyobjectnullRequest body
headersobject{}HTTP headers

Examples

High Priority Task

curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "log_message", "payload": { "message": "Urgent!", "level": "error" }, "priority": 100 }'

Tasks with higher priority values are processed first.

Schedule for Later

# Schedule for 1 hour from now SCHEDULED_TIME=$(($(date +%s) + 3600)) curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d "{ \"task_type\": \"log_message\", \"payload\": { \"message\": \"Scheduled message\", \"level\": \"info\" }, \"scheduled_for\": $SCHEDULED_TIME }"

Configure Retries

curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "webhook", "payload": { "url": "https://example.com/webhook", "method": "POST", "body": { "data": "value" } }, "max_attempts": 5 }'

This task will be retried up to 5 times before moving to the dead letter queue.

Send Email with Queue

curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "send_email", "payload": { "to": "user@example.com", "subject": "Welcome!", "text": "Thanks for signing up" } }'

Adding Custom Task Types

Adding a new task type requires changes to three files: types.ts, processors.ts, and api.ts. Follow this step-by-step guide:

Step 1: Define the Payload Type in types.ts

Add a Zod schema to validate your task payload. This ensures type safety and runtime validation:

export const slackNotificationPayloadSchema = z.object({ webhook_url: z.string().url(), channel: z.string(), message: z.string(), emoji: z.string().optional().default(":robot_face:"), }); export type SlackNotificationPayload = z.infer< typeof slackNotificationPayloadSchema >;

Key points:

  • Use .url(), .email(), etc. for common patterns
  • Use .optional() for optional fields and .default() for defaults
  • Export both the schema and the inferred type

Step 2: Create the Processor in processors.ts

Create an async function that processes the task. This function is called by the worker:

async function processSlackNotification(payload: SlackNotificationPayload) { const { webhook_url, channel, message, emoji } = payload; const response = await fetch(webhook_url, { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ channel, text: message, icon_emoji: emoji, }), }); if (!response.ok) { throw new Error( `Slack webhook failed: ${response.status} ${response.statusText}` ); } console.log(`Slack message sent to ${channel}`); }

Best practices:

  • Always validate external responses with if (!response.ok)
  • Throw descriptive errors - they'll be logged and trigger retries
  • Log success for monitoring
  • Handle timeouts with explicit fetch timeouts: { timeout: 10000 }

Step 3: Register in the ProcessorMap

Update the ProcessorMap type and processors object in processors.ts:

type ProcessorMap = { send_email: EmailPayload; webhook: WebhookPayload; log_message: LogMessagePayload; http_request: HttpRequestPayload; slack_notification: SlackNotificationPayload; // Add here }; const payloadSchemas = { send_email: emailPayloadSchema, webhook: webhookPayloadSchema, log_message: logMessagePayloadSchema, http_request: httpRequestPayloadSchema, slack_notification: slackNotificationPayloadSchema, // Add here } as const; const processors: { [K in TaskType]: Processor<K>; } = { send_email: processSendEmail, webhook: processWebhook, log_message: processLogMessage, http_request: processHttpRequest, slack_notification: processSlackNotification, // Add here };

Step 4: Update VALID_TASK_TYPES in api.ts

Add your task type to the VALID_TASK_TYPES array:

export const VALID_TASK_TYPES = [ "send_email", "webhook", "log_message", "http_request", "slack_notification", // Add here ] as const;

Testing Your Task Type

Once added, test by enqueueing a task:

curl -X POST https://your-val.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "slack_notification", "payload": { "webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL", "channel": "#notifications", "message": "Test message", "emoji": ":bell:" } }'

Then check the monitor to see if it succeeded:

curl https://your-val.web.val.run/monitor | jq '.task_types'

Common Patterns

Task with retryable external API:

async function processDataSync(payload: DataSyncPayload) { const response = await fetch(payload.api_url, { timeout: 30000, headers: payload.headers, }); // 5xx errors should retry; 4xx should fail permanently if (response.status >= 500) { throw new Error(`API error: ${response.status}`); } if (!response.ok) { throw new Error(`Data sync failed: ${response.status} - won't retry`); } const data = await response.json(); console.log("Data synced successfully"); }

Task that updates local state:

async function processDataAggregation(payload: DataAggregationPayload) { // Fetch data const results = await Promise.all( payload.sources.map((url) => fetch(url).then((r) => r.json())) ); // Store aggregated result (e.g., in blob storage) // await blob.setJSON(`aggregation_${Date.now()}`, results); console.log("Aggregation complete"); }

Task with retry-limiting logic:

async function processExpensiveOperation(payload: ExpensiveOpPayload) { // Some tasks you only want to retry a few times if (payload.max_retries === 0) { // Handle no-retry case console.log("One-shot operation"); } // Your expensive logic here }

Validation and Error Messages

The Zod schemas automatically validate payloads. Invalid payloads get rejected at enqueue time:

# This will fail with validation error (missing required fields) curl -X POST https://your-val.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "slack_notification", "payload": { "channel": "#notifications" # Missing webhook_url and message } }'

Response:

{ "error": "Validation error: webhook_url is required" }

Debugging Task Types

If your task fails:

  1. Check dead letter queue via /monitor endpoint
  2. Look for error messages in the recent failures section
  3. Check Val Town logs for console output
  4. Test manually by calling your processor function directly
  5. Use type-safety - TypeScript will catch most issues at dev time

Checklist for New Task Types

  • Zod schema defined in types.ts with all required fields
  • Type exported from types.ts
  • Processor function added to processors.ts
  • Error handling: throws descriptive errors on failure
  • Type added to ProcessorMap
  • Schema added to payloadSchemas
  • Processor added to processors object
  • Task type added to VALID_TASK_TYPES in api.ts
  • Tested with /enqueue endpoint
  • Verified in /monitor endpoint stats

Architecture

FilePurpose
api.tsHTTP handler with endpoints: /schema, /enqueue, /monitor
schema.tsDatabase schema initialization and validation
types.tsTypeScript types and Zod validation schemas
processors.tsTask execution logic for each task type
worker.tsWorker that processes queued tasks (runs every minute)

Database Schema

queue Table

ColumnTypeDescription
idINTEGERPrimary key
task_typeTEXTTask type (e.g., "send_email")
payloadTEXTJSON-encoded task data
statusTEXTTask status: pending, processing, completed, failed
created_atINTEGERUnix timestamp
started_atINTEGERUnix timestamp when processing started
completed_atINTEGERUnix timestamp when task completed
scheduled_forINTEGERUnix timestamp for delayed execution
attemptsINTEGERNumber of attempts made
max_attemptsINTEGERMaximum attempts before DLQ
priorityINTEGERTask priority (higher = first)
errorTEXTError message from last failed attempt

dead_letter_queue Table

Contains tasks that failed after exhausting all retry attempts. Has the same columns as queue plus:

ColumnTypeDescription
moved_to_dlq_atINTEGERUnix timestamp when moved to DLQ

Worker Behavior

The worker runs every minute and:

  1. Selects up to 10 pending tasks sorted by priority and creation time
  2. Skips tasks with scheduled_for in the future
  3. Marks task as "processing"
  4. Executes the task processor
  5. On success: marks task as "completed"
  6. On failure:
    • If attempts < max_attempts: resets status to "pending" for retry
    • If attempts >= max_attempts: moves task to dead letter queue

Error Handling

  • Task failures are logged with error messages
  • Failed tasks are automatically retried based on max_attempts
  • After max retries, tasks are moved to the dead letter queue
  • The /monitor endpoint shows recent failures for debugging

Best Practices

  1. Use meaningful task types - makes monitoring easier
  2. Set reasonable priorities - avoid inversions
  3. Configure max_attempts - based on your tolerance for retries
  4. Monitor the dead letter queue - investigate failures
  5. Use scheduled_for - for distributed task timing
  6. Keep payloads small - SQLite isn't a document DB
  7. Validate payloads - Zod schemas catch errors early

Limits

  • Maximum 10 tasks processed per minute
  • SQLite row size limit ~1GB (payloads should be much smaller)
  • Task payloads are JSON-serialized strings
FeaturesVersion controlCode intelligenceCLIMCP
Use cases
TeamsAI agentsSlackGTM
DocsShowcaseTemplatesNewestTrendingAPI examplesNPM packages
PricingNewsletterBlogAboutCareers
We’re hiring!
Brandhi@val.townStatus
X (Twitter)
Discord community
GitHub discussions
YouTube channel
Bluesky
Open Source Pledge
Terms of usePrivacy policyAbuse contact
© 2025 Val Town, Inc.