From 902e01e8a5b250b63971c0fbd0724c1288fb18d6 Mon Sep 17 00:00:00 2001 From: Jan Sipr Date: Wed, 25 Mar 2020 11:28:08 +0100 Subject: [PATCH 01/16] Export records needed for better logging --- Data/Pool.hs | 2 +- resource-pool.cabal | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Data/Pool.hs b/Data/Pool.hs index 6764e8b..e4ecc28 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -30,7 +30,7 @@ module Data.Pool ( Pool(idleTime, maxResources, numStripes) - , LocalPool + , LocalPool(inUse, entries) , createPool , withResource , takeResource diff --git a/resource-pool.cabal b/resource-pool.cabal index 6a9bc09..9b44a9f 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -1,5 +1,5 @@ name: resource-pool -version: 0.2.3.2 +version: 0.2.3.3 synopsis: A high-performance striped resource pooling implementation description: A high-performance striped pooling abstraction for managing From cb56f2c1d3bd42dceb4bbb26e3c12063831f89fa Mon Sep 17 00:00:00 2001 From: Soares Chen Date: Fri, 11 Sep 2020 16:25:57 +0200 Subject: [PATCH 02/16] Downgrade local patched version to 0.2.3.2.1 --- resource-pool.cabal | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-pool.cabal b/resource-pool.cabal index 9b44a9f..7dc197b 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -1,5 +1,5 @@ name: resource-pool -version: 0.2.3.3 +version: 0.2.3.2.1 synopsis: A high-performance striped resource pooling implementation description: A high-performance striped pooling abstraction for managing From ff273d4ada69415c2840b4576087f63a4922c793 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Wed, 9 Mar 2022 00:25:03 +0100 Subject: [PATCH 03/16] Rewrite based on Control.Concurrent.QSem for better performance --- .github/workflows/haskell-ci.yml | 228 ++++++++++++++++ .hgignore | 10 - .hgtags | 14 - Data/Pool.hs | 450 +++++-------------------------- Data/Pool/Internal.hs | 261 ++++++++++++++++++ Data/Pool/Introspection.hs | 56 ++++ README.markdown | 28 -- README.md | 7 + Setup.lhs | 3 - resource-pool.cabal | 74 ++--- 10 files changed, 650 insertions(+), 481 deletions(-) create mode 100644 .github/workflows/haskell-ci.yml delete mode 100644 .hgignore delete mode 100644 .hgtags create mode 100644 Data/Pool/Internal.hs create mode 100644 Data/Pool/Introspection.hs delete mode 100644 README.markdown create mode 100644 README.md delete mode 100755 Setup.lhs diff --git a/.github/workflows/haskell-ci.yml b/.github/workflows/haskell-ci.yml new file mode 100644 index 0000000..4231913 --- /dev/null +++ b/.github/workflows/haskell-ci.yml @@ -0,0 +1,228 @@ +# This GitHub workflow config has been generated by a script via +# +# haskell-ci 'github' '--config=cabal.haskell-ci' 'resource-pool.cabal' +# +# To regenerate the script (for example after adjusting tested-with) run +# +# haskell-ci regenerate +# +# For more information, see https://github.com/haskell-CI/haskell-ci +# +# version: 0.14.3 +# +# REGENDATA ("0.14.3",["github","--config=cabal.haskell-ci","resource-pool.cabal"]) +# +name: Haskell-CI +on: + push: + branches: + - master + pull_request: + branches: + - master +jobs: + linux: + name: Haskell-CI - Linux - ${{ matrix.compiler }} + runs-on: ubuntu-18.04 + timeout-minutes: + 60 + container: + image: buildpack-deps:bionic + continue-on-error: ${{ matrix.allow-failure }} + strategy: + matrix: + include: + - compiler: ghc-9.2.2 + compilerKind: ghc + compilerVersion: 9.2.2 + setup-method: ghcup + allow-failure: false + - compiler: ghc-9.0.2 + compilerKind: ghc + compilerVersion: 9.0.2 + setup-method: ghcup + allow-failure: false + - compiler: ghc-8.10.7 + compilerKind: ghc + compilerVersion: 8.10.7 + setup-method: ghcup + allow-failure: false + - compiler: ghc-8.8.4 + compilerKind: ghc + compilerVersion: 8.8.4 + setup-method: hvr-ppa + allow-failure: false + - compiler: ghc-8.6.5 + compilerKind: ghc + compilerVersion: 8.6.5 + setup-method: hvr-ppa + allow-failure: false + - compiler: ghc-8.4.4 + compilerKind: ghc + compilerVersion: 8.4.4 + setup-method: hvr-ppa + allow-failure: false + fail-fast: false + steps: + - name: apt + run: | + apt-get update + apt-get install -y --no-install-recommends gnupg ca-certificates dirmngr curl git software-properties-common libtinfo5 + if [ "${{ matrix.setup-method }}" = ghcup ]; then + mkdir -p "$HOME/.ghcup/bin" + curl -sL https://downloads.haskell.org/ghcup/0.1.17.5/x86_64-linux-ghcup-0.1.17.5 > "$HOME/.ghcup/bin/ghcup" + chmod a+x "$HOME/.ghcup/bin/ghcup" + "$HOME/.ghcup/bin/ghcup" install ghc "$HCVER" + "$HOME/.ghcup/bin/ghcup" install cabal 3.6.2.0 + else + apt-add-repository -y 'ppa:hvr/ghc' + apt-get update + apt-get install -y "$HCNAME" + mkdir -p "$HOME/.ghcup/bin" + curl -sL https://downloads.haskell.org/ghcup/0.1.17.5/x86_64-linux-ghcup-0.1.17.5 > "$HOME/.ghcup/bin/ghcup" + chmod a+x "$HOME/.ghcup/bin/ghcup" + "$HOME/.ghcup/bin/ghcup" install cabal 3.6.2.0 + fi + env: + HCKIND: ${{ matrix.compilerKind }} + HCNAME: ${{ matrix.compiler }} + HCVER: ${{ matrix.compilerVersion }} + - name: Set PATH and environment variables + run: | + echo "$HOME/.cabal/bin" >> $GITHUB_PATH + echo "LANG=C.UTF-8" >> "$GITHUB_ENV" + echo "CABAL_DIR=$HOME/.cabal" >> "$GITHUB_ENV" + echo "CABAL_CONFIG=$HOME/.cabal/config" >> "$GITHUB_ENV" + HCDIR=/opt/$HCKIND/$HCVER + if [ "${{ matrix.setup-method }}" = ghcup ]; then + HC=$HOME/.ghcup/bin/$HCKIND-$HCVER + echo "HC=$HC" >> "$GITHUB_ENV" + echo "HCPKG=$HOME/.ghcup/bin/$HCKIND-pkg-$HCVER" >> "$GITHUB_ENV" + echo "HADDOCK=$HOME/.ghcup/bin/haddock-$HCVER" >> "$GITHUB_ENV" + echo "CABAL=$HOME/.ghcup/bin/cabal-3.6.2.0 -vnormal+nowrap" >> "$GITHUB_ENV" + else + HC=$HCDIR/bin/$HCKIND + echo "HC=$HC" >> "$GITHUB_ENV" + echo "HCPKG=$HCDIR/bin/$HCKIND-pkg" >> "$GITHUB_ENV" + echo "HADDOCK=$HCDIR/bin/haddock" >> "$GITHUB_ENV" + echo "CABAL=$HOME/.ghcup/bin/cabal-3.6.2.0 -vnormal+nowrap" >> "$GITHUB_ENV" + fi + + HCNUMVER=$(${HC} --numeric-version|perl -ne '/^(\d+)\.(\d+)\.(\d+)(\.(\d+))?$/; print(10000 * $1 + 100 * $2 + ($3 == 0 ? $5 != 1 : $3))') + echo "HCNUMVER=$HCNUMVER" >> "$GITHUB_ENV" + echo "ARG_TESTS=--enable-tests" >> "$GITHUB_ENV" + echo "ARG_BENCH=--enable-benchmarks" >> "$GITHUB_ENV" + echo "HEADHACKAGE=false" >> "$GITHUB_ENV" + echo "ARG_COMPILER=--$HCKIND --with-compiler=$HC" >> "$GITHUB_ENV" + echo "GHCJSARITH=0" >> "$GITHUB_ENV" + env: + HCKIND: ${{ matrix.compilerKind }} + HCNAME: ${{ matrix.compiler }} + HCVER: ${{ matrix.compilerVersion }} + - name: env + run: | + env + - name: write cabal config + run: | + mkdir -p $CABAL_DIR + cat >> $CABAL_CONFIG <> $CABAL_CONFIG < cabal-plan.xz + echo 'de73600b1836d3f55e32d80385acc055fd97f60eaa0ab68a755302685f5d81bc cabal-plan.xz' | sha256sum -c - + xz -d < cabal-plan.xz > $HOME/.cabal/bin/cabal-plan + rm -f cabal-plan.xz + chmod a+x $HOME/.cabal/bin/cabal-plan + cabal-plan --version + - name: checkout + uses: actions/checkout@v2 + with: + path: source + - name: initial cabal.project for sdist + run: | + touch cabal.project + echo "packages: $GITHUB_WORKSPACE/source/." >> cabal.project + cat cabal.project + - name: sdist + run: | + mkdir -p sdist + $CABAL sdist all --output-dir $GITHUB_WORKSPACE/sdist + - name: unpack + run: | + mkdir -p unpacked + find sdist -maxdepth 1 -type f -name '*.tar.gz' -exec tar -C $GITHUB_WORKSPACE/unpacked -xzvf {} \; + - name: generate cabal.project + run: | + PKGDIR_resource_pool="$(find "$GITHUB_WORKSPACE/unpacked" -maxdepth 1 -type d -regex '.*/resource-pool-[0-9.]*')" + echo "PKGDIR_resource_pool=${PKGDIR_resource_pool}" >> "$GITHUB_ENV" + rm -f cabal.project cabal.project.local + touch cabal.project + touch cabal.project.local + echo "packages: ${PKGDIR_resource_pool}" >> cabal.project + echo "package resource-pool" >> cabal.project + echo " ghc-options: -Werror=missing-methods" >> cabal.project + cat >> cabal.project <> cabal.project.local + cat cabal.project + cat cabal.project.local + - name: dump install plan + run: | + $CABAL v2-build $ARG_COMPILER $ARG_TESTS $ARG_BENCH --dry-run all + cabal-plan + - name: cache + uses: actions/cache@v2 + with: + key: ${{ runner.os }}-${{ matrix.compiler }}-${{ github.sha }} + path: ~/.cabal/store + restore-keys: ${{ runner.os }}-${{ matrix.compiler }}- + - name: install dependencies + run: | + $CABAL v2-build $ARG_COMPILER --disable-tests --disable-benchmarks --dependencies-only -j2 all + $CABAL v2-build $ARG_COMPILER $ARG_TESTS $ARG_BENCH --dependencies-only -j2 all + - name: build w/o tests + run: | + $CABAL v2-build $ARG_COMPILER --disable-tests --disable-benchmarks all + - name: build + run: | + $CABAL v2-build $ARG_COMPILER $ARG_TESTS $ARG_BENCH all --write-ghc-environment-files=always + - name: cabal check + run: | + cd ${PKGDIR_resource_pool} || false + ${CABAL} -vnormal check + - name: haddock + run: | + $CABAL v2-haddock $ARG_COMPILER --with-haddock $HADDOCK $ARG_TESTS $ARG_BENCH all + - name: unconstrained build + run: | + rm -f cabal.project.local + $CABAL v2-build $ARG_COMPILER --disable-tests --disable-benchmarks all diff --git a/.hgignore b/.hgignore deleted file mode 100644 index 51da823..0000000 --- a/.hgignore +++ /dev/null @@ -1,10 +0,0 @@ -.*\.(?:aux|h[ip]|o|orig|out|pdf|prof|ps|rej)$ -^(?:dist|\.DS_Store)$ -^tests/(?:qc) - -syntax: glob -cabal-dev -*~ -.*.swp -.\#* -\#* diff --git a/.hgtags b/.hgtags deleted file mode 100644 index 1407166..0000000 --- a/.hgtags +++ /dev/null @@ -1,14 +0,0 @@ -4c30c410223808c6ba0f4045b76c79dccb2b3386 0.1.0.0 -7c58de97c097a6e67252460b3cdcddea5f4b688e 0.1.0.1 -639cfbbbfcb73b0effa51064e9b59bf7cb397ac7 0.1.0.2 -639cfbbbfcb73b0effa51064e9b59bf7cb397ac7 0.1.0.2 -1fc8b99aad9ca0947fa6c614eab7e3d793dc1b0d 0.1.0.2 -33fb0f106b0153d24d4281ef88b8cf0eb681e13d 0.1.1.0 -caf8342bace0ca22f74f08458419229cbff87727 0.2.0.0 -606c1e147f6c363a9850e1239f6ec01dc1cea842 0.2.0.1 -3c0453d9ed9f57ea13873e9fb6f3cf423c6f9fc8 0.2.0.2 -89a5ed47828f804e352296f66ccd10cd8639aebe 0.2.0.3 -f3fc5de81581943b601e4e0605c0a41e822e89b2 0.2.0.4 -1453c7d3314ff954f040b0179abe65790f4819fe 0.2.1.0 -ccaa7c42c382361181286cff1db1d305fb88a4a5 0.2.1.1 -e7c91e7b0e1cf6bf89b628b97e901820e4a42032 0.2.2.0 diff --git a/Data/Pool.hs b/Data/Pool.hs index e4ecc28..ef233e2 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -1,393 +1,81 @@ -{-# LANGUAGE CPP, NamedFieldPuns, RecordWildCards, ScopedTypeVariables, RankNTypes, DeriveDataTypeable #-} - -#if MIN_VERSION_monad_control(0,3,0) -{-# LANGUAGE FlexibleContexts #-} -#endif - -#if !MIN_VERSION_base(4,3,0) -{-# LANGUAGE RankNTypes #-} -#endif - --- | --- Module: Data.Pool --- Copyright: (c) 2011 MailRank, Inc. --- License: BSD3 --- Maintainer: Bryan O'Sullivan , --- Bas van Dijk --- Stability: experimental --- Portability: portable --- --- A high-performance striped pooling abstraction for managing --- flexibly-sized collections of resources such as database --- connections. --- --- \"Striped\" means that a single 'Pool' consists of several --- sub-pools, each managed independently. A single stripe is fine for --- many applications, and probably what you should choose by default. --- More stripes will lead to reduced contention in high-performance --- multicore applications, at a trade-off of causing the maximum --- number of simultaneous resources in use to grow. +-- | A high-performance pooling abstraction for managing flexibly-sized +-- collections of resources such as database connections. module Data.Pool - ( - Pool(idleTime, maxResources, numStripes) - , LocalPool(inUse, entries) - , createPool - , withResource - , takeResource - , tryWithResource - , tryTakeResource - , destroyResource - , putResource - , destroyAllResources - ) where - -import Control.Applicative ((<$>)) -import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay) -import Control.Concurrent.STM -import Control.Exception (SomeException, onException, mask_) -import Control.Monad (forM_, forever, join, liftM3, unless, when) -import Data.Hashable (hash) -import Data.IORef (IORef, newIORef, mkWeakIORef) -import Data.List (partition) -import Data.Time.Clock (NominalDiffTime, UTCTime, diffUTCTime, getCurrentTime) -import Data.Typeable (Typeable) -import GHC.Conc.Sync (labelThread) -import qualified Control.Exception as E -import qualified Data.Vector as V - -#if MIN_VERSION_monad_control(0,3,0) -import Control.Monad.Trans.Control (MonadBaseControl, control) -import Control.Monad.Base (liftBase) -#else -import Control.Monad.IO.Control (MonadControlIO, controlIO) -import Control.Monad.IO.Class (liftIO) -#define control controlIO -#define liftBase liftIO -#endif - -#if MIN_VERSION_base(4,3,0) -import Control.Exception (mask) -#else --- Don't do any async exception protection for older GHCs. -mask :: ((forall a. IO a -> IO a) -> IO b) -> IO b -mask f = f id -#endif - --- | A single resource pool entry. -data Entry a = Entry { - entry :: a - , lastUse :: UTCTime - -- ^ Time of last return. - } - --- | A single striped pool. -data LocalPool a = LocalPool { - inUse :: TVar Int - -- ^ Count of open entries (both idle and in use). - , entries :: TVar [Entry a] - -- ^ Idle entries. - , lfin :: IORef () - -- ^ empty value used to attach a finalizer to (internal) - } deriving (Typeable) - -data Pool a = Pool { - create :: IO a - -- ^ Action for creating a new entry to add to the pool. - , destroy :: a -> IO () - -- ^ Action for destroying an entry that is now done with. - , numStripes :: Int - -- ^ The number of stripes (distinct sub-pools) to maintain. - -- The smallest acceptable value is 1. - , idleTime :: NominalDiffTime - -- ^ Amount of time for which an unused resource is kept alive. - -- The smallest acceptable value is 0.5 seconds. - -- - -- The elapsed time before closing may be a little longer than - -- requested, as the reaper thread wakes at 1-second intervals. - , maxResources :: Int - -- ^ Maximum number of resources to maintain per stripe. The - -- smallest acceptable value is 1. - -- - -- Requests for resources will block if this limit is reached on a - -- single stripe, even if other stripes have idle resources - -- available. - , localPools :: V.Vector (LocalPool a) - -- ^ Per-capability resource pools. - , fin :: IORef () - -- ^ empty value used to attach a finalizer to (internal) - } deriving (Typeable) - -instance Show (Pool a) where - show Pool{..} = "Pool {numStripes = " ++ show numStripes ++ ", " ++ - "idleTime = " ++ show idleTime ++ ", " ++ - "maxResources = " ++ show maxResources ++ "}" - --- | Create a striped resource pool. + ( -- * Pool + Pool + , LocalPool + , newPool + + -- * Resource management + , withResource + , takeResource + , putResource + , destroyResource + , destroyAllResources + + -- * Compatibility with 0.2 + , createPool + ) where + +import Control.Concurrent +import Control.Exception +import Data.Time (NominalDiffTime) + +import Data.Pool.Internal + +-- | Take a resource from the pool, perform an action with it and return it to +-- the pool afterwards. -- --- Although the garbage collector will destroy all idle resources when --- the pool is garbage collected it's recommended to manually --- 'destroyAllResources' when you're done with the pool so that the --- resources are freed up as soon as possible. -createPool - :: IO a - -- ^ Action that creates a new resource. - -> (a -> IO ()) - -- ^ Action that destroys an existing resource. - -> Int - -- ^ The number of stripes (distinct sub-pools) to maintain. - -- The smallest acceptable value is 1. - -> NominalDiffTime - -- ^ Amount of time for which an unused resource is kept open. - -- The smallest acceptable value is 0.5 seconds. - -- - -- The elapsed time before destroying a resource may be a little - -- longer than requested, as the reaper thread wakes at 1-second - -- intervals. - -> Int - -- ^ Maximum number of resources to keep open per stripe. The - -- smallest acceptable value is 1. - -- - -- Requests for resources will block if this limit is reached on a - -- single stripe, even if other stripes have idle resources - -- available. - -> IO (Pool a) -createPool create destroy numStripes idleTime maxResources = do - when (numStripes < 1) $ - modError "pool " $ "invalid stripe count " ++ show numStripes - when (idleTime < 0.5) $ - modError "pool " $ "invalid idle time " ++ show idleTime - when (maxResources < 1) $ - modError "pool " $ "invalid maximum resource count " ++ show maxResources - localPools <- V.replicateM numStripes $ - liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ()) - reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask -> - unmask $ reaper destroy idleTime localPools - fin <- newIORef () - let p = Pool { - create - , destroy - , numStripes - , idleTime - , maxResources - , localPools - , fin - } - mkWeakIORef fin (killThread reaperId) >> - V.mapM_ (\lp -> mkWeakIORef (lfin lp) (purgeLocalPool destroy lp)) localPools - return p - --- TODO: Propose 'forkIOLabeledWithUnmask' for the base library. - --- | Sparks off a new thread using 'forkIOWithUnmask' to run the given --- IO computation, but first labels the thread with the given label --- (using 'labelThread'). +-- * If the pool has an idle resource available, it is used immediately. -- --- The implementation makes sure that asynchronous exceptions are --- masked until the given computation is executed. This ensures the --- thread will always be labeled which guarantees you can always --- easily find it in the GHC event log. +-- * Otherwise, if the maximum number of resources has not yet been reached, a +-- new resource is created and used. -- --- Like 'forkIOWithUnmask', the given computation is given a function --- to unmask asynchronous exceptions. See the documentation of that --- function for the motivation of this. +-- * If the maximum number of resources has been reached, this function blocks +-- until a resource becomes available. -- --- Returns the 'ThreadId' of the newly created thread. -forkIOLabeledWithUnmask :: String - -> ((forall a. IO a -> IO a) -> IO ()) - -> IO ThreadId -forkIOLabeledWithUnmask label m = mask_ $ forkIOWithUnmask $ \unmask -> do - tid <- myThreadId - labelThread tid label - m unmask - --- | Periodically go through all pools, closing any resources that --- have been left idle for too long. -reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO () -reaper destroy idleTime pools = forever $ do - threadDelay (1 * 1000000) - now <- getCurrentTime - let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime - V.forM_ pools $ \LocalPool{..} -> do - resources <- atomically $ do - (stale,fresh) <- partition isStale <$> readTVar entries - unless (null stale) $ do - writeTVar entries fresh - modifyTVar_ inUse (subtract (length stale)) - return (map entry stale) - forM_ resources $ \resource -> do - destroy resource `E.catch` \(_::SomeException) -> return () - --- | Destroy all idle resources of the given 'LocalPool' and remove them from --- the pool. -purgeLocalPool :: (a -> IO ()) -> LocalPool a -> IO () -purgeLocalPool destroy LocalPool{..} = do - resources <- atomically $ do - idle <- swapTVar entries [] - modifyTVar_ inUse (subtract (length idle)) - return (map entry idle) - forM_ resources $ \resource -> - destroy resource `E.catch` \(_::SomeException) -> return () - --- | Temporarily take a resource from a 'Pool', perform an action with --- it, and return it to the pool afterwards. --- --- * If the pool has an idle resource available, it is used --- immediately. --- --- * Otherwise, if the maximum number of resources has not yet been --- reached, a new resource is created and used. --- --- * If the maximum number of resources has been reached, this --- function blocks until a resource becomes available. +-- If the action throws an exception of any type, the resource is destroyed and +-- not returned to the pool. -- --- If the action throws an exception of any type, the resource is --- destroyed, and not returned to the pool. --- --- It probably goes without saying that you should never manually --- destroy a pooled resource, as doing so will almost certainly cause --- a subsequent user (who expects the resource to be valid) to throw --- an exception. -withResource :: -#if MIN_VERSION_monad_control(0,3,0) - (MonadBaseControl IO m) -#else - (MonadControlIO m) -#endif - => Pool a -> (a -> m b) -> m b -{-# SPECIALIZE withResource :: Pool a -> (a -> IO b) -> IO b #-} -withResource pool act = control $ \runInIO -> mask $ \restore -> do - (resource, local) <- takeResource pool - ret <- restore (runInIO (act resource)) `onException` - destroyResource pool local resource - putResource local resource - return ret -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE withResource #-} -#endif +-- It probably goes without saying that you should never manually destroy a +-- pooled resource, as doing so will almost certainly cause a subsequent user +-- (who expects the resource to be valid) to throw an exception. +withResource :: Pool a -> (a -> IO r) -> IO r +withResource pool act = mask $ \unmask -> do + (res, localPool) <- takeResource pool + r <- unmask (act res) `onException` destroyResource pool localPool res + putResource localPool res + pure r -- | Take a resource from the pool, following the same results as --- 'withResource'. Note that this function should be used with caution, as --- improper exception handling can lead to leaked resources. +-- 'withResource'. -- --- This function returns both a resource and the @LocalPool@ it came from so --- that it may either be destroyed (via 'destroyResource') or returned to the --- pool (via 'putResource'). +-- /Note:/ this function returns both a resource and the 'LocalPool' it came +-- from so that it may either be destroyed (via 'destroyResource') or returned +-- to the pool (via 'putResource'). takeResource :: Pool a -> IO (a, LocalPool a) -takeResource pool@Pool{..} = do - local@LocalPool{..} <- getLocalPool pool - resource <- liftBase . join . atomically $ do - ents <- readTVar entries - case ents of - (Entry{..}:es) -> writeTVar entries es >> return (return entry) - [] -> do - used <- readTVar inUse - when (used == maxResources) retry - writeTVar inUse $! used + 1 - return $ - create `onException` atomically (modifyTVar_ inUse (subtract 1)) - return (resource, local) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE takeResource #-} -#endif - --- | Similar to 'withResource', but only performs the action if a resource could --- be taken from the pool /without blocking/. Otherwise, 'tryWithResource' --- returns immediately with 'Nothing' (ie. the action function is /not/ called). --- Conversely, if a resource can be borrowed from the pool without blocking, the --- action is performed and it's result is returned, wrapped in a 'Just'. -tryWithResource :: forall m a b. -#if MIN_VERSION_monad_control(0,3,0) - (MonadBaseControl IO m) -#else - (MonadControlIO m) -#endif - => Pool a -> (a -> m b) -> m (Maybe b) -tryWithResource pool act = control $ \runInIO -> mask $ \restore -> do - res <- tryTakeResource pool - case res of - Just (resource, local) -> do - ret <- restore (runInIO (Just <$> act resource)) `onException` - destroyResource pool local resource - putResource local resource - return ret - Nothing -> restore . runInIO $ return (Nothing :: Maybe b) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE tryWithResource #-} -#endif - --- | A non-blocking version of 'takeResource'. The 'tryTakeResource' function --- returns immediately, with 'Nothing' if the pool is exhausted, or @'Just' (a, --- 'LocalPool' a)@ if a resource could be borrowed from the pool successfully. -tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a)) -tryTakeResource pool@Pool{..} = do - local@LocalPool{..} <- getLocalPool pool - resource <- liftBase . join . atomically $ do - ents <- readTVar entries - case ents of - (Entry{..}:es) -> writeTVar entries es >> return (return . Just $ entry) +takeResource pool = mask_ $ do + localPool@(LocalPool mstripe) <- getLocalPool (localPools pool) + stripe <- takeMVar mstripe + if available stripe == 0 + then do + q <- newEmptyMVar + putMVar mstripe $! stripe { queueR = q : queueR stripe } + a <- waitForResource mstripe q + pure (a, localPool) + else case cache stripe of [] -> do - used <- readTVar inUse - if used == maxResources - then return (return Nothing) - else do - writeTVar inUse $! used + 1 - return $ Just <$> - create `onException` atomically (modifyTVar_ inUse (subtract 1)) - return $ (flip (,) local) <$> resource -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE tryTakeResource #-} -#endif - --- | Get a (Thread-)'LocalPool' --- --- Internal, just to not repeat code for 'takeResource' and 'tryTakeResource' -getLocalPool :: Pool a -> IO (LocalPool a) -getLocalPool Pool{..} = do - i <- liftBase $ ((`mod` numStripes) . hash) <$> myThreadId - return $ localPools V.! i -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE getLocalPool #-} -#endif - --- | Destroy a resource. Note that this will ignore any exceptions in the --- destroy function. -destroyResource :: Pool a -> LocalPool a -> a -> IO () -destroyResource Pool{..} LocalPool{..} resource = do - destroy resource `E.catch` \(_::SomeException) -> return () - atomically (modifyTVar_ inUse (subtract 1)) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE destroyResource #-} -#endif - --- | Return a resource to the given 'LocalPool'. -putResource :: LocalPool a -> a -> IO () -putResource LocalPool{..} resource = do - now <- getCurrentTime - atomically $ modifyTVar_ entries (Entry resource now:) -#if __GLASGOW_HASKELL__ >= 700 -{-# INLINABLE putResource #-} -#endif - --- | Destroy all resources in all stripes in the pool. Note that this --- will ignore any exceptions in the destroy function. --- --- This function is useful when you detect that all resources in the --- pool are broken. For example after a database has been restarted --- all connections opened before the restart will be broken. In that --- case it's better to close those connections so that 'takeResource' --- won't take a broken connection from the pool but will open a new --- connection instead. + putMVar mstripe $! stripe { available = available stripe - 1 } + a <- createResource pool `onException` restoreSize mstripe + pure (a, localPool) + Entry a _ : as -> do + putMVar mstripe $! stripe { available = available stripe - 1, cache = as } + pure (a, localPool) + +{-# DEPRECATED createPool "Use newPool instead" #-} +-- | Provided for compatibility with @resource-pool < 0.3@. -- --- Another use-case for this function is that when you know you are --- done with the pool you can destroy all idle resources immediately --- instead of waiting on the garbage collector to destroy them, thus --- freeing up those resources sooner. -destroyAllResources :: Pool a -> IO () -destroyAllResources Pool{..} = V.forM_ localPools $ purgeLocalPool destroy - -modifyTVar_ :: TVar a -> (a -> a) -> STM () -modifyTVar_ v f = readTVar v >>= \a -> writeTVar v $! f a - -modError :: String -> String -> a -modError func msg = - error $ "Data.Pool." ++ func ++ ": " ++ msg +-- Use 'newPool' instead. +createPool :: IO a -> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a) +createPool create free numStripes idleTime maxResources = do + newPool create free (realToFrac idleTime) (numStripes * maxResources) diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs new file mode 100644 index 0000000..871e3bd --- /dev/null +++ b/Data/Pool/Internal.hs @@ -0,0 +1,261 @@ +-- | Internal implementation details for "Data.Pool". +-- +-- This module is intended for internal use only, and may change without warning +-- in subsequent releases. +{-# OPTIONS_HADDOCK hide, not-home #-} +module Data.Pool.Internal where + +import Control.Concurrent +import Control.Exception +import Control.Monad +import Data.IORef +import Data.Primitive.SmallArray +import GHC.Clock +import qualified Data.List as L + +-- | Striped resource pool based on "Control.Concurrent.QSem". +-- +-- The number of stripes is arranged to be equal to the number of capabilities +-- so that they never compete over access to the same stripe. This results in a +-- very good performance in a multi-threaded environment. +data Pool a = Pool + { createResource :: IO a + , freeResource :: a -> IO () + , localPools :: SmallArray (LocalPool a) + , reaperRef :: IORef () + } + +-- | A single, capability-local pool. +newtype LocalPool a = LocalPool (MVar (Stripe a)) + +-- | Stripe of a resource pool. If @available@ is 0, the list of threads waiting +-- for a resource (each with an associated 'MVar') is @queue ++ reverse queueR@. +data Stripe a = Stripe + { available :: Int + , cache :: [Entry a] + , queue :: [MVar a] + , queueR :: [MVar a] + } + +-- | An existing resource currently sitting in a pool. +data Entry a = Entry + { entry :: a + , lastUsed :: Double + } + +-- | Create a new striped resource pool. +-- +-- The number of stripes is equal to the number of capabilities @N@. +-- +-- /Note:/ although the garbage collector will destroy all idle resources when +-- the pool is garbage collected, it's recommended to manually call +-- 'destroyAllResources' when you're done with the pool so that the resources +-- are freed up as soon as possible. +newPool + :: IO a + -- ^ The action that creates a new resource. + -> (a -> IO ()) + -- ^ The action that destroys an existing resource. + -> Double + -- ^ The amount of seconds for which an unused resource is kept open. The + -- smallest acceptable value is @0.5@. + -- + -- /Note:/ the elapsed time before destroying a resource may be a little + -- longer than requested, as the collector thread wakes at 1-second intervals. + -> Int + -- ^ The number of resources to keep open across all stripes. The smallest + -- acceptable value is @1@. + -- + -- /Note:/ for each stripe the number of resources is divided by the number of + -- capabilities and rounded up. Therefore the pool might end up creating up to + -- @N - 1@ resources more in total than specified. + -> IO (Pool a) +newPool create free idleTime maxResources = do + when (idleTime < 0.5) $ do + error "idleTime must be at least 0.5" + when (maxResources < 1) $ do + error "maxResources must be at least 1" + numStripes <- getNumCapabilities + pools <- + fmap (smallArrayFromListN numStripes) + . replicateM numStripes + . fmap LocalPool + . newMVar + $ Stripe { available = maxResources `quotCeil` numStripes + , cache = [] + , queue = [] + , queueR = [] + } + mask_ $ do + ref <- newIORef () + collectorA <- forkIOWithUnmask $ \unmask -> unmask $ collector pools + void . mkWeakIORef ref $ do + killThread collectorA + cleanLocalPools (const True) free pools + pure Pool { createResource = create + , freeResource = free + , localPools = pools + , reaperRef = ref + } + where + quotCeil :: Int -> Int -> Int + quotCeil x y = + -- Basically ceiling (x / y) without going through Double. + let (z, r) = x `quotRem` y in if r == 0 then z else z + 1 + + -- Collect stale resources from the pool once per second. + collector pools = forever $ do + threadDelay 1000000 + now <- getMonotonicTime + let isStale e = now - lastUsed e > idleTime + cleanLocalPools isStale free pools + +-- | Destroy a resource. +-- +-- Note that this will ignore any exceptions in the destroy function. +destroyResource :: Pool a -> LocalPool a -> a -> IO () +destroyResource pool (LocalPool mstripe) a = do + uninterruptibleMask_ $ do -- Note [signal uninterruptible] + stripe <- takeMVar mstripe + putMVar mstripe $! stripe { available = available stripe + 1 } + void . try @SomeException $ freeResource pool a + +-- | Return a resource to the given 'LocalPool'. +putResource :: LocalPool a -> a -> IO () +putResource (LocalPool mstripe) a = do + uninterruptibleMask_ $ do -- Note [signal uninterruptible] + stripe <- takeMVar mstripe + newStripe <- signal stripe a + putMVar mstripe newStripe + +-- | Destroy all resources in all stripes in the pool. +-- +-- Note that this will ignore any exceptions in the destroy function. +-- +-- This function is useful when you detect that all resources in the pool are +-- broken. For example after a database has been restarted all connections +-- opened before the restart will be broken. In that case it's better to close +-- those connections so that 'takeResource' won't take a broken connection from +-- the pool but will open a new connection instead. +-- +-- Another use-case for this function is that when you know you are done with +-- the pool you can destroy all idle resources immediately instead of waiting on +-- the garbage collector to destroy them, thus freeing up those resources +-- sooner. +destroyAllResources :: Pool a -> IO () +destroyAllResources pool = do + cleanLocalPools (const True) (freeResource pool) (localPools pool) + +---------------------------------------- +-- Introspection + +-- | A resource taken from the pool along with additional information. +data Resource a = Resource + { resource :: a + , acquisitionTime :: Double + , acquisitionMethod :: AcquisitionMethod + , availableResources :: Int + } + +-- | Method of acquiring a resource from the pool. +data AcquisitionMethod + = Created + -- ^ A new resource was created. + | Taken + -- ^ An existing resource was directly taken from the pool. + | WaitedFor + -- ^ The thread had to wait until a resource was released. + deriving (Eq, Show) + +---------------------------------------- +-- Helpers + +-- | Get a capability-local pool. +getLocalPool :: SmallArray (LocalPool a) -> IO (LocalPool a) +getLocalPool pools = do + (cid, _) <- threadCapability =<< myThreadId + pure $ pools `indexSmallArray` (cid `rem` sizeofSmallArray pools) + +-- | Wait for the resource to be put into a given 'MVar'. +waitForResource :: MVar (Stripe a) -> MVar a -> IO a +waitForResource mstripe q = takeMVar q `onException` cleanup + where + cleanup = uninterruptibleMask_ $ do -- Note [signal uninterruptible] + stripe <- takeMVar mstripe + newStripe <- tryTakeMVar q >>= \case + Just a -> do + -- Between entering the exception handler and taking ownership of + -- the stripe we got the resource we wanted. We don't need it + -- anymore though, so pass it to someone else. + signal stripe a + Nothing -> do + -- If we're still waiting, fill up the MVar with an undefined value + -- so that 'signal' can discard our MVar from the queue. + putMVar q $ error "unreachable" + pure stripe + putMVar mstripe newStripe + +-- | If an exception is received while a resource is created, restore the +-- original size of the stripe. +restoreSize :: MVar (Stripe a) -> IO () +restoreSize mstripe = uninterruptibleMask_ $ do + -- 'uninterruptibleMask_' is used since 'takeMVar' might block. + stripe <- takeMVar mstripe + putMVar mstripe $! stripe { available = available stripe + 1 } + +-- | Free resource entries in the stripes that fulfil a given condition. +cleanLocalPools + :: (Entry a -> Bool) + -> (a -> IO ()) + -> SmallArray (LocalPool a) + -> IO () +cleanLocalPools isStale free pools = do + mask $ \unmask -> forM_ pools $ \(LocalPool mstripe) -> do + -- Asynchronous exceptions need to be masked here to prevent leaking of + -- 'stale' resources before they're freed. + stale <- modifyMVar mstripe $ \stripe -> unmask $ do + let (stale, fresh) = L.partition isStale (cache stripe) + -- There's no need to update 'size' here because it only tracks the + -- number of resources taken from the pool. + newStripe = stripe { cache = fresh } + newStripe `seq` pure (newStripe, map entry stale) + -- We need to ignore exceptions in the 'free' function, otherwise if an + -- exception is thrown half-way, we leak the rest of the resources. Also, + -- asynchronous exceptions need to be hard masked here since freeing a + -- resource might in theory block. + uninterruptibleMask_ . forM_ stale $ try @SomeException . free + +-- Note [signal uninterruptible] +-- +-- If we have +-- +-- bracket takeResource putResource (...) +-- +-- and an exception arrives at the putResource, then we must not lose the +-- resource. The putResource is masked by bracket, but taking the MVar might +-- block, and so it would be interruptible. Hence we need an uninterruptible +-- variant of mask here. +signal :: Stripe a -> a -> IO (Stripe a) +signal stripe a = if available stripe == 0 + then loop (queue stripe) (queueR stripe) + else do + now <- getMonotonicTime + pure + $! stripe { available = available stripe + 1, cache = Entry a now : cache stripe } + where + loop [] [] = do + now <- getMonotonicTime + pure $! stripe { available = 1 + , cache = [Entry a now] + , queue = [] + , queueR = [] + } + loop [] qR = loop (reverse qR) [] + loop (q : qs) qR = tryPutMVar q a >>= \case + -- This fails when 'waitForResource' went into the exception handler and + -- filled the MVar (with an undefined value) itself. In such case we + -- simply ignore it. + False -> loop qs qR + True -> pure $! stripe { available = 0 + , queue = qs, queueR = qR + } diff --git a/Data/Pool/Introspection.hs b/Data/Pool/Introspection.hs new file mode 100644 index 0000000..92bdf99 --- /dev/null +++ b/Data/Pool/Introspection.hs @@ -0,0 +1,56 @@ +-- | A variant of "Data.Pool" with introspection capabilities. +module Data.Pool.Introspection + ( -- * Pool + Pool + , LocalPool + , newPool + + -- * Resource management + , Resource(..) + , AcquisitionMethod(..) + , withResource + , takeResource + , putResource + , destroyResource + , destroyAllResources + ) where + +import Control.Concurrent +import Control.Exception +import GHC.Clock + +import Data.Pool.Internal + +-- | 'Data.Pool.withResource' with introspection capabilities. +withResource :: Pool a -> (Resource a -> IO r) -> IO r +withResource pool act = mask $ \unmask -> do + (res, localPool) <- takeResource pool + r <- unmask (act res) `onException` destroyResource pool localPool (resource res) + putResource localPool (resource res) + pure r + +-- | 'Data.Pool.takeResource' with introspection capabilities. +takeResource :: Pool a -> IO (Resource a, LocalPool a) +takeResource pool = mask_ $ do + t1 <- getMonotonicTime + LocalPool mstripe <- getLocalPool (localPools pool) + stripe <- takeMVar mstripe + if available stripe == 0 + then do + q <- newEmptyMVar + putMVar mstripe $! stripe { queueR = q : queueR stripe } + a <- waitForResource mstripe q + t2 <- getMonotonicTime + pure (Resource a (t2 - t1) WaitedFor 0, LocalPool mstripe) + else case cache stripe of + [] -> do + let newAvailable = available stripe - 1 + putMVar mstripe $! stripe { available = newAvailable } + a <- createResource pool `onException` restoreSize mstripe + t2 <- getMonotonicTime + pure (Resource a (t2 - t1) Created newAvailable, LocalPool mstripe) + Entry a _ : as -> do + let newAvailable = available stripe - 1 + putMVar mstripe $! stripe { available = newAvailable, cache = as } + t2 <- getMonotonicTime + pure (Resource a (t2 - t1) Taken newAvailable, LocalPool mstripe) diff --git a/README.markdown b/README.markdown deleted file mode 100644 index f30471c..0000000 --- a/README.markdown +++ /dev/null @@ -1,28 +0,0 @@ -# Welcome to pool - -pool is a fast Haskell library for managing medium-lifetime pooled -resources, such as database connections. - -# Join in! - -We are happy to receive bug reports, fixes, documentation enhancements, -and other improvements. - -Please report bugs via the -[github issue tracker](http://github.com/bos/pool/issues). - -Master [git repository](http://github.com/bos/pool): - -* `git clone git://github.com/bos/pool.git` - -There's also a [Mercurial mirror](http://bitbucket.org/bos/pool): - -* `hg clone http://bitbucket.org/bos/pool` - -(You can create and contribute changes using either git or Mercurial.) - -Authors -------- - -This library is written and maintained by Bryan O'Sullivan, -. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ce27184 --- /dev/null +++ b/README.md @@ -0,0 +1,7 @@ +# resource-pool + +[![Build Status](https://github.com/scrive/pool/workflows/Haskell-CI/badge.svg?branch=master)](https://github.com/scrive/pool/actions?query=branch%3Amaster) +[![Hackage](https://img.shields.io/hackage/v/resource-pool.svg)](https://hackage.haskell.org/package/resource-pool) +[![Dependencies](https://img.shields.io/hackage-deps/v/resource-pool.svg)](https://packdeps.haskellers.com/feed?needle=andrzej@rybczak.net) +[![Stackage LTS](https://www.stackage.org/package/resource-pool/badge/lts)](https://www.stackage.org/lts/package/resource-pool) +[![Stackage Nightly](https://www.stackage.org/package/resource-pool/badge/nightly)](https://www.stackage.org/nightly/package/resource-pool) diff --git a/Setup.lhs b/Setup.lhs deleted file mode 100755 index 5bde0de..0000000 --- a/Setup.lhs +++ /dev/null @@ -1,3 +0,0 @@ -#!/usr/bin/env runhaskell -> import Distribution.Simple -> main = defaultMain diff --git a/resource-pool.cabal b/resource-pool.cabal index 7dc197b..e8ebfa8 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -1,55 +1,39 @@ +cabal-version: 2.4 +build-type: Simple name: resource-pool -version: 0.2.3.2.1 -synopsis: A high-performance striped resource pooling implementation -description: - A high-performance striped pooling abstraction for managing - flexibly-sized collections of resources such as database - connections. - -homepage: http://github.com/bos/pool -license: BSD3 +version: 0.3.0.0 +license: BSD-3-Clause license-file: LICENSE -author: Bryan O'Sullivan -maintainer: Bryan O'Sullivan , - Bas van Dijk -copyright: Copyright 2011 MailRank, Inc. category: Data, Database, Network -build-type: Simple -extra-source-files: - README.markdown +maintainer: andrzej@rybczak.net +author: Andrzej Rybczak, Bryan O'Sullivan -cabal-version: >=1.8 +synopsis: A high-performance striped resource pooling implementation -flag developer - description: operate in developer mode - default: False - manual: True +description: A high-performance striped pooling abstraction for managing + flexibly-sized collections of resources such as database + connections. -library - exposed-modules: - Data.Pool - - build-depends: - base >= 4.4 && < 5, - hashable, - monad-control >= 0.2.0.1, - transformers, - transformers-base >= 0.4, - stm >= 2.3, - time, - vector >= 0.7 - - if flag(developer) - ghc-options: -Werror - ghc-prof-options: -auto-all - cpp-options: -DASSERTS -DDEBUG - - ghc-options: -Wall +tested-with: GHC ==8.4.4 || ==8.6.5 || ==8.8.4 || ==8.10.7 || ==9.0.2 || ==9.2.2 +bug-reports: https://github.com/scrive/pool/issues source-repository head type: git - location: http://github.com/bos/pool + location: https://github.com/scrive/pool.git -source-repository head - type: mercurial - location: http://bitbucket.org/bos/pool +library + exposed-modules: Data.Pool + Data.Pool.Internal + Data.Pool.Introspection + + build-depends: base >= 4.11 && < 5 + , primitive >= 0.7 + , time + + ghc-options: -Wall -Wcompat + + default-language: Haskell2010 + + default-extensions: LambdaCase + , StrictData + , TypeApplications From 7e9f32d820a36b4d676a4962c2d0c652869bb38c Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Wed, 9 Mar 2022 04:36:36 +0100 Subject: [PATCH 04/16] Correct implementation of destroyResource --- Data/Pool.hs | 11 ++++++-- Data/Pool/Internal.hs | 57 +++++++++++++++++++++++--------------- Data/Pool/Introspection.hs | 21 ++++++++------ 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/Data/Pool.hs b/Data/Pool.hs index ef233e2..5f6dd4f 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -61,15 +61,20 @@ takeResource pool = mask_ $ do then do q <- newEmptyMVar putMVar mstripe $! stripe { queueR = q : queueR stripe } - a <- waitForResource mstripe q - pure (a, localPool) + waitForResource mstripe q >>= \case + Just a -> pure (a, localPool) + Nothing -> do + a <- createResource pool `onException` restoreSize mstripe + pure (a, localPool) else case cache stripe of [] -> do putMVar mstripe $! stripe { available = available stripe - 1 } a <- createResource pool `onException` restoreSize mstripe pure (a, localPool) Entry a _ : as -> do - putMVar mstripe $! stripe { available = available stripe - 1, cache = as } + putMVar mstripe $! stripe { available = available stripe - 1 + , cache = as + } pure (a, localPool) {-# DEPRECATED createPool "Use newPool instead" #-} diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs index 871e3bd..2e577ff 100644 --- a/Data/Pool/Internal.hs +++ b/Data/Pool/Internal.hs @@ -33,8 +33,8 @@ newtype LocalPool a = LocalPool (MVar (Stripe a)) data Stripe a = Stripe { available :: Int , cache :: [Entry a] - , queue :: [MVar a] - , queueR :: [MVar a] + , queue :: [MVar (Maybe a)] + , queueR :: [MVar (Maybe a)] } -- | An existing resource currently sitting in a pool. @@ -117,7 +117,8 @@ destroyResource :: Pool a -> LocalPool a -> a -> IO () destroyResource pool (LocalPool mstripe) a = do uninterruptibleMask_ $ do -- Note [signal uninterruptible] stripe <- takeMVar mstripe - putMVar mstripe $! stripe { available = available stripe + 1 } + newStripe <- signal stripe Nothing + putMVar mstripe newStripe void . try @SomeException $ freeResource pool a -- | Return a resource to the given 'LocalPool'. @@ -125,7 +126,7 @@ putResource :: LocalPool a -> a -> IO () putResource (LocalPool mstripe) a = do uninterruptibleMask_ $ do -- Note [signal uninterruptible] stripe <- takeMVar mstripe - newStripe <- signal stripe a + newStripe <- signal stripe (Just a) putMVar mstripe newStripe -- | Destroy all resources in all stripes in the pool. @@ -163,8 +164,10 @@ data AcquisitionMethod -- ^ A new resource was created. | Taken -- ^ An existing resource was directly taken from the pool. - | WaitedFor - -- ^ The thread had to wait until a resource was released. + | WaitedThen AcquisitionMethod + -- ^ The thread had to wait until a resource was released. The inner method + -- signifies whether the resource was returned to the pool via 'putResource' + -- ('Taken') or 'destroyResource' ('Created'). deriving (Eq, Show) ---------------------------------------- @@ -177,17 +180,17 @@ getLocalPool pools = do pure $ pools `indexSmallArray` (cid `rem` sizeofSmallArray pools) -- | Wait for the resource to be put into a given 'MVar'. -waitForResource :: MVar (Stripe a) -> MVar a -> IO a +waitForResource :: MVar (Stripe a) -> MVar (Maybe a) -> IO (Maybe a) waitForResource mstripe q = takeMVar q `onException` cleanup where cleanup = uninterruptibleMask_ $ do -- Note [signal uninterruptible] stripe <- takeMVar mstripe newStripe <- tryTakeMVar q >>= \case - Just a -> do + Just ma -> do -- Between entering the exception handler and taking ownership of -- the stripe we got the resource we wanted. We don't need it -- anymore though, so pass it to someone else. - signal stripe a + signal stripe ma Nothing -> do -- If we're still waiting, fill up the MVar with an undefined value -- so that 'signal' can discard our MVar from the queue. @@ -195,7 +198,7 @@ waitForResource mstripe q = takeMVar q `onException` cleanup pure stripe putMVar mstripe newStripe --- | If an exception is received while a resource is created, restore the +-- | If an exception is received while a resource is being created, restore the -- original size of the stripe. restoreSize :: MVar (Stripe a) -> IO () restoreSize mstripe = uninterruptibleMask_ $ do @@ -215,8 +218,8 @@ cleanLocalPools isStale free pools = do -- 'stale' resources before they're freed. stale <- modifyMVar mstripe $ \stripe -> unmask $ do let (stale, fresh) = L.partition isStale (cache stripe) - -- There's no need to update 'size' here because it only tracks the - -- number of resources taken from the pool. + -- There's no need to update 'available' here because it only tracks + -- the number of resources taken from the pool. newStripe = stripe { cache = fresh } newStripe `seq` pure (newStripe, map entry stale) -- We need to ignore exceptions in the 'free' function, otherwise if an @@ -235,27 +238,37 @@ cleanLocalPools isStale free pools = do -- resource. The putResource is masked by bracket, but taking the MVar might -- block, and so it would be interruptible. Hence we need an uninterruptible -- variant of mask here. -signal :: Stripe a -> a -> IO (Stripe a) -signal stripe a = if available stripe == 0 +signal :: Stripe a -> Maybe a -> IO (Stripe a) +signal stripe ma = if available stripe == 0 then loop (queue stripe) (queueR stripe) else do - now <- getMonotonicTime - pure - $! stripe { available = available stripe + 1, cache = Entry a now : cache stripe } + newCache <- case ma of + Just a -> do + now <- getMonotonicTime + pure $ Entry a now : cache stripe + Nothing -> pure $ cache stripe + pure $! stripe { available = available stripe + 1 + , cache = newCache + } where loop [] [] = do - now <- getMonotonicTime - pure $! stripe { available = 1 - , cache = [Entry a now] + newCache <- case ma of + Just a -> do + now <- getMonotonicTime + pure [Entry a now] + Nothing -> pure [] + pure $! Stripe { available = 1 + , cache = newCache , queue = [] , queueR = [] } loop [] qR = loop (reverse qR) [] - loop (q : qs) qR = tryPutMVar q a >>= \case + loop (q : qs) qR = tryPutMVar q ma >>= \case -- This fails when 'waitForResource' went into the exception handler and -- filled the MVar (with an undefined value) itself. In such case we -- simply ignore it. False -> loop qs qR True -> pure $! stripe { available = 0 - , queue = qs, queueR = qR + , queue = qs + , queueR = qR } diff --git a/Data/Pool/Introspection.hs b/Data/Pool/Introspection.hs index 92bdf99..ff1a277 100644 --- a/Data/Pool/Introspection.hs +++ b/Data/Pool/Introspection.hs @@ -32,25 +32,30 @@ withResource pool act = mask $ \unmask -> do -- | 'Data.Pool.takeResource' with introspection capabilities. takeResource :: Pool a -> IO (Resource a, LocalPool a) takeResource pool = mask_ $ do - t1 <- getMonotonicTime - LocalPool mstripe <- getLocalPool (localPools pool) - stripe <- takeMVar mstripe + t1 <- getMonotonicTime + localPool@(LocalPool mstripe) <- getLocalPool (localPools pool) + stripe <- takeMVar mstripe if available stripe == 0 then do q <- newEmptyMVar putMVar mstripe $! stripe { queueR = q : queueR stripe } - a <- waitForResource mstripe q - t2 <- getMonotonicTime - pure (Resource a (t2 - t1) WaitedFor 0, LocalPool mstripe) + waitForResource mstripe q >>= \case + Just a -> do + t2 <- getMonotonicTime + pure (Resource a (t2 - t1) (WaitedThen Taken) 0, localPool) + Nothing -> do + a <- createResource pool `onException` restoreSize mstripe + t2 <- getMonotonicTime + pure (Resource a (t2 - t1) (WaitedThen Created) 0, localPool) else case cache stripe of [] -> do let newAvailable = available stripe - 1 putMVar mstripe $! stripe { available = newAvailable } a <- createResource pool `onException` restoreSize mstripe t2 <- getMonotonicTime - pure (Resource a (t2 - t1) Created newAvailable, LocalPool mstripe) + pure (Resource a (t2 - t1) Created newAvailable, localPool) Entry a _ : as -> do let newAvailable = available stripe - 1 putMVar mstripe $! stripe { available = newAvailable, cache = as } t2 <- getMonotonicTime - pure (Resource a (t2 - t1) Taken newAvailable, LocalPool mstripe) + pure (Resource a (t2 - t1) Taken newAvailable, localPool) From 70f4a86c7faaa4c6303f0f5bfc263429c5182f1a Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Wed, 9 Mar 2022 18:55:38 +0100 Subject: [PATCH 05/16] Use a monomorphic list for the queue --- Data/Pool.hs | 2 +- Data/Pool/Internal.hs | 50 +++++++++++++++++++++++--------------- Data/Pool/Introspection.hs | 2 +- resource-pool.cabal | 1 - 4 files changed, 33 insertions(+), 22 deletions(-) diff --git a/Data/Pool.hs b/Data/Pool.hs index 5f6dd4f..6cc47a3 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -60,7 +60,7 @@ takeResource pool = mask_ $ do if available stripe == 0 then do q <- newEmptyMVar - putMVar mstripe $! stripe { queueR = q : queueR stripe } + putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) } waitForResource mstripe q >>= \case Just a -> pure (a, localPool) Nothing -> do diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs index 2e577ff..de76ef5 100644 --- a/Data/Pool/Internal.hs +++ b/Data/Pool/Internal.hs @@ -19,10 +19,10 @@ import qualified Data.List as L -- so that they never compete over access to the same stripe. This results in a -- very good performance in a multi-threaded environment. data Pool a = Pool - { createResource :: IO a - , freeResource :: a -> IO () - , localPools :: SmallArray (LocalPool a) - , reaperRef :: IORef () + { createResource :: !(IO a) + , freeResource :: !(a -> IO ()) + , localPools :: !(SmallArray (LocalPool a)) + , reaperRef :: !(IORef ()) } -- | A single, capability-local pool. @@ -31,18 +31,23 @@ newtype LocalPool a = LocalPool (MVar (Stripe a)) -- | Stripe of a resource pool. If @available@ is 0, the list of threads waiting -- for a resource (each with an associated 'MVar') is @queue ++ reverse queueR@. data Stripe a = Stripe - { available :: Int - , cache :: [Entry a] - , queue :: [MVar (Maybe a)] - , queueR :: [MVar (Maybe a)] + { available :: !Int + , cache :: ![Entry a] + , queue :: !(Queue a) + , queueR :: !(Queue a) } -- | An existing resource currently sitting in a pool. data Entry a = Entry { entry :: a - , lastUsed :: Double + , lastUsed :: !Double } +-- | A queue of MVarS corresponding to threads waiting for resources. +-- +-- Basically a monomorphic list to save two pointer indirections. +data Queue a = Queue !(MVar (Maybe a)) (Queue a) | Empty + -- | Create a new striped resource pool. -- -- The number of stripes is equal to the number of capabilities @N@. @@ -83,8 +88,8 @@ newPool create free idleTime maxResources = do . newMVar $ Stripe { available = maxResources `quotCeil` numStripes , cache = [] - , queue = [] - , queueR = [] + , queue = Empty + , queueR = Empty } mask_ $ do ref <- newIORef () @@ -153,9 +158,9 @@ destroyAllResources pool = do -- | A resource taken from the pool along with additional information. data Resource a = Resource { resource :: a - , acquisitionTime :: Double - , acquisitionMethod :: AcquisitionMethod - , availableResources :: Int + , acquisitionTime :: !Double + , acquisitionMethod :: !AcquisitionMethod + , availableResources :: !Int } -- | Method of acquiring a resource from the pool. @@ -251,7 +256,7 @@ signal stripe ma = if available stripe == 0 , cache = newCache } where - loop [] [] = do + loop Empty Empty = do newCache <- case ma of Just a -> do now <- getMonotonicTime @@ -259,11 +264,11 @@ signal stripe ma = if available stripe == 0 Nothing -> pure [] pure $! Stripe { available = 1 , cache = newCache - , queue = [] - , queueR = [] + , queue = Empty + , queueR = Empty } - loop [] qR = loop (reverse qR) [] - loop (q : qs) qR = tryPutMVar q ma >>= \case + loop Empty qR = loop (reverseQueue qR) Empty + loop (Queue q qs) qR = tryPutMVar q ma >>= \case -- This fails when 'waitForResource' went into the exception handler and -- filled the MVar (with an undefined value) itself. In such case we -- simply ignore it. @@ -272,3 +277,10 @@ signal stripe ma = if available stripe == 0 , queue = qs , queueR = qR } + +reverseQueue :: Queue a -> Queue a +reverseQueue = go Empty + where + go acc = \case + Empty -> acc + Queue x xs -> go (Queue x acc) xs diff --git a/Data/Pool/Introspection.hs b/Data/Pool/Introspection.hs index ff1a277..269e9e0 100644 --- a/Data/Pool/Introspection.hs +++ b/Data/Pool/Introspection.hs @@ -38,7 +38,7 @@ takeResource pool = mask_ $ do if available stripe == 0 then do q <- newEmptyMVar - putMVar mstripe $! stripe { queueR = q : queueR stripe } + putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) } waitForResource mstripe q >>= \case Just a -> do t2 <- getMonotonicTime diff --git a/resource-pool.cabal b/resource-pool.cabal index e8ebfa8..91e7d2c 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -35,5 +35,4 @@ library default-language: Haskell2010 default-extensions: LambdaCase - , StrictData , TypeApplications From 4c3a9cedecb6fe167969a97dfcb283a321fcc8bf Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Wed, 9 Mar 2022 19:38:26 +0100 Subject: [PATCH 06/16] Move introspection related types to Data.Pool.Introspection --- Data/Pool/Internal.hs | 23 ----------------------- Data/Pool/Introspection.hs | 21 +++++++++++++++++++++ resource-pool.cabal | 3 ++- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs index de76ef5..bcf6a8b 100644 --- a/Data/Pool/Internal.hs +++ b/Data/Pool/Internal.hs @@ -152,29 +152,6 @@ destroyAllResources :: Pool a -> IO () destroyAllResources pool = do cleanLocalPools (const True) (freeResource pool) (localPools pool) ----------------------------------------- --- Introspection - --- | A resource taken from the pool along with additional information. -data Resource a = Resource - { resource :: a - , acquisitionTime :: !Double - , acquisitionMethod :: !AcquisitionMethod - , availableResources :: !Int - } - --- | Method of acquiring a resource from the pool. -data AcquisitionMethod - = Created - -- ^ A new resource was created. - | Taken - -- ^ An existing resource was directly taken from the pool. - | WaitedThen AcquisitionMethod - -- ^ The thread had to wait until a resource was released. The inner method - -- signifies whether the resource was returned to the pool via 'putResource' - -- ('Taken') or 'destroyResource' ('Created'). - deriving (Eq, Show) - ---------------------------------------- -- Helpers diff --git a/Data/Pool/Introspection.hs b/Data/Pool/Introspection.hs index 269e9e0..2a12047 100644 --- a/Data/Pool/Introspection.hs +++ b/Data/Pool/Introspection.hs @@ -18,9 +18,30 @@ module Data.Pool.Introspection import Control.Concurrent import Control.Exception import GHC.Clock +import GHC.Generics (Generic) import Data.Pool.Internal +-- | A resource taken from the pool along with additional information. +data Resource a = Resource + { resource :: a + , acquisitionTime :: !Double + , acquisitionMethod :: !AcquisitionMethod + , availableResources :: !Int + } deriving (Eq, Show, Generic) + +-- | Method of acquiring a resource from the pool. +data AcquisitionMethod + = Created + -- ^ A new resource was created. + | Taken + -- ^ An existing resource was directly taken from the pool. + | WaitedThen !AcquisitionMethod + -- ^ The thread had to wait until a resource was released. The inner method + -- signifies whether the resource was returned to the pool via 'putResource' + -- ('Taken') or 'destroyResource' ('Created'). + deriving (Eq, Show, Generic) + -- | 'Data.Pool.withResource' with introspection capabilities. withResource :: Pool a -> (Resource a -> IO r) -> IO r withResource pool act = mask $ \unmask -> do diff --git a/resource-pool.cabal b/resource-pool.cabal index 91e7d2c..c07c780 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -34,5 +34,6 @@ library default-language: Haskell2010 - default-extensions: LambdaCase + default-extensions: DeriveGeneric + , LambdaCase , TypeApplications From b0cfae3591106e93752a8712a8ec8bc397dba820 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Thu, 17 Mar 2022 09:40:16 +0100 Subject: [PATCH 07/16] Include stripe number in the info about acquired resources (#3) --- Data/Pool.hs | 2 +- Data/Pool/Internal.hs | 27 +++++++++++++-------------- Data/Pool/Introspection.hs | 11 ++++++----- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/Data/Pool.hs b/Data/Pool.hs index 6cc47a3..504fe91 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -55,7 +55,7 @@ withResource pool act = mask $ \unmask -> do -- to the pool (via 'putResource'). takeResource :: Pool a -> IO (a, LocalPool a) takeResource pool = mask_ $ do - localPool@(LocalPool mstripe) <- getLocalPool (localPools pool) + localPool@(LocalPool _ mstripe) <- getLocalPool (localPools pool) stripe <- takeMVar mstripe if available stripe == 0 then do diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs index bcf6a8b..ea79580 100644 --- a/Data/Pool/Internal.hs +++ b/Data/Pool/Internal.hs @@ -26,7 +26,7 @@ data Pool a = Pool } -- | A single, capability-local pool. -newtype LocalPool a = LocalPool (MVar (Stripe a)) +data LocalPool a = LocalPool !Int !(MVar (Stripe a)) -- | Stripe of a resource pool. If @available@ is 0, the list of threads waiting -- for a resource (each with an associated 'MVar') is @queue ++ reverse queueR@. @@ -81,16 +81,15 @@ newPool create free idleTime maxResources = do when (maxResources < 1) $ do error "maxResources must be at least 1" numStripes <- getNumCapabilities - pools <- - fmap (smallArrayFromListN numStripes) - . replicateM numStripes - . fmap LocalPool - . newMVar - $ Stripe { available = maxResources `quotCeil` numStripes - , cache = [] - , queue = Empty - , queueR = Empty - } + when (numStripes < 1) $ do + error "numStripes must be at least 1" + pools <- fmap (smallArrayFromListN numStripes) . forM [1..numStripes] $ \n -> do + LocalPool n <$> newMVar Stripe + { available = maxResources `quotCeil` numStripes + , cache = [] + , queue = Empty + , queueR = Empty + } mask_ $ do ref <- newIORef () collectorA <- forkIOWithUnmask $ \unmask -> unmask $ collector pools @@ -119,7 +118,7 @@ newPool create free idleTime maxResources = do -- -- Note that this will ignore any exceptions in the destroy function. destroyResource :: Pool a -> LocalPool a -> a -> IO () -destroyResource pool (LocalPool mstripe) a = do +destroyResource pool (LocalPool _ mstripe) a = do uninterruptibleMask_ $ do -- Note [signal uninterruptible] stripe <- takeMVar mstripe newStripe <- signal stripe Nothing @@ -128,7 +127,7 @@ destroyResource pool (LocalPool mstripe) a = do -- | Return a resource to the given 'LocalPool'. putResource :: LocalPool a -> a -> IO () -putResource (LocalPool mstripe) a = do +putResource (LocalPool _ mstripe) a = do uninterruptibleMask_ $ do -- Note [signal uninterruptible] stripe <- takeMVar mstripe newStripe <- signal stripe (Just a) @@ -195,7 +194,7 @@ cleanLocalPools -> SmallArray (LocalPool a) -> IO () cleanLocalPools isStale free pools = do - mask $ \unmask -> forM_ pools $ \(LocalPool mstripe) -> do + mask $ \unmask -> forM_ pools $ \(LocalPool _ mstripe) -> do -- Asynchronous exceptions need to be masked here to prevent leaking of -- 'stale' resources before they're freed. stale <- modifyMVar mstripe $ \stripe -> unmask $ do diff --git a/Data/Pool/Introspection.hs b/Data/Pool/Introspection.hs index 2a12047..920db49 100644 --- a/Data/Pool/Introspection.hs +++ b/Data/Pool/Introspection.hs @@ -25,6 +25,7 @@ import Data.Pool.Internal -- | A resource taken from the pool along with additional information. data Resource a = Resource { resource :: a + , stripeNumber :: !Int , acquisitionTime :: !Double , acquisitionMethod :: !AcquisitionMethod , availableResources :: !Int @@ -54,7 +55,7 @@ withResource pool act = mask $ \unmask -> do takeResource :: Pool a -> IO (Resource a, LocalPool a) takeResource pool = mask_ $ do t1 <- getMonotonicTime - localPool@(LocalPool mstripe) <- getLocalPool (localPools pool) + localPool@(LocalPool n mstripe) <- getLocalPool (localPools pool) stripe <- takeMVar mstripe if available stripe == 0 then do @@ -63,20 +64,20 @@ takeResource pool = mask_ $ do waitForResource mstripe q >>= \case Just a -> do t2 <- getMonotonicTime - pure (Resource a (t2 - t1) (WaitedThen Taken) 0, localPool) + pure (Resource a n (t2 - t1) (WaitedThen Taken) 0, localPool) Nothing -> do a <- createResource pool `onException` restoreSize mstripe t2 <- getMonotonicTime - pure (Resource a (t2 - t1) (WaitedThen Created) 0, localPool) + pure (Resource a n (t2 - t1) (WaitedThen Created) 0, localPool) else case cache stripe of [] -> do let newAvailable = available stripe - 1 putMVar mstripe $! stripe { available = newAvailable } a <- createResource pool `onException` restoreSize mstripe t2 <- getMonotonicTime - pure (Resource a (t2 - t1) Created newAvailable, localPool) + pure (Resource a n (t2 - t1) Created newAvailable, localPool) Entry a _ : as -> do let newAvailable = available stripe - 1 putMVar mstripe $! stripe { available = newAvailable, cache = as } t2 <- getMonotonicTime - pure (Resource a (t2 - t1) Taken newAvailable, localPool) + pure (Resource a n (t2 - t1) Taken newAvailable, localPool) From 6393c9d86e889e4df61007a0886b4c0954bde552 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Thu, 26 May 2022 13:17:48 +0200 Subject: [PATCH 08/16] More robust release of existing resources on pool destruction --- Data/Pool.hs | 29 ++++++++-------- Data/Pool/Internal.hs | 69 ++++++++++++++++++++++---------------- Data/Pool/Introspection.hs | 24 ++++++------- resource-pool.cabal | 1 + 4 files changed, 68 insertions(+), 55 deletions(-) diff --git a/Data/Pool.hs b/Data/Pool.hs index 504fe91..b012f57 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -55,27 +55,28 @@ withResource pool act = mask $ \unmask -> do -- to the pool (via 'putResource'). takeResource :: Pool a -> IO (a, LocalPool a) takeResource pool = mask_ $ do - localPool@(LocalPool _ mstripe) <- getLocalPool (localPools pool) - stripe <- takeMVar mstripe + lp <- getLocalPool (localPools pool) + stripe <- takeMVar (stripeVar lp) if available stripe == 0 then do q <- newEmptyMVar - putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) } - waitForResource mstripe q >>= \case - Just a -> pure (a, localPool) + putMVar (stripeVar lp) $! stripe { queueR = Queue q (queueR stripe) } + waitForResource (stripeVar lp) q >>= \case + Just a -> pure (a, lp) Nothing -> do - a <- createResource pool `onException` restoreSize mstripe - pure (a, localPool) + a <- createResource pool `onException` restoreSize (stripeVar lp) + pure (a, lp) else case cache stripe of [] -> do - putMVar mstripe $! stripe { available = available stripe - 1 } - a <- createResource pool `onException` restoreSize mstripe - pure (a, localPool) + putMVar (stripeVar lp) $! stripe { available = available stripe - 1 } + a <- createResource pool `onException` restoreSize (stripeVar lp) + pure (a, lp) Entry a _ : as -> do - putMVar mstripe $! stripe { available = available stripe - 1 - , cache = as - } - pure (a, localPool) + putMVar (stripeVar lp) $! stripe + { available = available stripe - 1 + , cache = as + } + pure (a, lp) {-# DEPRECATED createPool "Use newPool instead" #-} -- | Provided for compatibility with @resource-pool < 0.3@. diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs index ea79580..ebe0663 100644 --- a/Data/Pool/Internal.hs +++ b/Data/Pool/Internal.hs @@ -26,7 +26,11 @@ data Pool a = Pool } -- | A single, capability-local pool. -data LocalPool a = LocalPool !Int !(MVar (Stripe a)) +data LocalPool a = LocalPool + { stripeId :: !Int + , stripeVar :: !(MVar (Stripe a)) + , cleanerRef :: !(IORef ()) + } -- | Stripe of a resource pool. If @available@ is 0, the list of threads waiting -- for a resource (each with an associated 'MVar') is @queue ++ reverse queueR@. @@ -84,18 +88,26 @@ newPool create free idleTime maxResources = do when (numStripes < 1) $ do error "numStripes must be at least 1" pools <- fmap (smallArrayFromListN numStripes) . forM [1..numStripes] $ \n -> do - LocalPool n <$> newMVar Stripe + ref <- newIORef () + stripe <- newMVar Stripe { available = maxResources `quotCeil` numStripes , cache = [] , queue = Empty , queueR = Empty } + -- When the local pool goes out of scope, free its resources. + void . mkWeakIORef ref $ cleanStripe (const True) free stripe + pure LocalPool { stripeId = n + , stripeVar = stripe + , cleanerRef = ref + } mask_ $ do ref <- newIORef () collectorA <- forkIOWithUnmask $ \unmask -> unmask $ collector pools void . mkWeakIORef ref $ do + -- When the pool goes out of scope, stop the collector. Resources existing + -- in stripes will be taken care by their cleaners. killThread collectorA - cleanLocalPools (const True) free pools pure Pool { createResource = create , freeResource = free , localPools = pools @@ -112,26 +124,26 @@ newPool create free idleTime maxResources = do threadDelay 1000000 now <- getMonotonicTime let isStale e = now - lastUsed e > idleTime - cleanLocalPools isStale free pools + mapM_ (cleanStripe isStale free . stripeVar) pools -- | Destroy a resource. -- -- Note that this will ignore any exceptions in the destroy function. destroyResource :: Pool a -> LocalPool a -> a -> IO () -destroyResource pool (LocalPool _ mstripe) a = do +destroyResource pool lp a = do uninterruptibleMask_ $ do -- Note [signal uninterruptible] - stripe <- takeMVar mstripe + stripe <- takeMVar (stripeVar lp) newStripe <- signal stripe Nothing - putMVar mstripe newStripe + putMVar (stripeVar lp) newStripe void . try @SomeException $ freeResource pool a -- | Return a resource to the given 'LocalPool'. putResource :: LocalPool a -> a -> IO () -putResource (LocalPool _ mstripe) a = do +putResource lp a = do uninterruptibleMask_ $ do -- Note [signal uninterruptible] - stripe <- takeMVar mstripe + stripe <- takeMVar (stripeVar lp) newStripe <- signal stripe (Just a) - putMVar mstripe newStripe + putMVar (stripeVar lp) newStripe -- | Destroy all resources in all stripes in the pool. -- @@ -148,8 +160,8 @@ putResource (LocalPool _ mstripe) a = do -- the garbage collector to destroy them, thus freeing up those resources -- sooner. destroyAllResources :: Pool a -> IO () -destroyAllResources pool = do - cleanLocalPools (const True) (freeResource pool) (localPools pool) +destroyAllResources pool = forM_ (localPools pool) $ \lp -> do + cleanStripe (const True) (freeResource pool) (stripeVar lp) ---------------------------------------- -- Helpers @@ -188,26 +200,25 @@ restoreSize mstripe = uninterruptibleMask_ $ do putMVar mstripe $! stripe { available = available stripe + 1 } -- | Free resource entries in the stripes that fulfil a given condition. -cleanLocalPools +cleanStripe :: (Entry a -> Bool) -> (a -> IO ()) - -> SmallArray (LocalPool a) + -> MVar (Stripe a) -> IO () -cleanLocalPools isStale free pools = do - mask $ \unmask -> forM_ pools $ \(LocalPool _ mstripe) -> do - -- Asynchronous exceptions need to be masked here to prevent leaking of - -- 'stale' resources before they're freed. - stale <- modifyMVar mstripe $ \stripe -> unmask $ do - let (stale, fresh) = L.partition isStale (cache stripe) - -- There's no need to update 'available' here because it only tracks - -- the number of resources taken from the pool. - newStripe = stripe { cache = fresh } - newStripe `seq` pure (newStripe, map entry stale) - -- We need to ignore exceptions in the 'free' function, otherwise if an - -- exception is thrown half-way, we leak the rest of the resources. Also, - -- asynchronous exceptions need to be hard masked here since freeing a - -- resource might in theory block. - uninterruptibleMask_ . forM_ stale $ try @SomeException . free +cleanStripe isStale free mstripe = mask $ \unmask -> do + -- Asynchronous exceptions need to be masked here to prevent leaking of + -- 'stale' resources before they're freed. + stale <- modifyMVar mstripe $ \stripe -> unmask $ do + let (stale, fresh) = L.partition isStale (cache stripe) + -- There's no need to update 'available' here because it only tracks + -- the number of resources taken from the pool. + newStripe = stripe { cache = fresh } + newStripe `seq` pure (newStripe, map entry stale) + -- We need to ignore exceptions in the 'free' function, otherwise if an + -- exception is thrown half-way, we leak the rest of the resources. Also, + -- asynchronous exceptions need to be hard masked here since freeing a + -- resource might in theory block. + uninterruptibleMask_ . forM_ stale $ try @SomeException . free -- Note [signal uninterruptible] -- diff --git a/Data/Pool/Introspection.hs b/Data/Pool/Introspection.hs index 920db49..b5c91c3 100644 --- a/Data/Pool/Introspection.hs +++ b/Data/Pool/Introspection.hs @@ -55,29 +55,29 @@ withResource pool act = mask $ \unmask -> do takeResource :: Pool a -> IO (Resource a, LocalPool a) takeResource pool = mask_ $ do t1 <- getMonotonicTime - localPool@(LocalPool n mstripe) <- getLocalPool (localPools pool) - stripe <- takeMVar mstripe + lp <- getLocalPool (localPools pool) + stripe <- takeMVar (stripeVar lp) if available stripe == 0 then do q <- newEmptyMVar - putMVar mstripe $! stripe { queueR = Queue q (queueR stripe) } - waitForResource mstripe q >>= \case + putMVar (stripeVar lp) $! stripe { queueR = Queue q (queueR stripe) } + waitForResource (stripeVar lp) q >>= \case Just a -> do t2 <- getMonotonicTime - pure (Resource a n (t2 - t1) (WaitedThen Taken) 0, localPool) + pure (Resource a (stripeId lp) (t2 - t1) (WaitedThen Taken) 0, lp) Nothing -> do - a <- createResource pool `onException` restoreSize mstripe + a <- createResource pool `onException` restoreSize (stripeVar lp) t2 <- getMonotonicTime - pure (Resource a n (t2 - t1) (WaitedThen Created) 0, localPool) + pure (Resource a (stripeId lp) (t2 - t1) (WaitedThen Created) 0, lp) else case cache stripe of [] -> do let newAvailable = available stripe - 1 - putMVar mstripe $! stripe { available = newAvailable } - a <- createResource pool `onException` restoreSize mstripe + putMVar (stripeVar lp) $! stripe { available = newAvailable } + a <- createResource pool `onException` restoreSize (stripeVar lp) t2 <- getMonotonicTime - pure (Resource a n (t2 - t1) Created newAvailable, localPool) + pure (Resource a (stripeId lp) (t2 - t1) Created newAvailable, lp) Entry a _ : as -> do let newAvailable = available stripe - 1 - putMVar mstripe $! stripe { available = newAvailable, cache = as } + putMVar (stripeVar lp) $! stripe { available = newAvailable, cache = as } t2 <- getMonotonicTime - pure (Resource a n (t2 - t1) Taken newAvailable, localPool) + pure (Resource a (stripeId lp) (t2 - t1) Taken newAvailable, lp) diff --git a/resource-pool.cabal b/resource-pool.cabal index c07c780..ca07867 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -36,4 +36,5 @@ library default-extensions: DeriveGeneric , LambdaCase + , RankNTypes , TypeApplications From 48b7b7d28d1b7fb1568be837ec3f3fc4be5424ab Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Tue, 31 May 2022 14:53:15 +0200 Subject: [PATCH 09/16] Add PoolConfig (#4) * Add PoolConfig * Rename poolCapacity to poolMaxResources * Improve doc --- Data/Pool.hs | 15 ++++--- Data/Pool/Internal.hs | 80 +++++++++++++++++++------------------- Data/Pool/Introspection.hs | 7 ++-- 3 files changed, 55 insertions(+), 47 deletions(-) diff --git a/Data/Pool.hs b/Data/Pool.hs index b012f57..339007d 100644 --- a/Data/Pool.hs +++ b/Data/Pool.hs @@ -2,7 +2,8 @@ -- collections of resources such as database connections. module Data.Pool ( -- * Pool - Pool + PoolConfig(..) + , Pool , LocalPool , newPool @@ -64,12 +65,12 @@ takeResource pool = mask_ $ do waitForResource (stripeVar lp) q >>= \case Just a -> pure (a, lp) Nothing -> do - a <- createResource pool `onException` restoreSize (stripeVar lp) + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) pure (a, lp) else case cache stripe of [] -> do putMVar (stripeVar lp) $! stripe { available = available stripe - 1 } - a <- createResource pool `onException` restoreSize (stripeVar lp) + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) pure (a, lp) Entry a _ : as -> do putMVar (stripeVar lp) $! stripe @@ -83,5 +84,9 @@ takeResource pool = mask_ $ do -- -- Use 'newPool' instead. createPool :: IO a -> (a -> IO ()) -> Int -> NominalDiffTime -> Int -> IO (Pool a) -createPool create free numStripes idleTime maxResources = do - newPool create free (realToFrac idleTime) (numStripes * maxResources) +createPool create free numStripes idleTime maxResources = newPool PoolConfig + { createResource = create + , freeResource = free + , poolCacheTTL = realToFrac idleTime + , poolMaxResources = numStripes * maxResources + } diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs index ebe0663..e12e8f2 100644 --- a/Data/Pool/Internal.hs +++ b/Data/Pool/Internal.hs @@ -19,10 +19,9 @@ import qualified Data.List as L -- so that they never compete over access to the same stripe. This results in a -- very good performance in a multi-threaded environment. data Pool a = Pool - { createResource :: !(IO a) - , freeResource :: !(a -> IO ()) - , localPools :: !(SmallArray (LocalPool a)) - , reaperRef :: !(IORef ()) + { poolConfig :: !(PoolConfig a) + , localPools :: !(SmallArray (LocalPool a)) + , reaperRef :: !(IORef ()) } -- | A single, capability-local pool. @@ -52,51 +51,55 @@ data Entry a = Entry -- Basically a monomorphic list to save two pointer indirections. data Queue a = Queue !(MVar (Maybe a)) (Queue a) | Empty --- | Create a new striped resource pool. --- --- The number of stripes is equal to the number of capabilities @N@. --- --- /Note:/ although the garbage collector will destroy all idle resources when --- the pool is garbage collected, it's recommended to manually call --- 'destroyAllResources' when you're done with the pool so that the resources --- are freed up as soon as possible. -newPool - :: IO a - -- ^ The action that creates a new resource. - -> (a -> IO ()) - -- ^ The action that destroys an existing resource. - -> Double - -- ^ The amount of seconds for which an unused resource is kept open. The +-- | Configuration of a 'Pool'. +data PoolConfig a = PoolConfig + { createResource :: !(IO a) + -- ^ The action that creates a new resource. + , freeResource :: !(a -> IO ()) + -- ^ The action that destroys an existing resource. + , poolCacheTTL :: !Double + -- ^ The amount of seconds for which an unused resource is kept around. The -- smallest acceptable value is @0.5@. -- -- /Note:/ the elapsed time before destroying a resource may be a little -- longer than requested, as the collector thread wakes at 1-second intervals. - -> Int - -- ^ The number of resources to keep open across all stripes. The smallest - -- acceptable value is @1@. + , poolMaxResources :: !Int + -- ^ The maximum number of resources to keep open across all stripes. The + -- smallest acceptable value is @1@. -- -- /Note:/ for each stripe the number of resources is divided by the number of -- capabilities and rounded up. Therefore the pool might end up creating up to - -- @N - 1@ resources more in total than specified. - -> IO (Pool a) -newPool create free idleTime maxResources = do - when (idleTime < 0.5) $ do - error "idleTime must be at least 0.5" - when (maxResources < 1) $ do - error "maxResources must be at least 1" + -- @N - 1@ resources more in total than specified, where @N@ is the number of + -- capabilities. + } + +-- | Create a new striped resource pool. +-- +-- The number of stripes is equal to the number of capabilities. +-- +-- /Note:/ although the runtime system will destroy all idle resources when the +-- pool is garbage collected, it's recommended to manually call +-- 'destroyAllResources' when you're done with the pool so that the resources +-- are freed up as soon as possible. +newPool :: PoolConfig a -> IO (Pool a) +newPool pc = do + when (poolCacheTTL pc < 0.5) $ do + error "poolCacheTTL must be at least 0.5" + when (poolMaxResources pc < 1) $ do + error "poolMaxResources must be at least 1" numStripes <- getNumCapabilities when (numStripes < 1) $ do error "numStripes must be at least 1" pools <- fmap (smallArrayFromListN numStripes) . forM [1..numStripes] $ \n -> do ref <- newIORef () stripe <- newMVar Stripe - { available = maxResources `quotCeil` numStripes + { available = poolMaxResources pc `quotCeil` numStripes , cache = [] , queue = Empty , queueR = Empty } -- When the local pool goes out of scope, free its resources. - void . mkWeakIORef ref $ cleanStripe (const True) free stripe + void . mkWeakIORef ref $ cleanStripe (const True) (freeResource pc) stripe pure LocalPool { stripeId = n , stripeVar = stripe , cleanerRef = ref @@ -108,10 +111,9 @@ newPool create free idleTime maxResources = do -- When the pool goes out of scope, stop the collector. Resources existing -- in stripes will be taken care by their cleaners. killThread collectorA - pure Pool { createResource = create - , freeResource = free - , localPools = pools - , reaperRef = ref + pure Pool { poolConfig = pc + , localPools = pools + , reaperRef = ref } where quotCeil :: Int -> Int -> Int @@ -123,8 +125,8 @@ newPool create free idleTime maxResources = do collector pools = forever $ do threadDelay 1000000 now <- getMonotonicTime - let isStale e = now - lastUsed e > idleTime - mapM_ (cleanStripe isStale free . stripeVar) pools + let isStale e = now - lastUsed e > poolCacheTTL pc + mapM_ (cleanStripe isStale (freeResource pc) . stripeVar) pools -- | Destroy a resource. -- @@ -135,7 +137,7 @@ destroyResource pool lp a = do stripe <- takeMVar (stripeVar lp) newStripe <- signal stripe Nothing putMVar (stripeVar lp) newStripe - void . try @SomeException $ freeResource pool a + void . try @SomeException $ freeResource (poolConfig pool) a -- | Return a resource to the given 'LocalPool'. putResource :: LocalPool a -> a -> IO () @@ -161,7 +163,7 @@ putResource lp a = do -- sooner. destroyAllResources :: Pool a -> IO () destroyAllResources pool = forM_ (localPools pool) $ \lp -> do - cleanStripe (const True) (freeResource pool) (stripeVar lp) + cleanStripe (const True) (freeResource (poolConfig pool)) (stripeVar lp) ---------------------------------------- -- Helpers diff --git a/Data/Pool/Introspection.hs b/Data/Pool/Introspection.hs index b5c91c3..5effc36 100644 --- a/Data/Pool/Introspection.hs +++ b/Data/Pool/Introspection.hs @@ -1,7 +1,8 @@ -- | A variant of "Data.Pool" with introspection capabilities. module Data.Pool.Introspection ( -- * Pool - Pool + PoolConfig(..) + , Pool , LocalPool , newPool @@ -66,14 +67,14 @@ takeResource pool = mask_ $ do t2 <- getMonotonicTime pure (Resource a (stripeId lp) (t2 - t1) (WaitedThen Taken) 0, lp) Nothing -> do - a <- createResource pool `onException` restoreSize (stripeVar lp) + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) t2 <- getMonotonicTime pure (Resource a (stripeId lp) (t2 - t1) (WaitedThen Created) 0, lp) else case cache stripe of [] -> do let newAvailable = available stripe - 1 putMVar (stripeVar lp) $! stripe { available = newAvailable } - a <- createResource pool `onException` restoreSize (stripeVar lp) + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) t2 <- getMonotonicTime pure (Resource a (stripeId lp) (t2 - t1) Created newAvailable, lp) Entry a _ : as -> do From 1c02853ea5dbb3ef21d90ef5d7cfc71c86cd8f71 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Tue, 31 May 2022 12:49:23 +0200 Subject: [PATCH 10/16] Don't hide haddock of the .Internal module --- Data/Pool/Internal.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Data/Pool/Internal.hs b/Data/Pool/Internal.hs index e12e8f2..85c5557 100644 --- a/Data/Pool/Internal.hs +++ b/Data/Pool/Internal.hs @@ -2,7 +2,7 @@ -- -- This module is intended for internal use only, and may change without warning -- in subsequent releases. -{-# OPTIONS_HADDOCK hide, not-home #-} +{-# OPTIONS_HADDOCK not-home #-} module Data.Pool.Internal where import Control.Concurrent From 1344386f6b1d06afeb873aaff4ecb79e000a9aff Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Tue, 31 May 2022 14:25:27 +0200 Subject: [PATCH 11/16] Add a changelog --- CHANGELOG.md | 2 ++ resource-pool.cabal | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..13d445c --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,2 @@ +# resource-pool-0.3.0.0 (2022-??-??) +* Rewrite based on `Control.Concurrent.QSem` for better throughput and latency. diff --git a/resource-pool.cabal b/resource-pool.cabal index ca07867..098cc34 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -16,6 +16,8 @@ description: A high-performance striped pooling abstraction for managing tested-with: GHC ==8.4.4 || ==8.6.5 || ==8.8.4 || ==8.10.7 || ==9.0.2 || ==9.2.2 +extra-doc-files: CHANGELOG.md + bug-reports: https://github.com/scrive/pool/issues source-repository head type: git From daa2e1bb7f5152a7880ede944e0bc50dd775e207 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Tue, 31 May 2022 14:54:49 +0200 Subject: [PATCH 12/16] Update README --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index ce27184..19142dc 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,6 @@ [![Dependencies](https://img.shields.io/hackage-deps/v/resource-pool.svg)](https://packdeps.haskellers.com/feed?needle=andrzej@rybczak.net) [![Stackage LTS](https://www.stackage.org/package/resource-pool/badge/lts)](https://www.stackage.org/lts/package/resource-pool) [![Stackage Nightly](https://www.stackage.org/package/resource-pool/badge/nightly)](https://www.stackage.org/nightly/package/resource-pool) + +A high-performance striped resource pooling implementation for Haskell based on +[Control.Concurrent.QSem](https://hackage.haskell.org/package/base/docs/Control-Concurrent-QSem.html). From b64a7d87283dfeb96c21620e1c6dcc676ed02c48 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Tue, 31 May 2022 14:58:22 +0200 Subject: [PATCH 13/16] Move modules to src --- resource-pool.cabal | 2 ++ {Data => src/Data}/Pool.hs | 0 {Data => src/Data}/Pool/Internal.hs | 0 {Data => src/Data}/Pool/Introspection.hs | 0 4 files changed, 2 insertions(+) rename {Data => src/Data}/Pool.hs (100%) rename {Data => src/Data}/Pool/Internal.hs (100%) rename {Data => src/Data}/Pool/Introspection.hs (100%) diff --git a/resource-pool.cabal b/resource-pool.cabal index 098cc34..ccd341e 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -24,6 +24,8 @@ source-repository head location: https://github.com/scrive/pool.git library + hs-source-dirs: src + exposed-modules: Data.Pool Data.Pool.Internal Data.Pool.Introspection diff --git a/Data/Pool.hs b/src/Data/Pool.hs similarity index 100% rename from Data/Pool.hs rename to src/Data/Pool.hs diff --git a/Data/Pool/Internal.hs b/src/Data/Pool/Internal.hs similarity index 100% rename from Data/Pool/Internal.hs rename to src/Data/Pool/Internal.hs diff --git a/Data/Pool/Introspection.hs b/src/Data/Pool/Introspection.hs similarity index 100% rename from Data/Pool/Introspection.hs rename to src/Data/Pool/Introspection.hs From a06f6f1fb5400c804422d9a2740b80e2e313109b Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Tue, 31 May 2022 14:59:13 +0200 Subject: [PATCH 14/16] Update README --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 19142dc..833ee37 100644 --- a/README.md +++ b/README.md @@ -7,4 +7,4 @@ [![Stackage Nightly](https://www.stackage.org/package/resource-pool/badge/nightly)](https://www.stackage.org/nightly/package/resource-pool) A high-performance striped resource pooling implementation for Haskell based on -[Control.Concurrent.QSem](https://hackage.haskell.org/package/base/docs/Control-Concurrent-QSem.html). +[QSem](https://hackage.haskell.org/package/base/docs/Control-Concurrent-QSem.html). From 8e31365ac90bf87163ec86602d9cf8d8f176b177 Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Wed, 1 Jun 2022 13:49:58 +0200 Subject: [PATCH 15/16] Adjust stats (#5) * Adjust stats * Fix a typo * Fix a typo --- src/Data/Pool/Introspection.hs | 69 ++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/src/Data/Pool/Introspection.hs b/src/Data/Pool/Introspection.hs index 5effc36..2bab6aa 100644 --- a/src/Data/Pool/Introspection.hs +++ b/src/Data/Pool/Introspection.hs @@ -8,7 +8,7 @@ module Data.Pool.Introspection -- * Resource management , Resource(..) - , AcquisitionMethod(..) + , Acquisition(..) , withResource , takeResource , putResource @@ -27,21 +27,18 @@ import Data.Pool.Internal data Resource a = Resource { resource :: a , stripeNumber :: !Int - , acquisitionTime :: !Double - , acquisitionMethod :: !AcquisitionMethod , availableResources :: !Int + , acquisition :: !Acquisition + , acquisitionTime :: !Double + , creationTime :: !(Maybe Double) } deriving (Eq, Show, Generic) --- | Method of acquiring a resource from the pool. -data AcquisitionMethod - = Created - -- ^ A new resource was created. - | Taken - -- ^ An existing resource was directly taken from the pool. - | WaitedThen !AcquisitionMethod - -- ^ The thread had to wait until a resource was released. The inner method - -- signifies whether the resource was returned to the pool via 'putResource' - -- ('Taken') or 'destroyResource' ('Created'). +-- | Describes how a resource was acquired from the pool. +data Acquisition + = Immediate + -- ^ A resource was taken from the pool immediately. + | Delayed + -- ^ The thread had to wait until a resource was released. deriving (Eq, Show, Generic) -- | 'Data.Pool.withResource' with introspection capabilities. @@ -65,20 +62,54 @@ takeResource pool = mask_ $ do waitForResource (stripeVar lp) q >>= \case Just a -> do t2 <- getMonotonicTime - pure (Resource a (stripeId lp) (t2 - t1) (WaitedThen Taken) 0, lp) + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = 0 + , acquisition = Delayed + , acquisitionTime = t2 - t1 + , creationTime = Nothing + } + pure (res, lp) Nothing -> do - a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) t2 <- getMonotonicTime - pure (Resource a (stripeId lp) (t2 - t1) (WaitedThen Created) 0, lp) + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) + t3 <- getMonotonicTime + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = 0 + , acquisition = Delayed + , acquisitionTime = t2 - t1 + , creationTime = Just $! t3 - t2 + } + pure (res, lp) else case cache stripe of [] -> do let newAvailable = available stripe - 1 putMVar (stripeVar lp) $! stripe { available = newAvailable } - a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) t2 <- getMonotonicTime - pure (Resource a (stripeId lp) (t2 - t1) Created newAvailable, lp) + a <- createResource (poolConfig pool) `onException` restoreSize (stripeVar lp) + t3 <- getMonotonicTime + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = newAvailable + , acquisition = Immediate + , acquisitionTime = t2 - t1 + , creationTime = Just $! t3 - t2 + } + pure (res, lp) Entry a _ : as -> do let newAvailable = available stripe - 1 putMVar (stripeVar lp) $! stripe { available = newAvailable, cache = as } t2 <- getMonotonicTime - pure (Resource a (stripeId lp) (t2 - t1) Taken newAvailable, lp) + let res = Resource + { resource = a + , stripeNumber = stripeId lp + , availableResources = newAvailable + , acquisition = Immediate + , acquisitionTime = t2 - t1 + , creationTime = Nothing + } + pure (res, lp) From bca254f01cccba010d55a20a3c97e26ae7357a1d Mon Sep 17 00:00:00 2001 From: Andrzej Rybczak Date: Wed, 1 Jun 2022 13:47:31 +0200 Subject: [PATCH 16/16] Include README.md in extra-doc-files --- resource-pool.cabal | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/resource-pool.cabal b/resource-pool.cabal index ccd341e..cde8eb1 100644 --- a/resource-pool.cabal +++ b/resource-pool.cabal @@ -16,7 +16,9 @@ description: A high-performance striped pooling abstraction for managing tested-with: GHC ==8.4.4 || ==8.6.5 || ==8.8.4 || ==8.10.7 || ==9.0.2 || ==9.2.2 -extra-doc-files: CHANGELOG.md +extra-doc-files: + CHANGELOG.md + README.md bug-reports: https://github.com/scrive/pool/issues source-repository head