diff --git a/share-api/src/Share/BackgroundJobs/Diffs/CausalDiffs.hs b/share-api/src/Share/BackgroundJobs/Diffs/CausalDiffs.hs index 25439b2d..54cd7e39 100644 --- a/share-api/src/Share/BackgroundJobs/Diffs/CausalDiffs.hs +++ b/share-api/src/Share/BackgroundJobs/Diffs/CausalDiffs.hs @@ -25,6 +25,7 @@ import Share.Web.Authorization qualified as AuthZ import Share.Web.Errors (EntityMissing (..)) import Share.Web.Share.Diffs.Impl qualified as Diffs import System.Clock qualified as Clock +import UnliftIO qualified -- | Check every 10 minutes if we haven't heard on the notifications channel. -- Just in case we missed a notification. @@ -50,19 +51,32 @@ processDiffs authZReceipt unisonRuntime = do -- | Process a diff, then return whether or not we did any work. processDiff :: AuthZ.AuthZReceipt -> CR.UnisonRuntime -> Background Bool -processDiff authZReceipt unisonRuntime = do - result <- Trace.withSpan "background:causal-diffs:process-diff" mempty $ - PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do - DQ.claimCausalDiff >>= \case - Nothing -> pure Nothing - Just causalDiffInfo -> withTags (causalDiffTags causalDiffInfo) do - startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic) - result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt unisonRuntime causalDiffInfo) - DQ.deleteClaimedCausalDiff causalDiffInfo - pure (Just (causalDiffInfo, startTime, result)) +processDiff authZReceipt unisonRuntime = Trace.withSpan "background:causal-diffs:process-diff" mempty $ do + pendingCausalDiffVar <- liftIO $ UnliftIO.newEmptyMVar + result <- UnliftIO.tryAny $ PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do + DQ.claimCausalDiff >>= \case + Nothing -> pure Nothing + Just causalDiffInfo -> withTags (causalDiffTags causalDiffInfo) do + PG.transactionUnsafeIO $ UnliftIO.tryPutMVar pendingCausalDiffVar causalDiffInfo + startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic) + result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt unisonRuntime causalDiffInfo) + DQ.deleteClaimedCausalDiff causalDiffInfo + pure (Just (causalDiffInfo, startTime, result)) case result of - Nothing -> pure False - Just (cdi, startTime, result) -> do + -- The transaction failed with an exception. + -- One possible cause is an unknown builtin. + -- We should report it and mark it as invalid so we don't keep retrying it. + Left err -> do + mCausalDiffInfo <- liftIO $ UnliftIO.tryTakeMVar pendingCausalDiffVar + case mCausalDiffInfo of + Nothing -> pure () + Just cdi -> withTags (causalDiffTags cdi) do + reportError err + PG.runTransaction $ DQ.markCausalDiffInvalid (tShow err) cdi + -- Continue processing other diffs. + pure True + Right Nothing -> pure False + Right (Just (cdi, startTime, result)) -> do let tags = causalDiffTags cdi withTags tags do case result of diff --git a/share-api/src/Share/BackgroundJobs/Diffs/Queries.hs b/share-api/src/Share/BackgroundJobs/Diffs/Queries.hs index be30dc35..0ffbb325 100644 --- a/share-api/src/Share/BackgroundJobs/Diffs/Queries.hs +++ b/share-api/src/Share/BackgroundJobs/Diffs/Queries.hs @@ -3,6 +3,7 @@ module Share.BackgroundJobs.Diffs.Queries submitCausalsToBeDiffed, claimCausalDiff, deleteClaimedCausalDiff, + markCausalDiffInvalid, ) where @@ -84,12 +85,25 @@ claimCausalDiff = do [sql| SELECT from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner FROM causal_diff_queue + WHERE error IS NULL ORDER BY created_at ASC LIMIT 1 -- Skip any that are being synced by other workers. FOR UPDATE SKIP LOCKED |] +markCausalDiffInvalid :: Text -> CausalDiffInfo -> Transaction e () +markCausalDiffInvalid err CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner} = + execute_ + [sql| + UPDATE causal_diff_queue + SET error = #{err} + WHERE causal_diff_queue.from_causal_id = #{fromCausalId} + AND causal_diff_queue.to_causal_id = #{toCausalId} + AND causal_diff_queue.from_codebase_owner = #{fromCodebaseOwner} + AND causal_diff_queue.to_codebase_owner = #{toCodebaseOwner} + |] + deleteClaimedCausalDiff :: CausalDiffInfo -> Transaction e () deleteClaimedCausalDiff CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner} = execute_ diff --git a/sql/2026-01-09_causal-diff-queue-deadletter.sql b/sql/2026-01-09_causal-diff-queue-deadletter.sql new file mode 100644 index 00000000..6d357d52 --- /dev/null +++ b/sql/2026-01-09_causal-diff-queue-deadletter.sql @@ -0,0 +1,7 @@ +-- Add nullable error column to causal_diff_queue table +ALTER TABLE causal_diff_queue + ADD COLUMN error TEXT; + +CREATE INDEX IF NOT EXISTS idx_causal_diff_queue_processing_order + ON causal_diff_queue (created_at ASC) + WHERE error IS NULL;