From a7c418103c4e9f362dcf4fac12d025d1d68918a8 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 8 Jan 2026 11:56:05 -0800 Subject: [PATCH 01/14] Update sync impl to handle history comments and revisions separately --- .../src/Share/Web/UCM/HistoryComments/Impl.hs | 36 +++++++++++++------ .../Share/Web/UCM/HistoryComments/Queries.hs | 25 ++++++++++--- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index 4a86f7f4..e904b031 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -21,7 +21,6 @@ import Share.Web.Authorization qualified as AuthZ import Share.Web.Errors (Unimplemented (Unimplemented), reportError, respondError) import Share.Web.UCM.HistoryComments.Queries qualified as Q import Unison.Debug qualified as Debug -import Unison.Hash32 (Hash32) import Unison.Server.HistoryComments.API qualified as HistoryComments import Unison.Server.HistoryComments.Types (HistoryCommentDownloaderChunk (..), HistoryCommentUploaderChunk (..), UploadCommentsResponse (..)) import Unison.Server.HistoryComments.Types qualified as Sync @@ -39,8 +38,8 @@ server mayCaller = wsMessageBufferSize :: Int wsMessageBufferSize = 100 -downloadHistoryCommentsStreamImpl :: Maybe UserId -> Connection -> WebApp () -downloadHistoryCommentsStreamImpl _mayUserId _conn = do +downloadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp () +downloadHistoryCommentsStreamImpl _mayUserId _branchRef _conn = do _ <- error "AUTH CHECK HERE" respondError Unimplemented @@ -81,12 +80,12 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = lift (AuthZ.checkUploadToProjectBranchCodebase callerUserId project.projectId (user_id <$> contributorUser)) >>= \case Left _authErr -> handleErrInQueue q (UploadCommentsNotAuthorized br) Right authZ -> pure authZ - hashesToCheckQ <- liftIO $ newTBMQueueIO 100 + hashesToCheckQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 commentsQ <- liftIO $ newTBMQueueIO 100 errMVar <- liftIO newEmptyTMVarIO _receiverThread <- lift $ Ki.fork scope $ receiverWorker receive errMVar hashesToCheckQ commentsQ inserterThread <- lift $ Ki.fork scope $ inserterWorker authZ commentsQ project.projectId - _hashCheckingThread <- lift $ Ki.fork scope $ hashCheckingWorker send hashesToCheckQ + _hashCheckingThread <- lift $ Ki.fork scope $ hashCheckingWorker project.projectId send hashesToCheckQ Debug.debugLogM Debug.Temp "Upload history comments: waiting for inserter thread to finish" -- The inserter thread will finish when the client closes the connection. atomically $ Ki.await inserterThread @@ -111,16 +110,33 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = Debug.debugLogM Debug.Temp "Inserter worker finished" hashCheckingWorker :: + ProjectId -> (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool) -> - TBMQueue Hash32 -> + TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> WebApp () - hashCheckingWorker send hashesToCheckQ = do + hashCheckingWorker projectId send hashesToCheckQ = do let loop = do (hashes, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue hashesToCheckQ)) Debug.debugM Debug.Temp "Checking hashes chunk of size" (length hashes) PG.whenNonEmpty hashes $ do - unknownHashes <- PG.runTransaction $ do Q.filterForUnknownHistoryCommentHashes hashes - case NESet.nonEmptySet (Set.fromList unknownHashes) of + unknownCommentHashes <- fmap Set.fromList $ PG.runTransaction $ do + Q.filterForUnknownHistoryCommentHashes (Sync.unHistoryCommentHash32 . fst <$> hashes) + let (revisionHashesWeDefinitelyNeed, revisionHashesToCheck) = + hashes + -- Only check revisions for comments that are unknown + & foldMap \case + (commentHash, revisionHashes) + -- If the comment hash is unknown, we need _all_ its revisions, we + -- don't need to check them. + -- Otherwise, we need to check which revisions are unknown. + | Set.member (Sync.unHistoryCommentHash32 commentHash) unknownCommentHashes -> (revisionHashes, []) + | otherwise -> ([], revisionHashes) + unknownRevsFiltered <- PG.runTransaction $ Q.filterForUnknownHistoryCommentRevisionHashes projectId (Sync.unHistoryCommentRevisionHash32 <$> revisionHashesToCheck) + let allNeededHashes = + (Set.map (Left . Sync.HistoryCommentHash32) unknownCommentHashes) + <> (Set.fromList . fmap Right $ revisionHashesWeDefinitelyNeed) + <> (Set.fromList (Right . Sync.HistoryCommentRevisionHash32 <$> unknownRevsFiltered)) + case NESet.nonEmptySet allNeededHashes of Nothing -> pure () Just unknownHashesSet -> do void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet @@ -128,7 +144,7 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = loop void . atomically $ send $ Msg $ DoneCheckingHashesChunk Debug.debugLogM Debug.Temp "Hash checking worker finished" - receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk)) -> TMVar Text -> TBMQueue Hash32 -> TBMQueue (Either Sync.HistoryComment Sync.HistoryCommentRevision) -> WebApp () + receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk)) -> TMVar Text -> TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> TBMQueue (Either Sync.HistoryComment Sync.HistoryCommentRevision) -> WebApp () receiverWorker recv errMVar hashesToCheckQ commentsQ = do let loop = do next <- atomically do diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs index 9c648985..7ff3ecd4 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -5,6 +5,7 @@ module Share.Web.UCM.HistoryComments.Queries ( fetchProjectBranchCommentsSince, insertHistoryComments, filterForUnknownHistoryCommentHashes, + filterForUnknownHistoryCommentRevisionHashes, ) where @@ -203,10 +204,26 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do ) filterForUnknownHistoryCommentHashes :: (PG.QueryA m) => [Hash32] -> m [Hash32] -filterForUnknownHistoryCommentHashes hashes = do - -- error "TODO: Check whether they're in the project as well." +filterForUnknownHistoryCommentHashes commentHashes = do PG.queryListCol [PG.sql| - SELECT hash FROM ^{PG.singleColumnTable hashes} AS t(hash) - WHERE hash NOT IN (SELECT comment_hash FROM history_comments) + SELECT hash FROM ^{PG.singleColumnTable commentHashes} AS t(hash) + WHERE NOT EXISTS ( + SELECT FROM history_comments hc + WHERE hc.comment_hash = t.hash + ) + |] + +filterForUnknownHistoryCommentRevisionHashes :: (PG.QueryA m) => ProjectId -> [Hash32] -> m [Hash32] +filterForUnknownHistoryCommentRevisionHashes projectId revisionHashes = do + PG.queryListCol + [PG.sql| + SELECT hash FROM ^{PG.singleColumnTable revisionHashes} AS t(hash) + WHERE NOT EXISTS ( + SELECT FROM history_comment_revisions_project_discovery hcrpd + JOIN history_comment_revisions hcr + ON hcrpd.history_comment_revision_id = hcr.id + WHERE hcrpd.project_id = #{projectId} + AND hcr.revision_hash = t.hash + ) |] From d7493637fa92e0876acd3a87b6bf6cbe13a12257 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 13 Jan 2026 13:56:42 -0800 Subject: [PATCH 02/14] Fix a bunch of comment insertion errors --- .../src/Share/Web/UCM/HistoryComments/Impl.hs | 1 + .../Share/Web/UCM/HistoryComments/Queries.hs | 38 +++++++++---------- sql/2025-11-20_history-comments.sql | 2 +- 3 files changed, 21 insertions(+), 20 deletions(-) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index e904b031..9691138d 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -89,6 +89,7 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = Debug.debugLogM Debug.Temp "Upload history comments: waiting for inserter thread to finish" -- The inserter thread will finish when the client closes the connection. atomically $ Ki.await inserterThread + Debug.debugLogM Debug.Temp "Done. Closing connection." case result of Left err -> reportError err Right (Left err, _leftovers) -> reportError err diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs index 7ff3ecd4..b9bcbb77 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -36,9 +36,9 @@ fetchProjectBranchCommentsSince !_authZ projectId causalId sinceTime = do SELECT hcr.id, hc.id, hcr.subject, hcr.content, hcr.created_at_ms, hcr.is_hidden, hcr.revision_hash, hc.comment_hash, hcr.author_signature history_comment_revisions_project_discovery pd JOIN history_comment_revisions hcr - ON pd.history_comment_revision_id = hcr.id + ON pd.comment_revision_id = hcr.id JOIN history_comments hc - ON hcr.history_comment_id = hc.id + ON hcr.comment_id = hc.id WHERE pd.project_id = #{projectId} AND pd.discovered_at > #{sinceTime} @@ -99,7 +99,7 @@ utcTimeToMillis utcTime = & round insertHistoryComments :: AuthZReceipt -> ProjectId -> [Either HistoryComment HistoryCommentRevision] -> PG.Transaction e () -insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do +insertHistoryComments !_authZ projectId chunks = do let thumbprints = NESet.nonEmptySet $ Set.fromList (comments <&> \HistoryComment {authorThumbprint} -> authorThumbprint) for thumbprints insertThumbprints whenNonEmpty comments $ insertHistoryComments comments @@ -111,7 +111,7 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do chunks & foldMap \case Left comment -> ([comment], []) Right revision -> ([], [revision]) - insertHistoryComments :: [HistoryComment] -> PG.Pipeline e () + insertHistoryComments :: (PG.QueryA m) => [HistoryComment] -> m () insertHistoryComments comments = do PG.execute_ [PG.sql| @@ -138,13 +138,13 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do commentHash ) - insertRevisions :: [HistoryCommentRevision] -> PG.Pipeline e () + insertRevisions :: (PG.QueryA m) => [HistoryCommentRevision] -> m () insertRevisions revs = do let doRevs = PG.execute_ [PG.sql| - WITH new_revisions(subject, content, created_at_ms, hidden, author_signature, revision_hash, comment_hash) AS ( - VALUES ^{PG.toTable revsTable} + WITH new_revisions(subject, contents, created_at_ms, hidden, author_signature, revision_hash, comment_hash) AS ( + SELECT * FROM ^{PG.toTable revsTable} ) INSERT INTO history_comment_revisions(comment_id, subject, contents, created_at_ms, hidden, author_signature, revision_hash) SELECT hc.id, nr.subject, nr.contents, nr.created_at_ms, nr.hidden, nr.author_signature, nr.revision_hash @@ -157,12 +157,12 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do PG.execute_ [PG.sql| WITH new_discoveries(revision_hash) AS ( - VALUES ^{PG.singleColumnTable revHashTable} + SELECT * FROM ^{PG.singleColumnTable revHashTable} ) - INSERT INTO history_comment_revisions_project_discovery(project_id, history_comment_revision_id) + INSERT INTO history_comment_revisions_project_discovery(project_id, comment_revision_id) SELECT #{projectId}, hcr.id FROM new_discoveries nd - JOIN history_comments hcr + JOIN history_comment_revisions hcr ON hcr.revision_hash = nd.revision_hash ON CONFLICT DO NOTHING |] @@ -179,28 +179,28 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do commentHash ) revHashTable = revs <&> \HistoryCommentRevision {..} -> (revisionHash) - insertDiscoveryInfo :: [HistoryCommentRevision] -> PG.Pipeline e () + insertDiscoveryInfo :: (PG.QueryA m) => [HistoryCommentRevision] -> m () insertDiscoveryInfo revs = do PG.execute_ [PG.sql| - WITH new_discoveries(project_id, history_comment_hash, discovered_at) AS ( - VALUES ^{PG.toTable discoveryTable} + WITH new_discoveries(project_id, history_comment_hash) AS ( + SELECT * FROM ^{PG.toTable discoveryTable} ) - INSERT INTO history_comment_revisions_project_discovery(project_id, history_comment_revision_id, discovered_at) - SELECT #{projectId}, hcr.id, nd.discovered_at + INSERT INTO history_comment_revisions_project_discovery(project_id, comment_revision_id) + SELECT #{projectId}, hcr.id FROM new_discoveries nd JOIN history_comments hc ON hc.comment_hash = nd.history_comment_hash JOIN history_comment_revisions hcr - ON hcr.history_comment_id = hc.id + ON hcr.comment_id = hc.id ON CONFLICT DO NOTHING |] where + discoveryTable :: [(ProjectId, Hash32)] discoveryTable = revs <&> \HistoryCommentRevision {..} -> ( projectId, - commentHash, - utcTimeToMillis createdAt + commentHash ) filterForUnknownHistoryCommentHashes :: (PG.QueryA m) => [Hash32] -> m [Hash32] @@ -222,7 +222,7 @@ filterForUnknownHistoryCommentRevisionHashes projectId revisionHashes = do WHERE NOT EXISTS ( SELECT FROM history_comment_revisions_project_discovery hcrpd JOIN history_comment_revisions hcr - ON hcrpd.history_comment_revision_id = hcr.id + ON hcrpd.comment_revision_id = hcr.id WHERE hcrpd.project_id = #{projectId} AND hcr.revision_hash = t.hash ) diff --git a/sql/2025-11-20_history-comments.sql b/sql/2025-11-20_history-comments.sql index a70d50ed..12e76d08 100644 --- a/sql/2025-11-20_history-comments.sql +++ b/sql/2025-11-20_history-comments.sql @@ -40,7 +40,7 @@ CREATE TABLE history_comments ( CREATE INDEX idx_history_comments_causal_id ON history_comments(causal_id); CREATE TABLE history_comment_revisions ( - id INTEGER PRIMARY KEY, + id SERIAL PRIMARY KEY, comment_id INTEGER NOT NULL REFERENCES history_comments(id), subject TEXT NOT NULL, contents TEXT NOT NULL, From 42732ef005428ee1a9b21e522a9d089f7129a4d8 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 13 Jan 2026 14:29:15 -0800 Subject: [PATCH 03/14] Replace old download auth checks with specific HashJWT overrides --- share-api/src/Share/Web/Authorization.hs | 17 ++++++++++------- share-api/src/Share/Web/UCM/Sync/Impl.hs | 6 +++--- share-api/src/Share/Web/UCM/SyncV2/Impl.hs | 4 ++-- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/share-api/src/Share/Web/Authorization.hs b/share-api/src/Share/Web/Authorization.hs index fd489671..7ec7c426 100644 --- a/share-api/src/Share/Web/Authorization.hs +++ b/share-api/src/Share/Web/Authorization.hs @@ -43,7 +43,7 @@ module Share.Web.Authorization checkUploadToUserCodebase, checkUploadToProjectBranchCodebase, checkUserUpdate, - checkDownloadFromUserCodebase, + hashJWTAuthOverride, checkDownloadFromProjectBranchCodebase, checkCreateOrg, checkReadOrgRolesList, @@ -389,17 +389,20 @@ checkUploadToUserCodebase reqUserId codebaseOwnerUserId = maybePermissionFailure assertUsersEqual reqUserId codebaseOwnerUserId pure $ AuthZ.UnsafeAuthZReceipt Nothing --- | The download endpoint currently does all of its own auth using HashJWTs, +-- | The download endpoints currently do all of its own auth using HashJWTs, -- So we don't add any other authz checks here, the HashJWT check is sufficient. -checkDownloadFromUserCodebase :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) -checkDownloadFromUserCodebase = +hashJWTAuthOverride :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) +hashJWTAuthOverride = pure . Right $ AuthZ.UnsafeAuthZReceipt Nothing -- | The download endpoint currently does all of its own auth using HashJWTs, -- So we don't add any other authz checks here, the HashJWT check is sufficient. -checkDownloadFromProjectBranchCodebase :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) -checkDownloadFromProjectBranchCodebase = - pure . Right $ AuthZ.UnsafeAuthZReceipt Nothing +checkDownloadFromProjectBranchCodebase :: Maybe UserId -> ProjectId -> WebApp (Either AuthZFailure AuthZ.AuthZReceipt) +checkDownloadFromProjectBranchCodebase reqUserId projectId = + mapLeft (const authzError) <$> do + checkProjectGet reqUserId projectId + where + authzError = AuthZFailure $ (ProjectPermission (ProjectBranchBrowse projectId)) checkProjectCreate :: UserId -> UserId -> WebApp (Either AuthZFailure AuthZ.AuthZReceipt) checkProjectCreate reqUserId targetUserId = maybePermissionFailure (ProjectPermission (ProjectCreate targetUserId)) $ do diff --git a/share-api/src/Share/Web/UCM/Sync/Impl.hs b/share-api/src/Share/Web/UCM/Sync/Impl.hs index e214134c..2bbedf1a 100644 --- a/share-api/src/Share/Web/UCM/Sync/Impl.hs +++ b/share-api/src/Share/Web/UCM/Sync/Impl.hs @@ -149,7 +149,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h Left err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesInvalidRepoInfo err repoInfo) Right (RepoInfoUser userHandle) -> do User {user_id = repoOwnerUserId} <- lift (PG.runTransaction (UserQ.userByHandle userHandle)) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesUserNotFound $ IDs.toText @UserHandle userHandle) - authZToken <- lift AuthZ.checkDownloadFromUserCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForUserCodebase repoOwnerUserId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (RepoInfoProjectBranch ProjectBranchShortHand {userHandle, projectSlug, contributorHandle}) -> do @@ -158,7 +158,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (UserQ.userByHandle ch) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesUserNotFound $ IDs.toText @UserHandle ch) pure (project, mayContributorUserId) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (RepoInfoProjectRelease ProjectReleaseShortHand {userHandle, projectSlug}) -> do @@ -166,7 +166,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) pure (project, Nothing) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Env.Env {maxParallelismPerDownloadRequest} <- ask diff --git a/share-api/src/Share/Web/UCM/SyncV2/Impl.hs b/share-api/src/Share/Web/UCM/SyncV2/Impl.hs index 161376c4..1fba40ce 100644 --- a/share-api/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/share-api/src/Share/Web/UCM/SyncV2/Impl.hs @@ -146,7 +146,7 @@ codebaseForBranchRef branchRef = do (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound $ projectShortHand) pure (project, Nothing) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do @@ -155,6 +155,6 @@ codebaseForBranchRef branchRef = do project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound projectShortHand) mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (UserQ.userByHandle ch) `whenNothingM` throwError (CodebaseLoadingErrorUserNotFound ch) pure (project, mayContributorUserId) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc From 575e1d67e4cf468eb7f5f0a64f2747141e4594ae Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 13 Jan 2026 14:29:15 -0800 Subject: [PATCH 04/14] Add cursor over history comments query functions --- share-api/src/Share/Postgres/Orphans.hs | 7 ++- .../Share/Web/UCM/HistoryComments/Queries.hs | 55 ++++++++----------- 2 files changed, 28 insertions(+), 34 deletions(-) diff --git a/share-api/src/Share/Postgres/Orphans.hs b/share-api/src/Share/Postgres/Orphans.hs index d9e38bd0..0f427908 100644 --- a/share-api/src/Share/Postgres/Orphans.hs +++ b/share-api/src/Share/Postgres/Orphans.hs @@ -33,10 +33,11 @@ import U.Codebase.TermEdit qualified as TermEdit import U.Util.Base32Hex qualified as Base32Hex import Unison.Hash (Hash) import Unison.Hash qualified as Hash -import Unison.Hash32 (Hash32) +import Unison.Hash32 (Hash32 (..)) import Unison.Hash32 qualified as Hash32 import Unison.Name (Name) import Unison.NameSegment.Internal (NameSegment (..)) +import Unison.Server.HistoryComments.Types import Unison.SyncV2.Types (CBORBytes (..)) import Unison.Syntax.Name qualified as Name import UnliftIO (MonadUnliftIO (..)) @@ -103,6 +104,10 @@ deriving via Hash instance FromHttpApiData ComponentHash deriving via Hash instance ToHttpApiData ComponentHash +deriving via Hash32 instance Hasql.DecodeValue HistoryCommentHash32 + +deriving via Hash32 instance Hasql.DecodeValue HistoryCommentRevisionHash32 + deriving via Text instance Hasql.DecodeValue NameSegment deriving via Text instance Hasql.EncodeValue NameSegment diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs index b9bcbb77..7ecba667 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -2,7 +2,7 @@ {-# LANGUAGE RecordWildCards #-} module Share.Web.UCM.HistoryComments.Queries - ( fetchProjectBranchCommentsSince, + ( projectBranchCommentsCursor, insertHistoryComments, filterForUnknownHistoryCommentHashes, filterForUnknownHistoryCommentRevisionHashes, @@ -27,48 +27,37 @@ import Share.Web.Authorization (AuthZReceipt) import Unison.Hash32 (Hash32) import Unison.Server.HistoryComments.Types -fetchProjectBranchCommentsSince :: AuthZReceipt -> ProjectId -> CausalId -> UTCTime -> PG.Transaction e (PGCursor HistoryCommentUploaderChunk) -fetchProjectBranchCommentsSince !_authZ projectId causalId sinceTime = do - PG.newRowCursor @(Bool, Maybe Text, Maybe Text, Maybe Int64, Maybe Bool, Maybe ByteString, Maybe Hash32, Maybe Hash32, Maybe Text, Maybe Int64, Maybe Text, Maybe Hash32, Maybe Hash32) - "fetchProjectBranchCommentsSince" +projectBranchCommentsCursor :: AuthZReceipt -> CausalId -> PG.Transaction e (PGCursor (HistoryCommentHash32, HistoryCommentRevisionHash32)) +projectBranchCommentsCursor !_authZ causalId = do + PG.newRowCursor @(HistoryCommentHash32, HistoryCommentRevisionHash32) + "projectBranchCommentsCursor" [PG.sql| - WITH revisions(id, comment_id, subject, contents, created_at_ms, hidden, revision_hash, comment_hash, author_signature) AS ( + WITH history(causal_Id) AS ( + SELECT causal_id FROM causal_history(#{causalId}) + ), revisions(id, comment_id, subject, contents, created_at_ms, hidden, revision_hash, comment_hash, author_signature) AS ( SELECT hcr.id, hc.id, hcr.subject, hcr.content, hcr.created_at_ms, hcr.is_hidden, hcr.revision_hash, hc.comment_hash, hcr.author_signature - history_comment_revisions_project_discovery pd - JOIN history_comment_revisions hcr - ON pd.comment_revision_id = hcr.id + FROM history JOIN history_comments hc + ON hc.causal_id = history.causal_id + JOIN history_comment_revisions hcr ON hcr.comment_id = hc.id WHERE - pd.project_id = #{projectId} - AND pd.discovered_at > #{sinceTime} - AND hc.causal_id IN (SELECT causal_id FROM causal_history(#{causalId})) - ) (SELECT true, NULL, NULL, NULL, NULL, NULL, NULL, NULL, hc.author, hc.created_at_ms, key.thumbprint, causal.hash, hc.comment_hash - FROM revisions rev + hc.causal_id IN () + ) SELECT hc.comment_hash, hcr.revision_hash + FROM history JOIN history_comments hc - ON revisions.comment_id = hc.id + ON hc.causal_id = history.causal_id JOIN causals causal ON hc.causal_id = causal.id JOIN personal_keys key ON hc.author_key_id = key.id - ) - UNION ALL - -- Include ALL the base comments regardless of time, - -- the vast majority of the time we'll need them, it simplifies logic, - -- and the client can just ignore them if they already have them. - (SELECT DISTINCT ON (rev.comment_id) - false, rev.subject, rev.content, rev.created_at_ms, rev.is_hidden, rev.author_signature, rev.revision_hash, rev.comment_hash, NULL, NULL, NULL, NULL, NULL - FROM revisions rev + JOIN LATERAL ( + SELECT * FROM revisions rev + WHERE rev.comment_id = hc.id + ORDER BY rev.created_at_ms DESC + LIMIT 1 ) |] - <&> fmap \case - (True, Nothing, Nothing, Nothing, Nothing, Nothing, Nothing, Nothing, Just author, Just createdAtMs, Just authorThumbprint, Just causalHash, Just commentHash) -> - let createdAt = millisToUTCTime createdAtMs - in HistoryCommentChunk $ HistoryComment {..} - (False, Just subject, Just content, Just createdAtMs, Just isHidden, Just authorSignature, Just revisionHash, Just commentHash, Nothing, Nothing, Nothing, Nothing, Nothing) -> - let createdAt = millisToUTCTime createdAtMs - in HistoryCommentRevisionChunk $ HistoryCommentRevision {..} - row -> error $ "fetchProjectBranchCommentsSince: Unexpected row format: " <> show row insertThumbprints :: (PG.QueryA m) => NESet Text -> m () insertThumbprints thumbprints = do @@ -84,8 +73,8 @@ insertThumbprints thumbprints = do -- Convert milliseconds since epoch to UTCTime _exactly_. -- UTCTime has picosecond precision so this is lossless. -millisToUTCTime :: Int64 -> UTCTime -millisToUTCTime ms = +_millisToUTCTime :: Int64 -> UTCTime +_millisToUTCTime ms = toRational ms & (/ (1_000 :: Rational)) & fromRational From 64264a3b66e9bb3fb4d96dbb8c4345d69248d20d Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Tue, 13 Jan 2026 14:29:15 -0800 Subject: [PATCH 05/14] Write downloading queries --- share-api/src/Share/Postgres/Orphans.hs | 4 + .../src/Share/Web/UCM/HistoryComments/Impl.hs | 161 +++++++++++++++++- .../Share/Web/UCM/HistoryComments/Queries.hs | 71 ++++++++ 3 files changed, 228 insertions(+), 8 deletions(-) diff --git a/share-api/src/Share/Postgres/Orphans.hs b/share-api/src/Share/Postgres/Orphans.hs index 0f427908..a549d79d 100644 --- a/share-api/src/Share/Postgres/Orphans.hs +++ b/share-api/src/Share/Postgres/Orphans.hs @@ -106,8 +106,12 @@ deriving via Hash instance ToHttpApiData ComponentHash deriving via Hash32 instance Hasql.DecodeValue HistoryCommentHash32 +deriving via Hash32 instance Hasql.EncodeValue HistoryCommentHash32 + deriving via Hash32 instance Hasql.DecodeValue HistoryCommentRevisionHash32 +deriving via Hash32 instance Hasql.EncodeValue HistoryCommentRevisionHash32 + deriving via Text instance Hasql.DecodeValue NameSegment deriving via Text instance Hasql.EncodeValue NameSegment diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index 9691138d..5204cc9b 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -1,8 +1,11 @@ module Share.Web.UCM.HistoryComments.Impl (server) where import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue) +import Control.Lens import Control.Monad.Except import Control.Monad.Trans.Maybe +import Data.List.NonEmpty qualified as NEL +import Data.Monoid (Any (..)) import Data.Set qualified as Set import Data.Set.NonEmpty qualified as NESet import Ki.Unlifted qualified as Ki @@ -10,6 +13,7 @@ import Network.WebSockets.Connection import Share.IDs import Share.IDs qualified as IDs import Share.Postgres qualified as PG +import Share.Postgres.Cursors qualified as Cursor import Share.Postgres.Queries qualified as PGQ import Share.Postgres.Users.Queries qualified as UserQ import Share.Prelude @@ -18,7 +22,7 @@ import Share.User import Share.Web.App (WebApp, WebAppServer) import Share.Web.Authentication qualified as AuthN import Share.Web.Authorization qualified as AuthZ -import Share.Web.Errors (Unimplemented (Unimplemented), reportError, respondError) +import Share.Web.Errors (reportError) import Share.Web.UCM.HistoryComments.Queries qualified as Q import Unison.Debug qualified as Debug import Unison.Server.HistoryComments.API qualified as HistoryComments @@ -39,9 +43,131 @@ wsMessageBufferSize :: Int wsMessageBufferSize = 100 downloadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp () -downloadHistoryCommentsStreamImpl _mayUserId _branchRef _conn = do - _ <- error "AUTH CHECK HERE" - respondError Unimplemented +downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = do + result <- withQueues @(MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk) @(MsgOrError Void HistoryCommentDownloaderChunk) wsMessageBufferSize wsMessageBufferSize conn \q@(Queues {receive, send}) -> Ki.scoped \scope -> runExceptT $ do + projectBranchSH@ProjectBranchShortHand {userHandle, projectSlug, contributorHandle} <- case IDs.fromText @ProjectBranchShortHand branchRef of + Left err -> handleErrInQueue q (Sync.DownloadCommentsGenericFailure $ IDs.toText err) + Right pbsh -> pure pbsh + let projectSH = ProjectShortHand {userHandle, projectSlug} + mayInfo <- lift . runMaybeT $ mapMaybeT PG.runTransaction $ do + project <- MaybeT $ PGQ.projectByShortHand projectSH + branch <- MaybeT $ PGQ.branchByProjectBranchShortHand projectBranchSH + contributorUser <- for contributorHandle (MaybeT . UserQ.userByHandle) + pure (project, branch, contributorUser) + (project, branch, _contributorUser) <- maybe (handleErrInQueue q $ Sync.DownloadCommentsProjectBranchNotFound br) pure $ mayInfo + !authZ <- + lift (AuthZ.checkDownloadFromProjectBranchCodebase mayCallerUserId project.projectId) >>= \case + Left _authErr -> handleErrInQueue q (Sync.DownloadCommentsNotAuthorized br) + Right authZ -> pure authZ + commentHashesToSendQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 + commentsToSendQ <- liftIO $ newTBMQueueIO @(Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) 100 + -- Is filled when the server notifies us it's done requesting comments + doneRequestingCommentsMVar <- newEmptyTMVarIO + errMVar <- newEmptyTMVarIO + _ <- liftIO $ Ki.fork scope (hashNotifyWorker send commentHashesToSendQ) + senderThread <- liftIO $ Ki.fork scope (senderWorker send commentsToSendQ) + _ <- liftIO $ Ki.fork scope (receiverWorker receive commentsToSendQ errMVar doneRequestingCommentsMVar downloadableCommentsVar) + downloadableCommentsVar <- + liftIO $ newTVarIO @_ @(Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32)) Set.empty + PG.runTransaction $ do + cursor <- Q.projectBranchCommentsCursor authZ branch.causal + Cursor.foldBatched cursor 100 \hashes -> do + let (newHashes, chunks) = + hashes + & foldMap + ( \(commentHash, revisionHash) -> + ([Left commentHash, Right revisionHash], [(commentHash, [revisionHash])]) + ) + & first Set.fromList + atomically $ do + modifyTVar downloadableCommentsVar (Set.union newHashes) + for chunks \chunk -> writeTBMQueue commentHashesToSendQ (chunk) + -- Close the hashes queue to signal we don't have any more, then wait for the notifier to finish + atomically $ closeTBMQueue commentHashesToSendQ + -- Once the comment Hash queue is closed, eventually we'll send a DoneSendingHashesChunk message, + -- the server will respond with a DoneCheckingHashesChunk message after it's made all necessary + -- requests. + -- + -- Then we can close the comment upload hash queue to signal we won't get any more upload requests. + atomically $ readTMVar doneRequestingCommentsMVar >> closeTBMQueue commentHashesToSendQ + -- Now we just have to wait for the sender to finish sending all the comments we have queued up. + -- Once we've uploaded everything we can safely exit and the connection will be closed. + atomically $ Ki.await senderThread + case result of + Left err -> do + reportError err + Right (Left err, _leftovers {- Messages sent by server after we finished. -}) -> do + reportError err + Right (Right (), _leftovers {- Messages sent by server after we finished. -}) -> do + pure () + where + senderWorker :: + ( MsgOrError err HistoryCommentUploaderChunk -> + STM Bool + ) -> + TBMQueue (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) -> + IO () + senderWorker send commentsToSendQ = do + let loop = do + (hashesToSend, isClosed) <- atomically $ flushTBMQueue commentsToSendQ + -- Send comments first, then revisions + withComments <- Q.historyCommentsByHashOf (traversed . _Left) hashesToSend + withCommentsAndRevisions <- Q.historyCommentRevisionsByHashOf (traversed . _Right) withComments + for withCommentsAndRevisions \commentOrRevision -> atomically . send . Msg $ intoChunk commentOrRevision + guard (not isClosed) + loop + void . runMaybeT $ loop + + receiverWorker :: + STM (Maybe (MsgOrError err HistoryCommentDownloaderChunk)) -> + TBMQueue + ( Either + Sync.HistoryCommentHash32 + Sync.HistoryCommentRevisionHash32 + ) -> + TMVar Text -> + TMVar () -> + (TVar (Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32))) -> + IO () + receiverWorker receive toUploadQ errMVar doneRequestingCommentsMVar downloadableCommentsVar = do + let loop = do + msgOrError <- atomically receive + case msgOrError of + -- Channel closed, shut down + Nothing -> pure () + Just (Msg msg) -> case msg of + DoneCheckingHashesChunk -> do + -- Notify that the server is done requesting comments + atomically $ putTMVar doneRequestingCommentsMVar () + loop + RequestCommentsChunk comments -> do + atomically $ do + downloadableComments <- readTVar downloadableCommentsVar + let validComments = Set.intersection (NESet.toSet comments) downloadableComments + for_ validComments $ writeTBMQueue toUploadQ + loop + Just (DeserialiseFailure msg) -> do + atomically $ putTMVar errMVar $ "uploadHistoryComments: deserialisation failure: " <> msg + Just (UserErr err) -> do + atomically $ putTMVar errMVar $ "uploadHistoryComments: server error: " <> tShow err + loop + + hashNotifyWorker :: (MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk -> STM Bool) -> TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> IO () + hashNotifyWorker send q = do + let loop = do + isClosed <- atomically $ do + (hashesToCheck, isClosed) <- flushTBMQueue q + Any serverClosed <- + NEL.nonEmpty hashesToCheck & foldMapM \possiblyNewHashes -> do + Any <$> (send $ Msg $ PossiblyNewHashesChunk possiblyNewHashes) + pure (isClosed || serverClosed) + if isClosed + then do + -- If the queue is closed, send a DoneCheckingHashesChunk to notify the server we're done. + void . atomically $ send (Msg DoneSendingHashesChunk) + else loop + loop + intoChunk = either HistoryCommentChunk HistoryCommentRevisionChunk -- Re-run the given STM action at most n times, collecting the results into a list. -- If the action returns Nothing, stop and return what has been collected so far, along with a Bool indicating whether the action was exhausted. @@ -106,6 +232,7 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = PG.whenNonEmpty chunk do Debug.debugM Debug.Temp "Inserting comments chunk of size" (length chunk) PG.runTransaction $ Q.insertHistoryComments authZ projectId chunk + when closed $ Debug.debugLogM Debug.Temp "Inserter worker: comments queue closed" when (not closed) loop loop Debug.debugLogM Debug.Temp "Inserter worker finished" @@ -140,7 +267,9 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = case NESet.nonEmptySet allNeededHashes of Nothing -> pure () Just unknownHashesSet -> do + Debug.debugM Debug.Temp "Requesting unknown hashes" unknownHashesSet void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet + when closed $ Debug.debugLogM Debug.Temp "Hash checking worker: hashes queue closed" when (not closed) loop loop void . atomically $ send $ Msg $ DoneCheckingHashesChunk @@ -151,10 +280,12 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = next <- atomically do recv >>= \case Nothing -> do + Debug.debugLogM Debug.Temp "Receiver worker: connection closed" closeTBMQueue hashesToCheckQ closeTBMQueue commentsQ pure (pure ()) Just (DeserialiseFailure err) -> do + Debug.debugM Debug.Temp "Receiver worker: deserialisation failure" err putTMVar errMVar err pure (pure ()) Just (Msg msg) -> do @@ -172,7 +303,21 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = loop Debug.debugLogM Debug.Temp "Receiver worker finished" insertCommentBatchSize = 100 - handleErrInQueue :: forall o x e a. Queues (MsgOrError e a) o -> e -> ExceptT e WebApp x - handleErrInQueue Queues {send} e = do - _ <- atomically $ send $ UserErr e - throwError e + +handleErrInQueue :: forall o x e a. Queues (MsgOrError e a) o -> e -> ExceptT e WebApp x +handleErrInQueue Queues {send} e = do + _ <- atomically $ send $ UserErr e + throwError e + +-- Read all available values from a TBMQueue, returning them and whether the queue is closed. +flushTBMQueue :: TBMQueue a -> STM ([a], Bool) +flushTBMQueue q = do + optional (readTBMQueue q) >>= \case + -- No values available + Nothing -> empty + Just Nothing -> do + -- Queue closed + pure ([], True) + Just (Just v) -> do + (vs, closed) <- flushTBMQueue q <|> pure ([], False) + pure (v : vs, closed) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs index 7ecba667..eb76a6be 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -4,6 +4,8 @@ module Share.Web.UCM.HistoryComments.Queries ( projectBranchCommentsCursor, insertHistoryComments, + historyCommentsByHashOf, + historyCommentRevisionsByHashOf, filterForUnknownHistoryCommentHashes, filterForUnknownHistoryCommentRevisionHashes, ) @@ -23,6 +25,7 @@ import Share.Postgres.Cursors (PGCursor) import Share.Postgres.Cursors qualified as PG import Share.Postgres.IDs import Share.Prelude +import Share.Utils.Postgres (ordered) import Share.Web.Authorization (AuthZReceipt) import Unison.Hash32 (Hash32) import Unison.Server.HistoryComments.Types @@ -59,6 +62,74 @@ projectBranchCommentsCursor !_authZ causalId = do ) |] +historyCommentsByHashOf :: (PG.QueryA m) => Traversal s t HistoryCommentHash32 HistoryComment -> s -> m t +historyCommentsByHashOf trav s = do + s + & asListOf trav %%~ \hashes -> + PG.queryListRows + [PG.sql| + WITH hashes (hash, ord) AS ( + SELECT * FROM ^{PG.toTable $ ordered hashes} + ) SELECT hc.author, hc.created_at_ms, key.thumbprint, causal.hash AS causal_hash, hc.comment_hash + FROM hashes + JOIN history_comments hc + ON hc.comment_hash = hashes.hash + JOIN causals causal + ON hc.causal_id = causal.id + JOIN personal_keys key + ON hc.author_key_id = key.id + ORDER BY hashes.ord ASC + |] + <&> fmap + \( author, + createdAt, + authorThumbprint, + causalHash, + commentHash + ) -> + HistoryComment + { author, + createdAt, + authorThumbprint, + causalHash, + commentHash + } + +historyCommentRevisionsByHashOf :: (PG.QueryA m) => Traversal s t HistoryCommentRevisionHash32 HistoryCommentRevision -> s -> m t +historyCommentRevisionsByHashOf trav s = do + s + & asListOf trav %%~ \hashes -> do + PG.queryListRows + [PG.sql| + WITH hashes (hash, ord) AS ( + SELECT * FROM ^{PG.toTable $ ordered hashes} + ) SELECT hcr.subject, hcr.content, hcr.created_at_ms, hcr.is_hidden, hcr.author_signature, hcr.revision_hash, hc.comment_hash + FROM hashes + JOIN history_comment_revisions hcr + ON hcr.revision_hash = hashes.hash + JOIN history_comments hc + ON hcr.comment_id = hc.id + ORDER BY hashes.ord ASC + |] + <&> fmap + \( subject, + content, + createdAt, + isHidden, + authorSignature, + revisionHash, + commentHash + ) -> + HistoryCommentRevision + { subject, + content, + createdAt, + isHidden, + authorSignature, + revisionHash, + commentHash + } + insertThumbprints :: (PG.QueryA m) => NESet Text -> m () insertThumbprints thumbprints = do PG.execute_ From 2f34d47e15b77594eca80012a3d0140dc15ad5c2 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Wed, 14 Jan 2026 14:33:08 -0800 Subject: [PATCH 06/14] More clean up --- share-api/src/Share/Utils/Logging.hs | 14 +++++++- share-api/src/Share/Web/Errors.hs | 11 +++++- .../src/Share/Web/UCM/HistoryComments/Impl.hs | 36 ++++++++++--------- 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/share-api/src/Share/Utils/Logging.hs b/share-api/src/Share/Utils/Logging.hs index e13d0770..0d98cf39 100644 --- a/share-api/src/Share/Utils/Logging.hs +++ b/share-api/src/Share/Utils/Logging.hs @@ -57,7 +57,7 @@ import Share.Utils.Logging.Types as X import Share.Utils.Tags (MonadTags) import System.Log.FastLogger qualified as FL import Unison.Server.Backend qualified as Backend -import Unison.Server.HistoryComments.Types (UploadCommentsResponse (..)) +import Unison.Server.HistoryComments.Types (DownloadCommentsResponse (..), UploadCommentsResponse (..)) import Unison.Server.Types (BranchRef (..)) import Unison.Sync.Types qualified as Sync import Unison.Util.Monoid (intercalateMap) @@ -285,3 +285,15 @@ instance Loggable UploadCommentsResponse where instance Loggable WS.ConnectionException where toLog = withSeverity Error . showLog + +instance Loggable DownloadCommentsResponse where + toLog = \case + DownloadCommentsProjectBranchNotFound (BranchRef branchRef) -> + textLog ("Project branch not found: " <> branchRef) + & withSeverity UserFault + DownloadCommentsNotAuthorized (BranchRef branchRef) -> + textLog ("Not authorized to download comments from branch: " <> branchRef) + & withSeverity UserFault + DownloadCommentsGenericFailure errMsg -> + textLog ("Download comments generic failure: " <> errMsg) + & withSeverity Error diff --git a/share-api/src/Share/Web/Errors.hs b/share-api/src/Share/Web/Errors.hs index 9d5594f4..7024a64b 100644 --- a/share-api/src/Share/Web/Errors.hs +++ b/share-api/src/Share/Web/Errors.hs @@ -68,7 +68,7 @@ import Share.Utils.URI (URIParam (..), addQueryParam) import Share.Web.App import Unison.Server.Backend qualified as Backend import Unison.Server.Errors qualified as Backend -import Unison.Server.HistoryComments.Types (UploadCommentsResponse (..)) +import Unison.Server.HistoryComments.Types (DownloadCommentsResponse (..), UploadCommentsResponse (..)) import Unison.Server.Types (BranchRef (..)) import Unison.Sync.Types qualified as Sync import UnliftIO qualified @@ -446,3 +446,12 @@ instance ToServerError WS.ConnectionException where (ErrorID "websocket:unicode-exception", err400 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Unicode decoding exception: " <> Text.pack msg}) WS.ConnectionClosed -> (ErrorID "websocket:connection-closed", err400 {errBody = "WebSocket connection closed"}) + +instance ToServerError DownloadCommentsResponse where + toServerError = \case + DownloadCommentsProjectBranchNotFound (BranchRef branchRef) -> + (ErrorID "download-comments:project-branch-not-found", err404 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Project branch not found: " <> branchRef}) + DownloadCommentsNotAuthorized (BranchRef branchRef) -> + (ErrorID "download-comments:not-authorized", err403 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Not authorized to download comments from branch: " <> branchRef}) + DownloadCommentsGenericFailure errMsg -> + (ErrorID "download-comments:generic-failure", err500 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Download comments failure: " <> errMsg}) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index 5204cc9b..ba93300b 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -10,6 +10,7 @@ import Data.Set qualified as Set import Data.Set.NonEmpty qualified as NESet import Ki.Unlifted qualified as Ki import Network.WebSockets.Connection +import Share.Branch import Share.IDs import Share.IDs qualified as IDs import Share.Postgres qualified as PG @@ -59,17 +60,18 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn lift (AuthZ.checkDownloadFromProjectBranchCodebase mayCallerUserId project.projectId) >>= \case Left _authErr -> handleErrInQueue q (Sync.DownloadCommentsNotAuthorized br) Right authZ -> pure authZ + + downloadableCommentsVar <- + liftIO $ newTVarIO @_ @(Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32)) Set.empty commentHashesToSendQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 commentsToSendQ <- liftIO $ newTBMQueueIO @(Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) 100 -- Is filled when the server notifies us it's done requesting comments doneRequestingCommentsMVar <- newEmptyTMVarIO errMVar <- newEmptyTMVarIO - _ <- liftIO $ Ki.fork scope (hashNotifyWorker send commentHashesToSendQ) - senderThread <- liftIO $ Ki.fork scope (senderWorker send commentsToSendQ) - _ <- liftIO $ Ki.fork scope (receiverWorker receive commentsToSendQ errMVar doneRequestingCommentsMVar downloadableCommentsVar) - downloadableCommentsVar <- - liftIO $ newTVarIO @_ @(Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32)) Set.empty - PG.runTransaction $ do + _ <- lift $ Ki.fork scope (hashNotifyWorker send commentHashesToSendQ) + senderThread <- lift $ Ki.fork scope (senderWorker send commentsToSendQ) + _ <- lift $ Ki.fork scope (receiverWorker receive commentsToSendQ errMVar doneRequestingCommentsMVar downloadableCommentsVar) + lift $ PG.runTransaction $ do cursor <- Q.projectBranchCommentsCursor authZ branch.causal Cursor.foldBatched cursor 100 \hashes -> do let (newHashes, chunks) = @@ -79,7 +81,7 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn ([Left commentHash, Right revisionHash], [(commentHash, [revisionHash])]) ) & first Set.fromList - atomically $ do + PG.transactionUnsafeIO $ atomically $ do modifyTVar downloadableCommentsVar (Set.union newHashes) for chunks \chunk -> writeTBMQueue commentHashesToSendQ (chunk) -- Close the hashes queue to signal we don't have any more, then wait for the notifier to finish @@ -106,20 +108,21 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn STM Bool ) -> TBMQueue (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) -> - IO () + WebApp () senderWorker send commentsToSendQ = do let loop = do (hashesToSend, isClosed) <- atomically $ flushTBMQueue commentsToSendQ - -- Send comments first, then revisions - withComments <- Q.historyCommentsByHashOf (traversed . _Left) hashesToSend - withCommentsAndRevisions <- Q.historyCommentRevisionsByHashOf (traversed . _Right) withComments + withCommentsAndRevisions <- lift . PG.runTransaction $ do + -- Send comments first, then revisions + withComments <- Q.historyCommentsByHashOf (traversed . _Left) hashesToSend + Q.historyCommentRevisionsByHashOf (traversed . _Right) withComments for withCommentsAndRevisions \commentOrRevision -> atomically . send . Msg $ intoChunk commentOrRevision guard (not isClosed) loop void . runMaybeT $ loop receiverWorker :: - STM (Maybe (MsgOrError err HistoryCommentDownloaderChunk)) -> + STM (Maybe (MsgOrError Void HistoryCommentDownloaderChunk)) -> TBMQueue ( Either Sync.HistoryCommentHash32 @@ -128,7 +131,7 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn TMVar Text -> TMVar () -> (TVar (Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32))) -> - IO () + WebApp () receiverWorker receive toUploadQ errMVar doneRequestingCommentsMVar downloadableCommentsVar = do let loop = do msgOrError <- atomically receive @@ -148,11 +151,12 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn loop Just (DeserialiseFailure msg) -> do atomically $ putTMVar errMVar $ "uploadHistoryComments: deserialisation failure: " <> msg - Just (UserErr err) -> do - atomically $ putTMVar errMVar $ "uploadHistoryComments: server error: " <> tShow err loop - hashNotifyWorker :: (MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk -> STM Bool) -> TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> IO () + hashNotifyWorker :: + (MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk -> STM Bool) -> + TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> + WebApp () hashNotifyWorker send q = do let loop = do isClosed <- atomically $ do From 00d5e4f656f82daec3419f87ad2a5f362ee7a0a5 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Wed, 14 Jan 2026 14:43:28 -0800 Subject: [PATCH 07/14] SQL issues --- .../Share/Web/UCM/HistoryComments/Queries.hs | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs index eb76a6be..62fb052e 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -35,17 +35,8 @@ projectBranchCommentsCursor !_authZ causalId = do PG.newRowCursor @(HistoryCommentHash32, HistoryCommentRevisionHash32) "projectBranchCommentsCursor" [PG.sql| - WITH history(causal_Id) AS ( - SELECT causal_id FROM causal_history(#{causalId}) - ), revisions(id, comment_id, subject, contents, created_at_ms, hidden, revision_hash, comment_hash, author_signature) AS ( - SELECT hcr.id, hc.id, hcr.subject, hcr.content, hcr.created_at_ms, hcr.is_hidden, hcr.revision_hash, hc.comment_hash, hcr.author_signature - FROM history - JOIN history_comments hc - ON hc.causal_id = history.causal_id - JOIN history_comment_revisions hcr - ON hcr.comment_id = hc.id - WHERE - hc.causal_id IN () + WITH history(causal_id) AS ( + SELECT ch.causal_id FROM causal_history(#{causalId}) AS ch(causal_id) ) SELECT hc.comment_hash, hcr.revision_hash FROM history JOIN history_comments hc @@ -55,11 +46,13 @@ projectBranchCommentsCursor !_authZ causalId = do JOIN personal_keys key ON hc.author_key_id = key.id JOIN LATERAL ( - SELECT * FROM revisions rev - WHERE rev.comment_id = hc.id - ORDER BY rev.created_at_ms DESC - LIMIT 1 - ) + SELECT rev.revision_hash + FROM history_comment_revisions rev + WHERE rev.comment_id = hc.id + ORDER BY rev.created_at_ms DESC + LIMIT 1 + ) hcr + ON TRUE |] historyCommentsByHashOf :: (PG.QueryA m) => Traversal s t HistoryCommentHash32 HistoryComment -> s -> m t @@ -68,7 +61,7 @@ historyCommentsByHashOf trav s = do & asListOf trav %%~ \hashes -> PG.queryListRows [PG.sql| - WITH hashes (hash, ord) AS ( + WITH hashes (ord, hash) AS ( SELECT * FROM ^{PG.toTable $ ordered hashes} ) SELECT hc.author, hc.created_at_ms, key.thumbprint, causal.hash AS causal_hash, hc.comment_hash FROM hashes From 111fce1104358bb6708500f94fcac90549265420 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Wed, 14 Jan 2026 15:04:49 -0800 Subject: [PATCH 08/14] Cleanup --- .../src/Share/Web/UCM/HistoryComments/Impl.hs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index ba93300b..471a9482 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -70,7 +70,7 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn errMVar <- newEmptyTMVarIO _ <- lift $ Ki.fork scope (hashNotifyWorker send commentHashesToSendQ) senderThread <- lift $ Ki.fork scope (senderWorker send commentsToSendQ) - _ <- lift $ Ki.fork scope (receiverWorker receive commentsToSendQ errMVar doneRequestingCommentsMVar downloadableCommentsVar) + _ <- lift $ Ki.fork scope (receiverWorker receive commentsToSendQ commentHashesToSendQ errMVar downloadableCommentsVar) lift $ PG.runTransaction $ do cursor <- Q.projectBranchCommentsCursor authZ branch.causal Cursor.foldBatched cursor 100 \hashes -> do @@ -86,12 +86,6 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn for chunks \chunk -> writeTBMQueue commentHashesToSendQ (chunk) -- Close the hashes queue to signal we don't have any more, then wait for the notifier to finish atomically $ closeTBMQueue commentHashesToSendQ - -- Once the comment Hash queue is closed, eventually we'll send a DoneSendingHashesChunk message, - -- the server will respond with a DoneCheckingHashesChunk message after it's made all necessary - -- requests. - -- - -- Then we can close the comment upload hash queue to signal we won't get any more upload requests. - atomically $ readTMVar doneRequestingCommentsMVar >> closeTBMQueue commentHashesToSendQ -- Now we just have to wait for the sender to finish sending all the comments we have queued up. -- Once we've uploaded everything we can safely exit and the connection will be closed. atomically $ Ki.await senderThread @@ -128,11 +122,11 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32 ) -> + (TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32])) -> TMVar Text -> - TMVar () -> (TVar (Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32))) -> WebApp () - receiverWorker receive toUploadQ errMVar doneRequestingCommentsMVar downloadableCommentsVar = do + receiverWorker receive commentsToSendQ commentHashesToSendQ errMVar downloadableCommentsVar = do let loop = do msgOrError <- atomically receive case msgOrError of @@ -141,13 +135,13 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn Just (Msg msg) -> case msg of DoneCheckingHashesChunk -> do -- Notify that the server is done requesting comments - atomically $ putTMVar doneRequestingCommentsMVar () + atomically $ closeTBMQueue commentHashesToSendQ loop RequestCommentsChunk comments -> do atomically $ do downloadableComments <- readTVar downloadableCommentsVar let validComments = Set.intersection (NESet.toSet comments) downloadableComments - for_ validComments $ writeTBMQueue toUploadQ + for_ validComments $ writeTBMQueue commentsToSendQ loop Just (DeserialiseFailure msg) -> do atomically $ putTMVar errMVar $ "uploadHistoryComments: deserialisation failure: " <> msg From b4db036d15602a484ace747bca978f8653c0658c Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Wed, 14 Jan 2026 15:06:13 -0800 Subject: [PATCH 09/14] Bump Unison --- unison | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/unison b/unison index 23b831f6..fabc45fd 160000 --- a/unison +++ b/unison @@ -1 +1 @@ -Subproject commit 23b831f6e3736d54a0516940dec3f56d143b9015 +Subproject commit fabc45fd6462e00b020a4b2e47906c70e5600722 From 7fb745e9f3134ba6c6012b33f6486d1eff11cd01 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Wed, 14 Jan 2026 15:07:51 -0800 Subject: [PATCH 10/14] Working! --- .../src/Share/Web/UCM/HistoryComments/Impl.hs | 23 +++++++++---------- .../Share/Web/UCM/HistoryComments/Queries.hs | 4 ++-- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index 471a9482..fe614a07 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -5,7 +5,7 @@ import Control.Lens import Control.Monad.Except import Control.Monad.Trans.Maybe import Data.List.NonEmpty qualified as NEL -import Data.Monoid (Any (..)) +import Data.Monoid (All (..)) import Data.Set qualified as Set import Data.Set.NonEmpty qualified as NESet import Ki.Unlifted qualified as Ki @@ -65,12 +65,10 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn liftIO $ newTVarIO @_ @(Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32)) Set.empty commentHashesToSendQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 commentsToSendQ <- liftIO $ newTBMQueueIO @(Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) 100 - -- Is filled when the server notifies us it's done requesting comments - doneRequestingCommentsMVar <- newEmptyTMVarIO errMVar <- newEmptyTMVarIO _ <- lift $ Ki.fork scope (hashNotifyWorker send commentHashesToSendQ) senderThread <- lift $ Ki.fork scope (senderWorker send commentsToSendQ) - _ <- lift $ Ki.fork scope (receiverWorker receive commentsToSendQ commentHashesToSendQ errMVar downloadableCommentsVar) + _ <- lift $ Ki.fork scope (receiverWorker receive commentsToSendQ errMVar downloadableCommentsVar) lift $ PG.runTransaction $ do cursor <- Q.projectBranchCommentsCursor authZ branch.causal Cursor.foldBatched cursor 100 \hashes -> do @@ -110,7 +108,7 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn -- Send comments first, then revisions withComments <- Q.historyCommentsByHashOf (traversed . _Left) hashesToSend Q.historyCommentRevisionsByHashOf (traversed . _Right) withComments - for withCommentsAndRevisions \commentOrRevision -> atomically . send . Msg $ intoChunk commentOrRevision + for_ withCommentsAndRevisions \commentOrRevision -> atomically . send . Msg $ intoChunk commentOrRevision guard (not isClosed) loop void . runMaybeT $ loop @@ -122,11 +120,10 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32 ) -> - (TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32])) -> TMVar Text -> (TVar (Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32))) -> WebApp () - receiverWorker receive commentsToSendQ commentHashesToSendQ errMVar downloadableCommentsVar = do + receiverWorker receive commentsToSendQ errMVar downloadableCommentsVar = do let loop = do msgOrError <- atomically receive case msgOrError of @@ -134,8 +131,10 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn Nothing -> pure () Just (Msg msg) -> case msg of DoneCheckingHashesChunk -> do - -- Notify that the server is done requesting comments - atomically $ closeTBMQueue commentHashesToSendQ + -- The downloader is done checking hashes, and has issued all requests for + -- comments. + -- We can close the relevant queues now, we won't get any more requests. + atomically $ closeTBMQueue commentsToSendQ loop RequestCommentsChunk comments -> do atomically $ do @@ -155,10 +154,10 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn let loop = do isClosed <- atomically $ do (hashesToCheck, isClosed) <- flushTBMQueue q - Any serverClosed <- + All sendSuccess <- NEL.nonEmpty hashesToCheck & foldMapM \possiblyNewHashes -> do - Any <$> (send $ Msg $ PossiblyNewHashesChunk possiblyNewHashes) - pure (isClosed || serverClosed) + All <$> (send $ Msg $ PossiblyNewHashesChunk possiblyNewHashes) + pure (isClosed || not sendSuccess) if isClosed then do -- If the queue is closed, send a DoneCheckingHashesChunk to notify the server we're done. diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs index 62fb052e..5e04dff9 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -94,9 +94,9 @@ historyCommentRevisionsByHashOf trav s = do & asListOf trav %%~ \hashes -> do PG.queryListRows [PG.sql| - WITH hashes (hash, ord) AS ( + WITH hashes (ord, hash) AS ( SELECT * FROM ^{PG.toTable $ ordered hashes} - ) SELECT hcr.subject, hcr.content, hcr.created_at_ms, hcr.is_hidden, hcr.author_signature, hcr.revision_hash, hc.comment_hash + ) SELECT hcr.subject, hcr.contents, hcr.created_at_ms, hcr.hidden, hcr.author_signature, hcr.revision_hash, hc.comment_hash FROM hashes JOIN history_comment_revisions hcr ON hcr.revision_hash = hashes.hash From 445a695454e9bacfe38bba7cad51579a543b7daa Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Wed, 14 Jan 2026 15:30:45 -0800 Subject: [PATCH 11/14] Make debugging more specific --- .../src/Share/Web/UCM/HistoryComments/Impl.hs | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index fe614a07..7bb919fc 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -180,7 +180,6 @@ fetchChunk size action = do -- Queue is closed pure ([], True) Just (Just val) -> do - Debug.debugM Debug.Temp "Fetched value from queue" val (rest, exhausted) <- go (n - 1) <|> pure ([], False) pure (val : rest, exhausted) go size @@ -209,10 +208,10 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = _receiverThread <- lift $ Ki.fork scope $ receiverWorker receive errMVar hashesToCheckQ commentsQ inserterThread <- lift $ Ki.fork scope $ inserterWorker authZ commentsQ project.projectId _hashCheckingThread <- lift $ Ki.fork scope $ hashCheckingWorker project.projectId send hashesToCheckQ - Debug.debugLogM Debug.Temp "Upload history comments: waiting for inserter thread to finish" + Debug.debugLogM Debug.HistoryComments "Upload history comments: waiting for inserter thread to finish" -- The inserter thread will finish when the client closes the connection. atomically $ Ki.await inserterThread - Debug.debugLogM Debug.Temp "Done. Closing connection." + Debug.debugLogM Debug.HistoryComments "Done. Closing connection." case result of Left err -> reportError err Right (Left err, _leftovers) -> reportError err @@ -227,12 +226,12 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = let loop = do (chunk, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue commentsQ)) PG.whenNonEmpty chunk do - Debug.debugM Debug.Temp "Inserting comments chunk of size" (length chunk) + Debug.debugM Debug.HistoryComments "Inserting comments chunk of size" (length chunk) PG.runTransaction $ Q.insertHistoryComments authZ projectId chunk - when closed $ Debug.debugLogM Debug.Temp "Inserter worker: comments queue closed" + when closed $ Debug.debugLogM Debug.HistoryComments "Inserter worker: comments queue closed" when (not closed) loop loop - Debug.debugLogM Debug.Temp "Inserter worker finished" + Debug.debugLogM Debug.HistoryComments "Inserter worker finished" hashCheckingWorker :: ProjectId -> @@ -242,7 +241,7 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = hashCheckingWorker projectId send hashesToCheckQ = do let loop = do (hashes, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue hashesToCheckQ)) - Debug.debugM Debug.Temp "Checking hashes chunk of size" (length hashes) + Debug.debugM Debug.HistoryComments "Checking hashes chunk of size" (length hashes) PG.whenNonEmpty hashes $ do unknownCommentHashes <- fmap Set.fromList $ PG.runTransaction $ do Q.filterForUnknownHistoryCommentHashes (Sync.unHistoryCommentHash32 . fst <$> hashes) @@ -264,25 +263,25 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = case NESet.nonEmptySet allNeededHashes of Nothing -> pure () Just unknownHashesSet -> do - Debug.debugM Debug.Temp "Requesting unknown hashes" unknownHashesSet + Debug.debugM Debug.HistoryComments "Requesting unknown hashes" unknownHashesSet void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet - when closed $ Debug.debugLogM Debug.Temp "Hash checking worker: hashes queue closed" + when closed $ Debug.debugLogM Debug.HistoryComments "Hash checking worker: hashes queue closed" when (not closed) loop loop void . atomically $ send $ Msg $ DoneCheckingHashesChunk - Debug.debugLogM Debug.Temp "Hash checking worker finished" + Debug.debugLogM Debug.HistoryComments "Hash checking worker finished" receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk)) -> TMVar Text -> TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> TBMQueue (Either Sync.HistoryComment Sync.HistoryCommentRevision) -> WebApp () receiverWorker recv errMVar hashesToCheckQ commentsQ = do let loop = do next <- atomically do recv >>= \case Nothing -> do - Debug.debugLogM Debug.Temp "Receiver worker: connection closed" + Debug.debugLogM Debug.HistoryComments "Receiver worker: connection closed" closeTBMQueue hashesToCheckQ closeTBMQueue commentsQ pure (pure ()) Just (DeserialiseFailure err) -> do - Debug.debugM Debug.Temp "Receiver worker: deserialisation failure" err + Debug.debugM Debug.HistoryComments "Receiver worker: deserialisation failure" err putTMVar errMVar err pure (pure ()) Just (Msg msg) -> do @@ -298,7 +297,7 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = pure loop next loop - Debug.debugLogM Debug.Temp "Receiver worker finished" + Debug.debugLogM Debug.HistoryComments "Receiver worker finished" insertCommentBatchSize = 100 handleErrInQueue :: forall o x e a. Queues (MsgOrError e a) o -> e -> ExceptT e WebApp x From c0ef4c3f6b898a2ef3389d668a8958beaaa7c35c Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 15 Jan 2026 10:44:43 -0800 Subject: [PATCH 12/14] Set up history-comments transcripts --- transcripts/run-transcripts.zsh | 2 +- .../history-comments/comment-pull.md | 4 ++++ .../share-apis/history-comments/prelude.md | 18 ++++++++++++++++++ .../share-apis/history-comments/run.zsh | 11 +++++++++++ 4 files changed, 34 insertions(+), 1 deletion(-) create mode 100644 transcripts/share-apis/history-comments/comment-pull.md create mode 100644 transcripts/share-apis/history-comments/prelude.md create mode 100755 transcripts/share-apis/history-comments/run.zsh diff --git a/transcripts/run-transcripts.zsh b/transcripts/run-transcripts.zsh index 2a5f0c2d..5dd27bb0 100755 --- a/transcripts/run-transcripts.zsh +++ b/transcripts/run-transcripts.zsh @@ -21,7 +21,7 @@ transcripts_location="transcripts/share-apis" for dir in "$transcripts_location"/*(/); do # Extract the directory name (transcript name) transcript="${dir:t}" - + # If the first argument is missing, run all transcripts, otherwise run only transcripts which match a prefix of the argument if [ -z "${1:-}" ] || [[ "$transcript" == "$1"* ]]; then pg_reset_fixtures diff --git a/transcripts/share-apis/history-comments/comment-pull.md b/transcripts/share-apis/history-comments/comment-pull.md new file mode 100644 index 00000000..f50a2a1f --- /dev/null +++ b/transcripts/share-apis/history-comments/comment-pull.md @@ -0,0 +1,4 @@ +```ucm:hide +scratch/main> pull @test/history-comments/main +scratch/main> history +``` diff --git a/transcripts/share-apis/history-comments/prelude.md b/transcripts/share-apis/history-comments/prelude.md new file mode 100644 index 00000000..2ad425cb --- /dev/null +++ b/transcripts/share-apis/history-comments/prelude.md @@ -0,0 +1,18 @@ +```ucm:hide +history-comments/main> builtins.mergeio lib.builtins +``` + +Add some history, then set comments on it. + +```unison:hide +x = 1 +``` + +```ucm +scratch/main> config.set author.name Unison +scratch/main> history.comment /main: "Initial commit with variable x set to 1" +scratch/main> alias.term x y +scratch/main> history.comment /main: "Renamed x to y" +scratch/main> history +scratch/main> push @test/history-comments/main +``` diff --git a/transcripts/share-apis/history-comments/run.zsh b/transcripts/share-apis/history-comments/run.zsh new file mode 100755 index 00000000..fd189a38 --- /dev/null +++ b/transcripts/share-apis/history-comments/run.zsh @@ -0,0 +1,11 @@ +#!/usr/bin/env zsh + +set -e + +source "../../transcript_helpers.sh" + +# Create some history +transcript_ucm transcript prelude.md + +# Pull the history +transcript_ucm transcript comment-pull.md From 14e07420c766684329b6ed2475b0ed4b30e2f8c1 Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 15 Jan 2026 14:52:44 -0800 Subject: [PATCH 13/14] Remove redundant constraint --- share-api/src/Share/Web/UCM/HistoryComments/Impl.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index 7bb919fc..af234168 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -168,7 +168,7 @@ downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn -- Re-run the given STM action at most n times, collecting the results into a list. -- If the action returns Nothing, stop and return what has been collected so far, along with a Bool indicating whether the action was exhausted. -fetchChunk :: (Show a) => Int -> STM (Maybe a) -> STM ([a], Bool) +fetchChunk :: Int -> STM (Maybe a) -> STM ([a], Bool) fetchChunk size action = do let go 0 = pure ([], False) go n = do From 25be58a91a663eca4a2cd912ab64c91eb34cf9cf Mon Sep 17 00:00:00 2001 From: Chris Penner Date: Thu, 15 Jan 2026 14:54:50 -0800 Subject: [PATCH 14/14] Add comment pull transcripts. Need to update ucm after PRs there are merged --- transcripts/share-apis/history-comments/comment-pull.md | 4 ++-- transcripts/share-apis/history-comments/prelude.md | 3 ++- transcripts/transcript_functions.sh | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/transcripts/share-apis/history-comments/comment-pull.md b/transcripts/share-apis/history-comments/comment-pull.md index f50a2a1f..76a4421b 100644 --- a/transcripts/share-apis/history-comments/comment-pull.md +++ b/transcripts/share-apis/history-comments/comment-pull.md @@ -1,4 +1,4 @@ -```ucm:hide -scratch/main> pull @test/history-comments/main +```ucm +scratch/main> pull @transcripts/history-comments/main scratch/main> history ``` diff --git a/transcripts/share-apis/history-comments/prelude.md b/transcripts/share-apis/history-comments/prelude.md index 2ad425cb..225b9f69 100644 --- a/transcripts/share-apis/history-comments/prelude.md +++ b/transcripts/share-apis/history-comments/prelude.md @@ -9,10 +9,11 @@ x = 1 ``` ```ucm +scratch/main> update scratch/main> config.set author.name Unison scratch/main> history.comment /main: "Initial commit with variable x set to 1" scratch/main> alias.term x y scratch/main> history.comment /main: "Renamed x to y" scratch/main> history -scratch/main> push @test/history-comments/main +scratch/main> push @transcripts/history-comments/main ``` diff --git a/transcripts/transcript_functions.sh b/transcripts/transcript_functions.sh index 146c17a2..5b3e4baa 100644 --- a/transcripts/transcript_functions.sh +++ b/transcripts/transcript_functions.sh @@ -11,7 +11,7 @@ mkdir -p "${ucm_xdg_data_dir}/unisonlanguage" ucm_credentials_file="${ucm_xdg_data_dir}/unisonlanguage/credentials.json" # Executable to use when running unison transcripts -export UCM_PATH="${UCM_PATH:-"$(which ucm)"}" +export UCM_PATH="$(which unison-history-comment-message)" export empty_causal_hash='sg60bvjo91fsoo7pkh9gejbn0qgc95vra87ap6l5d35ri0lkaudl7bs12d71sf3fh6p23teemuor7mk1i9n567m50ibakcghjec5ajg' export echo_server_port=9999 export echo_server="http://localhost:${echo_server_port}"