You have a cron job that processes research questions against multiple transcripts using OpenAI, then updates a Notion report page. The original implementation had two problems:
Rough notes: does the openai service first ask a question of all transcripts, generate an answer per question, and then look for consensus across all 10 answers from all 10 transcripts? do not write code, just answer the question.
Yes, exactly! The OpenAI service uses a two-stage approach:
Stage 1 (Individual Analysis):
Takes each transcript individually Asks the same research question to each transcript separately Generates an individual analysis for each transcript that includes: A stance (supports/mixed/contradicts/not enough evidence) Key findings Supporting quotes Rationale Confidence score Stage 2 (Consensus Synthesis):
Takes all the individual analyses from Stage 1 Feeds them to a second OpenAI call that acts as a "senior research synthesizer" This second call looks across all the individual answers to determine: Overall consensus level (high/medium/low) A direct answer that synthesizes all individual responses Key points most transcripts agree on Notable disagreements or outliers Overall confidence score So if you have 10 transcripts, it makes 10 individual OpenAI calls in Stage 1, then 1 synthesis call in Stage 2 (total of 11 API calls per question). The final answer is based on finding patterns and consensus across all 10 individual transcript analyses.
Implement a 2-stage OpenAI analysis with incremental Notion updates for real-time progress visibility and fault tolerance.
This implementation follows Val Town's layered architecture:
findings-analysis.ts
): Simple trigger, calls controllerfindings.controller.ts
): Business logic orchestration,
environment variables, error handlingopenai.service.ts
, notion.service.ts
): Pure API calls with
consistent response formatfindings.types.ts
): Shared interfacesKey principle: Services only make API calls and return
{ success, data?, error? }
. Controllers handle business logic and coordinate
multiple service calls.
Replace entire file with 2-stage implementation:
export interface ConsensusAnalysisRequest {
question: string;
transcripts: string[];
apiKey: string;
}
export interface ConsensusAnswer {
question: string;
consensusLevel: "high" | "medium" | "low";
directAnswer: string;
keyPoints: string[];
disagreements: string[];
confidence: number; // 0..1
methodologyNote: string;
perTranscriptFindings: PerTranscriptFinding[];
}
export interface ConsensusAnalysisResponse {
success: boolean;
data?: ConsensusAnswer;
error?: string;
timestamp: string;
}
Function:
analyzeIndividualTranscript(openai, model, question, transcript, transcriptKey, maxChars)
Implementation details:
Use openai.chat.completions.create()
with actual OpenAI client
Model: gpt-4o-mini
(cost efficient)
System prompt: "You are a meticulous research analyst. Analyze the provided interview transcript to answer a specific research question."
Response format: { type: "json_object" }
Expected JSON structure:
{
"stance": "supports|mixed|contradicts|not enough evidence",
"keyFindings": ["finding 1", "finding 2"],
"supportingQuotes": ["quote 1", "quote 2"],
"rationale": "Brief explanation",
"confidence": 0.85
}
Error handling: Return fallback analysis with stance "not enough evidence" and confidence 0.0
Truncate transcripts to maxChars (default 15,000) to avoid token limits
Function: synthesizeConsensus(openai, model, question, individualAnalyses)
Implementation details:
Use openai.chat.completions.create()
with actual OpenAI client
Model: gpt-4o
(high quality reasoning)
System prompt: "You are a senior research synthesizer. You will receive multiple individual analyses of different transcripts for the same research question."
Input: Formatted string of all individual analyses
Response format: { type: "json_object" }
Expected JSON structure:
{
"consensusLevel": "high|medium|low",
"directAnswer": "Clear, direct answer",
"keyPoints": ["point 1", "point 2"],
"disagreements": ["disagreement 1"],
"confidence": 0.85,
"methodologyNote": "Brief explanation"
}
Consensus level logic:
Error handling: Create fallback consensus using stance distribution from Stage 1 results
Function: mapWithLimit(items, limit, fn)
Function: analyzeTranscriptsWithConsensus(request, options?)
Flow:
ConsensusAnalysisResponse
formatError handling:
Add these functions following service pattern:
export async function deletePageContent(pageId: string) {
try {
const existingBlocks = await notion.blocks.children.list({ block_id: pageId });
for (const block of existingBlocks.results) {
await notion.blocks.delete({ block_id: block.id });
}
return { success: true, data: { deletedBlocks: existingBlocks.results.length }, timestamp: new Date().toISOString() };
} catch (error) {
return { success: false, error: error.message, timestamp: new Date().toISOString() };
}
}
export async function appendPageContent(pageId: string, blocks: any[]) {
try {
const response = await notion.blocks.children.append({ block_id: pageId, children: blocks });
return { success: true, data: response, timestamp: new Date().toISOString() };
} catch (error) {
return { success: false, error: error.message, timestamp: new Date().toISOString() };
}
}
Service responsibilities:
{ success, data?, error?, timestamp }
response formatController responsibilities:
OPENAI_API_KEY
, database IDs)1. Import new Notion functions:
import { deletePageContent, appendPageContent } from "../services/notion.service.ts";
2. Early initialization (after finding report page):
// Set status to "Working..."
const statusUpdateResult = await updatePageStatus(reportPageId, "Working...");
// Clear existing content (clean slate)
const clearContentResult = await deletePageContent(reportPageId);
3. Replace batch processing with incremental loop:
// OLD: Collect all results, then update once
const analyses: EnhancedQuestionAnalysis[] = [];
for (const question of questions) {
// analyze and push to analyses array
}
const contentBlocks = formatEnhancedReportContent(analyses);
await updatePageContent(reportPageId, contentBlocks);
// NEW: Process and immediately append each question
for (const question of questions) {
const analysisResult = await analyzeTranscriptsWithConsensus({...});
if (analysisResult.success) {
const questionAnalysis = { question: question.name, consensusAnswer: analysisResult.data };
const contentBlocks = formatEnhancedReportContent([questionAnalysis]);
await appendPageContent(reportPageId, contentBlocks);
} else {
const errorBlocks = createErrorBlock(question.name, analysisResult.error);
await appendPageContent(reportPageId, errorBlocks);
}
}
4. Add error block creation:
function createErrorBlock(questionName: string, errorMessage: string): any[] {
return [
{ object: "block", type: "heading_2", heading_2: { rich_text: [{ type: "text", text: { content: questionName } }] } },
{ object: "block", type: "callout", callout: {
icon: { emoji: "❌" },
rich_text: [{ type: "text", text: { content: `Error: ${errorMessage}` }, annotations: { bold: true, color: "red" } }]
}},
{ object: "block", type: "divider", divider: {} }
];
}
5. Track success metrics:
let questionsProcessed = 0;
let questionsSucceeded = 0;
// Increment in loop, return both in response
Business logic handled by controller:
Add to ProcessFindingsResponse:
questionsSucceeded?: number;
Type responsibilities:
Add logging for success metrics:
if (result.success) {
console.log(`✅ Questions succeeded: ${result.questionsSucceeded}`);
if (result.questionsSucceeded !== result.questionsProcessed) {
console.log(`⚠️ Some questions failed: ${result.questionsProcessed - result.questionsSucceeded} errors`);
}
}
Cron responsibilities:
Create temporary test endpoint:
// /run-findings-cron.ts (HTTP)
import findingsAnalysisCron from "./backend/crons/findings-analysis.ts";
export default async function(req: Request) {
const result = await findingsAnalysisCron();
return new Response(JSON.stringify({ message: "Cron execution completed", result }, null, 2));
}
Test scenarios:
Remove test endpoint after verification
Separation of concerns achieved:
For 30 questions, users will see:
Performance:
Add timing capture at the start of processing:
// After clearing page content, capture start time
const processingStartTime = new Date();
console.log(`⏱️ Starting question processing at ${processingStartTime.toISOString()}`);
Add timing capture after all questions are processed:
// After the incremental processing loop, before final status update
const processingEndTime = new Date();
const totalTimeMs = processingEndTime.getTime() - processingStartTime.getTime();
const totalMinutes = Math.floor(totalTimeMs / 60000);
const totalSeconds = Math.floor((totalTimeMs % 60000) / 1000);
console.log(`⏱️ Completed question processing at ${processingEndTime.toISOString()}`);
console.log(`⏱️ Total processing time: ${totalMinutes}m ${totalSeconds}s`);
// Create timing summary block
const timingSummaryBlocks = createTimingSummaryBlock(totalMinutes, totalSeconds, questionsProcessed, questionsSucceeded);
// Append timing summary as final block
const timingAppendResult = await appendPageContent(reportPageId, timingSummaryBlocks);
if (!timingAppendResult.success) {
console.error(`⚠️ Failed to append timing summary: ${timingAppendResult.error}`);
}
Add timing summary block creation function:
/**
* Create timing summary block for the end of the report
*/
function createTimingSummaryBlock(minutes: number, seconds: number, totalQuestions: number, successfulQuestions: number): any[] {
const timeDisplay = minutes > 0 ? `${minutes}m ${seconds}s` : `${seconds}s`;
const successRate = Math.round((successfulQuestions / totalQuestions) * 100);
return [
{
object: "block",
type: "divider",
divider: {}
},
{
object: "block",
type: "heading_3",
heading_3: {
rich_text: [
{
type: "text",
text: {
content: "📊 Report Generation Summary"
}
}
]
}
},
{
object: "block",
type: "callout",
callout: {
icon: { emoji: "⏱️" },
rich_text: [
{
type: "text",
text: {
content: `Total time to generate report: ${timeDisplay}`
},
annotations: { bold: true }
}
]
}
},
{
object: "block",
type: "paragraph",
paragraph: {
rich_text: [
{
type: "text",
text: {
content: `Questions processed: ${successfulQuestions}/${totalQuestions} (${successRate}% success rate)`
}
}
]
}
},
{
object: "block",
type: "paragraph",
paragraph: {
rich_text: [
{
type: "text",
text: {
content: `Generated on: ${new Date().toLocaleString()}`
},
annotations: { italic: true, color: "gray" }
}
]
}
}
];
}
The complete timing flow:
At the bottom of every report, users will see:
📊 Report Generation Summary
⏱️ Total time to generate report: 3m 45s
Questions processed: 28/30 (93% success rate)
Generated on: 8/15/2025, 3:45:23 PM
Benefits:
Implementation notes: