From b38ff130b33079f96c86606d339862f5c65328fa Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Mon, 5 Jan 2026 08:43:20 +0100 Subject: [PATCH] feat: add infrastructure schema for step skipping and cascading Introduce new columns and constraints to support skip logic in steps, including condition patterns, skip reasons, and cascade skip functionality. Extend runtime tables with skip-related fields and indexes. Update functions to handle new skip parameters and cascade logic, ensuring proper validation and data integrity. --- .changeset/skip-infrastructure-schema.md | 5 + pkgs/core/schemas/0050_tables_definitions.sql | 7 +- pkgs/core/schemas/0060_tables_runtime.sql | 23 ++- pkgs/core/schemas/0100_function_add_step.sql | 19 +- .../0100_function_cascade_skip_steps.sql | 105 +++++++++++ pkgs/core/src/database-types.ts | 25 +++ ...074725_pgflow_temp_skip_infrastructure.sql | 173 ++++++++++++++++++ pkgs/core/supabase/migrations/atlas.sum | 3 +- .../condition_invalid_values.test.sql | 24 +++ .../add_step/condition_parameters.test.sql | 125 +++++++++++++ .../broadcast_order.test.sql | 64 +++++++ .../cascade_through_multiple_levels.test.sql | 95 ++++++++++ .../cascade_to_single_dependent.test.sql | 86 +++++++++ .../multi_dependency_partial_skip.test.sql | 80 ++++++++ .../single_step_skip.test.sql | 69 +++++++ .../skipped_event_payload.test.sql | 88 +++++++++ 16 files changed, 979 insertions(+), 12 deletions(-) create mode 100644 .changeset/skip-infrastructure-schema.md create mode 100644 pkgs/core/schemas/0100_function_cascade_skip_steps.sql create mode 100644 pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql create mode 100644 pkgs/core/supabase/tests/add_step/condition_invalid_values.test.sql create mode 100644 pkgs/core/supabase/tests/add_step/condition_parameters.test.sql create mode 100644 pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql create mode 100644 pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql create mode 100644 pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql create mode 100644 pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql create mode 100644 pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql create mode 100644 pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql diff --git a/.changeset/skip-infrastructure-schema.md b/.changeset/skip-infrastructure-schema.md new file mode 100644 index 000000000..5fd952320 --- /dev/null +++ b/.changeset/skip-infrastructure-schema.md @@ -0,0 +1,5 @@ +--- +'@pgflow/core': patch +--- + +Add skip infrastructure schema for conditional execution - new columns (condition_pattern, when_unmet, when_failed, skip_reason, skipped_at), 'skipped' status, and cascade_skip_steps function diff --git a/pkgs/core/schemas/0050_tables_definitions.sql b/pkgs/core/schemas/0050_tables_definitions.sql index 42367280c..74c3d8b57 100644 --- a/pkgs/core/schemas/0050_tables_definitions.sql +++ b/pkgs/core/schemas/0050_tables_definitions.sql @@ -24,6 +24,9 @@ create table pgflow.steps ( opt_base_delay int, opt_timeout int, opt_start_delay int, + condition_pattern jsonb, -- JSON pattern for @> containment check + when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default) + when_failed text not null default 'fail', -- What to do when handler fails after retries created_at timestamptz not null default now(), primary key (flow_slug, step_slug), unique (flow_slug, step_index), -- Ensure step_index is unique within a flow @@ -32,7 +35,9 @@ create table pgflow.steps ( constraint opt_max_attempts_is_nonnegative check (opt_max_attempts is null or opt_max_attempts >= 0), constraint opt_base_delay_is_nonnegative check (opt_base_delay is null or opt_base_delay >= 0), constraint opt_timeout_is_positive check (opt_timeout is null or opt_timeout > 0), - constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0) + constraint opt_start_delay_is_nonnegative check (opt_start_delay is null or opt_start_delay >= 0), + constraint when_unmet_is_valid check (when_unmet in ('fail', 'skip', 'skip-cascade')), + constraint when_failed_is_valid check (when_failed in ('fail', 'skip', 'skip-cascade')) ); -- Dependencies table - stores relationships between steps diff --git a/pkgs/core/schemas/0060_tables_runtime.sql b/pkgs/core/schemas/0060_tables_runtime.sql index 7a408410c..33eac47eb 100644 --- a/pkgs/core/schemas/0060_tables_runtime.sql +++ b/pkgs/core/schemas/0060_tables_runtime.sql @@ -31,18 +31,20 @@ create table pgflow.step_states ( remaining_deps int not null default 0 check (remaining_deps >= 0), output jsonb, -- Step output: stored atomically with status=completed transition error_message text, + skip_reason text, -- Why step was skipped: condition_unmet, handler_failed, dependency_skipped created_at timestamptz not null default now(), started_at timestamptz, completed_at timestamptz, failed_at timestamptz, + skipped_at timestamptz, primary key (run_id, step_slug), foreign key (flow_slug, step_slug) references pgflow.steps (flow_slug, step_slug), - constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed')), + constraint status_is_valid check (status in ('created', 'started', 'completed', 'failed', 'skipped')), constraint status_and_remaining_tasks_match check (status != 'completed' or remaining_tasks = 0), -- Add constraint to ensure remaining_tasks is only set when step has started constraint remaining_tasks_state_consistency check ( - remaining_tasks is null or status != 'created' + remaining_tasks is null or status not in ('created', 'skipped') ), constraint initial_tasks_known_when_started check ( status != 'started' or initial_tasks is not null @@ -52,16 +54,29 @@ create table pgflow.step_states ( constraint output_only_for_completed_or_null check ( output is null or status = 'completed' ), - constraint completed_at_or_failed_at check (not (completed_at is not null and failed_at is not null)), + -- skip_reason is required for skipped status and forbidden for other statuses + constraint skip_reason_matches_status check ( + (status = 'skipped' and skip_reason is not null) or + (status != 'skipped' and skip_reason is null) + ), + constraint completed_at_or_failed_at_or_skipped_at check ( + ( + case when completed_at is not null then 1 else 0 end + + case when failed_at is not null then 1 else 0 end + + case when skipped_at is not null then 1 else 0 end + ) <= 1 + ), constraint started_at_is_after_created_at check (started_at is null or started_at >= created_at), constraint completed_at_is_after_started_at check (completed_at is null or completed_at >= started_at), - constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at) + constraint failed_at_is_after_started_at check (failed_at is null or failed_at >= started_at), + constraint skipped_at_is_after_created_at check (skipped_at is null or skipped_at >= created_at) ); create index if not exists idx_step_states_ready on pgflow.step_states (run_id, status, remaining_deps) where status = 'created' and remaining_deps = 0; create index if not exists idx_step_states_failed on pgflow.step_states (run_id, step_slug) where status = 'failed'; +create index if not exists idx_step_states_skipped on pgflow.step_states (run_id, step_slug) where status = 'skipped'; create index if not exists idx_step_states_flow_slug on pgflow.step_states (flow_slug); create index if not exists idx_step_states_run_id on pgflow.step_states (run_id); diff --git a/pkgs/core/schemas/0100_function_add_step.sql b/pkgs/core/schemas/0100_function_add_step.sql index 3fb8fbc54..0eecdcd7c 100644 --- a/pkgs/core/schemas/0100_function_add_step.sql +++ b/pkgs/core/schemas/0100_function_add_step.sql @@ -6,7 +6,10 @@ create or replace function pgflow.add_step( base_delay int default null, timeout int default null, start_delay int default null, - step_type text default 'single' + step_type text default 'single', + condition_pattern jsonb default null, + when_unmet text default 'skip', + when_failed text default 'fail' ) returns pgflow.steps language plpgsql @@ -22,7 +25,7 @@ BEGIN -- 0 dependencies (root map - maps over flow input array) -- 1 dependency (dependent map - maps over dependency output array) IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN - RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', + RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', add_step.step_slug, COALESCE(array_length(add_step.deps_slugs, 1), 0), array_to_string(add_step.deps_slugs, ', '); @@ -36,18 +39,22 @@ BEGIN -- Create the step INSERT INTO pgflow.steps ( flow_slug, step_slug, step_type, step_index, deps_count, - opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay + opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, + condition_pattern, when_unmet, when_failed ) VALUES ( add_step.flow_slug, add_step.step_slug, COALESCE(add_step.step_type, 'single'), - next_idx, + next_idx, COALESCE(array_length(add_step.deps_slugs, 1), 0), add_step.max_attempts, add_step.base_delay, add_step.timeout, - add_step.start_delay + add_step.start_delay, + add_step.condition_pattern, + add_step.when_unmet, + add_step.when_failed ) ON CONFLICT ON CONSTRAINT steps_pkey DO UPDATE SET step_slug = EXCLUDED.step_slug @@ -59,7 +66,7 @@ BEGIN FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; - + RETURN result_step; END; $$; diff --git a/pkgs/core/schemas/0100_function_cascade_skip_steps.sql b/pkgs/core/schemas/0100_function_cascade_skip_steps.sql new file mode 100644 index 000000000..62be9f53d --- /dev/null +++ b/pkgs/core/schemas/0100_function_cascade_skip_steps.sql @@ -0,0 +1,105 @@ +-- cascade_skip_steps: Skip a step and cascade to all downstream dependents +-- Used when a condition is unmet (whenUnmet: skip-cascade) or handler fails (whenFailed: skip-cascade) +create or replace function pgflow.cascade_skip_steps( + run_id uuid, + step_slug text, + skip_reason text +) +returns int +language plpgsql +as $$ +DECLARE + v_flow_slug text; + v_total_skipped int := 0; +BEGIN + -- Get flow_slug for this run + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = cascade_skip_steps.run_id; + + IF v_flow_slug IS NULL THEN + RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; + END IF; + + -- ========================================== + -- SKIP STEPS IN TOPOLOGICAL ORDER + -- ========================================== + -- Use recursive CTE to find all downstream dependents, + -- then skip them in topological order (by step_index) + WITH RECURSIVE + -- ---------- Find all downstream steps ---------- + downstream_steps AS ( + -- Base case: the trigger step + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step + FROM pgflow.steps s + WHERE s.flow_slug = v_flow_slug + AND s.step_slug = cascade_skip_steps.step_slug + + UNION ALL + + -- Recursive case: steps that depend on already-found steps + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + 'dependency_skipped'::text AS reason -- Downstream steps get this reason + FROM pgflow.steps s + JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug + JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug + ), + -- ---------- Deduplicate and order by step_index ---------- + steps_to_skip AS ( + SELECT DISTINCT ON (ds.step_slug) + ds.flow_slug, + ds.step_slug, + ds.step_index, + ds.reason + FROM downstream_steps ds + ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) + ), + -- ---------- Skip the steps ---------- + skipped AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = sts.reason, + skipped_at = now(), + remaining_tasks = NULL -- Clear remaining_tasks for skipped steps + FROM steps_to_skip sts + WHERE ss.run_id = cascade_skip_steps.run_id + AND ss.step_slug = sts.step_slug + AND ss.status IN ('created', 'started') -- Only skip non-terminal steps + RETURNING + ss.*, + -- Broadcast step:skipped event + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', ss.skip_reason, + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result + ), + -- ---------- Update run counters ---------- + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - skipped_count.count + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + WHERE r.run_id = cascade_skip_steps.run_id + AND skipped_count.count > 0 + ) + SELECT COUNT(*) INTO v_total_skipped FROM skipped; + + RETURN v_total_skipped; +END; +$$; diff --git a/pkgs/core/src/database-types.ts b/pkgs/core/src/database-types.ts index 510d7e144..9a4fe7c7d 100644 --- a/pkgs/core/src/database-types.ts +++ b/pkgs/core/src/database-types.ts @@ -132,6 +132,8 @@ export type Database = { remaining_deps: number remaining_tasks: number | null run_id: string + skip_reason: string | null + skipped_at: string | null started_at: string | null status: string step_slug: string @@ -147,6 +149,8 @@ export type Database = { remaining_deps?: number remaining_tasks?: number | null run_id: string + skip_reason?: string | null + skipped_at?: string | null started_at?: string | null status?: string step_slug: string @@ -162,6 +166,8 @@ export type Database = { remaining_deps?: number remaining_tasks?: number | null run_id?: string + skip_reason?: string | null + skipped_at?: string | null started_at?: string | null status?: string step_slug?: string @@ -272,6 +278,7 @@ export type Database = { } steps: { Row: { + condition_pattern: Json | null created_at: string deps_count: number flow_slug: string @@ -282,8 +289,11 @@ export type Database = { step_index: number step_slug: string step_type: string + when_failed: string + when_unmet: string } Insert: { + condition_pattern?: Json | null created_at?: string deps_count?: number flow_slug: string @@ -294,8 +304,11 @@ export type Database = { step_index?: number step_slug: string step_type?: string + when_failed?: string + when_unmet?: string } Update: { + condition_pattern?: Json | null created_at?: string deps_count?: number flow_slug?: string @@ -306,6 +319,8 @@ export type Database = { step_index?: number step_slug?: string step_type?: string + when_failed?: string + when_unmet?: string } Relationships: [ { @@ -391,6 +406,7 @@ export type Database = { add_step: { Args: { base_delay?: number + condition_pattern?: Json deps_slugs?: string[] flow_slug: string max_attempts?: number @@ -398,8 +414,11 @@ export type Database = { step_slug: string step_type?: string timeout?: number + when_failed?: string + when_unmet?: string } Returns: { + condition_pattern: Json | null created_at: string deps_count: number flow_slug: string @@ -410,6 +429,8 @@ export type Database = { step_index: number step_slug: string step_type: string + when_failed: string + when_unmet: string } SetofOptions: { from: "*" @@ -426,6 +447,10 @@ export type Database = { Args: { run_id: string } Returns: number } + cascade_skip_steps: { + Args: { run_id: string; skip_reason: string; step_slug: string } + Returns: number + } cleanup_ensure_workers_logs: { Args: { retention_hours?: number } Returns: { diff --git a/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql b/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql new file mode 100644 index 000000000..178f74ac3 --- /dev/null +++ b/pkgs/core/supabase/migrations/20260105074725_pgflow_temp_skip_infrastructure.sql @@ -0,0 +1,173 @@ +-- Modify "step_states" table +ALTER TABLE "pgflow"."step_states" DROP CONSTRAINT "completed_at_or_failed_at", DROP CONSTRAINT "remaining_tasks_state_consistency", ADD CONSTRAINT "remaining_tasks_state_consistency" CHECK ((remaining_tasks IS NULL) OR (status <> ALL (ARRAY['created'::text, 'skipped'::text]))), DROP CONSTRAINT "status_is_valid", ADD CONSTRAINT "status_is_valid" CHECK (status = ANY (ARRAY['created'::text, 'started'::text, 'completed'::text, 'failed'::text, 'skipped'::text])), ADD CONSTRAINT "completed_at_or_failed_at_or_skipped_at" CHECK ((( +CASE + WHEN (completed_at IS NOT NULL) THEN 1 + ELSE 0 +END + +CASE + WHEN (failed_at IS NOT NULL) THEN 1 + ELSE 0 +END) + +CASE + WHEN (skipped_at IS NOT NULL) THEN 1 + ELSE 0 +END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipped'::text) AND (skip_reason IS NOT NULL)) OR ((status <> 'skipped'::text) AND (skip_reason IS NULL))), ADD CONSTRAINT "skipped_at_is_after_created_at" CHECK ((skipped_at IS NULL) OR (skipped_at >= created_at)), ADD COLUMN "skip_reason" text NULL, ADD COLUMN "skipped_at" timestamptz NULL; +-- Create index "idx_step_states_skipped" to table: "step_states" +CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text); +-- Modify "steps" table +ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail'; +-- Create "add_step" function +CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$ +DECLARE + result_step pgflow.steps; + next_idx int; +BEGIN + -- Validate map step constraints + -- Map steps can have either: + -- 0 dependencies (root map - maps over flow input array) + -- 1 dependency (dependent map - maps over dependency output array) + IF COALESCE(add_step.step_type, 'single') = 'map' AND COALESCE(array_length(add_step.deps_slugs, 1), 0) > 1 THEN + RAISE EXCEPTION 'Map step "%" can have at most one dependency, but % were provided: %', + add_step.step_slug, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + array_to_string(add_step.deps_slugs, ', '); + END IF; + + -- Get next step index + SELECT COALESCE(MAX(s.step_index) + 1, 0) INTO next_idx + FROM pgflow.steps s + WHERE s.flow_slug = add_step.flow_slug; + + -- Create the step + INSERT INTO pgflow.steps ( + flow_slug, step_slug, step_type, step_index, deps_count, + opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay, + condition_pattern, when_unmet, when_failed + ) + VALUES ( + add_step.flow_slug, + add_step.step_slug, + COALESCE(add_step.step_type, 'single'), + next_idx, + COALESCE(array_length(add_step.deps_slugs, 1), 0), + add_step.max_attempts, + add_step.base_delay, + add_step.timeout, + add_step.start_delay, + add_step.condition_pattern, + add_step.when_unmet, + add_step.when_failed + ) + ON CONFLICT ON CONSTRAINT steps_pkey + DO UPDATE SET step_slug = EXCLUDED.step_slug + RETURNING * INTO result_step; + + -- Insert dependencies + INSERT INTO pgflow.deps (flow_slug, dep_slug, step_slug) + SELECT add_step.flow_slug, d.dep_slug, add_step.step_slug + FROM unnest(COALESCE(add_step.deps_slugs, '{}')) AS d(dep_slug) + WHERE add_step.deps_slugs IS NOT NULL AND array_length(add_step.deps_slugs, 1) > 0 + ON CONFLICT ON CONSTRAINT deps_pkey DO NOTHING; + + RETURN result_step; +END; +$$; +-- Create "cascade_skip_steps" function +CREATE FUNCTION "pgflow"."cascade_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$ +DECLARE + v_flow_slug text; + v_total_skipped int := 0; +BEGIN + -- Get flow_slug for this run + SELECT r.flow_slug INTO v_flow_slug + FROM pgflow.runs r + WHERE r.run_id = cascade_skip_steps.run_id; + + IF v_flow_slug IS NULL THEN + RAISE EXCEPTION 'Run not found: %', cascade_skip_steps.run_id; + END IF; + + -- ========================================== + -- SKIP STEPS IN TOPOLOGICAL ORDER + -- ========================================== + -- Use recursive CTE to find all downstream dependents, + -- then skip them in topological order (by step_index) + WITH RECURSIVE + -- ---------- Find all downstream steps ---------- + downstream_steps AS ( + -- Base case: the trigger step + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + cascade_skip_steps.skip_reason AS reason -- Original reason for trigger step + FROM pgflow.steps s + WHERE s.flow_slug = v_flow_slug + AND s.step_slug = cascade_skip_steps.step_slug + + UNION ALL + + -- Recursive case: steps that depend on already-found steps + SELECT + s.flow_slug, + s.step_slug, + s.step_index, + 'dependency_skipped'::text AS reason -- Downstream steps get this reason + FROM pgflow.steps s + JOIN pgflow.deps d ON d.flow_slug = s.flow_slug AND d.step_slug = s.step_slug + JOIN downstream_steps ds ON ds.flow_slug = d.flow_slug AND ds.step_slug = d.dep_slug + ), + -- ---------- Deduplicate and order by step_index ---------- + steps_to_skip AS ( + SELECT DISTINCT ON (ds.step_slug) + ds.flow_slug, + ds.step_slug, + ds.step_index, + ds.reason + FROM downstream_steps ds + ORDER BY ds.step_slug, ds.step_index -- Keep first occurrence (trigger step has original reason) + ), + -- ---------- Skip the steps ---------- + skipped AS ( + UPDATE pgflow.step_states ss + SET status = 'skipped', + skip_reason = sts.reason, + skipped_at = now(), + remaining_tasks = NULL -- Clear remaining_tasks for skipped steps + FROM steps_to_skip sts + WHERE ss.run_id = cascade_skip_steps.run_id + AND ss.step_slug = sts.step_slug + AND ss.status IN ('created', 'started') -- Only skip non-terminal steps + RETURNING + ss.*, + -- Broadcast step:skipped event + realtime.send( + jsonb_build_object( + 'event_type', 'step:skipped', + 'run_id', ss.run_id, + 'flow_slug', ss.flow_slug, + 'step_slug', ss.step_slug, + 'status', 'skipped', + 'skip_reason', ss.skip_reason, + 'skipped_at', ss.skipped_at + ), + concat('step:', ss.step_slug, ':skipped'), + concat('pgflow:run:', ss.run_id), + false + ) as _broadcast_result + ), + -- ---------- Update run counters ---------- + run_updates AS ( + UPDATE pgflow.runs r + SET remaining_steps = r.remaining_steps - skipped_count.count + FROM (SELECT COUNT(*) AS count FROM skipped) skipped_count + WHERE r.run_id = cascade_skip_steps.run_id + AND skipped_count.count > 0 + ) + SELECT COUNT(*) INTO v_total_skipped FROM skipped; + + RETURN v_total_skipped; +END; +$$; +-- Drop "add_step" function +DROP FUNCTION "pgflow"."add_step" (text, text, text[], integer, integer, integer, integer, text); diff --git a/pkgs/core/supabase/migrations/atlas.sum b/pkgs/core/supabase/migrations/atlas.sum index c0881d482..e23d991f8 100644 --- a/pkgs/core/supabase/migrations/atlas.sum +++ b/pkgs/core/supabase/migrations/atlas.sum @@ -1,4 +1,4 @@ -h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g= +h1:95pJcIaIV04WBvPgFpjULl/TWBCArYhQTMB4IG69phs= 20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s= 20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY= 20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg= @@ -16,3 +16,4 @@ h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g= 20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ= 20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc= 20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o= +20260105074725_pgflow_temp_skip_infrastructure.sql h1:tjele0FyNwcK0DLlr7I8QxiAueTC36r7KYK27Mkbi2s= diff --git a/pkgs/core/supabase/tests/add_step/condition_invalid_values.test.sql b/pkgs/core/supabase/tests/add_step/condition_invalid_values.test.sql new file mode 100644 index 000000000..17458192e --- /dev/null +++ b/pkgs/core/supabase/tests/add_step/condition_invalid_values.test.sql @@ -0,0 +1,24 @@ +-- Test: add_step - Invalid condition parameter values +-- Verifies CHECK constraints reject invalid when_unmet and when_failed values +begin; +select plan(2); + +select pgflow_tests.reset_db(); +select pgflow.create_flow('invalid_test'); + +-- Test 1: Invalid when_unmet value should fail +select throws_ok( + $$ SELECT pgflow.add_step('invalid_test', 'bad_step', when_unmet => 'invalid_value') $$, + 'new row for relation "steps" violates check constraint "when_unmet_is_valid"', + 'Invalid when_unmet value should be rejected' +); + +-- Test 2: Invalid when_failed value should fail +select throws_ok( + $$ SELECT pgflow.add_step('invalid_test', 'bad_step2', when_failed => 'invalid_value') $$, + 'new row for relation "steps" violates check constraint "when_failed_is_valid"', + 'Invalid when_failed value should be rejected' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/add_step/condition_parameters.test.sql b/pkgs/core/supabase/tests/add_step/condition_parameters.test.sql new file mode 100644 index 000000000..324aae2bf --- /dev/null +++ b/pkgs/core/supabase/tests/add_step/condition_parameters.test.sql @@ -0,0 +1,125 @@ +-- Test: add_step - New condition parameters +-- Verifies condition_pattern, when_unmet, when_failed parameters work correctly +begin; +select plan(9); + +select pgflow_tests.reset_db(); +select pgflow.create_flow('condition_test'); + +-- Test 1: Add step with condition_pattern +select pgflow.add_step( + 'condition_test', + 'step_with_condition', + condition_pattern => '{"type": "premium"}'::jsonb +); + +select is( + (select condition_pattern from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_with_condition'), + '{"type": "premium"}'::jsonb, + 'condition_pattern should be stored correctly' +); + +-- Test 2: Add step with when_unmet = skip +select pgflow.add_step( + 'condition_test', + 'step_skip_unmet', + when_unmet => 'skip' +); + +select is( + (select when_unmet from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_unmet'), + 'skip', + 'when_unmet should be skip' +); + +-- Test 3: Add step with when_unmet = skip-cascade +select pgflow.add_step( + 'condition_test', + 'step_skip_cascade_unmet', + when_unmet => 'skip-cascade' +); + +select is( + (select when_unmet from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_cascade_unmet'), + 'skip-cascade', + 'when_unmet should be skip-cascade' +); + +-- Test 4: Add step with when_failed = skip +select pgflow.add_step( + 'condition_test', + 'step_skip_failed', + when_failed => 'skip' +); + +select is( + (select when_failed from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_failed'), + 'skip', + 'when_failed should be skip' +); + +-- Test 5: Add step with when_failed = skip-cascade +select pgflow.add_step( + 'condition_test', + 'step_skip_cascade_failed', + when_failed => 'skip-cascade' +); + +select is( + (select when_failed from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_skip_cascade_failed'), + 'skip-cascade', + 'when_failed should be skip-cascade' +); + +-- Test 6: Default when_unmet should be skip (natural default for conditions) +select pgflow.add_step('condition_test', 'step_default_unmet'); + +select is( + (select when_unmet from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_default_unmet'), + 'skip', + 'Default when_unmet should be skip' +); + +-- Test 7: Default when_failed should be fail +select is( + (select when_failed from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_default_unmet'), + 'fail', + 'Default when_failed should be fail' +); + +-- Test 8: Default condition_pattern should be NULL +select is( + (select condition_pattern from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_default_unmet'), + NULL::jsonb, + 'Default condition_pattern should be NULL' +); + +-- Test 9: Add step with all condition parameters +select pgflow.add_step( + 'condition_test', + 'step_all_params', + condition_pattern => '{"status": "active"}'::jsonb, + when_unmet => 'skip', + when_failed => 'skip-cascade' +); + +select ok( + (select + condition_pattern = '{"status": "active"}'::jsonb + AND when_unmet = 'skip' + AND when_failed = 'skip-cascade' + from pgflow.steps + where flow_slug = 'condition_test' and step_slug = 'step_all_params'), + 'All condition parameters should be stored correctly together' +); + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql new file mode 100644 index 000000000..44e04d649 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/broadcast_order.test.sql @@ -0,0 +1,64 @@ +-- Test: cascade_skip_steps - Broadcast order respects dependency graph +-- Verifies step:skipped events are sent in topological order +begin; +select plan(2); + +-- Reset database and create a chain: A -> B -> C +select pgflow_tests.reset_db(); +select pgflow.create_flow('order_test'); +select pgflow.add_step('order_test', 'step_a'); +select pgflow.add_step('order_test', 'step_b', ARRAY['step_a']); +select pgflow.add_step('order_test', 'step_c', ARRAY['step_b']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('order_test', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (cascades to B and C) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 1: All 3 step:skipped events should exist +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 3::bigint, + 'Should have 3 step:skipped events' +); + +-- Test 2: Events should be in dependency order (A before B before C) +with ordered_events as ( + select + inserted_at, + payload->>'step_slug' as step_slug, + row_number() over (order by inserted_at) as event_order + from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids) +), +step_a_event as ( + select event_order from ordered_events where step_slug = 'step_a' +), +step_b_event as ( + select event_order from ordered_events where step_slug = 'step_b' +), +step_c_event as ( + select event_order from ordered_events where step_slug = 'step_c' +) +select ok( + (select event_order from step_a_event) < (select event_order from step_b_event) + AND (select event_order from step_b_event) < (select event_order from step_c_event), + 'Events must be in dependency order (A -> B -> C)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql new file mode 100644 index 000000000..888a8d6f8 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_through_multiple_levels.test.sql @@ -0,0 +1,95 @@ +-- Test: cascade_skip_steps - Cascade through multiple DAG levels +-- Verifies skipping A cascades through A -> B -> C chain +begin; +select plan(8); + +-- Reset database and create a flow: A -> B -> C +select pgflow_tests.reset_db(); +select pgflow.create_flow('deep_cascade'); +select pgflow.add_step('deep_cascade', 'step_a'); +select pgflow.add_step('deep_cascade', 'step_b', ARRAY['step_a']); +select pgflow.add_step('deep_cascade', 'step_c', ARRAY['step_b']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('deep_cascade', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (should cascade to step_b and step_c) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'handler_failed' +); + +-- Test 1: step_a should be skipped with handler_failed reason +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'handler_failed', + 'step_a skip_reason should be handler_failed' +); + +-- Test 2: step_b should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'skipped', + 'step_b should be skipped (direct dependent of step_a)' +); + +-- Test 3: step_b should have dependency_skipped reason +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'dependency_skipped', + 'step_b skip_reason should be dependency_skipped' +); + +-- Test 4: step_c should also be skipped (transitive) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'skipped', + 'step_c should be skipped (transitive cascade)' +); + +-- Test 5: step_c should have dependency_skipped reason +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'dependency_skipped', + 'step_c skip_reason should be dependency_skipped' +); + +-- Test 6: All three steps should be skipped +select is( + (select count(*) from pgflow.step_states + where run_id = (select run_id from run_ids) and status = 'skipped'), + 3::bigint, + 'All 3 steps should be skipped' +); + +-- Test 7: remaining_steps should be 0 +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 0::int, + 'remaining_steps should be 0' +); + +-- Test 8: step:skipped events should be sent for all 3 steps +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 3::bigint, + 'Should send 3 step:skipped events' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql new file mode 100644 index 000000000..a6b086b41 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/cascade_to_single_dependent.test.sql @@ -0,0 +1,86 @@ +-- Test: cascade_skip_steps - Cascade to single dependent +-- Verifies skipping a step cascades to its direct dependent +begin; +select plan(7); + +-- Reset database and create a flow: A -> B +select pgflow_tests.reset_db(); +select pgflow.create_flow('cascade_flow'); +select pgflow.add_step('cascade_flow', 'step_a'); +select pgflow.add_step('cascade_flow', 'step_b', ARRAY['step_a']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('cascade_flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (should cascade to step_b) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 1: step_a should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a should be skipped' +); + +-- Test 2: step_a should have skip_reason = condition_unmet +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'condition_unmet', + 'step_a skip_reason should be condition_unmet' +); + +-- Test 3: step_b should also be skipped (cascade) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'skipped', + 'step_b should be skipped due to cascade' +); + +-- Test 4: step_b should have skip_reason = dependency_skipped +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'dependency_skipped', + 'step_b skip_reason should be dependency_skipped' +); + +-- Test 5: Both steps should have skipped_at timestamp set +select ok( + (select count(*) = 2 from pgflow.step_states + where run_id = (select run_id from run_ids) + and skipped_at is not null), + 'Both steps should have skipped_at timestamp' +); + +-- Test 6: remaining_steps should be 0 (both skipped) +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 0::int, + 'remaining_steps should be 0 (both steps skipped)' +); + +-- Test 7: step:skipped events should be sent for both steps +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 2::bigint, + 'Should send step:skipped events for both steps' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql new file mode 100644 index 000000000..c7a1f2327 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/multi_dependency_partial_skip.test.sql @@ -0,0 +1,80 @@ +-- Test: cascade_skip_steps - Multi-dependency scenario +-- Flow: A -> C, B -> C (C depends on both A and B) +-- Skipping A should cascade to C, even though B is still runnable +begin; +select plan(6); + +-- Reset database and create a diamond-ish flow +select pgflow_tests.reset_db(); +select pgflow.create_flow('multi_dep'); +select pgflow.add_step('multi_dep', 'step_a'); +select pgflow.add_step('multi_dep', 'step_b'); +select pgflow.add_step('multi_dep', 'step_c', ARRAY['step_a', 'step_b']); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('multi_dep', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a (should cascade to step_c) +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 1: step_a should be skipped +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a should be skipped' +); + +-- Test 2: step_b should NOT be skipped (independent of step_a, root step so started) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'started', + 'step_b should remain in started status (independent root step)' +); + +-- Test 3: step_c should be skipped (depends on skipped step_a) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'skipped', + 'step_c should be skipped (one of its deps was skipped)' +); + +-- Test 4: step_c skip_reason should be dependency_skipped +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_c'), + 'dependency_skipped', + 'step_c skip_reason should be dependency_skipped' +); + +-- Test 5: remaining_steps should be 1 (only step_b) +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 1::int, + 'remaining_steps should be 1 (only step_b remains)' +); + +-- Test 6: 2 step:skipped events (step_a and step_c) +select is( + (select count(*) from realtime.messages + where payload->>'event_type' = 'step:skipped' + and payload->>'run_id' = (select run_id::text from run_ids)), + 2::bigint, + 'Should send 2 step:skipped events (step_a and step_c)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql new file mode 100644 index 000000000..2df25cbd5 --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/single_step_skip.test.sql @@ -0,0 +1,69 @@ +-- Test: cascade_skip_steps - Single step skip (base case) +-- Verifies the function can skip a single step without dependencies +begin; +select plan(5); + +-- Reset database and create a simple flow with no dependencies +select pgflow_tests.reset_db(); +select pgflow.create_flow('simple_flow'); +select pgflow.add_step('simple_flow', 'step_a'); +select pgflow.add_step('simple_flow', 'step_b'); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('simple_flow', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Test 1: Verify step_a starts in 'started' status (root steps auto-start) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'started', + 'step_a should start in started status (root step auto-starts)' +); + +-- Skip step_a +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Test 2: step_a should now have status 'skipped' +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'skipped', + 'step_a should be skipped after cascade_skip_steps' +); + +-- Test 3: step_a should have skip_reason set +select is( + (select skip_reason from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_a'), + 'condition_unmet', + 'step_a should have skip_reason = condition_unmet' +); + +-- Test 4: step_b should remain unaffected (still started, independent root step) +select is( + (select status from pgflow.step_states + where run_id = (select run_id from run_ids) and step_slug = 'step_b'), + 'started', + 'step_b (independent step) should remain in started status' +); + +-- Test 5: remaining_steps on run should be decremented by 1 +select is( + (select remaining_steps from pgflow.runs + where run_id = (select run_id from run_ids)), + 1::int, + 'remaining_steps should be decremented by 1 (was 2, now 1)' +); + +-- Clean up +drop table if exists run_ids; + +select finish(); +rollback; diff --git a/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql b/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql new file mode 100644 index 000000000..6b546928f --- /dev/null +++ b/pkgs/core/supabase/tests/cascade_skip_steps/skipped_event_payload.test.sql @@ -0,0 +1,88 @@ +-- Test: cascade_skip_steps - step:skipped event payload format +-- Verifies the realtime event contains all required fields +begin; +select plan(8); + +-- Reset database and create a simple flow +select pgflow_tests.reset_db(); +select pgflow.create_flow('event_test'); +select pgflow.add_step('event_test', 'step_a'); + +-- Start flow +with flow as ( + select * from pgflow.start_flow('event_test', '{}'::jsonb) +) +select run_id into temporary run_ids from flow; + +-- Skip step_a +select pgflow.cascade_skip_steps( + (select run_id from run_ids), + 'step_a', + 'condition_unmet' +); + +-- Get the event for assertions +select * into temporary skip_event +from pgflow_tests.get_realtime_message('step:skipped', (select run_id from run_ids), 'step_a'); + +-- Test 1: Event type should be step:skipped +select is( + (select payload->>'event_type' from skip_event), + 'step:skipped', + 'Event type should be step:skipped' +); + +-- Test 2: step_slug should be in payload +select is( + (select payload->>'step_slug' from skip_event), + 'step_a', + 'Payload should contain step_slug' +); + +-- Test 3: flow_slug should be in payload +select is( + (select payload->>'flow_slug' from skip_event), + 'event_test', + 'Payload should contain flow_slug' +); + +-- Test 4: run_id should be in payload +select is( + (select payload->>'run_id' from skip_event), + (select run_id::text from run_ids), + 'Payload should contain run_id' +); + +-- Test 5: status should be skipped +select is( + (select payload->>'status' from skip_event), + 'skipped', + 'Payload status should be skipped' +); + +-- Test 6: skip_reason should be in payload +select is( + (select payload->>'skip_reason' from skip_event), + 'condition_unmet', + 'Payload should contain skip_reason' +); + +-- Test 7: skipped_at timestamp should be present +select ok( + (select (payload->>'skipped_at')::timestamptz is not null from skip_event), + 'Payload should include skipped_at timestamp' +); + +-- Test 8: Event name format should be step::skipped +select is( + (select event from skip_event), + 'step:step_a:skipped', + 'Event name should be step::skipped' +); + +-- Clean up +drop table if exists run_ids; +drop table if exists skip_event; + +select finish(); +rollback;