diff --git a/glean/glass/Glean/Glass/Snapshot.hs b/glean/glass/Glean/Glass/Snapshot.hs index 85fe6eb5f..4a4f0d79e 100644 --- a/glean/glass/Glean/Glass/Snapshot.hs +++ b/glean/glass/Glean/Glass/Snapshot.hs @@ -17,20 +17,24 @@ module Glean.Glass.Snapshot ( main ) where -import Control.Concurrent.Stream (forConcurrently_unordered) +import Control.Concurrent.Async (withAsync, wait) import Control.Concurrent (getNumCapabilities) +import Control.Concurrent.STM + (atomically, writeTBQueue, readTBQueue, newTBQueueIO) +import Control.Concurrent.Stream (stream) import Control.Exception (SomeException, try) -import Control.Monad (forM, when) +import Control.Monad (forM_, when) import qualified Data.ByteString as BS import Data.Default (def) import Data.Int (Int64) import Data.Foldable (asum) -import Data.Maybe (fromMaybe, catMaybes) +import Data.Maybe (fromMaybe) import Data.Proxy (Proxy (..)) import Data.Text (Text, pack, unpack) import Data.ByteString.Lazy (fromStrict, toStrict) import qualified Data.Text.Encoding as TE import qualified Database.MySQL.Simple as DB +import Numeric.Natural import Options.Applicative ( Parser, ParserInfo, @@ -79,6 +83,10 @@ import Glean.Init (withOptions) import qualified Glean.Snapshot.Types as Types import Glean.Glass.Types (GlassException(glassException_reasons)) +-- A snapshot is usually <100KB +mAX_UPLOAD_QUEUE :: Natural +mAX_UPLOAD_QUEUE = 10000 + type FileToSnapshot = (Types.RepoName, Types.Path) splitPath :: Text -> FileToSnapshot @@ -182,10 +190,10 @@ data BuildSnapshot = BuildSnapshot bytes :: !BS.ByteString, revision :: Types.Revision, sizeKB :: !Int, - symbolList :: Types.DocumentSymbolListXResult, defs :: !Int, - refs :: !Int - } + refs :: !Int, + isEmptySnapshot :: !Bool +} isEmptySymbols :: Types.DocumentSymbolListXResult -> Bool isEmptySymbols Types.DocumentSymbolListXResult{..} = @@ -200,6 +208,10 @@ data SnapshotError | UploadError SomeException | AlreadyPresent +data UploadQueueItem + = UploadQueueItem (FileToSnapshot, Either SnapshotError BuildSnapshot) + | UploadQueueDone + showError :: SnapshotError -> Text showError EmptySymbolList = "empty symbol list" showError SizeAboveThreshold{..} = "size above threshold (" <> showt kb <> "kB)" @@ -244,10 +256,11 @@ buildSnapshot env rev (repo, path) doCompress = do let ser = transform $ serializeGen (Proxy :: Proxy Compact) snapshots let snapshotSizeKB = BS.length ser `div` 1000 let logCompress :: String = if doCompress then "(compressed)" else "" + let isEmpty = isEmptySymbols symList logInfo $ printf "Building snapshot %s %s: %d defs %d refs %dKB %s" (Types.unPath path) (Types.unRevision revision) defs refs snapshotSizeKB logCompress - return $! BuildSnapshot ser revision snapshotSizeKB symbolList defs refs + return $! BuildSnapshot ser revision snapshotSizeKB defs refs isEmpty uploadToXdb :: SnapshotTier @@ -261,8 +274,8 @@ uploadToXdb (Types.RepoName repo) (Types.Revision rev) (Types.Path path) - BuildSnapshot{bytes, sizeKB, symbolList} - | isEmptySymbols symbolList = do + BuildSnapshot{bytes, sizeKB, isEmptySnapshot} + | isEmptySnapshot = do logWarning $ printf "%s: Refusing to upload empty snapshot" path return $ Just EmptySymbolList | otherwise = do @@ -309,61 +322,73 @@ realMain _config@Config{..} = foldMap (Logger.setWorkflowId . pack) skycastleJobId <> foldMap Logger.setDiffVersion phabricatorVersionNumber - snaps <- forConcurrently_unordered numCores files $ \file -> do - let (_, Types.Path p) = file - snapOrError <- try $ buildSnapshot env rev file doCompress - case snapOrError of - Left e@Types.GlassException{..} -> do - let reason = printf "%s: %s" p (show $ head glassException_reasons) - logError reason - return (file, Left $ GlassException e) - Right snap -> - return (file, Right snap) - - mbErrors <- forM snaps $ \((repo, path), snapOrError) -> do - result <- case snapOrError of - Left e -> return (Left e) - Right snap -> do - let BuildSnapshot{bytes, revision, sizeKB} = snap - if - | Nothing <- output - , sizeKB < fromMaybe maxBound threshold - -> do - mbError <- uploadToXdb tier repo revision path snap - return (maybe (Right snap) Left mbError) - | Nothing <- output - -> do - logError $ - printf - "%s: Snapshot size above threshold (%d kB), not uploading" - (Types.unPath path) - sizeKB - return (Left $ SizeAboveThreshold sizeKB) - | Just output_ <- output - -> do - let out = output_ unpack (Types.unPath path) - createDirectoryIfMissing True (takeDirectory out) - BS.writeFile out bytes - return (Right snap) - - Logger.runLog logger $ - logJobProperties <> - Logger.setPath (Types.unPath path) <> - Logger.setRepo (Types.unRepoName repo) <> - case result of - Left e -> - Logger.setError (showError e) - Right BuildSnapshot{..} -> - Logger.setDefinitions defs <> - Logger.setReferences refs - - return (either Just (const Nothing) result) - - -- Exit with error code = number of failed snapshots - let errors = catMaybes mbErrors - let errorsCount = length $ filter isFatalError errors - when (errorsCount > 0) $ - exitWith (ExitFailure errorsCount) + -- Single-threaded upload to XDB to avoid issues + uploadQueue <- newTBQueueIO mAX_UPLOAD_QUEUE + let + uploadLoop accErrors = do + next <- atomically $ readTBQueue uploadQueue + case next of + UploadQueueDone -> return accErrors + UploadQueueItem ((repo,path), snapOrError) -> do + result <- case snapOrError of + Left _ -> return snapOrError + Right snap -> do + let BuildSnapshot{bytes, revision, sizeKB} = snap + if + | Nothing <- output + , sizeKB < fromMaybe maxBound threshold + -> do + mbError <- uploadToXdb tier repo revision path snap + return (maybe (Right snap) Left mbError) + | Nothing <- output + -> do + logError $ + printf + "%s: Snapshot size above threshold (%d kB), not uploading" + (Types.unPath path) + sizeKB + return (Left $ SizeAboveThreshold sizeKB) + | Just output_ <- output + -> do + let out = output_ unpack (Types.unPath path) + createDirectoryIfMissing True (takeDirectory out) + BS.writeFile out bytes + return (Right snap) + Logger.runLog logger $ + logJobProperties <> + Logger.setPath (Types.unPath path) <> + Logger.setRepo (Types.unRepoName repo) <> + case result of + Left e -> + Logger.setError (showError e) + Right BuildSnapshot{defs, refs} -> + Logger.setDefinitions defs <> + Logger.setReferences refs + let accErrors' = either pure (const []) result <> accErrors + uploadLoop accErrors' + + withAsync (uploadLoop []) $ \uploadThread -> do + -- Concurrent computation of Glass snapshots + stream numCores (forM_ files) $ \file -> do + let (_, Types.Path p) = file + snapOrError <- try $ buildSnapshot env rev file doCompress + result <- case snapOrError of + Left e@Types.GlassException{..} -> do + let reason = printf "%s: %s" p (show $ head glassException_reasons) + logError reason + return (file, Left $ GlassException e) + Right snap -> + return (file, Right snap) + atomically $ writeTBQueue uploadQueue $ UploadQueueItem result + + -- wait for all the uploads + atomically $ writeTBQueue uploadQueue $ UploadQueueDone + errors <- wait uploadThread + + -- Exit with error code = number of failed snapshots + let errorsCount = length $ filter isFatalError errors + when (errorsCount > 0) $ + exitWith (ExitFailure errorsCount) isFatalError :: SnapshotError -> Bool isFatalError = \case