From b8520068f0e74c656b3f4f3b601b9e0ed4a14126 Mon Sep 17 00:00:00 2001 From: Jonathan Fischoff Date: Sat, 22 Aug 2020 23:50:51 -0700 Subject: [PATCH] Filterable --- benchmarks/Main.hs | 2 +- hasql-queue.cabal | 4 +- src/Hasql/Queue/High/AtLeastOnce.hs | 9 ++++- src/Hasql/Queue/Internal.hs | 21 +++++----- src/Hasql/Queue/Low/AtLeastOnce.hs | 9 ++++- test/Hasql/Queue/High/AtLeastOnceSpec.hs | 47 ++++++++++++++-------- test/Hasql/Queue/Low/AtLeastOnceSpec.hs | 51 +++++++++++++++--------- 7 files changed, 91 insertions(+), 52 deletions(-) diff --git a/benchmarks/Main.hs b/benchmarks/Main.hs index 961bec1..f36bf0e 100644 --- a/benchmarks/Main.hs +++ b/benchmarks/Main.hs @@ -84,7 +84,7 @@ main = do else void $ withResource pool $ \conn -> I.runThrow (S.enqueue E.int4 (replicate enqueueBatchCount payload)) conn dequeueAction = if notify > 0 then void $ withResource pool $ \conn -> - IO.withDequeue "channel" conn D.int4 1 dequeueBatchCount (const $ pure ()) + IO.withDequeue "channel" conn "" D.int4 1 dequeueBatchCount (const $ pure ()) else void $ withResource pool $ \conn -> fix $ \next -> I.runThrow (S.dequeue D.int4 dequeueBatchCount) conn >>= \case [] -> next diff --git a/hasql-queue.cabal b/hasql-queue.cabal index db09027..8b68806 100644 --- a/hasql-queue.cabal +++ b/hasql-queue.cabal @@ -4,10 +4,10 @@ cabal-version: 1.12 -- -- see: https://github.com/sol/hpack -- --- hash: 956ae93525f9dafcc0c9c8149cd2bbc8cfcfe4e63310adec92ce40f995e4cbf4 +-- hash: 53732dee5cba530cd2bc93a536addc0db9e136ecebf54b707637daac0ec24a67 name: hasql-queue -version: 1.2.0.1 +version: 1.2.0.2 synopsis: A PostgreSQL backed queue description: A PostgreSQL backed queue. Please see README.md category: Web diff --git a/src/Hasql/Queue/High/AtLeastOnce.hs b/src/Hasql/Queue/High/AtLeastOnce.hs index 1f0502c..602aa1b 100644 --- a/src/Hasql/Queue/High/AtLeastOnce.hs +++ b/src/Hasql/Queue/High/AtLeastOnce.hs @@ -6,6 +6,7 @@ import qualified Hasql.Encoders as E import qualified Hasql.Decoders as D import Control.Exception import Data.Function +import Data.ByteString (ByteString) {-|Enqueue a list of payloads. -} @@ -31,6 +32,8 @@ any entries 'withDequeue' will wrap the list in 'Just'. -} withDequeue :: Connection -- ^ Connection + -> ByteString + -- ^ Optional filter -> D.Value a -- ^ Payload decoder -> Int @@ -79,6 +82,8 @@ withDequeueWith :: forall e a b . Exception e => Connection -- ^ Connection + -> ByteString + -- ^ Optional filter -> D.Value a -- ^ Payload decoder -> Int @@ -88,8 +93,8 @@ withDequeueWith :: forall e a b -> ([a] -> IO b) -- ^ Continuation -> IO (Maybe b) -withDequeueWith conn decoder retryCount count f = (fix $ \restart i -> do - try (flip I.runThrow conn $ I.withDequeue decoder retryCount count f) >>= \case +withDequeueWith conn theFilter decoder retryCount count f = (fix $ \restart i -> do + try (flip I.runThrow conn $ I.withDequeue theFilter decoder retryCount count f) >>= \case Right x -> pure x Left (e :: e) -> if i < retryCount then diff --git a/src/Hasql/Queue/Internal.hs b/src/Hasql/Queue/Internal.hs index e5c8106..a1e5e6c 100644 --- a/src/Hasql/Queue/Internal.hs +++ b/src/Hasql/Queue/Internal.hs @@ -13,6 +13,7 @@ import Hasql.Connection import Data.Int import Data.Functor.Contravariant import Data.String.Here.Uninterpolated +import Data.String.Here.Interpolated import Hasql.Statement import Data.ByteString (ByteString) import Control.Exception @@ -102,28 +103,29 @@ enqueuePayload theEncoder values = do statement values theStatement -dequeuePayload :: D.Value a -> Int -> Session [Payload a] -dequeuePayload valueDecoder count = do - let multipleQuery = [here| +dequeuePayload :: ByteString -> D.Value a -> Int -> Session [Payload a] +dequeuePayload theFilter valueDecoder count = do + let multipleQuery = [i| DELETE FROM payloads WHERE id in ( SELECT p1.id FROM payloads AS p1 - WHERE p1.state='enqueued' + WHERE p1.state='enqueued' ${theFilter} ORDER BY p1.modified_at ASC FOR UPDATE SKIP LOCKED + LIMIT $1 ) RETURNING id, state, attempts, modified_at, value |] multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4 - singleQuery = [here| + singleQuery = [i| DELETE FROM payloads WHERE id = ( SELECT p1.id FROM payloads AS p1 - WHERE p1.state='enqueued' + WHERE p1.state='enqueued' ${theFilter} ORDER BY p1.modified_at ASC FOR UPDATE SKIP LOCKED LIMIT 1 @@ -293,11 +295,10 @@ failures = listState Failed -- Move to Internal -- This should use bracketOnError -withDequeue :: D.Value a -> Int -> Int -> ([a] -> IO b) -> Session (Maybe b) -withDequeue decoder retryCount count f = do - -- TODO turn to a save point +withDequeue :: ByteString -> D.Value a -> Int -> Int -> ([a] -> IO b) -> Session (Maybe b) +withDequeue theFilter decoder retryCount count f = do sql "BEGIN;SAVEPOINT temp" - dequeuePayload decoder count >>= \case + dequeuePayload theFilter decoder count >>= \case [] -> Nothing <$ sql "COMMIT" xs -> fmap Just $ do liftIO (try $ f $ fmap pValue xs) >>= \case diff --git a/src/Hasql/Queue/Low/AtLeastOnce.hs b/src/Hasql/Queue/Low/AtLeastOnce.hs index af920f9..87bec3f 100644 --- a/src/Hasql/Queue/Low/AtLeastOnce.hs +++ b/src/Hasql/Queue/Low/AtLeastOnce.hs @@ -22,6 +22,7 @@ import Control.Exception import Data.Function import Data.Text (Text) import Control.Monad.IO.Class +import Data.ByteString (ByteString) {-|Enqueue a list of payloads. -} @@ -51,6 +52,8 @@ withDequeue :: Text -- ^ Notification channel name. Any valid PostgreSQL identifier -> Connection -- ^ Connection + -> ByteString + -- ^ Optional filter -> D.Value a -- ^ Payload decoder -> Int @@ -101,6 +104,8 @@ withDequeueWith :: forall e a b -- ^ Notification channel name. Any valid PostgreSQL identifier -> Connection -- ^ Connection + -> ByteString + -- ^ Optional filter -> D.Value a -- ^ Payload decoder -> Int @@ -110,8 +115,8 @@ withDequeueWith :: forall e a b -> ([a] -> IO b) -- ^ Continuation -> IO b -withDequeueWith withNotifyHandlers channel conn decoder retryCount count f = (fix $ \restart i -> do - let action = I.withDequeue decoder retryCount count f >>= \case +withDequeueWith withNotifyHandlers channel conn theFilter decoder retryCount count f = (fix $ \restart i -> do + let action = I.withDequeue theFilter decoder retryCount count f >>= \case Nothing -> liftIO $ throwIO I.NoRows Just x -> pure x diff --git a/test/Hasql/Queue/High/AtLeastOnceSpec.hs b/test/Hasql/Queue/High/AtLeastOnceSpec.hs index 1f5dfce..6129f8d 100644 --- a/test/Hasql/Queue/High/AtLeastOnceSpec.hs +++ b/test/Hasql/Queue/High/AtLeastOnceSpec.hs @@ -18,6 +18,19 @@ data FailedwithDequeue = FailedwithDequeue instance Exception FailedwithDequeue +withDequeueNoFilter :: Connection + -- ^ Connection + -> D.Value a + -- ^ Payload decoder + -> Int + -- ^ Retry count + -> Int + -- ^ Element count + -> ([a] -> IO b) + -- ^ Continuation + -> IO (Maybe b) +withDequeueNoFilter c = withDequeue c "" + getPayload :: Connection -> D.Value a -> I.PayloadId -> IO (Maybe (I.Payload a)) getPayload conn decoder payloadId = I.runThrow (I.getPayload decoder payloadId) conn @@ -26,56 +39,56 @@ spec = describe "Hasql.Queue.High.AtLeastOnce" $ parallel $ do sequential $ aroundAll withSetup $ describe "enqueue/dequeue" $ do it "enqueue nothing gives nothing" $ withConnection $ \conn -> do enqueue conn E.int4 [] - withDequeue conn D.int4 1 1 pure `shouldReturn` Nothing + withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Nothing it "enqueue 1 gives 1" $ withConnection $ \conn -> do enqueue conn E.int4 [1] - withDequeue conn D.int4 1 1 pure `shouldReturn` Just [1] + withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Just [1] it "dequeue give nothing after enqueueing everything" $ withConnection $ \conn -> do - withDequeue conn D.int4 1 1 pure `shouldReturn` Nothing + withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Nothing it "dequeueing is in FIFO order" $ withConnection $ \conn -> do enqueue conn E.int4 [1] enqueue conn E.int4 [2] - withDequeue conn D.int4 1 1 pure `shouldReturn` Just [1] - withDequeue conn D.int4 1 1 pure `shouldReturn` Just [2] + withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Just [1] + withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Just [2] it "dequeueing a batch of elements works" $ withConnection $ \conn -> do enqueue conn E.int4 [1, 2, 3] - withDequeue conn D.int4 1 2 pure `shouldReturn` Just [1, 2] + withDequeueNoFilter conn D.int4 1 2 pure `shouldReturn` Just [1, 2] - withDequeue conn D.int4 1 2 pure `shouldReturn` Just [3] + withDequeueNoFilter conn D.int4 1 2 pure `shouldReturn` Just [3] - it "withDequeue fails if a non IOError is thrown" $ withConnection $ \conn -> do + it "withDequeueNoFilter fails if a non IOError is thrown" $ withConnection $ \conn -> do enqueue conn E.int4 [1] handle (\FailedwithDequeue -> pure Nothing) $ - withDequeue conn D.int4 2 1 $ \_ -> throwIO FailedwithDequeue + withDequeueNoFilter conn D.int4 2 1 $ \_ -> throwIO FailedwithDequeue failures conn D.int4 Nothing 1 `shouldReturn` [] - withDequeue conn D.int4 0 1 pure `shouldReturn` Just [1] + withDequeueNoFilter conn D.int4 0 1 pure `shouldReturn` Just [1] - it "withDequeue fails if throws occur and retry is zero" $ withConnection $ \conn -> do + it "withDequeueNoFilter fails if throws occur and retry is zero" $ withConnection $ \conn -> do enqueue conn E.int4 [1] handle (\(_ :: IOError) -> pure Nothing) $ - withDequeue conn D.int4 0 1 $ \_ -> throwIO $ userError "hey" + withDequeueNoFilter conn D.int4 0 1 $ \_ -> throwIO $ userError "hey" [(pId, x)] <- failures conn D.int4 Nothing 1 x `shouldBe` 1 delete conn [pId] - it "withDequeue succeeds even if the first attempt fails" $ withConnection $ \conn -> do + it "withDequeueNoFilter succeeds even if the first attempt fails" $ withConnection $ \conn -> do enqueue conn E.int4 [1] ref <- newIORef (0 :: Int) - withDequeue conn D.int4 1 1 (\_ -> do + withDequeueNoFilter conn D.int4 1 1 (\_ -> do count <- readIORef ref writeIORef ref $ count + 1 when (count < 1) $ throwIO $ userError "hey" pure '!') `shouldReturn` Just '!' - withDequeue conn D.int4 1 1 pure `shouldReturn` Nothing + withDequeueNoFilter conn D.int4 1 1 pure `shouldReturn` Nothing readIORef ref `shouldReturn` 2 it "failures paging works" $ withConnection $ \conn -> do @@ -83,9 +96,9 @@ spec = describe "Hasql.Queue.High.AtLeastOnce" $ parallel $ do enqueue conn E.int4 [3] handle (\(_ :: IOError) -> pure Nothing) $ - withDequeue conn D.int4 0 1 $ \_ -> throwIO $ userError "fds" + withDequeueNoFilter conn D.int4 0 1 $ \_ -> throwIO $ userError "fds" handle (\(_ :: IOError) -> pure Nothing) $ - withDequeue conn D.int4 0 1 $ \_ -> throwIO $ userError "fds" + withDequeueNoFilter conn D.int4 0 1 $ \_ -> throwIO $ userError "fds" [(next, x)] <- failures conn D.int4 Nothing 1 x `shouldBe` 2 diff --git a/test/Hasql/Queue/Low/AtLeastOnceSpec.hs b/test/Hasql/Queue/Low/AtLeastOnceSpec.hs index 4c12f3b..3115148 100644 --- a/test/Hasql/Queue/Low/AtLeastOnceSpec.hs +++ b/test/Hasql/Queue/Low/AtLeastOnceSpec.hs @@ -26,6 +26,21 @@ import Hasql.Queue.Internal (Payload (..)) import Hasql.Queue.TestUtils import System.Timeout +withDequeueNoFilter :: Text + -- ^ Notification channel name. Any valid PostgreSQL identifier + -> Connection + -- ^ Connection + -> D.Value a + -- ^ Payload decoder + -> Int + -- ^ Retry count + -> Int + -- ^ Element count + -> ([a] -> IO b) + -- ^ Continuation + -> IO b +withDequeueNoFilter t c = withDequeue t c "" + getCount :: Connection -> IO Int64 getCount = I.runThrow I.getCount @@ -44,33 +59,33 @@ spec :: Spec spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe "enqueue/withDequeue" $ do it "enqueue nothing timesout" $ withConnection $ \conn -> do enqueue channel conn E.int4 [] - timeout 100000 (withDequeue channel conn D.int4 1 1 pure) `shouldReturn` Nothing + timeout 100000 (withDequeueNoFilter channel conn D.int4 1 1 pure) `shouldReturn` Nothing it "enqueue 1 gives 1" $ withConnection $ \conn -> do enqueue channel conn E.int4 [1] - withDequeue channel conn D.int4 1 1 pure `shouldReturn` [1] + withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [1] it "dequeue timesout after enqueueing everything" $ withConnection $ \conn -> do - timeout 100000 (withDequeue channel conn D.int4 1 1 pure) `shouldReturn` Nothing + timeout 100000 (withDequeueNoFilter channel conn D.int4 1 1 pure) `shouldReturn` Nothing it "dequeueing is in FIFO order" $ withConnection $ \conn -> do enqueue channel conn E.int4 [1] enqueue channel conn E.int4 [2] - withDequeue channel conn D.int4 1 1 pure `shouldReturn` [1] - withDequeue channel conn D.int4 1 1 pure `shouldReturn` [2] + withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [1] + withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [2] it "dequeueing a batch of elements works" $ withConnection $ \conn -> do enqueue channel conn E.int4 [1, 2, 3] - withDequeue channel conn D.int4 1 2 pure `shouldReturn` [1, 2] + withDequeueNoFilter channel conn D.int4 1 2 pure `shouldReturn` [1, 2] - withDequeue channel conn D.int4 1 1 pure `shouldReturn` [3] + withDequeueNoFilter channel conn D.int4 1 1 pure `shouldReturn` [3] - it "withDequeue blocks until something is enqueued: before" $ withConnection $ \conn -> do + it "withDequeueNoFilter blocks until something is enqueued: before" $ withConnection $ \conn -> do void $ enqueue channel conn E.int4 [1] - res <- withDequeue channel conn D.int4 1 1 pure + res <- withDequeueNoFilter channel conn D.int4 1 1 pure res `shouldBe` [1] - it "withDequeue blocks until something is enqueued: during" $ withConnection $ \conn -> do + it "withDequeueNoFilter blocks until something is enqueued: during" $ withConnection $ \conn -> do afterActionMVar <- newEmptyMVar beforeNotifyMVar <- newEmptyMVar @@ -80,7 +95,7 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe " } -- This is the definition of IO.dequeue - resultThread <- async $ withDequeueWith @IOError handlers channel conn D.int4 1 1 pure + resultThread <- async $ withDequeueWith @IOError handlers channel conn "" D.int4 1 1 pure takeMVar afterActionMVar void $ enqueue "hey" conn E.int4 [1] @@ -89,8 +104,8 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe " wait resultThread `shouldReturn` [1] - it "withDequeue blocks until something is enqueued: after" $ withConnection2 $ \(conn1, conn2) -> do - thread <- async $ withDequeue channel conn1 D.int4 1 1 pure + it "withDequeueNoFilter blocks until something is enqueued: after" $ withConnection2 $ \(conn1, conn2) -> do + thread <- async $ withDequeueNoFilter channel conn1 D.int4 1 1 pure timeout 100000 (wait thread) `shouldReturn` Nothing enqueue channel conn2 E.int4 [1] @@ -98,19 +113,19 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe " wait thread `shouldReturn` [1] -- TODO redo just using failures - it "withDequeue fails and sets the retries to +1" $ withConnection $ \conn -> do + it "withDequeueNoFilter fails and sets the retries to +1" $ withConnection $ \conn -> do enqueue channel conn E.int4 [1] - handle (\(_ :: IOError) -> pure ()) $ withDequeue channel conn D.int4 0 1 $ \_ -> throwIO $ userError "hey" + handle (\(_ :: IOError) -> pure ()) $ withDequeueNoFilter channel conn D.int4 0 1 $ \_ -> throwIO $ userError "hey" xs <- failures conn D.int4 Nothing 1 map snd xs `shouldBe` [1] - it "withDequeue succeeds even if the first attempt fails" $ withConnection $ \conn -> do + it "withDequeueNoFilter succeeds even if the first attempt fails" $ withConnection $ \conn -> do [payloadId] <- I.runThrow (I.enqueuePayload E.int4 [1]) conn ref <- newIORef (0 :: Int) - withDequeueWith @FailedwithDequeue mempty channel conn D.int4 1 1 (\_ -> do + withDequeueWith @FailedwithDequeue mempty channel conn "" D.int4 1 1 (\_ -> do count <- readIORef ref writeIORef ref $ count + 1 when (count < 1) $ throwIO FailedwithDequeue @@ -126,7 +141,7 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe " ref <- newTVarIO [] loopThreads <- replicateM 35 $ async $ withPool' $ \c -> fix $ \next -> do - lastCount <- withDequeue channel c D.int4 1 1 $ \[x] -> do + lastCount <- withDequeueNoFilter channel c D.int4 1 1 $ \[x] -> do atomically $ do xs <- readTVar ref writeTVar ref $ x : xs