Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions src/Control/Monad/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ module Control.Monad.Trace (

-- ** Pending spans
pendingSpanCount,

-- ** SBQueue
SBQueue,
defaultQueueCapacity,
newSBQueueIO,
isEmptySBQueue,
readSBQueue,
writeSBQueue
) where

import Prelude hiding (span)
Expand All @@ -32,8 +40,10 @@ import Control.Monad.Trace.Class
import Control.Monad.Trace.Internal

import Control.Applicative ((<|>))
import Control.Monad.STM (retry)
import Control.Concurrent.STM.Lifted
import Control.Exception.Lifted
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Reader (ReaderT(ReaderT), ask, asks, local, runReaderT)
import Control.Monad.Reader.Class (MonadReader)
Expand Down Expand Up @@ -71,6 +81,48 @@ data Sample = Sample
-- ^ The span's duration.
}

-- SBQueue implementation and helpers are all taken shamelessly from logbase

-- | Default capacity of log queues (TBQueue for regular logger, 'SBQueue' for
-- bulk loggers). This corresponds to approximately 200 MiB memory residency
-- when the queue is full.
defaultQueueCapacity :: Int
defaultQueueCapacity = 1000000

-- | A simple STM based bounded queue.
data SBQueue a = SBQueue !(TVar [a]) !(TVar Int) !Int

-- | Create an instance of 'SBQueue' with a given capacity.
newSBQueueIO :: Int -> IO (SBQueue a)
newSBQueueIO capacity = SBQueue <$> newTVarIO [] <*> newTVarIO 0 <*> pure capacity

-- | Check if an 'SBQueue' is empty.
isEmptySBQueue :: SBQueue a -> STM Bool
isEmptySBQueue (SBQueue queue count _capacity) = do
isEmpty <- null <$> readTVar queue
numElems <- readTVar count
assert (if isEmpty then numElems == 0 else numElems > 0) $
return isEmpty

-- | Read all the values stored in an 'SBQueue'.
readSBQueue :: SBQueue a -> STM [a]
readSBQueue (SBQueue queue count _capacity) = do
elems <- readTVar queue
when (null elems) retry
writeTVar queue []
writeTVar count 0
return $ reverse elems

-- | Write a value to an 'SBQueue'.
writeSBQueue :: SBQueue a -> a -> STM ()
writeSBQueue (SBQueue queue count capacity) a = do
numElems <- readTVar count
when (numElems < capacity) $ do
modifyTVar queue (a :)
-- Strict modification of the queue size to avoid space leak
modifyTVar' count (+1)


-- | A tracer is a producer of spans.
--
-- More specifically, a tracer:
Expand All @@ -82,22 +134,22 @@ data Sample = Sample
-- These samples can then be consumed independently, decoupling downstream span processing from
-- their production.
data Tracer = Tracer
{ tracerChannel :: TChan Sample
{ tracerQueue :: SBQueue Sample
, tracerPendingCount :: TVar Int
}

-- | Creates a new 'Tracer'.
newTracer :: MonadIO m => m Tracer
newTracer = liftIO $ Tracer <$> newTChanIO <*> newTVarIO 0
newTracer = liftIO $ Tracer <$> newSBQueueIO defaultQueueCapacity <*> newTVarIO 0

-- | Returns the number of spans currently in flight (started but not yet completed).
pendingSpanCount :: Tracer -> TVar Int
pendingSpanCount = tracerPendingCount

-- | Returns all newly completed spans' samples. The samples become available in the same order they
-- are completed.
spanSamples :: Tracer -> TChan Sample
spanSamples = tracerChannel
spanSamples :: Tracer -> SBQueue Sample
spanSamples = tracerQueue

data Scope = Scope
{ scopeTracer :: !Tracer
Expand Down Expand Up @@ -163,7 +215,7 @@ instance (MonadBaseControl IO m, MonadIO m) => MonadTrace (TraceT m) where
modifyTVar' (tracerPendingCount tracer) (\n -> n - 1)
tags <- readTVar tagsTV
logs <- sortOn (\(t, k, _) -> (t, k)) <$> readTVar logsTV
writeTChan (tracerChannel tracer) (Sample spn tags logs start (end - start))
writeSBQueue (tracerQueue tracer) (Sample spn tags logs start (end - start))
run `finally` cleanup
else local (const $ Just $ Scope tracer (Just spn) Nothing Nothing) reader

Expand Down
11 changes: 7 additions & 4 deletions src/Monitor/Tracing/Local.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Monitor.Tracing.Local (

import Control.Monad.Trace

import Control.Concurrent.STM.Lifted (atomically, readTVar, readTChan, tryReadTChan)
import Control.Concurrent.STM.Lifted (atomically, readTVar)
import Control.Monad.Fix (fix)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
Expand All @@ -34,10 +34,13 @@ collectSpanSamples actn = do
samplesTC = spanSamples tracer
pendingTV = pendingSpanCount tracer
liftIO $ fix $ \loop -> do
(mbSample, pending) <- atomically $ (,) <$> tryReadTChan samplesTC <*> readTVar pendingTV
(mbSample, pending) <- atomically $ (,) <$> readSBQueue samplesTC <*> readTVar pendingTV
case mbSample of
Just spl -> addSample spl >> loop
Nothing | pending > 0 -> liftIO (atomically $ readTChan samplesTC) >>= addSample >> loop
(x:xs) -> mapM_ addSample (x:xs) >> loop
[] | pending > 0 -> do
toAdd <- liftIO (atomically $ readSBQueue samplesTC)
mapM_ addSample toAdd
loop
_ -> pure ()
spls <- reverse <$> liftIO (readIORef ref)
pure (rv, spls)
71 changes: 35 additions & 36 deletions src/Monitor/Tracing/Zipkin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ module Monitor.Tracing.Zipkin (
import Control.Monad.Trace
import Control.Monad.Trace.Class

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM.Lifted (atomically, tryReadTChan)
import Control.Exception (catch)
import Control.Exception.Lifted (finally)
import Control.Monad (forever, guard, void, when)
import Control.Monad.Fix (fix)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Lifted (fork)
import Control.Concurrent.STM.Lifted (atomically)
import Control.Exception (SomeException)
import Control.Exception.Lifted (finally, try)
import Control.Monad (forever, guard, unless, void)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import qualified Data.Aeson as JSON
Expand All @@ -58,7 +58,6 @@ import Data.CaseInsensitive (CI)
import Data.Time.Clock (NominalDiffTime)
import Data.Foldable (toList)
import Data.Int (Int64)
import Data.IORef (modifyIORef, newIORef, readIORef)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, fromMaybe, listToMaybe, maybeToList)
Expand All @@ -73,9 +72,10 @@ import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Time.Clock.POSIX (POSIXTime)
import Network.HTTP.Client (HttpException, Manager, Request)
import Network.HTTP.Client (Manager, Request)
import qualified Network.HTTP.Client as HTTP
import Network.Socket (HostName, PortNumber)
import Control.Monad.Base (liftBase)

-- | 'Zipkin' creation settings.
data Settings = Settings
Expand All @@ -87,18 +87,16 @@ data Settings = Settings
-- ^ Local endpoint included in all published spans.
, settingsManager :: !(Maybe Manager)
-- ^ An optional HTTP manager to use for publishing spans on the Zipkin server.
, settingsPublishPeriod :: !(Maybe NominalDiffTime)
-- ^ If set to a positive value, traces will be flushed in the background every such period.
, settingsIgnoreBackgroundExceptions :: Bool
-- ^ If set to True, background flushes will ignore any `HttpException`s
, settingsPublishPeriod :: !NominalDiffTime
-- ^ Publish traces in the background at the given interval
}

-- | Creates empty 'Settings'. You will typically use this (or the 'IsString' instance) as starting
-- point to only fill in the fields you care about:
--
-- > let settings = defaultSettings { settingsPort = Just 2222 }
defaultSettings :: Settings
defaultSettings = Settings Nothing Nothing Nothing Nothing Nothing False
defaultSettings = Settings Nothing Nothing Nothing Nothing 1

-- | Generates settings with the given string as hostname.
instance IsString Settings where
Expand All @@ -116,20 +114,9 @@ data Zipkin = Zipkin
, zipkinEndpoint :: !(Maybe Endpoint)
}

flushSpans :: Maybe Endpoint -> Tracer -> Request -> Manager -> IO ()
flushSpans ept tracer req mgr = do
ref <- newIORef []
fix $ \loop -> atomically (tryReadTChan $ spanSamples tracer) >>= \case
Nothing -> pure ()
Just sample -> modifyIORef ref (ZipkinSpan ept sample:) >> loop
spns <- readIORef ref
when (not $ null spns) $ do
let req' = req { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns }
void $ HTTP.httpLbs req' mgr

-- | Creates a 'Zipkin' publisher for the input 'Settings'.
new :: MonadIO m => Settings -> m Zipkin
new (Settings mbHostname mbPort mbEpt mbMgr mbPrd ignore) = liftIO $ do
new (Settings mbHostname mbPort mbEpt mbMgr publishInterval) = liftIO $ do
mgr <- maybe (HTTP.newManager HTTP.defaultManagerSettings) pure mbMgr
tracer <- newTracer
let
Expand All @@ -140,26 +127,38 @@ new (Settings mbHostname mbPort mbEpt mbMgr mbPrd ignore) = liftIO $ do
, HTTP.port = maybe 9411 fromIntegral mbPort
, HTTP.requestHeaders = [("content-type", "application/json")]
}
void $ let prd = fromMaybe 0 mbPrd in if prd <= 0
then pure Nothing
else fmap Just $ forkIO $ forever $ do
threadDelay (microSeconds prd)
let flush = flushSpans mbEpt tracer req mgr -- Manager is thread-safe.
if ignore
then catch flush (\(_ :: HttpException) -> pure ())
else flush
pure $ Zipkin mgr req tracer mbEpt
zpk = Zipkin mgr req tracer mbEpt
void $ fork $ forever $ do
threadDelay $ microSeconds publishInterval
publish zpk
pure zpk

-- | Runs a 'TraceT' action, sampling spans appropriately. Note that this method does not publish
-- spans on its own; to do so, either call 'publish' manually or specify a positive
-- 'settingsPublishPeriod' to publish in the background.
run :: TraceT m a -> Zipkin -> m a
run actn zipkin = runTraceT actn (zipkinTracer zipkin)

-- | Flushes the given spans to the Zipkin server. This function is a no-op when input is empty. If
-- publication failed, this function will throw a 'PublishFailed' exception.
publishSpans :: (MonadIO m, MonadBaseControl IO m) => Zipkin -> [ZipkinSpan] -> m ()
publishSpans zpk spns = unless (null spns) $
let req = (zipkinRequest zpk) { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns }
in void $ liftIO $ HTTP.httpLbs req (zipkinManager zpk)

-- | Flushes all complete spans to the Zipkin server.
publish :: MonadIO m => Zipkin -> m ()
publish z =
liftIO $ flushSpans (zipkinEndpoint z) (zipkinTracer z) (zipkinRequest z) (zipkinManager z)
publish z = liftIO $ do
let mbEpt = zipkinEndpoint z
samples <- atomically $ readSBQueue $ spanSamples $ zipkinTracer z
retryOnException $ publishSpans z $ fmap (ZipkinSpan mbEpt) samples

retryOnException :: MonadBaseControl IO m => m r -> m r
retryOnException m = try m >>= \case
Left (_ :: SomeException) -> do
liftBase $ threadDelay $ 10 * 1000000
retryOnException m
Right result -> return result

-- | Convenience method to start a 'Zipkin', run an action, and publish all spans before returning.
with :: (MonadBaseControl IO m, MonadIO m) => Settings -> (Zipkin -> m a) -> m a
Expand Down
1 change: 1 addition & 0 deletions tracing.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ library
, network >= 2.8
, random >= 1.1
, stm-lifted >= 2.5
, stm >= 2.5
, text >= 1.2
, time >= 1.8 && < 1.10
, transformers >= 0.5
Expand Down