From c4ff38e83aa8e331600e5dde16b0c49d0dbe14f4 Mon Sep 17 00:00:00 2001 From: Jose Lorenzo Rodriguez Date: Wed, 28 Jul 2021 10:57:22 +0200 Subject: [PATCH] Extract important queries to postgres functions The motivating use case for extracting these functions is that it gives more control to the developer using this package on how to deploy this to their infrastructure. For instance, some people may be interested in a different table structure for `payloads`, like having it partitioned. With this change, developers are free to change the implementation of the postgres functions as they see fit as long as they implement an equivalent logic and return the same types. --- hasql-queue.cabal | 53 ++++++++++++++++---- package.yaml | 2 +- src/Hasql/Queue/High/ExactlyOnce.hs | 23 ++------- src/Hasql/Queue/Internal.hs | 42 ++++------------ src/Hasql/Queue/Migrate.hs | 64 +++++++++++++++++++++++++ test/Hasql/Queue/Low/AtLeastOnceSpec.hs | 4 +- 6 files changed, 123 insertions(+), 65 deletions(-) diff --git a/hasql-queue.cabal b/hasql-queue.cabal index db09027..fd1f279 100644 --- a/hasql-queue.cabal +++ b/hasql-queue.cabal @@ -1,13 +1,13 @@ cabal-version: 1.12 --- This file has been generated from package.yaml by hpack version 0.31.2. +-- This file has been generated from package.yaml by hpack version 0.34.4. -- -- see: https://github.com/sol/hpack -- --- hash: 956ae93525f9dafcc0c9c8149cd2bbc8cfcfe4e63310adec92ce40f995e4cbf4 +-- hash: 30a78bb71c0fb6470ad0d6b6788b23f19801ab253d1c65e008a48e329e01b914 name: hasql-queue -version: 1.2.0.1 +version: 1.3.0.0 synopsis: A PostgreSQL backed queue description: A PostgreSQL backed queue. Please see README.md category: Web @@ -18,7 +18,8 @@ maintainer: jonathangfischoff@gmail.com copyright: 2020 Jonathan Fischoff license: BSD3 license-file: LICENSE -tested-with: GHC ==8.8.1 +tested-with: + GHC ==8.8.1 build-type: Simple extra-source-files: README.md @@ -42,7 +43,16 @@ library Paths_hasql_queue hs-source-dirs: src - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls build-depends: aeson @@ -67,7 +77,16 @@ executable benchmark Paths_hasql_queue hs-source-dirs: benchmarks - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N build-depends: aeson @@ -98,7 +117,16 @@ executable hasql-queue-tmp-db Paths_hasql_queue hs-source-dirs: hasql-queue-tmp-db - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N -g2 build-depends: aeson @@ -137,7 +165,16 @@ test-suite unit-tests Paths_hasql_queue hs-source-dirs: test - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N build-depends: aeson diff --git a/package.yaml b/package.yaml index eb36226..54c8d22 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: hasql-queue -version: '1.2.0.2' +version: '1.3.0.0' synopsis: A PostgreSQL backed queue description: A PostgreSQL backed queue. Please see README.md category: Web diff --git a/src/Hasql/Queue/High/ExactlyOnce.hs b/src/Hasql/Queue/High/ExactlyOnce.hs index 42eb999..d6f0390 100644 --- a/src/Hasql/Queue/High/ExactlyOnce.hs +++ b/src/Hasql/Queue/High/ExactlyOnce.hs @@ -58,30 +58,13 @@ dequeue valueDecoder count | count <= 0 = pure [] | otherwise = do let multipleQuery = [here| - DELETE FROM payloads - WHERE id in - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT $1 - ) - RETURNING value + SELECT value FROM dequeue_payload($1) |] + multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4 singleQuery = [here| - DELETE FROM payloads - WHERE id = - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - RETURNING value + SELECT value FROM dequeue_payload(1) |] singleEncoder = mempty diff --git a/src/Hasql/Queue/Internal.hs b/src/Hasql/Queue/Internal.hs index e5c8106..19229fd 100644 --- a/src/Hasql/Queue/Internal.hs +++ b/src/Hasql/Queue/Internal.hs @@ -62,10 +62,6 @@ newtype PayloadId = PayloadId { unPayloadId :: Int64 } data Payload a = Payload { pId :: PayloadId , pState :: State - -- TODO do I need this? - , pAttempts :: Int - , pModifiedAt :: Int - -- TODO rename. I don't need this either. , pValue :: a } deriving (Show, Eq) @@ -75,8 +71,6 @@ payloadDecoder thePayloadDecoder = Payload <$> payloadIdRow <*> D.column (D.nonNullable stateDecoder) - <*> D.column (D.nonNullable $ fromIntegral <$> D.int4) - <*> D.column (D.nonNullable $ fromIntegral <$> D.int4) <*> D.column (D.nonNullable thePayloadDecoder) payloadIdEncoder :: E.Value PayloadId @@ -92,9 +86,7 @@ payloadIdRow = D.column (D.nonNullable payloadIdDecoder) enqueuePayload :: E.Value a -> [a] -> Session [PayloadId] enqueuePayload theEncoder values = do let theQuery = [here| - INSERT INTO payloads (attempts, value) - SELECT 0, * FROM unnest($1) - RETURNING id + SELECT id FROM enqueue_payload($1) |] encoder = E.param $ E.nonNullable $ E.foldableArray $ E.nonNullable theEncoder decoder = D.rowList (D.column (D.nonNullable payloadIdDecoder)) @@ -105,30 +97,15 @@ enqueuePayload theEncoder values = do dequeuePayload :: D.Value a -> Int -> Session [Payload a] dequeuePayload valueDecoder count = do let multipleQuery = [here| - DELETE FROM payloads - WHERE id in - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT $1 - ) - RETURNING id, state, attempts, modified_at, value + SELECT id, state, value + FROM dequeue_payload($1) |] + multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4 singleQuery = [here| - DELETE FROM payloads - WHERE id = - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - RETURNING id, state, attempts, modified_at, value + SELECT id, state, value + FROM dequeue_payload(1) |] singleEncoder = mempty @@ -144,7 +121,7 @@ dequeuePayload valueDecoder count = do getPayload :: D.Value a -> PayloadId -> Session (Maybe (Payload a)) getPayload decoder payloadId = do let theQuery = [here| - SELECT id, state, attempts, modified_at, value + SELECT id, state, value FROM payloads WHERE id = $1 |] @@ -168,10 +145,7 @@ getCount = do incrementAttempts :: Int -> [PayloadId] -> Session () incrementAttempts retryCount pids = do let theQuery = [here| - UPDATE payloads - SET state=CASE WHEN attempts >= $1 THEN 'failed' :: state_t ELSE 'enqueued' END - , attempts=attempts+1 - WHERE id = ANY($2) + SELECT increment_payload_attempts($1, $2) |] encoder = (fst >$< E.param (E.nonNullable E.int4)) <> (snd >$< E.param (E.nonNullable $ E.foldableArray $ E.nonNullable payloadIdEncoder)) diff --git a/src/Hasql/Queue/Migrate.hs b/src/Hasql/Queue/Migrate.hs index 821fba8..d560b59 100644 --- a/src/Hasql/Queue/Migrate.hs +++ b/src/Hasql/Queue/Migrate.hs @@ -62,6 +62,36 @@ migrationQueryString valueType = [i| CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at) WHERE (state = 'enqueued'); + CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS + $$ + WITH available AS + ( SELECT p1.id + FROM payloads AS p1 + WHERE p1.state='enqueued' + ORDER BY p1.modified_at ASC + FOR UPDATE SKIP LOCKED + LIMIT limit_ + ) + DELETE FROM payloads + USING available + WHERE payloads.id = available.id + RETURNING payloads.* + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS + $$ + UPDATE payloads + SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END + , attempts=attempts+1 + WHERE id = ANY(ids_) + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS + $$ + INSERT INTO payloads (attempts, value) + SELECT 0, * FROM unnest(values_) + RETURNING * + $$ LANGUAGE SQL VOLATILE; |] {-| This function creates a table and enumeration type that is @@ -106,6 +136,37 @@ migrationQueryString valueType = [i| CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at, state) WHERE (state = 'enqueued'); + + CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS + $$ + WITH available AS + ( SELECT p1.id + FROM payloads AS p1 + WHERE p1.state='enqueued' + ORDER BY p1.modified_at ASC + FOR UPDATE SKIP LOCKED + LIMIT limit_ + ) + DELETE FROM payloads + USING available + WHERE payloads.id = available.id + RETURNING payloads.* + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS + $$ + UPDATE payloads + SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END + , attempts=attempts+1 + WHERE id = ANY(ids_) + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS + $$ + INSERT INTO payloads (attempts, value) + SELECT 0, * FROM unnest(values_) + RETURNING * + $$ LANGUAGE SQL VOLATILE; @ The @VALUE_TYPE@ needs to passed in through the second argument. @@ -123,6 +184,9 @@ Drop everything created by 'migrate' teardown :: Connection -> IO () teardown conn = do let theQuery = [i| + DROP FUNCTION IF EXISTS enqueue_payload; + DROP FUNCTION IF EXISTS dequeue_payload; + DROP FUNCTION IF EXISTS increment_payload_attempts; DROP TABLE IF EXISTS payloads; DROP TYPE IF EXISTS state_t; DROP SEQUENCE IF EXISTS modified_index; diff --git a/test/Hasql/Queue/Low/AtLeastOnceSpec.hs b/test/Hasql/Queue/Low/AtLeastOnceSpec.hs index 4c12f3b..33b268a 100644 --- a/test/Hasql/Queue/Low/AtLeastOnceSpec.hs +++ b/test/Hasql/Queue/Low/AtLeastOnceSpec.hs @@ -142,8 +142,8 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe " let Just decoded = mapM (decode . encode) xs sort decoded `shouldBe` sort expected - it "enqueue returns a PayloadId that cooresponds to the entry it added" $ withConnection $ \conn -> do + it "enqueue returns a PayloadId that corresponds to the entry it added" $ withConnection $ \conn -> do [payloadId] <- I.runThrow (I.enqueuePayload E.int4 [1]) conn Just actual <- getPayload conn D.int4 payloadId - pValue actual `shouldBe` 1 + pId actual `shouldBe` payloadId