A SQLite-based task queue for Val Town with priority scheduling, delayed execution, automatic retries, and dead letter queue support. Simple, reliable, and production-ready.
curl https://lqw.web.val.run/schema
curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "log_message", "payload": { "message": "Hello queue!", "level": "info" } }'
curl https://lqw.web.val.run/monitor
Enqueue a new task.
Request Body:
{ "task_type": "log_message", "payload": { "message": "Hello", "level": "info" }, "priority": 0, "max_attempts": 3, "scheduled_for": null }
Parameters:
| Parameter | Type | Default | Description |
|---|---|---|---|
task_type | string | required | One of: log_message, send_email, webhook, http_request |
payload | object | required | Task-specific data (structure depends on task_type) |
priority | number | 0 | Priority level (higher = processed first) |
max_attempts | number | 3 | Max retry attempts before moving to DLQ |
scheduled_for | number | null | Unix timestamp for delayed execution |
Response:
{ "success": true, "task_id": "1", "status": "queued", "scheduled_for": null }
Initialize or check the database schema.
Response:
{ "success": true, "message": "Queue schema initialized successfully!", "tables": ["queue", "dead_letter_queue"] }
Get queue statistics and monitoring data.
Response:
{ "timestamp": "2024-11-23T10:00:00.000Z", "stats": { "pending": { "count": 5, "avg_duration_seconds": null }, "processing": { "count": 1, "avg_duration_seconds": 2.3 }, "completed": { "count": 42, "avg_duration_seconds": 1.5 }, "failed": { "count": 0, "avg_duration_seconds": null } }, "task_types": { "log_message": { "count": 30 }, "send_email": { "count": 15 }, "webhook": { "count": 2 }, "http_request": { "count": 1 } }, "dead_letter_queue": { "count": 2, "recent": [ { "id": 1, "task_type": "send_email", "error": "Invalid email address", "attempts": 3, "created_at": "2024-11-23T09:00:00.000Z" } ] }, "scheduled_tasks": [ { "id": 5, "task_type": "log_message", "scheduled_for": "2024-11-23T11:00:00.000Z", "priority": 0 } ] }
Log a message to the console.
{ "task_type": "log_message", "payload": { "message": "Something happened", "level": "info" } }
Payload:
| Field | Type | Options | Description |
|---|---|---|---|
message | string | required | Message to log |
level | string | "info", "warning", "error" | Log level |
Send an email via Val Town's email service.
{ "task_type": "send_email", "payload": { "to": "user@example.com", "subject": "Hello", "text": "Email body" } }
Payload:
| Field | Type | Description |
|---|---|---|
to | string | Recipient email address |
subject | string | Email subject |
text | string | Email body text |
Call an external webhook endpoint.
{ "task_type": "webhook", "payload": { "url": "https://example.com/webhook", "method": "POST", "body": { "event": "task_complete" }, "headers": { "Authorization": "Bearer token123" } } }
Payload:
| Field | Type | Default | Description |
|---|---|---|---|
url | string | required | Webhook URL |
method | string | "POST" | HTTP method |
body | object | null | Request body (JSON) |
headers | object | {} | HTTP headers |
Make an HTTP request to any endpoint.
{ "task_type": "http_request", "payload": { "url": "https://api.example.com/data", "method": "GET", "headers": { "Authorization": "Bearer token" } } }
Payload:
| Field | Type | Default | Description |
|---|---|---|---|
url | string | required | Request URL |
method | string | "GET" | HTTP method |
body | object | null | Request body |
headers | object | {} | HTTP headers |
curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "log_message", "payload": { "message": "Urgent!", "level": "error" }, "priority": 100 }'
Tasks with higher priority values are processed first.
# Schedule for 1 hour from now SCHEDULED_TIME=$(($(date +%s) + 3600)) curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d "{ \"task_type\": \"log_message\", \"payload\": { \"message\": \"Scheduled message\", \"level\": \"info\" }, \"scheduled_for\": $SCHEDULED_TIME }"
curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "webhook", "payload": { "url": "https://example.com/webhook", "method": "POST", "body": { "data": "value" } }, "max_attempts": 5 }'
This task will be retried up to 5 times before moving to the dead letter queue.
curl -X POST https://lqw.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "send_email", "payload": { "to": "user@example.com", "subject": "Welcome!", "text": "Thanks for signing up" } }'
Adding a new task type requires changes to three files: types.ts, processors.ts, and api.ts. Follow this step-by-step guide:
Add a Zod schema to validate your task payload. This ensures type safety and runtime validation:
export const slackNotificationPayloadSchema = z.object({
webhook_url: z.string().url(),
channel: z.string(),
message: z.string(),
emoji: z.string().optional().default(":robot_face:"),
});
export type SlackNotificationPayload = z.infer<
typeof slackNotificationPayloadSchema
>;
Key points:
.url(), .email(), etc. for common patterns.optional() for optional fields and .default() for defaultsCreate an async function that processes the task. This function is called by the worker:
async function processSlackNotification(payload: SlackNotificationPayload) {
const { webhook_url, channel, message, emoji } = payload;
const response = await fetch(webhook_url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
channel,
text: message,
icon_emoji: emoji,
}),
});
if (!response.ok) {
throw new Error(
`Slack webhook failed: ${response.status} ${response.statusText}`
);
}
console.log(`Slack message sent to ${channel}`);
}
Best practices:
if (!response.ok){ timeout: 10000 }Update the ProcessorMap type and processors object in processors.ts:
type ProcessorMap = {
send_email: EmailPayload;
webhook: WebhookPayload;
log_message: LogMessagePayload;
http_request: HttpRequestPayload;
slack_notification: SlackNotificationPayload; // Add here
};
const payloadSchemas = {
send_email: emailPayloadSchema,
webhook: webhookPayloadSchema,
log_message: logMessagePayloadSchema,
http_request: httpRequestPayloadSchema,
slack_notification: slackNotificationPayloadSchema, // Add here
} as const;
const processors: {
[K in TaskType]: Processor<K>;
} = {
send_email: processSendEmail,
webhook: processWebhook,
log_message: processLogMessage,
http_request: processHttpRequest,
slack_notification: processSlackNotification, // Add here
};
Add your task type to the VALID_TASK_TYPES array:
export const VALID_TASK_TYPES = [
"send_email",
"webhook",
"log_message",
"http_request",
"slack_notification", // Add here
] as const;
Once added, test by enqueueing a task:
curl -X POST https://your-val.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "slack_notification", "payload": { "webhook_url": "https://hooks.slack.com/services/YOUR/WEBHOOK/URL", "channel": "#notifications", "message": "Test message", "emoji": ":bell:" } }'
Then check the monitor to see if it succeeded:
curl https://your-val.web.val.run/monitor | jq '.task_types'
Task with retryable external API:
async function processDataSync(payload: DataSyncPayload) {
const response = await fetch(payload.api_url, {
timeout: 30000,
headers: payload.headers,
});
// 5xx errors should retry; 4xx should fail permanently
if (response.status >= 500) {
throw new Error(`API error: ${response.status}`);
}
if (!response.ok) {
throw new Error(`Data sync failed: ${response.status} - won't retry`);
}
const data = await response.json();
console.log("Data synced successfully");
}
Task that updates local state:
async function processDataAggregation(payload: DataAggregationPayload) {
// Fetch data
const results = await Promise.all(
payload.sources.map((url) => fetch(url).then((r) => r.json()))
);
// Store aggregated result (e.g., in blob storage)
// await blob.setJSON(`aggregation_${Date.now()}`, results);
console.log("Aggregation complete");
}
Task with retry-limiting logic:
async function processExpensiveOperation(payload: ExpensiveOpPayload) {
// Some tasks you only want to retry a few times
if (payload.max_retries === 0) {
// Handle no-retry case
console.log("One-shot operation");
}
// Your expensive logic here
}
The Zod schemas automatically validate payloads. Invalid payloads get rejected at enqueue time:
# This will fail with validation error (missing required fields) curl -X POST https://your-val.web.val.run/enqueue \ -H "Content-Type: application/json" \ -d '{ "task_type": "slack_notification", "payload": { "channel": "#notifications" # Missing webhook_url and message } }'
Response:
{ "error": "Validation error: webhook_url is required" }
If your task fails:
/monitor endpointtypes.ts with all required fieldstypes.tsprocessors.tsProcessorMappayloadSchemasprocessors objectVALID_TASK_TYPES in api.ts/enqueue endpoint/monitor endpoint stats| File | Purpose |
|---|---|
api.ts | HTTP handler with endpoints: /schema, /enqueue, /monitor |
schema.ts | Database schema initialization and validation |
types.ts | TypeScript types and Zod validation schemas |
processors.ts | Task execution logic for each task type |
worker.ts | Worker that processes queued tasks (runs every minute) |
| Column | Type | Description |
|---|---|---|
id | INTEGER | Primary key |
task_type | TEXT | Task type (e.g., "send_email") |
payload | TEXT | JSON-encoded task data |
status | TEXT | Task status: pending, processing, completed, failed |
created_at | INTEGER | Unix timestamp |
started_at | INTEGER | Unix timestamp when processing started |
completed_at | INTEGER | Unix timestamp when task completed |
scheduled_for | INTEGER | Unix timestamp for delayed execution |
attempts | INTEGER | Number of attempts made |
max_attempts | INTEGER | Maximum attempts before DLQ |
priority | INTEGER | Task priority (higher = first) |
error | TEXT | Error message from last failed attempt |
Contains tasks that failed after exhausting all retry attempts. Has the same columns as queue plus:
| Column | Type | Description |
|---|---|---|
moved_to_dlq_at | INTEGER | Unix timestamp when moved to DLQ |
The worker runs every minute and:
scheduled_for in the futuremax_attempts/monitor endpoint shows recent failures for debugging