| name: | Issue Tracker Pipeline Refactor | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| overview: | Refactor the cron jobs into a composable pipeline architecture where sources, filters, and sinks are decoupled, allowing arbitrary combinations like "stalest issues with specific labels". | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| todos: |
|
The three cron jobs duplicate the same core loop (fetch, filter, dedup, send, record) with variations in:
Adding a "stalest + labeled" layer requires merging two separate files manually.
Rendering mermaid diagram...
interface GitHubIssue {
number: number;
title: string;
updated_at: string;
created_at: string;
labels?: Array<{ name: string } | string>;
}
interface IssueSource {
name: string;
getIssuesSinceLastRun(ctx: PipelineContext): AsyncIterable<GitHubIssue[]>;
updateState(ctx: PipelineContext): Promise<void>;
}
interface IssueFilter {
name: string;
filter(issues: GitHubIssue[], ctx: PipelineContext): Promise<GitHubIssue[]>;
}
interface IssueSink {
name: string;
getGoalForIssue(issue: GitHubIssue): string | null;
}
interface LayerConfig {
name: string; // e.g., "tagged-stalest"
source: IssueSource;
filters: IssueFilter[]; // Applied in order
sink: IssueSink;
}
allSinceLastRun.ts - Uses lastRunState.ts to track timestamp per layerstalestSnapshot.ts - Uses stalestSnapshot.ts for stalest-first trackingbyLabel.ts - Filter issues by label rules (extracted from third-layer.cron.tsx lines 27-67)byStaleness.ts - Filter to only stalest N issues (extracted from only-oldest-issues.cron.tsx lines 118-151)byCreatedAge.ts - Skip issues created today (from main.cron.tsx lines 81-93)singleGoal.ts - Always routes to one configured goallabelRouted.ts - Routes based on label matching rulesThis replaces the duplicated loop in all three cron files:
export async function runPipeline(layer: LayerConfig): Promise<void> {
const ctx = createContext(layer);
// 1. Get issues from source
const rawIssues = await collectIssues(layer.source, ctx);
// 2. Apply filters in sequence
let issues = rawIssues;
for (const filter of layer.filters) {
issues = await filter.filter(issues, ctx);
}
// 3. Dedup and send to sink
for (const issue of issues) {
const goal = layer.sink.getGoalForIssue(issue);
if (!goal) continue;
if (await alreadySentToday(issue.number, goal)) continue;
await sendToBeeminder(issue, goal);
await recordSent(issue.number, goal);
}
// 4. Update source state for next run
await layer.source.updateState(ctx);
}
Each cron file becomes a thin configuration layer:
// main.cron.tsx
import { runPipeline } from "./backend/pipeline/runner.ts";
import { allSinceLastRun } from "./backend/pipeline/sources/allSinceLastRun.ts";
import { byCreatedAge } from "./backend/pipeline/filters/byCreatedAge.ts";
import { singleGoal } from "./backend/pipeline/sinks/singleGoal.ts";
export default () => runPipeline({
name: "updatedIssues",
source: allSinceLastRun("main"),
filters: [byCreatedAge()],
sink: singleGoal(config.beeminder.updatedIssues),
});
// fourth-layer.cron.tsx (the new "stalest + tagged" layer)
export default () => runPipeline({
name: "tagged-stalest",
source: stalestSnapshot(),
filters: [byStaleness(), byLabel(taggedRules)],
sink: labelRouted(taggedRules),
});
main.cron.tsx as simplest)backend/
pipeline/
types.ts
runner.ts
context.ts
sources/
allSinceLastRun.ts
stalestSnapshot.ts
filters/
byLabel.ts
byStaleness.ts
byCreatedAge.ts
sinks/
singleGoal.ts
labelRouted.ts
database/
dailyTracking.ts (unchanged)
lastRunState.ts (unchanged)
stalestSnapshot.ts (unchanged)