Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions share-api/src/Share/BackgroundJobs/Diffs/CausalDiffs.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import Share.Web.Authorization qualified as AuthZ
import Share.Web.Errors (EntityMissing (..))
import Share.Web.Share.Diffs.Impl qualified as Diffs
import System.Clock qualified as Clock
import UnliftIO qualified

-- | Check every 10 minutes if we haven't heard on the notifications channel.
-- Just in case we missed a notification.
Expand All @@ -50,19 +51,32 @@ processDiffs authZReceipt unisonRuntime = do

-- | Process a diff, then return whether or not we did any work.
processDiff :: AuthZ.AuthZReceipt -> CR.UnisonRuntime -> Background Bool
processDiff authZReceipt unisonRuntime = do
result <- Trace.withSpan "background:causal-diffs:process-diff" mempty $
PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do
DQ.claimCausalDiff >>= \case
Nothing -> pure Nothing
Just causalDiffInfo -> withTags (causalDiffTags causalDiffInfo) do
startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic)
result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt unisonRuntime causalDiffInfo)
DQ.deleteClaimedCausalDiff causalDiffInfo
pure (Just (causalDiffInfo, startTime, result))
processDiff authZReceipt unisonRuntime = Trace.withSpan "background:causal-diffs:process-diff" mempty $ do
pendingCausalDiffVar <- liftIO $ UnliftIO.newEmptyMVar
result <- UnliftIO.tryAny $ PG.runTransactionMode PG.RepeatableRead PG.ReadWrite do
DQ.claimCausalDiff >>= \case
Nothing -> pure Nothing
Just causalDiffInfo -> withTags (causalDiffTags causalDiffInfo) do
PG.transactionUnsafeIO $ UnliftIO.tryPutMVar pendingCausalDiffVar causalDiffInfo
startTime <- PG.transactionUnsafeIO (Clock.getTime Clock.Monotonic)
result <- PG.catchTransaction (maybeComputeAndStoreCausalDiff authZReceipt unisonRuntime causalDiffInfo)
DQ.deleteClaimedCausalDiff causalDiffInfo
pure (Just (causalDiffInfo, startTime, result))
case result of
Nothing -> pure False
Just (cdi, startTime, result) -> do
-- The transaction failed with an exception.
-- One possible cause is an unknown builtin.
-- We should report it and mark it as invalid so we don't keep retrying it.
Left err -> do
mCausalDiffInfo <- liftIO $ UnliftIO.tryTakeMVar pendingCausalDiffVar
case mCausalDiffInfo of
Nothing -> pure ()
Just cdi -> withTags (causalDiffTags cdi) do
reportError err
PG.runTransaction $ DQ.markCausalDiffInvalid (tShow err) cdi
-- Continue processing other diffs.
pure True
Right Nothing -> pure False
Right (Just (cdi, startTime, result)) -> do
let tags = causalDiffTags cdi
withTags tags do
case result of
Expand Down
14 changes: 14 additions & 0 deletions share-api/src/Share/BackgroundJobs/Diffs/Queries.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module Share.BackgroundJobs.Diffs.Queries
submitCausalsToBeDiffed,
claimCausalDiff,
deleteClaimedCausalDiff,
markCausalDiffInvalid,
)
where

Expand Down Expand Up @@ -84,12 +85,25 @@ claimCausalDiff = do
[sql|
SELECT from_causal_id, to_causal_id, from_codebase_owner, to_codebase_owner
FROM causal_diff_queue
WHERE error IS NULL
ORDER BY created_at ASC
LIMIT 1
-- Skip any that are being synced by other workers.
FOR UPDATE SKIP LOCKED
|]

markCausalDiffInvalid :: Text -> CausalDiffInfo -> Transaction e ()
markCausalDiffInvalid err CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner} =
execute_
[sql|
UPDATE causal_diff_queue
SET error = #{err}
WHERE causal_diff_queue.from_causal_id = #{fromCausalId}
AND causal_diff_queue.to_causal_id = #{toCausalId}
AND causal_diff_queue.from_codebase_owner = #{fromCodebaseOwner}
AND causal_diff_queue.to_codebase_owner = #{toCodebaseOwner}
|]

deleteClaimedCausalDiff :: CausalDiffInfo -> Transaction e ()
deleteClaimedCausalDiff CausalDiffInfo {fromCausalId, toCausalId, fromCodebaseOwner, toCodebaseOwner} =
execute_
Expand Down
7 changes: 7 additions & 0 deletions sql/2026-01-09_causal-diff-queue-deadletter.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add nullable error column to causal_diff_queue table
ALTER TABLE causal_diff_queue
ADD COLUMN error TEXT;

CREATE INDEX IF NOT EXISTS idx_causal_diff_queue_processing_order
ON causal_diff_queue (created_at ASC)
WHERE error IS NULL;
Loading