From f072c96c86e8588abd9ea36394bd9851fe3f433e Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Sun, 1 Feb 2026 17:40:11 -0800 Subject: [PATCH 1/2] feat(debounce): add maxDelay option to limit total debounce time Add a per-trigger maxDelay option that limits how long a debounced run can be delayed. This ensures execution happens within a specified window even with continuous triggers. Use case: Summarizing AI conversation threads that need to stay relatively up to date while still debouncing rapid message triggers. refs TRI-7234 --- .changeset/add-debounce-maxdelay.md | 16 + .../src/engine/systems/debounceSystem.ts | 30 +- .../src/engine/tests/debounce.test.ts | 327 ++++++++++++++++++ packages/core/src/v3/isomorphic/duration.ts | 23 +- packages/core/src/v3/schemas/api.ts | 2 + packages/core/src/v3/types/tasks.ts | 16 + .../hello-world/src/trigger/debounce.ts | 156 +++++++++ 7 files changed, 561 insertions(+), 9 deletions(-) create mode 100644 .changeset/add-debounce-maxdelay.md diff --git a/.changeset/add-debounce-maxdelay.md b/.changeset/add-debounce-maxdelay.md new file mode 100644 index 0000000000..a70b95d476 --- /dev/null +++ b/.changeset/add-debounce-maxdelay.md @@ -0,0 +1,16 @@ +--- +"@trigger.dev/core": patch +"@trigger.dev/sdk": patch +--- + +Add `maxDelay` option to debounce feature. This allows setting a maximum time limit for how long a debounced run can be delayed, ensuring execution happens within a specified window even with continuous triggers. + +```typescript +await myTask.trigger(payload, { + debounce: { + key: "my-key", + delay: "5s", + maxDelay: "30m", // Execute within 30 minutes regardless of continuous triggers + }, +}); +``` diff --git a/internal-packages/run-engine/src/engine/systems/debounceSystem.ts b/internal-packages/run-engine/src/engine/systems/debounceSystem.ts index af25a31552..8cd06d0773 100644 --- a/internal-packages/run-engine/src/engine/systems/debounceSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/debounceSystem.ts @@ -6,7 +6,10 @@ import { type Result, } from "@internal/redis"; import { startSpan } from "@internal/tracing"; -import { parseNaturalLanguageDuration } from "@trigger.dev/core/v3/isomorphic"; +import { + parseNaturalLanguageDuration, + parseNaturalLanguageDurationInMs, +} from "@trigger.dev/core/v3/isomorphic"; import { PrismaClientOrTransaction, TaskRun, Waitpoint } from "@trigger.dev/database"; import { nanoid } from "nanoid"; import { SystemResources } from "./systems.js"; @@ -17,6 +20,12 @@ export type DebounceOptions = { key: string; delay: string; mode?: "leading" | "trailing"; + /** + * Maximum total delay before the run must execute, regardless of subsequent triggers. + * This prevents indefinite delays when continuous triggers keep pushing the execution time. + * If not specified, falls back to the server's maxDebounceDurationMs config. + */ + maxDelay?: string; /** When mode: "trailing", these fields will be used to update the existing run */ updateData?: { payload: string; @@ -521,8 +530,22 @@ return 0 } // Check if max debounce duration would be exceeded + // Use per-trigger maxDelay if provided, otherwise use global config + let maxDurationMs = this.maxDebounceDurationMs; + if (debounce.maxDelay) { + const parsedMaxDelay = parseNaturalLanguageDurationInMs(debounce.maxDelay); + if (parsedMaxDelay !== undefined) { + maxDurationMs = parsedMaxDelay; + } else { + this.$.logger.warn("handleExistingRun: invalid maxDelay duration, using global config", { + maxDelay: debounce.maxDelay, + fallbackMs: this.maxDebounceDurationMs, + }); + } + } + const runCreatedAt = existingRun.createdAt; - const maxDelayUntil = new Date(runCreatedAt.getTime() + this.maxDebounceDurationMs); + const maxDelayUntil = new Date(runCreatedAt.getTime() + maxDurationMs); if (newDelayUntil > maxDelayUntil) { this.$.logger.debug("handleExistingRun: max debounce duration would be exceeded", { @@ -531,7 +554,8 @@ return 0 runCreatedAt, newDelayUntil, maxDelayUntil, - maxDebounceDurationMs: this.maxDebounceDurationMs, + maxDurationMs, + maxDelayProvided: debounce.maxDelay, }); // Clean up Redis key since this debounce window is closed await this.redis.del(redisKey); diff --git a/internal-packages/run-engine/src/engine/tests/debounce.test.ts b/internal-packages/run-engine/src/engine/tests/debounce.test.ts index 0c3d09d887..1c201c4b4c 100644 --- a/internal-packages/run-engine/src/engine/tests/debounce.test.ts +++ b/internal-packages/run-engine/src/engine/tests/debounce.test.ts @@ -2170,5 +2170,332 @@ describe("RunEngine debounce", () => { } } ); + + containerTest( + "Debounce: per-trigger maxDelay overrides global maxDebounceDuration", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Set a long global max debounce duration (1 minute) + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + debounce: { + maxDebounceDurationMs: 60_000, // 1 minute global max + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + // First trigger with a very short per-trigger maxDelay (1 second) + const run1 = await engine.trigger( + { + number: 1, + friendlyId: "run_maxwait1", + environment: authenticatedEnvironment, + taskIdentifier, + payload: '{"data": "first"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + delayUntil: new Date(Date.now() + 5000), + debounce: { + key: "maxwait-key", + delay: "5s", + maxDelay: "1s", // Very short per-trigger maxDelay (1 second) + }, + }, + prisma + ); + + expect(run1.friendlyId).toBe("run_maxwait1"); + + // Wait for the per-trigger maxDelay to be exceeded (1.5s > 1s) + await setTimeout(1500); + + // Second trigger should create a new run because per-trigger maxDelay exceeded + // (even though global maxDebounceDurationMs is 60 seconds) + const run2 = await engine.trigger( + { + number: 2, + friendlyId: "run_maxwait2", + environment: authenticatedEnvironment, + taskIdentifier, + payload: '{"data": "second"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12346", + spanId: "s12346", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + delayUntil: new Date(Date.now() + 5000), + debounce: { + key: "maxwait-key", + delay: "5s", + maxDelay: "1s", + }, + }, + prisma + ); + + // Should be a different run because per-trigger maxDelay was exceeded + expect(run2.id).not.toBe(run1.id); + expect(run2.friendlyId).toBe("run_maxwait2"); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "Debounce: falls back to global config when maxDelay not specified", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Set a very short global max debounce duration (1 second) + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + debounce: { + maxDebounceDurationMs: 1000, // 1 second global max + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + // First trigger without maxDelay - should use global config + const run1 = await engine.trigger( + { + number: 1, + friendlyId: "run_noglobal1", + environment: authenticatedEnvironment, + taskIdentifier, + payload: '{"data": "first"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + delayUntil: new Date(Date.now() + 5000), + debounce: { + key: "global-fallback-key", + delay: "5s", + // No maxDelay specified - should use global maxDebounceDurationMs + }, + }, + prisma + ); + + // Wait for global maxDebounceDurationMs to be exceeded (1.5s > 1s) + await setTimeout(1500); + + // Second trigger should create a new run because global max exceeded + const run2 = await engine.trigger( + { + number: 2, + friendlyId: "run_noglobal2", + environment: authenticatedEnvironment, + taskIdentifier, + payload: '{"data": "second"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12346", + spanId: "s12346", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + delayUntil: new Date(Date.now() + 5000), + debounce: { + key: "global-fallback-key", + delay: "5s", + }, + }, + prisma + ); + + // Should be a different run because global max exceeded + expect(run2.id).not.toBe(run1.id); + } finally { + await engine.quit(); + } + } + ); + + containerTest( + "Debounce: long maxDelay allows more debounce time than global config", + async ({ prisma, redisOptions }) => { + const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION"); + + // Set a short global max debounce duration (1 second) + const engine = new RunEngine({ + prisma, + worker: { + redis: redisOptions, + workers: 1, + tasksPerWorker: 10, + pollIntervalMs: 100, + }, + queue: { + redis: redisOptions, + }, + runLock: { + redis: redisOptions, + }, + machines: { + defaultMachine: "small-1x", + machines: { + "small-1x": { + name: "small-1x" as const, + cpu: 0.5, + memory: 0.5, + centsPerMs: 0.0001, + }, + }, + baseCostInCents: 0.0001, + }, + debounce: { + maxDebounceDurationMs: 1000, // 1 second global max + }, + tracer: trace.getTracer("test", "0.0.0"), + }); + + try { + const taskIdentifier = "test-task"; + + await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier); + + // First trigger with long maxDelay that overrides the short global config + const run1 = await engine.trigger( + { + number: 1, + friendlyId: "run_longmax1", + environment: authenticatedEnvironment, + taskIdentifier, + payload: '{"data": "first"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12345", + spanId: "s12345", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + delayUntil: new Date(Date.now() + 2000), + debounce: { + key: "long-maxwait-key", + delay: "2s", + maxDelay: "60s", // Long per-trigger maxDelay overrides short global config + }, + }, + prisma + ); + + // Wait past the global maxDebounceDurationMs (1s) but within our per-trigger maxDelay (60s) + await setTimeout(1500); + + // Second trigger should return SAME run because per-trigger maxDelay is 60s + const run2 = await engine.trigger( + { + number: 2, + friendlyId: "run_longmax2", + environment: authenticatedEnvironment, + taskIdentifier, + payload: '{"data": "second"}', + payloadType: "application/json", + context: {}, + traceContext: {}, + traceId: "t12346", + spanId: "s12346", + workerQueue: "main", + queue: "task/test-task", + isTest: false, + tags: [], + delayUntil: new Date(Date.now() + 2000), + debounce: { + key: "long-maxwait-key", + delay: "2s", + maxDelay: "60s", + }, + }, + prisma + ); + + // Should be the SAME run because per-trigger maxDelay allows it + expect(run2.id).toBe(run1.id); + } finally { + await engine.quit(); + } + } + ); }); diff --git a/packages/core/src/v3/isomorphic/duration.ts b/packages/core/src/v3/isomorphic/duration.ts index b4c5cd20d3..9315f79683 100644 --- a/packages/core/src/v3/isomorphic/duration.ts +++ b/packages/core/src/v3/isomorphic/duration.ts @@ -1,5 +1,15 @@ -export function parseNaturalLanguageDuration(duration: string): Date | undefined { - // Handle Code scanning alert #44 (https://github.com/triggerdotdev/trigger.dev/security/code-scanning/44) by limiting the length of the input string +/** + * Parses a natural language duration string into milliseconds. + * + * @param duration - Duration string like "1s", "5m", "2h", "1d", "1w" + * @returns The duration in milliseconds, or undefined if invalid + * + * @example + * parseNaturalLanguageDurationInMs("30m") // 1800000 + * parseNaturalLanguageDurationInMs("2h") // 7200000 + */ +export function parseNaturalLanguageDurationInMs(duration: string): number | undefined { + // Handle Code scanning alert #44 by limiting the length of the input string if (duration.length > 100) { return undefined; } @@ -60,11 +70,12 @@ export function parseNaturalLanguageDuration(duration: string): Date | undefined } } - if (hasMatch) { - return new Date(Date.now() + totalMilliseconds); - } + return hasMatch ? totalMilliseconds : undefined; +} - return undefined; +export function parseNaturalLanguageDuration(duration: string): Date | undefined { + const ms = parseNaturalLanguageDurationInMs(duration); + return ms !== undefined ? new Date(Date.now() + ms) : undefined; } export function safeParseNaturalLanguageDuration(duration: string): Date | undefined { diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 9080a7f596..0291d2a05c 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -218,6 +218,7 @@ export const TriggerTaskRequestBody = z.object({ key: z.string().max(512), delay: z.string(), mode: z.enum(["leading", "trailing"]).optional(), + maxDelay: z.string().optional(), }) .optional(), }) @@ -275,6 +276,7 @@ export const BatchTriggerTaskItem = z.object({ key: z.string().max(512), delay: z.string(), mode: z.enum(["leading", "trailing"]).optional(), + maxDelay: z.string().optional(), }) .optional(), }) diff --git a/packages/core/src/v3/types/tasks.ts b/packages/core/src/v3/types/tasks.ts index f463b20f49..3b8b2e9ecd 100644 --- a/packages/core/src/v3/types/tasks.ts +++ b/packages/core/src/v3/types/tasks.ts @@ -945,6 +945,22 @@ export type TriggerOptions = { * @default "leading" */ mode?: "leading" | "trailing"; + /** + * Maximum total delay before the run must execute, regardless of subsequent triggers. + * This prevents indefinite delays when continuous triggers keep pushing the execution time. + * + * When specified, if a new trigger would push the execution time beyond this limit + * (measured from the first trigger), the current debounced run will be allowed to execute + * and a new run will be created for subsequent triggers. + * + * If not specified, falls back to the server's default maximum (typically 1 hour). + * + * Supported formats: `{number}s` (seconds), `{number}m` (minutes), `{number}h` (hours), + * `{number}d` (days), `{number}w` (weeks). + * + * @example "30m", "2h", "1d" + */ + maxDelay?: string; }; }; diff --git a/references/hello-world/src/trigger/debounce.ts b/references/hello-world/src/trigger/debounce.ts index e396714eb8..f0a7e8b638 100644 --- a/references/hello-world/src/trigger/debounce.ts +++ b/references/hello-world/src/trigger/debounce.ts @@ -1056,3 +1056,159 @@ export const demonstrateTrailingWithMetadata = task({ }; }, }); + +/** + * Example 12: Debounce with maxDelay + * + * The maxDelay option limits how long a debounced run can be delayed. + * Even if triggers keep coming, the run will eventually execute after maxDelay + * from the first trigger. This is useful for scenarios where you want to + * debounce but also guarantee execution within a certain time window. + * + * Use case: Summarizing AI conversation threads that need to stay relatively + * up to date. You want to debounce as messages come in, but also guarantee + * the summary runs at least every 30 minutes. + */ +export const processConversationSummary = task({ + id: "process-conversation-summary", + run: async (payload: { conversationId: string; messageCount: number }) => { + logger.info("Generating conversation summary", { payload }); + + // Simulate AI summarization work + await wait.for({ seconds: 2 }); + + logger.info("Conversation summary generated", { + conversationId: payload.conversationId, + messageCount: payload.messageCount, + }); + + return { + summarized: true, + conversationId: payload.conversationId, + messageCount: payload.messageCount, + summarizedAt: new Date().toISOString(), + }; + }, +}); + +/** + * Demonstrates maxDelay in action. + * + * This simulates a chat application where messages come in continuously. + * With just debounce, the summary task would keep getting delayed forever. + * With maxDelay: "30s", the summary will run at most 30 seconds after the first trigger, + * even if messages keep coming. + * + * Run this task and observe: + * - Messages trigger the summary task with debounce + * - Each trigger extends the delay by 5s + * - But maxDelay ensures execution happens within 30s of the first trigger + */ +export const simulateChatWithMaxWait = task({ + id: "simulate-chat-with-max-wait", + run: async (payload: { conversationId?: string; simulateDelay?: number }) => { + const conversationId = payload.conversationId ?? "conv-123"; + const delayBetweenMessages = payload.simulateDelay ?? 3000; // 3 seconds + + logger.info("Starting chat simulation with maxDelay", { + conversationId, + delayBetweenMessages, + }); + + logger.info( + "Debounce delay is 5s, maxDelay is 30s. Messages arrive every 3s, so debounce would normally keep extending. But maxDelay ensures execution within 30s." + ); + + const handles: string[] = []; + + // Simulate 15 messages over ~45 seconds + // Without maxDelay, the task would never run because each trigger resets the 5s delay + // With maxDelay: "30s", the task will run after 30 seconds from the first trigger + for (let i = 1; i <= 15; i++) { + logger.info(`Message ${i}/15 received`, { messageNumber: i }); + + const handle = await processConversationSummary.trigger( + { + conversationId, + messageCount: i, + }, + { + debounce: { + key: `conversation-${conversationId}`, + delay: "5s", + mode: "trailing", // Use latest message count + maxDelay: "30s", // Ensure execution within 30s of first trigger + }, + } + ); + + handles.push(handle.id); + logger.info(`Message ${i} triggered, run ID: ${handle.id}`, { + messageNumber: i, + runId: handle.id, + }); + + // Wait between messages (simulating real chat) + if (i < 15) { + await new Promise((resolve) => setTimeout(resolve, delayBetweenMessages)); + } + } + + const uniqueHandles = [...new Set(handles)]; + + logger.info("Chat simulation complete", { + totalMessages: 15, + uniqueRuns: uniqueHandles.length, + note: + "With maxDelay, runs should have been created periodically despite continuous triggering", + }); + + return { + conversationId, + totalMessages: 15, + uniqueRunsCreated: uniqueHandles.length, + runIds: uniqueHandles, + message: + "Due to maxDelay: '30s', the summary task runs periodically even with continuous triggers", + }; + }, +}); + +/** + * A simpler maxDelay example showing the basic usage pattern. + * + * This is the recommended pattern for using maxDelay: + * - delay: How long to wait after each trigger before executing + * - maxDelay: Maximum total wait time from the first trigger + */ +export const onNewMessage = task({ + id: "on-new-message", + run: async (payload: { conversationId: string; message: string }) => { + logger.info("New message received", { + conversationId: payload.conversationId, + messagePreview: payload.message.substring(0, 50), + }); + + // Trigger summarization with debounce and maxDelay + const handle = await processConversationSummary.trigger( + { + conversationId: payload.conversationId, + messageCount: 1, // In real code, you'd track actual count + }, + { + debounce: { + key: `summary-${payload.conversationId}`, + delay: "10s", // Wait 10s after last message before summarizing + mode: "trailing", // Use latest state + maxDelay: "5m", // But always summarize within 5 minutes + }, + } + ); + + logger.info("Summary task triggered (debounced with maxDelay)", { + runId: handle.id, + }); + + return { summaryRunId: handle.id }; + }, +}); From 2576a8e15f7d6180fb2df75f4838f189f32af1d1 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Mon, 2 Feb 2026 07:35:17 -0800 Subject: [PATCH 2/2] Add the maxDelay to the engine trigger types --- internal-packages/run-engine/src/engine/types.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index ee5176c2fa..2adc63415f 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -180,6 +180,7 @@ export type TriggerParams = { key: string; delay: string; mode?: "leading" | "trailing"; + maxDelay?: string; }; /** * Called when a run is debounced (existing delayed run found with triggerAndWait).