diff --git a/src/Control/Monad/Trace.hs b/src/Control/Monad/Trace.hs index 82d01e2..23cf6f3 100644 --- a/src/Control/Monad/Trace.hs +++ b/src/Control/Monad/Trace.hs @@ -24,6 +24,14 @@ module Control.Monad.Trace ( -- ** Pending spans pendingSpanCount, + + -- ** SBQueue + SBQueue, + defaultQueueCapacity, + newSBQueueIO, + isEmptySBQueue, + readSBQueue, + writeSBQueue ) where import Prelude hiding (span) @@ -32,8 +40,10 @@ import Control.Monad.Trace.Class import Control.Monad.Trace.Internal import Control.Applicative ((<|>)) +import Control.Monad.STM (retry) import Control.Concurrent.STM.Lifted import Control.Exception.Lifted +import Control.Monad (when) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Reader (ReaderT(ReaderT), ask, asks, local, runReaderT) import Control.Monad.Reader.Class (MonadReader) @@ -71,6 +81,48 @@ data Sample = Sample -- ^ The span's duration. } +-- SBQueue implementation and helpers are all taken shamelessly from logbase + +-- | Default capacity of log queues (TBQueue for regular logger, 'SBQueue' for +-- bulk loggers). This corresponds to approximately 200 MiB memory residency +-- when the queue is full. +defaultQueueCapacity :: Int +defaultQueueCapacity = 1000000 + +-- | A simple STM based bounded queue. +data SBQueue a = SBQueue !(TVar [a]) !(TVar Int) !Int + +-- | Create an instance of 'SBQueue' with a given capacity. +newSBQueueIO :: Int -> IO (SBQueue a) +newSBQueueIO capacity = SBQueue <$> newTVarIO [] <*> newTVarIO 0 <*> pure capacity + +-- | Check if an 'SBQueue' is empty. +isEmptySBQueue :: SBQueue a -> STM Bool +isEmptySBQueue (SBQueue queue count _capacity) = do + isEmpty <- null <$> readTVar queue + numElems <- readTVar count + assert (if isEmpty then numElems == 0 else numElems > 0) $ + return isEmpty + +-- | Read all the values stored in an 'SBQueue'. +readSBQueue :: SBQueue a -> STM [a] +readSBQueue (SBQueue queue count _capacity) = do + elems <- readTVar queue + when (null elems) retry + writeTVar queue [] + writeTVar count 0 + return $ reverse elems + +-- | Write a value to an 'SBQueue'. +writeSBQueue :: SBQueue a -> a -> STM () +writeSBQueue (SBQueue queue count capacity) a = do + numElems <- readTVar count + when (numElems < capacity) $ do + modifyTVar queue (a :) + -- Strict modification of the queue size to avoid space leak + modifyTVar' count (+1) + + -- | A tracer is a producer of spans. -- -- More specifically, a tracer: @@ -82,13 +134,13 @@ data Sample = Sample -- These samples can then be consumed independently, decoupling downstream span processing from -- their production. data Tracer = Tracer - { tracerChannel :: TChan Sample + { tracerQueue :: SBQueue Sample , tracerPendingCount :: TVar Int } -- | Creates a new 'Tracer'. newTracer :: MonadIO m => m Tracer -newTracer = liftIO $ Tracer <$> newTChanIO <*> newTVarIO 0 +newTracer = liftIO $ Tracer <$> newSBQueueIO defaultQueueCapacity <*> newTVarIO 0 -- | Returns the number of spans currently in flight (started but not yet completed). pendingSpanCount :: Tracer -> TVar Int @@ -96,8 +148,8 @@ pendingSpanCount = tracerPendingCount -- | Returns all newly completed spans' samples. The samples become available in the same order they -- are completed. -spanSamples :: Tracer -> TChan Sample -spanSamples = tracerChannel +spanSamples :: Tracer -> SBQueue Sample +spanSamples = tracerQueue data Scope = Scope { scopeTracer :: !Tracer @@ -163,7 +215,7 @@ instance (MonadBaseControl IO m, MonadIO m) => MonadTrace (TraceT m) where modifyTVar' (tracerPendingCount tracer) (\n -> n - 1) tags <- readTVar tagsTV logs <- sortOn (\(t, k, _) -> (t, k)) <$> readTVar logsTV - writeTChan (tracerChannel tracer) (Sample spn tags logs start (end - start)) + writeSBQueue (tracerQueue tracer) (Sample spn tags logs start (end - start)) run `finally` cleanup else local (const $ Just $ Scope tracer (Just spn) Nothing Nothing) reader diff --git a/src/Monitor/Tracing/Local.hs b/src/Monitor/Tracing/Local.hs index 51a8f91..39dd764 100644 --- a/src/Monitor/Tracing/Local.hs +++ b/src/Monitor/Tracing/Local.hs @@ -7,7 +7,7 @@ module Monitor.Tracing.Local ( import Control.Monad.Trace -import Control.Concurrent.STM.Lifted (atomically, readTVar, readTChan, tryReadTChan) +import Control.Concurrent.STM.Lifted (atomically, readTVar) import Control.Monad.Fix (fix) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl) @@ -34,10 +34,13 @@ collectSpanSamples actn = do samplesTC = spanSamples tracer pendingTV = pendingSpanCount tracer liftIO $ fix $ \loop -> do - (mbSample, pending) <- atomically $ (,) <$> tryReadTChan samplesTC <*> readTVar pendingTV + (mbSample, pending) <- atomically $ (,) <$> readSBQueue samplesTC <*> readTVar pendingTV case mbSample of - Just spl -> addSample spl >> loop - Nothing | pending > 0 -> liftIO (atomically $ readTChan samplesTC) >>= addSample >> loop + (x:xs) -> mapM_ addSample (x:xs) >> loop + [] | pending > 0 -> do + toAdd <- liftIO (atomically $ readSBQueue samplesTC) + mapM_ addSample toAdd + loop _ -> pure () spls <- reverse <$> liftIO (readIORef ref) pure (rv, spls) diff --git a/src/Monitor/Tracing/Zipkin.hs b/src/Monitor/Tracing/Zipkin.hs index eabffda..17dee8a 100644 --- a/src/Monitor/Tracing/Zipkin.hs +++ b/src/Monitor/Tracing/Zipkin.hs @@ -43,12 +43,12 @@ module Monitor.Tracing.Zipkin ( import Control.Monad.Trace import Control.Monad.Trace.Class -import Control.Concurrent (forkIO, threadDelay) -import Control.Concurrent.STM.Lifted (atomically, tryReadTChan) -import Control.Exception (catch) -import Control.Exception.Lifted (finally) -import Control.Monad (forever, guard, void, when) -import Control.Monad.Fix (fix) +import Control.Concurrent (threadDelay) +import Control.Concurrent.Lifted (fork) +import Control.Concurrent.STM.Lifted (atomically) +import Control.Exception (SomeException) +import Control.Exception.Lifted (finally, try) +import Control.Monad (forever, guard, unless, void) import Control.Monad.IO.Class (MonadIO, liftIO) import Control.Monad.Trans.Control (MonadBaseControl) import qualified Data.Aeson as JSON @@ -58,7 +58,6 @@ import Data.CaseInsensitive (CI) import Data.Time.Clock (NominalDiffTime) import Data.Foldable (toList) import Data.Int (Int64) -import Data.IORef (modifyIORef, newIORef, readIORef) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map import Data.Maybe (catMaybes, fromMaybe, listToMaybe, maybeToList) @@ -73,9 +72,10 @@ import Data.Text (Text) import qualified Data.Text as T import qualified Data.Text.Encoding as T import Data.Time.Clock.POSIX (POSIXTime) -import Network.HTTP.Client (HttpException, Manager, Request) +import Network.HTTP.Client (Manager, Request) import qualified Network.HTTP.Client as HTTP import Network.Socket (HostName, PortNumber) +import Control.Monad.Base (liftBase) -- | 'Zipkin' creation settings. data Settings = Settings @@ -87,10 +87,8 @@ data Settings = Settings -- ^ Local endpoint included in all published spans. , settingsManager :: !(Maybe Manager) -- ^ An optional HTTP manager to use for publishing spans on the Zipkin server. - , settingsPublishPeriod :: !(Maybe NominalDiffTime) - -- ^ If set to a positive value, traces will be flushed in the background every such period. - , settingsIgnoreBackgroundExceptions :: Bool - -- ^ If set to True, background flushes will ignore any `HttpException`s + , settingsPublishPeriod :: !NominalDiffTime + -- ^ Publish traces in the background at the given interval } -- | Creates empty 'Settings'. You will typically use this (or the 'IsString' instance) as starting @@ -98,7 +96,7 @@ data Settings = Settings -- -- > let settings = defaultSettings { settingsPort = Just 2222 } defaultSettings :: Settings -defaultSettings = Settings Nothing Nothing Nothing Nothing Nothing False +defaultSettings = Settings Nothing Nothing Nothing Nothing 1 -- | Generates settings with the given string as hostname. instance IsString Settings where @@ -116,20 +114,9 @@ data Zipkin = Zipkin , zipkinEndpoint :: !(Maybe Endpoint) } -flushSpans :: Maybe Endpoint -> Tracer -> Request -> Manager -> IO () -flushSpans ept tracer req mgr = do - ref <- newIORef [] - fix $ \loop -> atomically (tryReadTChan $ spanSamples tracer) >>= \case - Nothing -> pure () - Just sample -> modifyIORef ref (ZipkinSpan ept sample:) >> loop - spns <- readIORef ref - when (not $ null spns) $ do - let req' = req { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns } - void $ HTTP.httpLbs req' mgr - -- | Creates a 'Zipkin' publisher for the input 'Settings'. new :: MonadIO m => Settings -> m Zipkin -new (Settings mbHostname mbPort mbEpt mbMgr mbPrd ignore) = liftIO $ do +new (Settings mbHostname mbPort mbEpt mbMgr publishInterval) = liftIO $ do mgr <- maybe (HTTP.newManager HTTP.defaultManagerSettings) pure mbMgr tracer <- newTracer let @@ -140,15 +127,11 @@ new (Settings mbHostname mbPort mbEpt mbMgr mbPrd ignore) = liftIO $ do , HTTP.port = maybe 9411 fromIntegral mbPort , HTTP.requestHeaders = [("content-type", "application/json")] } - void $ let prd = fromMaybe 0 mbPrd in if prd <= 0 - then pure Nothing - else fmap Just $ forkIO $ forever $ do - threadDelay (microSeconds prd) - let flush = flushSpans mbEpt tracer req mgr -- Manager is thread-safe. - if ignore - then catch flush (\(_ :: HttpException) -> pure ()) - else flush - pure $ Zipkin mgr req tracer mbEpt + zpk = Zipkin mgr req tracer mbEpt + void $ fork $ forever $ do + threadDelay $ microSeconds publishInterval + publish zpk + pure zpk -- | Runs a 'TraceT' action, sampling spans appropriately. Note that this method does not publish -- spans on its own; to do so, either call 'publish' manually or specify a positive @@ -156,10 +139,26 @@ new (Settings mbHostname mbPort mbEpt mbMgr mbPrd ignore) = liftIO $ do run :: TraceT m a -> Zipkin -> m a run actn zipkin = runTraceT actn (zipkinTracer zipkin) +-- | Flushes the given spans to the Zipkin server. This function is a no-op when input is empty. If +-- publication failed, this function will throw a 'PublishFailed' exception. +publishSpans :: (MonadIO m, MonadBaseControl IO m) => Zipkin -> [ZipkinSpan] -> m () +publishSpans zpk spns = unless (null spns) $ + let req = (zipkinRequest zpk) { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns } + in void $ liftIO $ HTTP.httpLbs req (zipkinManager zpk) + -- | Flushes all complete spans to the Zipkin server. publish :: MonadIO m => Zipkin -> m () -publish z = - liftIO $ flushSpans (zipkinEndpoint z) (zipkinTracer z) (zipkinRequest z) (zipkinManager z) +publish z = liftIO $ do + let mbEpt = zipkinEndpoint z + samples <- atomically $ readSBQueue $ spanSamples $ zipkinTracer z + retryOnException $ publishSpans z $ fmap (ZipkinSpan mbEpt) samples + +retryOnException :: MonadBaseControl IO m => m r -> m r +retryOnException m = try m >>= \case + Left (_ :: SomeException) -> do + liftBase $ threadDelay $ 10 * 1000000 + retryOnException m + Right result -> return result -- | Convenience method to start a 'Zipkin', run an action, and publish all spans before returning. with :: (MonadBaseControl IO m, MonadIO m) => Settings -> (Zipkin -> m a) -> m a diff --git a/tracing.cabal b/tracing.cabal index eead395..ccb76f6 100644 --- a/tracing.cabal +++ b/tracing.cabal @@ -46,6 +46,7 @@ library , network >= 2.8 , random >= 1.1 , stm-lifted >= 2.5 + , stm >= 2.5 , text >= 1.2 , time >= 1.8 && < 1.10 , transformers >= 0.5