Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 79 additions & 10 deletions Data/Pool.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ module Data.Pool
Pool(idleTime, maxResources, numStripes)
, LocalPool
, createPool
, createPoolWithMaxReuseCount
, withResource
, takeResource
, tryWithResource
Expand All @@ -45,7 +46,7 @@ import Control.Applicative ((<$>))
import Control.Concurrent (ThreadId, forkIOWithUnmask, killThread, myThreadId, threadDelay)
import Control.Concurrent.STM
import Control.Exception (SomeException, onException, mask_)
import Control.Monad (forM_, forever, join, liftM3, unless, when)
import Control.Monad (forM_, forever, join, liftM5, unless, when)
import Data.Hashable (hash)
import Data.IORef (IORef, newIORef, mkWeakIORef)
import Data.List (partition)
Expand Down Expand Up @@ -86,6 +87,10 @@ data LocalPool a = LocalPool {
-- ^ Count of open entries (both idle and in use).
, entries :: TVar [Entry a]
-- ^ Idle entries.
, increment :: a -> a
-- ^ Function that increments reuse count.
, isReusable :: a -> Bool
-- ^ Function that returns if a resource is reusable or not.
, lfin :: IORef ()
-- ^ empty value used to attach a finalizer to (internal)
} deriving (Typeable)
Expand Down Expand Up @@ -150,16 +155,78 @@ createPool
-- Requests for resources will block if this limit is reached on a
-- single stripe, even if other stripes have idle resources
-- available.
-> IO (Pool a)
createPool create destroy numStripes idleTime maxResources = do
-> IO (Pool a)
createPool create destroy numStripes idleTime maxResources =
createPool' create destroy numStripes idleTime maxResources id (const True)

createPoolWithMaxReuseCount
:: IO a
-- ^ Action that creates a new resource.
-> (a -> IO ())
-- ^ Action that destroys an existing resource.
-> Int
-- ^ Stripe count. The number of distinct sub-pools to maintain.
-- The smallest acceptable value is 1.
-> NominalDiffTime
-- ^ Amount of time for which an unused resource is kept open.
-- The smallest acceptable value is 0.5 seconds.
--
-- The elapsed time before destroying a resource may be a little
-- longer than requested, as the reaper thread wakes at 1-second
-- intervals.
-> Int
-- ^ Maximum number of resources to keep open per stripe. The
-- smallest acceptable value is 1.
--
-- Requests for resources will block if this limit is reached on a
-- single stripe, even if other stripes have idle resources
-- available.
-> Int
-- ^ Maxium reuse count. The smallest acceptable value is 1.
-> IO (Pool (a, Int))
createPoolWithMaxReuseCount create destroy numStripes idleTime maxResources maxReuseCount = do
let create' = do x <- create; return (x, 0)
destroy' (x, _) = destroy x
increment (x, n) = (x, n + 1)
isReusable (_, n) = n < maxReuseCount
createPool' create' destroy' numStripes idleTime maxResources increment isReusable

createPool'
:: IO a
-- ^ Action that creates a new resource.
-> (a -> IO ())
-- ^ Action that destroys an existing resource.
-> Int
-- ^ Stripe count. The number of distinct sub-pools to maintain.
-- The smallest acceptable value is 1.
-> NominalDiffTime
-- ^ Amount of time for which an unused resource is kept open.
-- The smallest acceptable value is 0.5 seconds.
--
-- The elapsed time before destroying a resource may be a little
-- longer than requested, as the reaper thread wakes at 1-second
-- intervals.
-> Int
-- ^ Maximum number of resources to keep open per stripe. The
-- smallest acceptable value is 1.
--
-- Requests for resources will block if this limit is reached on a
-- single stripe, even if other stripes have idle resources
-- available.
-> (a -> a)
-- ^ Function that increments reuse count.
-> (a -> Bool)
-- ^ Function that returns if a resource is reusable or not.
-> IO (Pool a)
createPool' create destroy numStripes idleTime maxResources increment isReusable = do
when (numStripes < 1) $
modError "pool " $ "invalid stripe count " ++ show numStripes
when (idleTime < 0.5) $
modError "pool " $ "invalid idle time " ++ show idleTime
when (maxResources < 1) $
modError "pool " $ "invalid maximum resource count " ++ show maxResources
localPools <- V.replicateM numStripes $
liftM3 LocalPool (newTVarIO 0) (newTVarIO []) (newIORef ())
liftM5 LocalPool (newTVarIO 0) (newTVarIO []) (return increment) (return isReusable) (newIORef ())
reaperId <- forkIOLabeledWithUnmask "resource-pool: reaper" $ \unmask ->
unmask $ reaper destroy idleTime localPools
fin <- newIORef ()
Expand Down Expand Up @@ -206,8 +273,8 @@ reaper :: (a -> IO ()) -> NominalDiffTime -> V.Vector (LocalPool a) -> IO ()
reaper destroy idleTime pools = forever $ do
threadDelay (1 * 1000000)
now <- getCurrentTime
let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime
V.forM_ pools $ \LocalPool{..} -> do
let isStale Entry{..} = now `diffUTCTime` lastUse > idleTime || not (isReusable entry)
resources <- atomically $ do
(stale,fresh) <- partition isStale <$> readTVar entries
unless (null stale) $ do
Expand Down Expand Up @@ -276,9 +343,10 @@ takeResource :: Pool a -> IO (a, LocalPool a)
takeResource pool@Pool{..} = do
local@LocalPool{..} <- getLocalPool pool
resource <- liftBase . join . atomically $ do
ents <- readTVar entries
ents' <- readTVar entries
let (olds, ents) = span (not . isReusable . entry) ents'
case ents of
(Entry{..}:es) -> writeTVar entries es >> return (return entry)
(Entry{..}:es) -> writeTVar entries (olds ++ es) >> return (return entry)
[] -> do
used <- readTVar inUse
when (used == maxResources) retry
Expand Down Expand Up @@ -322,9 +390,10 @@ tryTakeResource :: Pool a -> IO (Maybe (a, LocalPool a))
tryTakeResource pool@Pool{..} = do
local@LocalPool{..} <- getLocalPool pool
resource <- liftBase . join . atomically $ do
ents <- readTVar entries
ents' <- readTVar entries
let (olds, ents) = span (not . isReusable . entry) ents'
case ents of
(Entry{..}:es) -> writeTVar entries es >> return (return . Just $ entry)
(Entry{..}:es) -> writeTVar entries (olds ++ es) >> return (return . Just $ entry)
[] -> do
used <- readTVar inUse
if used == maxResources
Expand Down Expand Up @@ -363,7 +432,7 @@ destroyResource Pool{..} LocalPool{..} resource = do
putResource :: LocalPool a -> a -> IO ()
putResource LocalPool{..} resource = do
now <- getCurrentTime
atomically $ modifyTVar_ entries (Entry resource now:)
atomically $ modifyTVar_ entries (Entry (increment resource) now:)
#if __GLASGOW_HASKELL__ >= 700
{-# INLINABLE putResource #-}
#endif
Expand Down