Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Control/Monad/Trace.hs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import Data.Time.Clock (NominalDiffTime)
import Data.Time.Clock.POSIX (POSIXTime, getPOSIXTime)
import UnliftIO (MonadUnliftIO, withRunInIO)
import UnliftIO.Exception (finally)
import UnliftIO.STM (TChan, TVar, atomically, modifyTVar', newTChanIO, newTVarIO, readTVar, writeTChan, writeTVar)
import UnliftIO.STM (TQueue, TVar, atomically, modifyTVar', newTQueueIO, newTVarIO, readTVar, writeTQueue, writeTVar)

-- | A collection of span tags.
type Tags = Map Key JSON.Value
Expand Down Expand Up @@ -76,21 +76,21 @@ data Sample = Sample
-- These samples can then be consumed independently, decoupling downstream span processing from
-- their production.
data Tracer = Tracer
{ tracerChannel :: TChan Sample
, tracerPendingCount :: TVar Int
{ tracerChannel :: !(TQueue Sample)
, tracerPendingCount :: !(TVar Int)
}

-- | Creates a new 'Tracer'.
newTracer :: MonadIO m => m Tracer
newTracer = liftIO $ Tracer <$> newTChanIO <*> newTVarIO 0
newTracer = liftIO $ Tracer <$> newTQueueIO <*> newTVarIO 0

-- | Returns the number of spans currently in flight (started but not yet completed).
pendingSpanCount :: Tracer -> TVar Int
pendingSpanCount = tracerPendingCount

-- | Returns all newly completed spans' samples. The samples become available in the same order they
-- are completed.
spanSamples :: Tracer -> TChan Sample
spanSamples :: Tracer -> TQueue Sample
spanSamples = tracerChannel

data Scope = Scope
Expand Down Expand Up @@ -145,7 +145,7 @@ instance MonadUnliftIO m => MonadTrace (TraceT m) where
modifyTVar' (tracerPendingCount tracer) (\n -> n - 1)
tags <- readTVar tagsTV
logs <- sortOn (\(t, k, _) -> (t, k)) <$> readTVar logsTV
writeTChan (tracerChannel tracer) (Sample spn tags logs start (end - start))
writeTQueue (tracerChannel tracer) (Sample spn tags logs start (end - start))
run `finally` cleanup
else local (const $ Scope tracer (Just spn) Nothing Nothing) reader

Expand Down
29 changes: 15 additions & 14 deletions src/Monitor/Tracing/Local.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ module Monitor.Tracing.Local (
collectSpanSamples
) where

import Control.Concurrent.STM (atomically, readTVar, readTChan, tryReadTChan)
import Control.Concurrent.STM (atomically, readTVar, flushTQueue, readTQueue)
import Control.Monad (when)
import Control.Monad.Fix (fix)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trace
import Data.Foldable (for_)
import Data.IORef (modifyIORef', newIORef, readIORef)
import UnliftIO (MonadUnliftIO)

Expand All @@ -26,16 +28,15 @@ collectSpanSamples :: MonadUnliftIO m => TraceT m a -> m (a, [Sample])
collectSpanSamples actn = do
tracer <- newTracer
rv <- runTraceT actn tracer
ref <- liftIO $ newIORef []
let
addSample spl = liftIO $ modifyIORef' ref (spl:)
samplesTC = spanSamples tracer
pendingTV = pendingSpanCount tracer
liftIO $ fix $ \loop -> do
(mbSample, pending) <- atomically $ (,) <$> tryReadTChan samplesTC <*> readTVar pendingTV
case mbSample of
Just spl -> addSample spl >> loop
Nothing | pending > 0 -> liftIO (atomically $ readTChan samplesTC) >>= addSample >> loop
_ -> pure ()
spls <- reverse <$> liftIO (readIORef ref)
pure (rv, spls)
liftIO $ do
ref <- newIORef []
let
addSample spl = modifyIORef' ref (spl:)
samplesTQ = spanSamples tracer
pendingTV = pendingSpanCount tracer
fix $ \loop -> do
(samples, pending) <- atomically $ (,) <$> flushTQueue samplesTQ <*> readTVar pendingTV
for_ samples addSample
when (pending > 0) $ (atomically $ readTQueue samplesTQ) >>= addSample >> loop
spls <- reverse <$> readIORef ref
pure (rv, spls)
80 changes: 48 additions & 32 deletions src/Monitor/Tracing/Zipkin.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
Expand All @@ -18,9 +19,16 @@ module Monitor.Tracing.Zipkin (
-- ** Endpoint
Endpoint(..), defaultEndpoint,

-- * Publishing traces
-- * Tracing actions and publishing traces
-- ** Automatically
Zipkin,
new, run, publish, with,
new, with,
PublishFailed(..),

-- ** Manually
-- | When more flexibility is needed, it's possible to control when traces are published by using
-- the functions below.
run, publish, publishSpans,

-- * Cross-process spans
-- ** Communication
Expand All @@ -41,19 +49,17 @@ module Monitor.Tracing.Zipkin (
import Control.Monad.Trace
import Control.Monad.Trace.Class

import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (atomically, tryReadTChan)
import Control.Monad (forever, guard, void, when)
import Control.Monad.Fix (fix)
import Control.Concurrent.STM (flushTQueue)
import Control.Exception (Exception, SomeException, throw)
import Control.Monad (forever, guard, mfilter, void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import qualified Data.Aeson as JSON
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.CaseInsensitive (CI)
import Data.Time.Clock (NominalDiffTime)
import Data.Foldable (toList)
import Data.Foldable (for_, toList)
import Data.Int (Int64)
import Data.IORef (modifyIORef, newIORef, readIORef)
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import Data.Maybe (catMaybes, fromMaybe, listToMaybe, maybeToList)
Expand All @@ -72,7 +78,9 @@ import Network.HTTP.Client (Manager, Request)
import qualified Network.HTTP.Client as HTTP
import Network.Socket (HostName, PortNumber)
import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception (finally)
import UnliftIO.Concurrent (forkIO, threadDelay)
import UnliftIO.Exception (catch, finally)
import UnliftIO.STM (atomically)

-- | 'Zipkin' creation settings.
data Settings = Settings
Expand Down Expand Up @@ -111,17 +119,6 @@ data Zipkin = Zipkin
, zipkinEndpoint :: !(Maybe Endpoint)
}

flushSpans :: Maybe Endpoint -> Tracer -> Request -> Manager -> IO ()
flushSpans ept tracer req mgr = do
ref <- newIORef []
fix $ \loop -> atomically (tryReadTChan $ spanSamples tracer) >>= \case
Nothing -> pure ()
Just sample -> modifyIORef ref (ZipkinSpan ept sample:) >> loop
spns <- readIORef ref
when (not $ null spns) $ do
let req' = req { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns }
void $ HTTP.httpLbs req' mgr

-- | Creates a 'Zipkin' publisher for the input 'Settings'.
new :: MonadIO m => Settings -> m Zipkin
new (Settings mbHostname mbPort mbEpt mbMgr mbPrd) = liftIO $ do
Expand All @@ -135,23 +132,42 @@ new (Settings mbHostname mbPort mbEpt mbMgr mbPrd) = liftIO $ do
, HTTP.port = maybe 9411 fromIntegral mbPort
, HTTP.requestHeaders = [("content-type", "application/json")]
}
void $ let prd = fromMaybe 0 mbPrd in if prd <= 0
then pure Nothing
else fmap Just $ forkIO $ forever $ do
threadDelay (microSeconds prd)
flushSpans mbEpt tracer req mgr -- Manager is thread-safe.
pure $ Zipkin mgr req tracer mbEpt
zpk = Zipkin mgr req tracer mbEpt
for_ (microSeconds <$> mfilter (> 0) mbPrd) $ \delay -> forkIO $ forever $ do
threadDelay delay
publish zpk
pure zpk

-- | Runs a 'TraceT' action, sampling spans appropriately. Note that this method does not publish
-- spans on its own; to do so, either call 'publish' manually or specify a positive
-- 'settingsPublishPeriod' to publish in the background.
run :: TraceT m a -> Zipkin -> m a
run actn zipkin = runTraceT actn (zipkinTracer zipkin)

-- | Flushes all complete spans to the Zipkin server.
publish :: MonadIO m => Zipkin -> m ()
publish z =
liftIO $ flushSpans (zipkinEndpoint z) (zipkinTracer z) (zipkinRequest z) (zipkinManager z)
-- | Error thrown when span publication failed, for example if the server is unreachable. The error
-- exposes the underlying cause as well as the spans which could not be flushed.
data PublishFailed = forall e. Exception e => PublishFailed [ZipkinSpan] e

instance Show PublishFailed where
show (PublishFailed _ e) = "PublishFailed: " <> show e

instance Exception PublishFailed

-- | Flushes the given spans to the Zipkin server. This function is a no-op when input is empty. If
-- publication failed, this function will throw a 'PublishFailed' exception.
publishSpans :: MonadUnliftIO m => Zipkin -> [ZipkinSpan] -> m ()
publishSpans zpk spns = when (not $ null spns) $
let
req = (zipkinRequest zpk) { HTTP.requestBody = HTTP.RequestBodyLBS $ JSON.encode spns }
send = void $ liftIO $ HTTP.httpLbs req (zipkinManager zpk)
in catch send $ \e -> throw (PublishFailed spns (e :: SomeException))

-- | Flushes all complete spans to the Zipkin server. See 'publishSpans' for error handling.
publish :: MonadUnliftIO m => Zipkin -> m ()
publish zpk = liftIO $ do
let mbEpt = zipkinEndpoint zpk
samples <- atomically $ flushTQueue $ spanSamples $ zipkinTracer zpk
publishSpans zpk $ fmap (ZipkinSpan mbEpt) samples

-- | Convenience method to start a 'Zipkin', run an action, and publish all spans before returning.
with :: MonadUnliftIO m => Settings -> (Zipkin -> m a) -> m a
Expand Down Expand Up @@ -421,8 +437,8 @@ instance JSON.ToJSON ZipkinAnnotation where
[ "timestamp" JSON..= microSeconds @Int64 t
, "value" JSON..= v ]

-- Internal type used to encode spans in the <https://zipkin.apache.org/zipkin-api/#/ format>
-- expected by Zipkin.
-- Type used to encode spans in the <https://zipkin.apache.org/zipkin-api/#/ format> expected by
-- Zipkin.
data ZipkinSpan = ZipkinSpan !(Maybe Endpoint) !Sample

publicTags :: Tags -> Map Text JSON.Value
Expand Down
2 changes: 1 addition & 1 deletion tracing.cabal
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
cabal-version: 2.0

name: tracing
version: 0.0.6.2
version: 0.0.7.0
synopsis: Distributed tracing
description:
An OpenTracing-compliant, simple, and extensible distributed tracing library.
Expand Down