Skip to content

Commit

Permalink
Avoid storing all snapshots in memory
Browse files Browse the repository at this point in the history
Summary:
glass-snapshot could OOM if asked to process many files, because currently we compute all snapshots first, then upload them to XDB

To prevent issues, let's parallelize snapshot computation and upload, and use a bounded queue in between to ensure we don't end up with too many snapshots in memory

Reviewed By: nhawkes

Differential Revision: D51148352

fbshipit-source-id: 7a43aa9f229c7301d4ecb26e222a22db6553ab8a
  • Loading branch information
Pepe Iborra authored and facebook-github-bot committed Nov 13, 2023
1 parent a54f054 commit dbabb92
Showing 1 changed file with 89 additions and 64 deletions.
153 changes: 89 additions & 64 deletions glean/glass/Glean/Glass/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,24 @@
module Glean.Glass.Snapshot
( main ) where

import Control.Concurrent.Stream (forConcurrently_unordered)
import Control.Concurrent.Async (withAsync, wait)
import Control.Concurrent (getNumCapabilities)
import Control.Concurrent.STM
(atomically, writeTBQueue, readTBQueue, newTBQueueIO)
import Control.Concurrent.Stream (stream)
import Control.Exception (SomeException, try)
import Control.Monad (forM, when)
import Control.Monad (forM_, when)
import qualified Data.ByteString as BS
import Data.Default (def)
import Data.Int (Int64)
import Data.Foldable (asum)
import Data.Maybe (fromMaybe, catMaybes)
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Data.Text (Text, pack, unpack)
import Data.ByteString.Lazy (fromStrict, toStrict)
import qualified Data.Text.Encoding as TE
import qualified Database.MySQL.Simple as DB
import Numeric.Natural
import Options.Applicative (
Parser,
ParserInfo,
Expand Down Expand Up @@ -79,6 +83,10 @@ import Glean.Init (withOptions)
import qualified Glean.Snapshot.Types as Types
import Glean.Glass.Types (GlassException(glassException_reasons))

-- A snapshot is usually <100KB
mAX_UPLOAD_QUEUE :: Natural
mAX_UPLOAD_QUEUE = 10000

type FileToSnapshot = (Types.RepoName, Types.Path)

splitPath :: Text -> FileToSnapshot
Expand Down Expand Up @@ -182,10 +190,10 @@ data BuildSnapshot = BuildSnapshot
bytes :: !BS.ByteString,
revision :: Types.Revision,
sizeKB :: !Int,
symbolList :: Types.DocumentSymbolListXResult,
defs :: !Int,
refs :: !Int
}
refs :: !Int,
isEmptySnapshot :: !Bool
}

isEmptySymbols :: Types.DocumentSymbolListXResult -> Bool
isEmptySymbols Types.DocumentSymbolListXResult{..} =
Expand All @@ -200,6 +208,10 @@ data SnapshotError
| UploadError SomeException
| AlreadyPresent

data UploadQueueItem
= UploadQueueItem (FileToSnapshot, Either SnapshotError BuildSnapshot)
| UploadQueueDone

showError :: SnapshotError -> Text
showError EmptySymbolList = "empty symbol list"
showError SizeAboveThreshold{..} = "size above threshold (" <> showt kb <> "kB)"
Expand Down Expand Up @@ -244,10 +256,11 @@ buildSnapshot env rev (repo, path) doCompress = do
let ser = transform $ serializeGen (Proxy :: Proxy Compact) snapshots
let snapshotSizeKB = BS.length ser `div` 1000
let logCompress :: String = if doCompress then "(compressed)" else ""
let isEmpty = isEmptySymbols symList
logInfo $ printf "Building snapshot %s %s: %d defs %d refs %dKB %s"
(Types.unPath path) (Types.unRevision revision) defs refs
snapshotSizeKB logCompress
return $! BuildSnapshot ser revision snapshotSizeKB symbolList defs refs
return $! BuildSnapshot ser revision snapshotSizeKB defs refs isEmpty

uploadToXdb
:: SnapshotTier
Expand All @@ -261,8 +274,8 @@ uploadToXdb
(Types.RepoName repo)
(Types.Revision rev)
(Types.Path path)
BuildSnapshot{bytes, sizeKB, symbolList}
| isEmptySymbols symbolList = do
BuildSnapshot{bytes, sizeKB, isEmptySnapshot}
| isEmptySnapshot = do
logWarning $ printf "%s: Refusing to upload empty snapshot" path
return $ Just EmptySymbolList
| otherwise = do
Expand Down Expand Up @@ -309,61 +322,73 @@ realMain _config@Config{..} =
foldMap (Logger.setWorkflowId . pack) skycastleJobId <>
foldMap Logger.setDiffVersion phabricatorVersionNumber

snaps <- forConcurrently_unordered numCores files $ \file -> do
let (_, Types.Path p) = file
snapOrError <- try $ buildSnapshot env rev file doCompress
case snapOrError of
Left e@Types.GlassException{..} -> do
let reason = printf "%s: %s" p (show $ head glassException_reasons)
logError reason
return (file, Left $ GlassException e)
Right snap ->
return (file, Right snap)

mbErrors <- forM snaps $ \((repo, path), snapOrError) -> do
result <- case snapOrError of
Left e -> return (Left e)
Right snap -> do
let BuildSnapshot{bytes, revision, sizeKB} = snap
if
| Nothing <- output
, sizeKB < fromMaybe maxBound threshold
-> do
mbError <- uploadToXdb tier repo revision path snap
return (maybe (Right snap) Left mbError)
| Nothing <- output
-> do
logError $
printf
"%s: Snapshot size above threshold (%d kB), not uploading"
(Types.unPath path)
sizeKB
return (Left $ SizeAboveThreshold sizeKB)
| Just output_ <- output
-> do
let out = output_ </> unpack (Types.unPath path)
createDirectoryIfMissing True (takeDirectory out)
BS.writeFile out bytes
return (Right snap)

Logger.runLog logger $
logJobProperties <>
Logger.setPath (Types.unPath path) <>
Logger.setRepo (Types.unRepoName repo) <>
case result of
Left e ->
Logger.setError (showError e)
Right BuildSnapshot{..} ->
Logger.setDefinitions defs <>
Logger.setReferences refs

return (either Just (const Nothing) result)

-- Exit with error code = number of failed snapshots
let errors = catMaybes mbErrors
let errorsCount = length $ filter isFatalError errors
when (errorsCount > 0) $
exitWith (ExitFailure errorsCount)
-- Single-threaded upload to XDB to avoid issues
uploadQueue <- newTBQueueIO mAX_UPLOAD_QUEUE
let
uploadLoop accErrors = do
next <- atomically $ readTBQueue uploadQueue
case next of
UploadQueueDone -> return accErrors
UploadQueueItem ((repo,path), snapOrError) -> do
result <- case snapOrError of
Left _ -> return snapOrError
Right snap -> do
let BuildSnapshot{bytes, revision, sizeKB} = snap
if
| Nothing <- output
, sizeKB < fromMaybe maxBound threshold
-> do
mbError <- uploadToXdb tier repo revision path snap
return (maybe (Right snap) Left mbError)
| Nothing <- output
-> do
logError $
printf
"%s: Snapshot size above threshold (%d kB), not uploading"
(Types.unPath path)
sizeKB
return (Left $ SizeAboveThreshold sizeKB)
| Just output_ <- output
-> do
let out = output_ </> unpack (Types.unPath path)
createDirectoryIfMissing True (takeDirectory out)
BS.writeFile out bytes
return (Right snap)
Logger.runLog logger $
logJobProperties <>
Logger.setPath (Types.unPath path) <>
Logger.setRepo (Types.unRepoName repo) <>
case result of
Left e ->
Logger.setError (showError e)
Right BuildSnapshot{defs, refs} ->
Logger.setDefinitions defs <>
Logger.setReferences refs
let accErrors' = either pure (const []) result <> accErrors
uploadLoop accErrors'

withAsync (uploadLoop []) $ \uploadThread -> do
-- Concurrent computation of Glass snapshots
stream numCores (forM_ files) $ \file -> do
let (_, Types.Path p) = file
snapOrError <- try $ buildSnapshot env rev file doCompress
result <- case snapOrError of
Left e@Types.GlassException{..} -> do
let reason = printf "%s: %s" p (show $ head glassException_reasons)
logError reason
return (file, Left $ GlassException e)
Right snap ->
return (file, Right snap)
atomically $ writeTBQueue uploadQueue $ UploadQueueItem result

-- wait for all the uploads
atomically $ writeTBQueue uploadQueue $ UploadQueueDone
errors <- wait uploadThread

-- Exit with error code = number of failed snapshots
let errorsCount = length $ filter isFatalError errors
when (errorsCount > 0) $
exitWith (ExitFailure errorsCount)

isFatalError :: SnapshotError -> Bool
isFatalError = \case
Expand Down

0 comments on commit dbabb92

Please sign in to comment.