Public
Like
lightweightQueue
Val Town is a collaborative website to build and scale JavaScript apps.
Deploy APIs, crons, & store data – all from the browser, and deployed in milliseconds.
Viewing readonly version of main branch: v33View latest version
A SQLite-based queue system for Val Town with support for priorities, delayed execution, retries, and dead letter queue.
- Initialize the database schema (run once):
# Visit the schema.ts val and run it https://www.val.town/v/nbbaier/lightweightQueue/schema.ts
- The worker will automatically run every minute to process queued tasks
POST to the enqueue.ts endpoint:
curl -X POST https://lightweightqueue-enqueue.val.run/ \ -H "Content-Type: application/json" \ -d '{ "task_type": "log_message", "payload": { "message": "Hello from the queue!", "level": "info" } }'
task_type(required): Type of task to runpayload(required): Data for the taskpriority(optional, default: 0): Higher numbers = higher prioritymax_attempts(optional, default: 3): Max retry attemptsscheduled_for(optional): Unix timestamp for delayed execution
Built-in task types in processors.ts:
- send_email
{ "task_type": "send_email", "payload": { "to": "user@example.com", "subject": "Test Email", "text": "This is a test" } }
- webhook
{ "task_type": "webhook", "payload": { "url": "https://example.com/webhook", "method": "POST", "body": { "data": "value" }, "headers": { "Authorization": "Bearer token" } } }
- log_message
{ "task_type": "log_message", "payload": { "message": "Something happened", "level": "info" } }
- http_request
{ "task_type": "http_request", "payload": { "url": "https://api.example.com/data", "method": "GET" } }
curl -X POST https://lightweightqueue-enqueue.val.run/ \ -H "Content-Type: application/json" \ -d '{ "task_type": "log_message", "payload": { "message": "High priority task!" }, "priority": 10 }'
# Schedule task for 1 hour from now SCHEDULED_TIME=$(($(date +%s) + 3600)) curl -X POST https://lightweightqueue-enqueue.val.run/ \ -H "Content-Type: application/json" \ -d "{ \"task_type\": \"log_message\", \"payload\": { \"message\": \"This is delayed!\" }, \"scheduled_for\": $SCHEDULED_TIME }"
Visit the monitoring endpoint:
https://lightweightqueue-monitor.val.run/
This shows:
- Task counts by status
- Average processing duration
- Task type breakdown
- Dead letter queue count
- Recent failures
- Upcoming scheduled tasks
Edit processors.ts and add a new case to the processTask function:
case "my_custom_task":
await processMyCustomTask(payload);
break;
Then implement your processor function:
async function processMyCustomTask(payload: any) {
// Your custom logic here
console.log("Processing custom task:", payload);
}
- schema.ts: Database initialization
- enqueue.ts: HTTP endpoint to add tasks
- worker.ts: Interval val that processes tasks every minute
- processors.ts: Task processing logic
- monitor.ts: HTTP endpoint for queue statistics
- ✅ Priority-based processing
- ✅ Delayed/scheduled execution
- ✅ Automatic retries with configurable max attempts
- ✅ Dead letter queue for failed tasks
- ✅ Task status tracking (pending, processing, completed, failed)
- ✅ Built-in monitoring endpoint
- ✅ Processing duration tracking
- ✅ Extensible task processors
- Worker processes up to 10 tasks per minute
- Failed tasks are retried up to
max_attemptstimes - Tasks exceeding max attempts move to dead letter queue
- SQLite provides ACID guarantees for queue operations