diff --git a/share-api/src/Share/Postgres/Orphans.hs b/share-api/src/Share/Postgres/Orphans.hs index d9e38bd0..a549d79d 100644 --- a/share-api/src/Share/Postgres/Orphans.hs +++ b/share-api/src/Share/Postgres/Orphans.hs @@ -33,10 +33,11 @@ import U.Codebase.TermEdit qualified as TermEdit import U.Util.Base32Hex qualified as Base32Hex import Unison.Hash (Hash) import Unison.Hash qualified as Hash -import Unison.Hash32 (Hash32) +import Unison.Hash32 (Hash32 (..)) import Unison.Hash32 qualified as Hash32 import Unison.Name (Name) import Unison.NameSegment.Internal (NameSegment (..)) +import Unison.Server.HistoryComments.Types import Unison.SyncV2.Types (CBORBytes (..)) import Unison.Syntax.Name qualified as Name import UnliftIO (MonadUnliftIO (..)) @@ -103,6 +104,14 @@ deriving via Hash instance FromHttpApiData ComponentHash deriving via Hash instance ToHttpApiData ComponentHash +deriving via Hash32 instance Hasql.DecodeValue HistoryCommentHash32 + +deriving via Hash32 instance Hasql.EncodeValue HistoryCommentHash32 + +deriving via Hash32 instance Hasql.DecodeValue HistoryCommentRevisionHash32 + +deriving via Hash32 instance Hasql.EncodeValue HistoryCommentRevisionHash32 + deriving via Text instance Hasql.DecodeValue NameSegment deriving via Text instance Hasql.EncodeValue NameSegment diff --git a/share-api/src/Share/Utils/Logging.hs b/share-api/src/Share/Utils/Logging.hs index e13d0770..0d98cf39 100644 --- a/share-api/src/Share/Utils/Logging.hs +++ b/share-api/src/Share/Utils/Logging.hs @@ -57,7 +57,7 @@ import Share.Utils.Logging.Types as X import Share.Utils.Tags (MonadTags) import System.Log.FastLogger qualified as FL import Unison.Server.Backend qualified as Backend -import Unison.Server.HistoryComments.Types (UploadCommentsResponse (..)) +import Unison.Server.HistoryComments.Types (DownloadCommentsResponse (..), UploadCommentsResponse (..)) import Unison.Server.Types (BranchRef (..)) import Unison.Sync.Types qualified as Sync import Unison.Util.Monoid (intercalateMap) @@ -285,3 +285,15 @@ instance Loggable UploadCommentsResponse where instance Loggable WS.ConnectionException where toLog = withSeverity Error . showLog + +instance Loggable DownloadCommentsResponse where + toLog = \case + DownloadCommentsProjectBranchNotFound (BranchRef branchRef) -> + textLog ("Project branch not found: " <> branchRef) + & withSeverity UserFault + DownloadCommentsNotAuthorized (BranchRef branchRef) -> + textLog ("Not authorized to download comments from branch: " <> branchRef) + & withSeverity UserFault + DownloadCommentsGenericFailure errMsg -> + textLog ("Download comments generic failure: " <> errMsg) + & withSeverity Error diff --git a/share-api/src/Share/Web/Authorization.hs b/share-api/src/Share/Web/Authorization.hs index fd489671..7ec7c426 100644 --- a/share-api/src/Share/Web/Authorization.hs +++ b/share-api/src/Share/Web/Authorization.hs @@ -43,7 +43,7 @@ module Share.Web.Authorization checkUploadToUserCodebase, checkUploadToProjectBranchCodebase, checkUserUpdate, - checkDownloadFromUserCodebase, + hashJWTAuthOverride, checkDownloadFromProjectBranchCodebase, checkCreateOrg, checkReadOrgRolesList, @@ -389,17 +389,20 @@ checkUploadToUserCodebase reqUserId codebaseOwnerUserId = maybePermissionFailure assertUsersEqual reqUserId codebaseOwnerUserId pure $ AuthZ.UnsafeAuthZReceipt Nothing --- | The download endpoint currently does all of its own auth using HashJWTs, +-- | The download endpoints currently do all of its own auth using HashJWTs, -- So we don't add any other authz checks here, the HashJWT check is sufficient. -checkDownloadFromUserCodebase :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) -checkDownloadFromUserCodebase = +hashJWTAuthOverride :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) +hashJWTAuthOverride = pure . Right $ AuthZ.UnsafeAuthZReceipt Nothing -- | The download endpoint currently does all of its own auth using HashJWTs, -- So we don't add any other authz checks here, the HashJWT check is sufficient. -checkDownloadFromProjectBranchCodebase :: WebApp (Either AuthZFailure AuthZ.AuthZReceipt) -checkDownloadFromProjectBranchCodebase = - pure . Right $ AuthZ.UnsafeAuthZReceipt Nothing +checkDownloadFromProjectBranchCodebase :: Maybe UserId -> ProjectId -> WebApp (Either AuthZFailure AuthZ.AuthZReceipt) +checkDownloadFromProjectBranchCodebase reqUserId projectId = + mapLeft (const authzError) <$> do + checkProjectGet reqUserId projectId + where + authzError = AuthZFailure $ (ProjectPermission (ProjectBranchBrowse projectId)) checkProjectCreate :: UserId -> UserId -> WebApp (Either AuthZFailure AuthZ.AuthZReceipt) checkProjectCreate reqUserId targetUserId = maybePermissionFailure (ProjectPermission (ProjectCreate targetUserId)) $ do diff --git a/share-api/src/Share/Web/Errors.hs b/share-api/src/Share/Web/Errors.hs index 9d5594f4..7024a64b 100644 --- a/share-api/src/Share/Web/Errors.hs +++ b/share-api/src/Share/Web/Errors.hs @@ -68,7 +68,7 @@ import Share.Utils.URI (URIParam (..), addQueryParam) import Share.Web.App import Unison.Server.Backend qualified as Backend import Unison.Server.Errors qualified as Backend -import Unison.Server.HistoryComments.Types (UploadCommentsResponse (..)) +import Unison.Server.HistoryComments.Types (DownloadCommentsResponse (..), UploadCommentsResponse (..)) import Unison.Server.Types (BranchRef (..)) import Unison.Sync.Types qualified as Sync import UnliftIO qualified @@ -446,3 +446,12 @@ instance ToServerError WS.ConnectionException where (ErrorID "websocket:unicode-exception", err400 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Unicode decoding exception: " <> Text.pack msg}) WS.ConnectionClosed -> (ErrorID "websocket:connection-closed", err400 {errBody = "WebSocket connection closed"}) + +instance ToServerError DownloadCommentsResponse where + toServerError = \case + DownloadCommentsProjectBranchNotFound (BranchRef branchRef) -> + (ErrorID "download-comments:project-branch-not-found", err404 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Project branch not found: " <> branchRef}) + DownloadCommentsNotAuthorized (BranchRef branchRef) -> + (ErrorID "download-comments:not-authorized", err403 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Not authorized to download comments from branch: " <> branchRef}) + DownloadCommentsGenericFailure errMsg -> + (ErrorID "download-comments:generic-failure", err500 {errBody = BL.fromStrict $ Text.encodeUtf8 $ "Download comments failure: " <> errMsg}) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs index 4a86f7f4..af234168 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Impl.hs @@ -1,15 +1,20 @@ module Share.Web.UCM.HistoryComments.Impl (server) where import Control.Concurrent.STM.TBMQueue (TBMQueue, closeTBMQueue, newTBMQueueIO, readTBMQueue, writeTBMQueue) +import Control.Lens import Control.Monad.Except import Control.Monad.Trans.Maybe +import Data.List.NonEmpty qualified as NEL +import Data.Monoid (All (..)) import Data.Set qualified as Set import Data.Set.NonEmpty qualified as NESet import Ki.Unlifted qualified as Ki import Network.WebSockets.Connection +import Share.Branch import Share.IDs import Share.IDs qualified as IDs import Share.Postgres qualified as PG +import Share.Postgres.Cursors qualified as Cursor import Share.Postgres.Queries qualified as PGQ import Share.Postgres.Users.Queries qualified as UserQ import Share.Prelude @@ -18,10 +23,9 @@ import Share.User import Share.Web.App (WebApp, WebAppServer) import Share.Web.Authentication qualified as AuthN import Share.Web.Authorization qualified as AuthZ -import Share.Web.Errors (Unimplemented (Unimplemented), reportError, respondError) +import Share.Web.Errors (reportError) import Share.Web.UCM.HistoryComments.Queries qualified as Q import Unison.Debug qualified as Debug -import Unison.Hash32 (Hash32) import Unison.Server.HistoryComments.API qualified as HistoryComments import Unison.Server.HistoryComments.Types (HistoryCommentDownloaderChunk (..), HistoryCommentUploaderChunk (..), UploadCommentsResponse (..)) import Unison.Server.HistoryComments.Types qualified as Sync @@ -39,14 +43,132 @@ server mayCaller = wsMessageBufferSize :: Int wsMessageBufferSize = 100 -downloadHistoryCommentsStreamImpl :: Maybe UserId -> Connection -> WebApp () -downloadHistoryCommentsStreamImpl _mayUserId _conn = do - _ <- error "AUTH CHECK HERE" - respondError Unimplemented +downloadHistoryCommentsStreamImpl :: Maybe UserId -> BranchRef -> Connection -> WebApp () +downloadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = do + result <- withQueues @(MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk) @(MsgOrError Void HistoryCommentDownloaderChunk) wsMessageBufferSize wsMessageBufferSize conn \q@(Queues {receive, send}) -> Ki.scoped \scope -> runExceptT $ do + projectBranchSH@ProjectBranchShortHand {userHandle, projectSlug, contributorHandle} <- case IDs.fromText @ProjectBranchShortHand branchRef of + Left err -> handleErrInQueue q (Sync.DownloadCommentsGenericFailure $ IDs.toText err) + Right pbsh -> pure pbsh + let projectSH = ProjectShortHand {userHandle, projectSlug} + mayInfo <- lift . runMaybeT $ mapMaybeT PG.runTransaction $ do + project <- MaybeT $ PGQ.projectByShortHand projectSH + branch <- MaybeT $ PGQ.branchByProjectBranchShortHand projectBranchSH + contributorUser <- for contributorHandle (MaybeT . UserQ.userByHandle) + pure (project, branch, contributorUser) + (project, branch, _contributorUser) <- maybe (handleErrInQueue q $ Sync.DownloadCommentsProjectBranchNotFound br) pure $ mayInfo + !authZ <- + lift (AuthZ.checkDownloadFromProjectBranchCodebase mayCallerUserId project.projectId) >>= \case + Left _authErr -> handleErrInQueue q (Sync.DownloadCommentsNotAuthorized br) + Right authZ -> pure authZ + + downloadableCommentsVar <- + liftIO $ newTVarIO @_ @(Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32)) Set.empty + commentHashesToSendQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 + commentsToSendQ <- liftIO $ newTBMQueueIO @(Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) 100 + errMVar <- newEmptyTMVarIO + _ <- lift $ Ki.fork scope (hashNotifyWorker send commentHashesToSendQ) + senderThread <- lift $ Ki.fork scope (senderWorker send commentsToSendQ) + _ <- lift $ Ki.fork scope (receiverWorker receive commentsToSendQ errMVar downloadableCommentsVar) + lift $ PG.runTransaction $ do + cursor <- Q.projectBranchCommentsCursor authZ branch.causal + Cursor.foldBatched cursor 100 \hashes -> do + let (newHashes, chunks) = + hashes + & foldMap + ( \(commentHash, revisionHash) -> + ([Left commentHash, Right revisionHash], [(commentHash, [revisionHash])]) + ) + & first Set.fromList + PG.transactionUnsafeIO $ atomically $ do + modifyTVar downloadableCommentsVar (Set.union newHashes) + for chunks \chunk -> writeTBMQueue commentHashesToSendQ (chunk) + -- Close the hashes queue to signal we don't have any more, then wait for the notifier to finish + atomically $ closeTBMQueue commentHashesToSendQ + -- Now we just have to wait for the sender to finish sending all the comments we have queued up. + -- Once we've uploaded everything we can safely exit and the connection will be closed. + atomically $ Ki.await senderThread + case result of + Left err -> do + reportError err + Right (Left err, _leftovers {- Messages sent by server after we finished. -}) -> do + reportError err + Right (Right (), _leftovers {- Messages sent by server after we finished. -}) -> do + pure () + where + senderWorker :: + ( MsgOrError err HistoryCommentUploaderChunk -> + STM Bool + ) -> + TBMQueue (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32) -> + WebApp () + senderWorker send commentsToSendQ = do + let loop = do + (hashesToSend, isClosed) <- atomically $ flushTBMQueue commentsToSendQ + withCommentsAndRevisions <- lift . PG.runTransaction $ do + -- Send comments first, then revisions + withComments <- Q.historyCommentsByHashOf (traversed . _Left) hashesToSend + Q.historyCommentRevisionsByHashOf (traversed . _Right) withComments + for_ withCommentsAndRevisions \commentOrRevision -> atomically . send . Msg $ intoChunk commentOrRevision + guard (not isClosed) + loop + void . runMaybeT $ loop + + receiverWorker :: + STM (Maybe (MsgOrError Void HistoryCommentDownloaderChunk)) -> + TBMQueue + ( Either + Sync.HistoryCommentHash32 + Sync.HistoryCommentRevisionHash32 + ) -> + TMVar Text -> + (TVar (Set (Either Sync.HistoryCommentHash32 Sync.HistoryCommentRevisionHash32))) -> + WebApp () + receiverWorker receive commentsToSendQ errMVar downloadableCommentsVar = do + let loop = do + msgOrError <- atomically receive + case msgOrError of + -- Channel closed, shut down + Nothing -> pure () + Just (Msg msg) -> case msg of + DoneCheckingHashesChunk -> do + -- The downloader is done checking hashes, and has issued all requests for + -- comments. + -- We can close the relevant queues now, we won't get any more requests. + atomically $ closeTBMQueue commentsToSendQ + loop + RequestCommentsChunk comments -> do + atomically $ do + downloadableComments <- readTVar downloadableCommentsVar + let validComments = Set.intersection (NESet.toSet comments) downloadableComments + for_ validComments $ writeTBMQueue commentsToSendQ + loop + Just (DeserialiseFailure msg) -> do + atomically $ putTMVar errMVar $ "uploadHistoryComments: deserialisation failure: " <> msg + loop + + hashNotifyWorker :: + (MsgOrError Sync.DownloadCommentsResponse HistoryCommentUploaderChunk -> STM Bool) -> + TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> + WebApp () + hashNotifyWorker send q = do + let loop = do + isClosed <- atomically $ do + (hashesToCheck, isClosed) <- flushTBMQueue q + All sendSuccess <- + NEL.nonEmpty hashesToCheck & foldMapM \possiblyNewHashes -> do + All <$> (send $ Msg $ PossiblyNewHashesChunk possiblyNewHashes) + pure (isClosed || not sendSuccess) + if isClosed + then do + -- If the queue is closed, send a DoneCheckingHashesChunk to notify the server we're done. + void . atomically $ send (Msg DoneSendingHashesChunk) + else loop + loop + intoChunk = either HistoryCommentChunk HistoryCommentRevisionChunk -- Re-run the given STM action at most n times, collecting the results into a list. -- If the action returns Nothing, stop and return what has been collected so far, along with a Bool indicating whether the action was exhausted. -fetchChunk :: (Show a) => Int -> STM (Maybe a) -> STM ([a], Bool) +fetchChunk :: Int -> STM (Maybe a) -> STM ([a], Bool) fetchChunk size action = do let go 0 = pure ([], False) go n = do @@ -58,7 +180,6 @@ fetchChunk size action = do -- Queue is closed pure ([], True) Just (Just val) -> do - Debug.debugM Debug.Temp "Fetched value from queue" val (rest, exhausted) <- go (n - 1) <|> pure ([], False) pure (val : rest, exhausted) go size @@ -81,15 +202,16 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = lift (AuthZ.checkUploadToProjectBranchCodebase callerUserId project.projectId (user_id <$> contributorUser)) >>= \case Left _authErr -> handleErrInQueue q (UploadCommentsNotAuthorized br) Right authZ -> pure authZ - hashesToCheckQ <- liftIO $ newTBMQueueIO 100 + hashesToCheckQ <- liftIO $ newTBMQueueIO @(Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) 100 commentsQ <- liftIO $ newTBMQueueIO 100 errMVar <- liftIO newEmptyTMVarIO _receiverThread <- lift $ Ki.fork scope $ receiverWorker receive errMVar hashesToCheckQ commentsQ inserterThread <- lift $ Ki.fork scope $ inserterWorker authZ commentsQ project.projectId - _hashCheckingThread <- lift $ Ki.fork scope $ hashCheckingWorker send hashesToCheckQ - Debug.debugLogM Debug.Temp "Upload history comments: waiting for inserter thread to finish" + _hashCheckingThread <- lift $ Ki.fork scope $ hashCheckingWorker project.projectId send hashesToCheckQ + Debug.debugLogM Debug.HistoryComments "Upload history comments: waiting for inserter thread to finish" -- The inserter thread will finish when the client closes the connection. atomically $ Ki.await inserterThread + Debug.debugLogM Debug.HistoryComments "Done. Closing connection." case result of Left err -> reportError err Right (Left err, _leftovers) -> reportError err @@ -104,40 +226,62 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = let loop = do (chunk, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue commentsQ)) PG.whenNonEmpty chunk do - Debug.debugM Debug.Temp "Inserting comments chunk of size" (length chunk) + Debug.debugM Debug.HistoryComments "Inserting comments chunk of size" (length chunk) PG.runTransaction $ Q.insertHistoryComments authZ projectId chunk + when closed $ Debug.debugLogM Debug.HistoryComments "Inserter worker: comments queue closed" when (not closed) loop loop - Debug.debugLogM Debug.Temp "Inserter worker finished" + Debug.debugLogM Debug.HistoryComments "Inserter worker finished" hashCheckingWorker :: + ProjectId -> (MsgOrError err HistoryCommentDownloaderChunk -> STM Bool) -> - TBMQueue Hash32 -> + TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> WebApp () - hashCheckingWorker send hashesToCheckQ = do + hashCheckingWorker projectId send hashesToCheckQ = do let loop = do (hashes, closed) <- atomically (fetchChunk insertCommentBatchSize (readTBMQueue hashesToCheckQ)) - Debug.debugM Debug.Temp "Checking hashes chunk of size" (length hashes) + Debug.debugM Debug.HistoryComments "Checking hashes chunk of size" (length hashes) PG.whenNonEmpty hashes $ do - unknownHashes <- PG.runTransaction $ do Q.filterForUnknownHistoryCommentHashes hashes - case NESet.nonEmptySet (Set.fromList unknownHashes) of + unknownCommentHashes <- fmap Set.fromList $ PG.runTransaction $ do + Q.filterForUnknownHistoryCommentHashes (Sync.unHistoryCommentHash32 . fst <$> hashes) + let (revisionHashesWeDefinitelyNeed, revisionHashesToCheck) = + hashes + -- Only check revisions for comments that are unknown + & foldMap \case + (commentHash, revisionHashes) + -- If the comment hash is unknown, we need _all_ its revisions, we + -- don't need to check them. + -- Otherwise, we need to check which revisions are unknown. + | Set.member (Sync.unHistoryCommentHash32 commentHash) unknownCommentHashes -> (revisionHashes, []) + | otherwise -> ([], revisionHashes) + unknownRevsFiltered <- PG.runTransaction $ Q.filterForUnknownHistoryCommentRevisionHashes projectId (Sync.unHistoryCommentRevisionHash32 <$> revisionHashesToCheck) + let allNeededHashes = + (Set.map (Left . Sync.HistoryCommentHash32) unknownCommentHashes) + <> (Set.fromList . fmap Right $ revisionHashesWeDefinitelyNeed) + <> (Set.fromList (Right . Sync.HistoryCommentRevisionHash32 <$> unknownRevsFiltered)) + case NESet.nonEmptySet allNeededHashes of Nothing -> pure () Just unknownHashesSet -> do + Debug.debugM Debug.HistoryComments "Requesting unknown hashes" unknownHashesSet void . atomically $ send $ Msg $ RequestCommentsChunk unknownHashesSet + when closed $ Debug.debugLogM Debug.HistoryComments "Hash checking worker: hashes queue closed" when (not closed) loop loop void . atomically $ send $ Msg $ DoneCheckingHashesChunk - Debug.debugLogM Debug.Temp "Hash checking worker finished" - receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk)) -> TMVar Text -> TBMQueue Hash32 -> TBMQueue (Either Sync.HistoryComment Sync.HistoryCommentRevision) -> WebApp () + Debug.debugLogM Debug.HistoryComments "Hash checking worker finished" + receiverWorker :: STM (Maybe (MsgOrError Void HistoryCommentUploaderChunk)) -> TMVar Text -> TBMQueue (Sync.HistoryCommentHash32, [Sync.HistoryCommentRevisionHash32]) -> TBMQueue (Either Sync.HistoryComment Sync.HistoryCommentRevision) -> WebApp () receiverWorker recv errMVar hashesToCheckQ commentsQ = do let loop = do next <- atomically do recv >>= \case Nothing -> do + Debug.debugLogM Debug.HistoryComments "Receiver worker: connection closed" closeTBMQueue hashesToCheckQ closeTBMQueue commentsQ pure (pure ()) Just (DeserialiseFailure err) -> do + Debug.debugM Debug.HistoryComments "Receiver worker: deserialisation failure" err putTMVar errMVar err pure (pure ()) Just (Msg msg) -> do @@ -153,9 +297,23 @@ uploadHistoryCommentsStreamImpl mayCallerUserId br@(BranchRef branchRef) conn = pure loop next loop - Debug.debugLogM Debug.Temp "Receiver worker finished" + Debug.debugLogM Debug.HistoryComments "Receiver worker finished" insertCommentBatchSize = 100 - handleErrInQueue :: forall o x e a. Queues (MsgOrError e a) o -> e -> ExceptT e WebApp x - handleErrInQueue Queues {send} e = do - _ <- atomically $ send $ UserErr e - throwError e + +handleErrInQueue :: forall o x e a. Queues (MsgOrError e a) o -> e -> ExceptT e WebApp x +handleErrInQueue Queues {send} e = do + _ <- atomically $ send $ UserErr e + throwError e + +-- Read all available values from a TBMQueue, returning them and whether the queue is closed. +flushTBMQueue :: TBMQueue a -> STM ([a], Bool) +flushTBMQueue q = do + optional (readTBMQueue q) >>= \case + -- No values available + Nothing -> empty + Just Nothing -> do + -- Queue closed + pure ([], True) + Just (Just v) -> do + (vs, closed) <- flushTBMQueue q <|> pure ([], False) + pure (v : vs, closed) diff --git a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs index 9c648985..5e04dff9 100644 --- a/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs +++ b/share-api/src/Share/Web/UCM/HistoryComments/Queries.hs @@ -2,9 +2,12 @@ {-# LANGUAGE RecordWildCards #-} module Share.Web.UCM.HistoryComments.Queries - ( fetchProjectBranchCommentsSince, + ( projectBranchCommentsCursor, insertHistoryComments, + historyCommentsByHashOf, + historyCommentRevisionsByHashOf, filterForUnknownHistoryCommentHashes, + filterForUnknownHistoryCommentRevisionHashes, ) where @@ -22,52 +25,103 @@ import Share.Postgres.Cursors (PGCursor) import Share.Postgres.Cursors qualified as PG import Share.Postgres.IDs import Share.Prelude +import Share.Utils.Postgres (ordered) import Share.Web.Authorization (AuthZReceipt) import Unison.Hash32 (Hash32) import Unison.Server.HistoryComments.Types -fetchProjectBranchCommentsSince :: AuthZReceipt -> ProjectId -> CausalId -> UTCTime -> PG.Transaction e (PGCursor HistoryCommentUploaderChunk) -fetchProjectBranchCommentsSince !_authZ projectId causalId sinceTime = do - PG.newRowCursor @(Bool, Maybe Text, Maybe Text, Maybe Int64, Maybe Bool, Maybe ByteString, Maybe Hash32, Maybe Hash32, Maybe Text, Maybe Int64, Maybe Text, Maybe Hash32, Maybe Hash32) - "fetchProjectBranchCommentsSince" +projectBranchCommentsCursor :: AuthZReceipt -> CausalId -> PG.Transaction e (PGCursor (HistoryCommentHash32, HistoryCommentRevisionHash32)) +projectBranchCommentsCursor !_authZ causalId = do + PG.newRowCursor @(HistoryCommentHash32, HistoryCommentRevisionHash32) + "projectBranchCommentsCursor" [PG.sql| - WITH revisions(id, comment_id, subject, contents, created_at_ms, hidden, revision_hash, comment_hash, author_signature) AS ( - SELECT hcr.id, hc.id, hcr.subject, hcr.content, hcr.created_at_ms, hcr.is_hidden, hcr.revision_hash, hc.comment_hash, hcr.author_signature - history_comment_revisions_project_discovery pd - JOIN history_comment_revisions hcr - ON pd.history_comment_revision_id = hcr.id - JOIN history_comments hc - ON hcr.history_comment_id = hc.id - WHERE - pd.project_id = #{projectId} - AND pd.discovered_at > #{sinceTime} - AND hc.causal_id IN (SELECT causal_id FROM causal_history(#{causalId})) - ) (SELECT true, NULL, NULL, NULL, NULL, NULL, NULL, NULL, hc.author, hc.created_at_ms, key.thumbprint, causal.hash, hc.comment_hash - FROM revisions rev + WITH history(causal_id) AS ( + SELECT ch.causal_id FROM causal_history(#{causalId}) AS ch(causal_id) + ) SELECT hc.comment_hash, hcr.revision_hash + FROM history JOIN history_comments hc - ON revisions.comment_id = hc.id + ON hc.causal_id = history.causal_id JOIN causals causal ON hc.causal_id = causal.id JOIN personal_keys key ON hc.author_key_id = key.id - ) - UNION ALL - -- Include ALL the base comments regardless of time, - -- the vast majority of the time we'll need them, it simplifies logic, - -- and the client can just ignore them if they already have them. - (SELECT DISTINCT ON (rev.comment_id) - false, rev.subject, rev.content, rev.created_at_ms, rev.is_hidden, rev.author_signature, rev.revision_hash, rev.comment_hash, NULL, NULL, NULL, NULL, NULL - FROM revisions rev - ) + JOIN LATERAL ( + SELECT rev.revision_hash + FROM history_comment_revisions rev + WHERE rev.comment_id = hc.id + ORDER BY rev.created_at_ms DESC + LIMIT 1 + ) hcr + ON TRUE |] - <&> fmap \case - (True, Nothing, Nothing, Nothing, Nothing, Nothing, Nothing, Nothing, Just author, Just createdAtMs, Just authorThumbprint, Just causalHash, Just commentHash) -> - let createdAt = millisToUTCTime createdAtMs - in HistoryCommentChunk $ HistoryComment {..} - (False, Just subject, Just content, Just createdAtMs, Just isHidden, Just authorSignature, Just revisionHash, Just commentHash, Nothing, Nothing, Nothing, Nothing, Nothing) -> - let createdAt = millisToUTCTime createdAtMs - in HistoryCommentRevisionChunk $ HistoryCommentRevision {..} - row -> error $ "fetchProjectBranchCommentsSince: Unexpected row format: " <> show row + +historyCommentsByHashOf :: (PG.QueryA m) => Traversal s t HistoryCommentHash32 HistoryComment -> s -> m t +historyCommentsByHashOf trav s = do + s + & asListOf trav %%~ \hashes -> + PG.queryListRows + [PG.sql| + WITH hashes (ord, hash) AS ( + SELECT * FROM ^{PG.toTable $ ordered hashes} + ) SELECT hc.author, hc.created_at_ms, key.thumbprint, causal.hash AS causal_hash, hc.comment_hash + FROM hashes + JOIN history_comments hc + ON hc.comment_hash = hashes.hash + JOIN causals causal + ON hc.causal_id = causal.id + JOIN personal_keys key + ON hc.author_key_id = key.id + ORDER BY hashes.ord ASC + |] + <&> fmap + \( author, + createdAt, + authorThumbprint, + causalHash, + commentHash + ) -> + HistoryComment + { author, + createdAt, + authorThumbprint, + causalHash, + commentHash + } + +historyCommentRevisionsByHashOf :: (PG.QueryA m) => Traversal s t HistoryCommentRevisionHash32 HistoryCommentRevision -> s -> m t +historyCommentRevisionsByHashOf trav s = do + s + & asListOf trav %%~ \hashes -> do + PG.queryListRows + [PG.sql| + WITH hashes (ord, hash) AS ( + SELECT * FROM ^{PG.toTable $ ordered hashes} + ) SELECT hcr.subject, hcr.contents, hcr.created_at_ms, hcr.hidden, hcr.author_signature, hcr.revision_hash, hc.comment_hash + FROM hashes + JOIN history_comment_revisions hcr + ON hcr.revision_hash = hashes.hash + JOIN history_comments hc + ON hcr.comment_id = hc.id + ORDER BY hashes.ord ASC + |] + <&> fmap + \( subject, + content, + createdAt, + isHidden, + authorSignature, + revisionHash, + commentHash + ) -> + HistoryCommentRevision + { subject, + content, + createdAt, + isHidden, + authorSignature, + revisionHash, + commentHash + } insertThumbprints :: (PG.QueryA m) => NESet Text -> m () insertThumbprints thumbprints = do @@ -83,8 +137,8 @@ insertThumbprints thumbprints = do -- Convert milliseconds since epoch to UTCTime _exactly_. -- UTCTime has picosecond precision so this is lossless. -millisToUTCTime :: Int64 -> UTCTime -millisToUTCTime ms = +_millisToUTCTime :: Int64 -> UTCTime +_millisToUTCTime ms = toRational ms & (/ (1_000 :: Rational)) & fromRational @@ -98,7 +152,7 @@ utcTimeToMillis utcTime = & round insertHistoryComments :: AuthZReceipt -> ProjectId -> [Either HistoryComment HistoryCommentRevision] -> PG.Transaction e () -insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do +insertHistoryComments !_authZ projectId chunks = do let thumbprints = NESet.nonEmptySet $ Set.fromList (comments <&> \HistoryComment {authorThumbprint} -> authorThumbprint) for thumbprints insertThumbprints whenNonEmpty comments $ insertHistoryComments comments @@ -110,7 +164,7 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do chunks & foldMap \case Left comment -> ([comment], []) Right revision -> ([], [revision]) - insertHistoryComments :: [HistoryComment] -> PG.Pipeline e () + insertHistoryComments :: (PG.QueryA m) => [HistoryComment] -> m () insertHistoryComments comments = do PG.execute_ [PG.sql| @@ -137,13 +191,13 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do commentHash ) - insertRevisions :: [HistoryCommentRevision] -> PG.Pipeline e () + insertRevisions :: (PG.QueryA m) => [HistoryCommentRevision] -> m () insertRevisions revs = do let doRevs = PG.execute_ [PG.sql| - WITH new_revisions(subject, content, created_at_ms, hidden, author_signature, revision_hash, comment_hash) AS ( - VALUES ^{PG.toTable revsTable} + WITH new_revisions(subject, contents, created_at_ms, hidden, author_signature, revision_hash, comment_hash) AS ( + SELECT * FROM ^{PG.toTable revsTable} ) INSERT INTO history_comment_revisions(comment_id, subject, contents, created_at_ms, hidden, author_signature, revision_hash) SELECT hc.id, nr.subject, nr.contents, nr.created_at_ms, nr.hidden, nr.author_signature, nr.revision_hash @@ -156,12 +210,12 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do PG.execute_ [PG.sql| WITH new_discoveries(revision_hash) AS ( - VALUES ^{PG.singleColumnTable revHashTable} + SELECT * FROM ^{PG.singleColumnTable revHashTable} ) - INSERT INTO history_comment_revisions_project_discovery(project_id, history_comment_revision_id) + INSERT INTO history_comment_revisions_project_discovery(project_id, comment_revision_id) SELECT #{projectId}, hcr.id FROM new_discoveries nd - JOIN history_comments hcr + JOIN history_comment_revisions hcr ON hcr.revision_hash = nd.revision_hash ON CONFLICT DO NOTHING |] @@ -178,35 +232,51 @@ insertHistoryComments !_authZ projectId chunks = PG.pipelined $ do commentHash ) revHashTable = revs <&> \HistoryCommentRevision {..} -> (revisionHash) - insertDiscoveryInfo :: [HistoryCommentRevision] -> PG.Pipeline e () + insertDiscoveryInfo :: (PG.QueryA m) => [HistoryCommentRevision] -> m () insertDiscoveryInfo revs = do PG.execute_ [PG.sql| - WITH new_discoveries(project_id, history_comment_hash, discovered_at) AS ( - VALUES ^{PG.toTable discoveryTable} + WITH new_discoveries(project_id, history_comment_hash) AS ( + SELECT * FROM ^{PG.toTable discoveryTable} ) - INSERT INTO history_comment_revisions_project_discovery(project_id, history_comment_revision_id, discovered_at) - SELECT #{projectId}, hcr.id, nd.discovered_at + INSERT INTO history_comment_revisions_project_discovery(project_id, comment_revision_id) + SELECT #{projectId}, hcr.id FROM new_discoveries nd JOIN history_comments hc ON hc.comment_hash = nd.history_comment_hash JOIN history_comment_revisions hcr - ON hcr.history_comment_id = hc.id + ON hcr.comment_id = hc.id ON CONFLICT DO NOTHING |] where + discoveryTable :: [(ProjectId, Hash32)] discoveryTable = revs <&> \HistoryCommentRevision {..} -> ( projectId, - commentHash, - utcTimeToMillis createdAt + commentHash ) filterForUnknownHistoryCommentHashes :: (PG.QueryA m) => [Hash32] -> m [Hash32] -filterForUnknownHistoryCommentHashes hashes = do - -- error "TODO: Check whether they're in the project as well." +filterForUnknownHistoryCommentHashes commentHashes = do PG.queryListCol [PG.sql| - SELECT hash FROM ^{PG.singleColumnTable hashes} AS t(hash) - WHERE hash NOT IN (SELECT comment_hash FROM history_comments) + SELECT hash FROM ^{PG.singleColumnTable commentHashes} AS t(hash) + WHERE NOT EXISTS ( + SELECT FROM history_comments hc + WHERE hc.comment_hash = t.hash + ) + |] + +filterForUnknownHistoryCommentRevisionHashes :: (PG.QueryA m) => ProjectId -> [Hash32] -> m [Hash32] +filterForUnknownHistoryCommentRevisionHashes projectId revisionHashes = do + PG.queryListCol + [PG.sql| + SELECT hash FROM ^{PG.singleColumnTable revisionHashes} AS t(hash) + WHERE NOT EXISTS ( + SELECT FROM history_comment_revisions_project_discovery hcrpd + JOIN history_comment_revisions hcr + ON hcrpd.comment_revision_id = hcr.id + WHERE hcrpd.project_id = #{projectId} + AND hcr.revision_hash = t.hash + ) |] diff --git a/share-api/src/Share/Web/UCM/Sync/Impl.hs b/share-api/src/Share/Web/UCM/Sync/Impl.hs index e214134c..2bbedf1a 100644 --- a/share-api/src/Share/Web/UCM/Sync/Impl.hs +++ b/share-api/src/Share/Web/UCM/Sync/Impl.hs @@ -149,7 +149,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h Left err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesInvalidRepoInfo err repoInfo) Right (RepoInfoUser userHandle) -> do User {user_id = repoOwnerUserId} <- lift (PG.runTransaction (UserQ.userByHandle userHandle)) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesUserNotFound $ IDs.toText @UserHandle userHandle) - authZToken <- lift AuthZ.checkDownloadFromUserCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForUserCodebase repoOwnerUserId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (RepoInfoProjectBranch ProjectBranchShortHand {userHandle, projectSlug, contributorHandle}) -> do @@ -158,7 +158,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (UserQ.userByHandle ch) `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesUserNotFound $ IDs.toText @UserHandle ch) pure (project, mayContributorUserId) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (RepoInfoProjectRelease ProjectReleaseShortHand {userHandle, projectSlug}) -> do @@ -166,7 +166,7 @@ downloadEntitiesEndpoint mayUserId DownloadEntitiesRequest {repoInfo, hashes = h (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (DownloadEntitiesFailure . DownloadEntitiesProjectNotFound $ IDs.toText @ProjectShortHand projectShortHand) pure (project, Nothing) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (DownloadEntitiesFailure $ DownloadEntitiesNoReadPermission repoInfo) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Env.Env {maxParallelismPerDownloadRequest} <- ask diff --git a/share-api/src/Share/Web/UCM/SyncV2/Impl.hs b/share-api/src/Share/Web/UCM/SyncV2/Impl.hs index 161376c4..1fba40ce 100644 --- a/share-api/src/Share/Web/UCM/SyncV2/Impl.hs +++ b/share-api/src/Share/Web/UCM/SyncV2/Impl.hs @@ -146,7 +146,7 @@ codebaseForBranchRef branchRef = do (Project {ownerUserId = projectOwnerUserId}, contributorId) <- ExceptT . PG.tryRunTransaction $ do project <- PGQ.projectByShortHand projectShortHand `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound $ projectShortHand) pure (project, Nothing) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc Right (Right (ProjectBranchShortHand {userHandle, projectSlug, contributorHandle})) -> do @@ -155,6 +155,6 @@ codebaseForBranchRef branchRef = do project <- (PGQ.projectByShortHand projectShortHand) `whenNothingM` throwError (CodebaseLoadingErrorProjectNotFound projectShortHand) mayContributorUserId <- for contributorHandle \ch -> fmap user_id $ (UserQ.userByHandle ch) `whenNothingM` throwError (CodebaseLoadingErrorUserNotFound ch) pure (project, mayContributorUserId) - authZToken <- lift AuthZ.checkDownloadFromProjectBranchCodebase `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) + authZToken <- lift AuthZ.hashJWTAuthOverride `whenLeftM` \_err -> throwError (CodebaseLoadingErrorNoReadPermission branchRef) let codebaseLoc = Codebase.codebaseLocationForProjectBranchCodebase projectOwnerUserId contributorId pure $ Codebase.codebaseEnv authZToken codebaseLoc diff --git a/sql/2025-11-20_history-comments.sql b/sql/2025-11-20_history-comments.sql index a70d50ed..12e76d08 100644 --- a/sql/2025-11-20_history-comments.sql +++ b/sql/2025-11-20_history-comments.sql @@ -40,7 +40,7 @@ CREATE TABLE history_comments ( CREATE INDEX idx_history_comments_causal_id ON history_comments(causal_id); CREATE TABLE history_comment_revisions ( - id INTEGER PRIMARY KEY, + id SERIAL PRIMARY KEY, comment_id INTEGER NOT NULL REFERENCES history_comments(id), subject TEXT NOT NULL, contents TEXT NOT NULL, diff --git a/transcripts/run-transcripts.zsh b/transcripts/run-transcripts.zsh index 2a5f0c2d..5dd27bb0 100755 --- a/transcripts/run-transcripts.zsh +++ b/transcripts/run-transcripts.zsh @@ -21,7 +21,7 @@ transcripts_location="transcripts/share-apis" for dir in "$transcripts_location"/*(/); do # Extract the directory name (transcript name) transcript="${dir:t}" - + # If the first argument is missing, run all transcripts, otherwise run only transcripts which match a prefix of the argument if [ -z "${1:-}" ] || [[ "$transcript" == "$1"* ]]; then pg_reset_fixtures diff --git a/transcripts/share-apis/history-comments/comment-pull.md b/transcripts/share-apis/history-comments/comment-pull.md new file mode 100644 index 00000000..76a4421b --- /dev/null +++ b/transcripts/share-apis/history-comments/comment-pull.md @@ -0,0 +1,4 @@ +```ucm +scratch/main> pull @transcripts/history-comments/main +scratch/main> history +``` diff --git a/transcripts/share-apis/history-comments/prelude.md b/transcripts/share-apis/history-comments/prelude.md new file mode 100644 index 00000000..225b9f69 --- /dev/null +++ b/transcripts/share-apis/history-comments/prelude.md @@ -0,0 +1,19 @@ +```ucm:hide +history-comments/main> builtins.mergeio lib.builtins +``` + +Add some history, then set comments on it. + +```unison:hide +x = 1 +``` + +```ucm +scratch/main> update +scratch/main> config.set author.name Unison +scratch/main> history.comment /main: "Initial commit with variable x set to 1" +scratch/main> alias.term x y +scratch/main> history.comment /main: "Renamed x to y" +scratch/main> history +scratch/main> push @transcripts/history-comments/main +``` diff --git a/transcripts/share-apis/history-comments/run.zsh b/transcripts/share-apis/history-comments/run.zsh new file mode 100755 index 00000000..fd189a38 --- /dev/null +++ b/transcripts/share-apis/history-comments/run.zsh @@ -0,0 +1,11 @@ +#!/usr/bin/env zsh + +set -e + +source "../../transcript_helpers.sh" + +# Create some history +transcript_ucm transcript prelude.md + +# Pull the history +transcript_ucm transcript comment-pull.md diff --git a/transcripts/transcript_functions.sh b/transcripts/transcript_functions.sh index 146c17a2..5b3e4baa 100644 --- a/transcripts/transcript_functions.sh +++ b/transcripts/transcript_functions.sh @@ -11,7 +11,7 @@ mkdir -p "${ucm_xdg_data_dir}/unisonlanguage" ucm_credentials_file="${ucm_xdg_data_dir}/unisonlanguage/credentials.json" # Executable to use when running unison transcripts -export UCM_PATH="${UCM_PATH:-"$(which ucm)"}" +export UCM_PATH="$(which unison-history-comment-message)" export empty_causal_hash='sg60bvjo91fsoo7pkh9gejbn0qgc95vra87ap6l5d35ri0lkaudl7bs12d71sf3fh6p23teemuor7mk1i9n567m50ibakcghjec5ajg' export echo_server_port=9999 export echo_server="http://localhost:${echo_server_port}" diff --git a/unison b/unison index 23b831f6..fabc45fd 160000 --- a/unison +++ b/unison @@ -1 +1 @@ -Subproject commit 23b831f6e3736d54a0516940dec3f56d143b9015 +Subproject commit fabc45fd6462e00b020a4b2e47906c70e5600722