Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ main = do
else void $ withResource pool $ \conn -> I.runThrow (S.enqueue E.int4 (replicate enqueueBatchCount payload)) conn
dequeueAction = if notify > 0
then void $ withResource pool $ \conn ->
IO.withDequeue "channel" conn D.int4 1 dequeueBatchCount (const $ pure ())
IO.withDequeue "channel" conn "" D.int4 1 dequeueBatchCount (const $ pure ())
else void $ withResource pool $ \conn -> fix $ \next ->
I.runThrow (S.dequeue D.int4 dequeueBatchCount) conn >>= \case
[] -> next
Expand Down
4 changes: 2 additions & 2 deletions hasql-queue.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ cabal-version: 1.12
--
-- see: https://github.com/sol/hpack
--
-- hash: 956ae93525f9dafcc0c9c8149cd2bbc8cfcfe4e63310adec92ce40f995e4cbf4
-- hash: 53732dee5cba530cd2bc93a536addc0db9e136ecebf54b707637daac0ec24a67

name: hasql-queue
version: 1.2.0.1
version: 1.2.0.2
synopsis: A PostgreSQL backed queue
description: A PostgreSQL backed queue. Please see README.md
category: Web
Expand Down
9 changes: 7 additions & 2 deletions src/Hasql/Queue/High/AtLeastOnce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import qualified Hasql.Encoders as E
import qualified Hasql.Decoders as D
import Control.Exception
import Data.Function
import Data.ByteString (ByteString)

{-|Enqueue a list of payloads.
-}
Expand All @@ -31,6 +32,8 @@ any entries 'withDequeue' will wrap the list in 'Just'.
-}
withDequeue :: Connection
-- ^ Connection
-> ByteString
-- ^ Optional filter
-> D.Value a
-- ^ Payload decoder
-> Int
Expand Down Expand Up @@ -79,6 +82,8 @@ withDequeueWith :: forall e a b
. Exception e
=> Connection
-- ^ Connection
-> ByteString
-- ^ Optional filter
-> D.Value a
-- ^ Payload decoder
-> Int
Expand All @@ -88,8 +93,8 @@ withDequeueWith :: forall e a b
-> ([a] -> IO b)
-- ^ Continuation
-> IO (Maybe b)
withDequeueWith conn decoder retryCount count f = (fix $ \restart i -> do
try (flip I.runThrow conn $ I.withDequeue decoder retryCount count f) >>= \case
withDequeueWith conn theFilter decoder retryCount count f = (fix $ \restart i -> do
try (flip I.runThrow conn $ I.withDequeue theFilter decoder retryCount count f) >>= \case
Right x -> pure x
Left (e :: e) ->
if i < retryCount then
Expand Down
21 changes: 11 additions & 10 deletions src/Hasql/Queue/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import Hasql.Connection
import Data.Int
import Data.Functor.Contravariant
import Data.String.Here.Uninterpolated
import Data.String.Here.Interpolated
import Hasql.Statement
import Data.ByteString (ByteString)
import Control.Exception
Expand Down Expand Up @@ -102,28 +103,29 @@ enqueuePayload theEncoder values = do

statement values theStatement

dequeuePayload :: D.Value a -> Int -> Session [Payload a]
dequeuePayload valueDecoder count = do
let multipleQuery = [here|
dequeuePayload :: ByteString -> D.Value a -> Int -> Session [Payload a]
dequeuePayload theFilter valueDecoder count = do
let multipleQuery = [i|
DELETE FROM payloads
WHERE id in
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
WHERE p1.state='enqueued' ${theFilter}
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED

LIMIT $1
)
RETURNING id, state, attempts, modified_at, value
|]
multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4

singleQuery = [here|
singleQuery = [i|
DELETE FROM payloads
WHERE id =
( SELECT p1.id
FROM payloads AS p1
WHERE p1.state='enqueued'
WHERE p1.state='enqueued' ${theFilter}
ORDER BY p1.modified_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
Expand Down Expand Up @@ -293,11 +295,10 @@ failures = listState Failed

-- Move to Internal
-- This should use bracketOnError
withDequeue :: D.Value a -> Int -> Int -> ([a] -> IO b) -> Session (Maybe b)
withDequeue decoder retryCount count f = do
-- TODO turn to a save point
withDequeue :: ByteString -> D.Value a -> Int -> Int -> ([a] -> IO b) -> Session (Maybe b)
withDequeue theFilter decoder retryCount count f = do
sql "BEGIN;SAVEPOINT temp"
dequeuePayload decoder count >>= \case
dequeuePayload theFilter decoder count >>= \case
[] -> Nothing <$ sql "COMMIT"
xs -> fmap Just $ do
liftIO (try $ f $ fmap pValue xs) >>= \case
Expand Down
9 changes: 7 additions & 2 deletions src/Hasql/Queue/Low/AtLeastOnce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import Control.Exception
import Data.Function
import Data.Text (Text)
import Control.Monad.IO.Class
import Data.ByteString (ByteString)

{-|Enqueue a list of payloads.
-}
Expand Down Expand Up @@ -51,6 +52,8 @@ withDequeue :: Text
-- ^ Notification channel name. Any valid PostgreSQL identifier
-> Connection
-- ^ Connection
-> ByteString
-- ^ Optional filter
-> D.Value a
-- ^ Payload decoder
-> Int
Expand Down Expand Up @@ -101,6 +104,8 @@ withDequeueWith :: forall e a b
-- ^ Notification channel name. Any valid PostgreSQL identifier
-> Connection
-- ^ Connection
-> ByteString
-- ^ Optional filter
-> D.Value a
-- ^ Payload decoder
-> Int
Expand All @@ -110,8 +115,8 @@ withDequeueWith :: forall e a b
-> ([a] -> IO b)
-- ^ Continuation
-> IO b
withDequeueWith withNotifyHandlers channel conn decoder retryCount count f = (fix $ \restart i -> do
let action = I.withDequeue decoder retryCount count f >>= \case
withDequeueWith withNotifyHandlers channel conn theFilter decoder retryCount count f = (fix $ \restart i -> do
let action = I.withDequeue theFilter decoder retryCount count f >>= \case
Nothing -> liftIO $ throwIO I.NoRows
Just x -> pure x

Expand Down
47 changes: 30 additions & 17 deletions test/Hasql/Queue/High/AtLeastOnceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,19 @@ data FailedwithDequeue = FailedwithDequeue

instance Exception FailedwithDequeue

withDequeueNoFilter :: Connection
-- ^ Connection
-> D.Value a
-- ^ Payload decoder
-> Int
-- ^ Retry count
-> Int
-- ^ Element count
-> ([a] -> IO b)
-- ^ Continuation
-> IO (Maybe b)
withDequeueNoFilter c = withDequeue c ""

getPayload :: Connection -> D.Value a -> I.PayloadId -> IO (Maybe (I.Payload a))
getPayload conn decoder payloadId = I.runThrow (I.getPayload decoder payloadId) conn

Expand All @@ -26,66 +39,66 @@ spec = describe "Hasql.Queue.High.AtLeastOnce" $ parallel $ do
sequential $ aroundAll withSetup $ describe "enqueue/dequeue" $ do
it "enqueue nothing gives nothing" $ withConnection $ \conn -> do
enqueue conn E.int4 []
withDequeue conn D.int4 1 1 pure `shouldReturn` Nothing
withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Nothing

it "enqueue 1 gives 1" $ withConnection $ \conn -> do
enqueue conn E.int4 [1]
withDequeue conn D.int4 1 1 pure `shouldReturn` Just [1]
withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Just [1]

it "dequeue give nothing after enqueueing everything" $ withConnection $ \conn -> do
withDequeue conn D.int4 1 1 pure `shouldReturn` Nothing
withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Nothing

it "dequeueing is in FIFO order" $ withConnection $ \conn -> do
enqueue conn E.int4 [1]
enqueue conn E.int4 [2]
withDequeue conn D.int4 1 1 pure `shouldReturn` Just [1]
withDequeue conn D.int4 1 1 pure `shouldReturn` Just [2]
withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Just [1]
withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Just [2]

it "dequeueing a batch of elements works" $ withConnection $ \conn -> do
enqueue conn E.int4 [1, 2, 3]
withDequeue conn D.int4 1 2 pure `shouldReturn` Just [1, 2]
withDequeueNoFilter conn D.int4 1 2 pure `shouldReturn` Just [1, 2]

withDequeue conn D.int4 1 2 pure `shouldReturn` Just [3]
withDequeueNoFilter conn D.int4 1 2 pure `shouldReturn` Just [3]

it "withDequeue fails if a non IOError is thrown" $ withConnection $ \conn -> do
it "withDequeueNoFilter fails if a non IOError is thrown" $ withConnection $ \conn -> do
enqueue conn E.int4 [1]
handle (\FailedwithDequeue -> pure Nothing) $
withDequeue conn D.int4 2 1 $ \_ -> throwIO FailedwithDequeue
withDequeueNoFilter conn D.int4 2 1 $ \_ -> throwIO FailedwithDequeue

failures conn D.int4 Nothing 1 `shouldReturn` []
withDequeue conn D.int4 0 1 pure `shouldReturn` Just [1]
withDequeueNoFilter conn D.int4 0 1 pure `shouldReturn` Just [1]

it "withDequeue fails if throws occur and retry is zero" $ withConnection $ \conn -> do
it "withDequeueNoFilter fails if throws occur and retry is zero" $ withConnection $ \conn -> do
enqueue conn E.int4 [1]
handle (\(_ :: IOError) -> pure Nothing) $
withDequeue conn D.int4 0 1 $ \_ -> throwIO $ userError "hey"
withDequeueNoFilter conn D.int4 0 1 $ \_ -> throwIO $ userError "hey"

[(pId, x)] <- failures conn D.int4 Nothing 1
x `shouldBe` 1
delete conn [pId]

it "withDequeue succeeds even if the first attempt fails" $ withConnection $ \conn -> do
it "withDequeueNoFilter succeeds even if the first attempt fails" $ withConnection $ \conn -> do
enqueue conn E.int4 [1]

ref <- newIORef (0 :: Int)

withDequeue conn D.int4 1 1 (\_ -> do
withDequeueNoFilter conn D.int4 1 1 (\_ -> do
count <- readIORef ref
writeIORef ref $ count + 1
when (count < 1) $ throwIO $ userError "hey"
pure '!') `shouldReturn` Just '!'

withDequeue conn D.int4 1 1 pure `shouldReturn` Nothing
withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Nothing
readIORef ref `shouldReturn` 2

it "failures paging works" $ withConnection $ \conn -> do
enqueue conn E.int4 [2]
enqueue conn E.int4 [3]

handle (\(_ :: IOError) -> pure Nothing) $
withDequeue conn D.int4 0 1 $ \_ -> throwIO $ userError "fds"
withDequeueNoFilter conn D.int4 0 1 $ \_ -> throwIO $ userError "fds"
handle (\(_ :: IOError) -> pure Nothing) $
withDequeue conn D.int4 0 1 $ \_ -> throwIO $ userError "fds"
withDequeueNoFilter conn D.int4 0 1 $ \_ -> throwIO $ userError "fds"

[(next, x)] <- failures conn D.int4 Nothing 1
x `shouldBe` 2
Expand Down
51 changes: 33 additions & 18 deletions test/Hasql/Queue/Low/AtLeastOnceSpec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ import Hasql.Queue.Internal (Payload (..))
import Hasql.Queue.TestUtils
import System.Timeout

withDequeueNoFilter :: Text
-- ^ Notification channel name. Any valid PostgreSQL identifier
-> Connection
-- ^ Connection
-> D.Value a
-- ^ Payload decoder
-> Int
-- ^ Retry count
-> Int
-- ^ Element count
-> ([a] -> IO b)
-- ^ Continuation
-> IO b
withDequeueNoFilter t c = withDequeue t c ""

getCount :: Connection -> IO Int64
getCount = I.runThrow I.getCount

Expand All @@ -44,33 +59,33 @@ spec :: Spec
spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "enqueue/withDequeue" $ do
it "enqueue nothing timesout" $ withConnection $ \conn -> do
enqueue channel conn E.int4 []
timeout 100000 (withDequeue channel conn D.int4 1 1 pure) `shouldReturn` Nothing
timeout 100000 (withDequeueNoFilter channel conn D.int4 1 1 pure) `shouldReturn` Nothing

it "enqueue 1 gives 1" $ withConnection $ \conn -> do
enqueue channel conn E.int4 [1]
withDequeue channel conn D.int4 1 1 pure `shouldReturn` [1]
withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [1]

it "dequeue timesout after enqueueing everything" $ withConnection $ \conn -> do
timeout 100000 (withDequeue channel conn D.int4 1 1 pure) `shouldReturn` Nothing
timeout 100000 (withDequeueNoFilter channel conn D.int4 1 1 pure) `shouldReturn` Nothing

it "dequeueing is in FIFO order" $ withConnection $ \conn -> do
enqueue channel conn E.int4 [1]
enqueue channel conn E.int4 [2]
withDequeue channel conn D.int4 1 1 pure `shouldReturn` [1]
withDequeue channel conn D.int4 1 1 pure `shouldReturn` [2]
withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [1]
withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [2]

it "dequeueing a batch of elements works" $ withConnection $ \conn -> do
enqueue channel conn E.int4 [1, 2, 3]
withDequeue channel conn D.int4 1 2 pure `shouldReturn` [1, 2]
withDequeueNoFilter channel conn D.int4 1 2 pure `shouldReturn` [1, 2]

withDequeue channel conn D.int4 1 1 pure `shouldReturn` [3]
withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [3]

it "withDequeue blocks until something is enqueued: before" $ withConnection $ \conn -> do
it "withDequeueNoFilter blocks until something is enqueued: before" $ withConnection $ \conn -> do
void $ enqueue channel conn E.int4 [1]
res <- withDequeue channel conn D.int4 1 1 pure
res <- withDequeueNoFilter channel conn D.int4 1 1 pure
res `shouldBe` [1]

it "withDequeue blocks until something is enqueued: during" $ withConnection $ \conn -> do
it "withDequeueNoFilter blocks until something is enqueued: during" $ withConnection $ \conn -> do
afterActionMVar <- newEmptyMVar
beforeNotifyMVar <- newEmptyMVar

Expand All @@ -80,7 +95,7 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "
}

-- This is the definition of IO.dequeue
resultThread <- async $ withDequeueWith @IOError handlers channel conn D.int4 1 1 pure
resultThread <- async $ withDequeueWith @IOError handlers channel conn "" D.int4 1 1 pure
takeMVar afterActionMVar

void $ enqueue "hey" conn E.int4 [1]
Expand All @@ -89,28 +104,28 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "

wait resultThread `shouldReturn` [1]

it "withDequeue blocks until something is enqueued: after" $ withConnection2 $ \(conn1, conn2) -> do
thread <- async $ withDequeue channel conn1 D.int4 1 1 pure
it "withDequeueNoFilter blocks until something is enqueued: after" $ withConnection2 $ \(conn1, conn2) -> do
thread <- async $ withDequeueNoFilter channel conn1 D.int4 1 1 pure
timeout 100000 (wait thread) `shouldReturn` Nothing

enqueue channel conn2 E.int4 [1]

wait thread `shouldReturn` [1]

-- TODO redo just using failures
it "withDequeue fails and sets the retries to +1" $ withConnection $ \conn -> do
it "withDequeueNoFilter fails and sets the retries to +1" $ withConnection $ \conn -> do
enqueue channel conn E.int4 [1]
handle (\(_ :: IOError) -> pure ()) $ withDequeue channel conn D.int4 0 1 $ \_ -> throwIO $ userError "hey"
handle (\(_ :: IOError) -> pure ()) $ withDequeueNoFilter channel conn D.int4 0 1 $ \_ -> throwIO $ userError "hey"
xs <- failures conn D.int4 Nothing 1

map snd xs `shouldBe` [1]

it "withDequeue succeeds even if the first attempt fails" $ withConnection $ \conn -> do
it "withDequeueNoFilter succeeds even if the first attempt fails" $ withConnection $ \conn -> do
[payloadId] <- I.runThrow (I.enqueuePayload E.int4 [1]) conn

ref <- newIORef (0 :: Int)

withDequeueWith @FailedwithDequeue mempty channel conn D.int4 1 1 (\_ -> do
withDequeueWith @FailedwithDequeue mempty channel conn "" D.int4 1 1 (\_ -> do
count <- readIORef ref
writeIORef ref $ count + 1
when (count < 1) $ throwIO FailedwithDequeue
Expand All @@ -126,7 +141,7 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "
ref <- newTVarIO []

loopThreads <- replicateM 35 $ async $ withPool' $ \c -> fix $ \next -> do
lastCount <- withDequeue channel c D.int4 1 1 $ \[x] -> do
lastCount <- withDequeueNoFilter channel c D.int4 1 1 $ \[x] -> do
atomically $ do
xs <- readTVar ref
writeTVar ref $ x : xs
Expand Down