Skip to content

Conversation

@Raveline
Copy link
Contributor

Proposal to ensure that an application running several different consumers doesn't crash if one of the job is unparseable for some reason.

I didn't find a way to plug this to consumer-monitoring, unfortunately. I guess warning on a high number of failure in consumers_job_execution_seconds, which despite the misleading name, includes the job_result is still the best way to go to detect this kind of issues. Or perhaps this is a good case for using logAttention.

@Raveline Raveline force-pushed the reenqueue-unparseable-job branch 2 times, most recently from d0354ec to c2310b7 Compare October 3, 2025 08:35
-- | A default implementation for ccOnFailedToFetchJob,
-- when the parsing of the row should never fail.
-- This will create a logAttention and reenqueue for the next day.
shouldNotFail :: MonadLog m => String -> idx -> m Action
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to defaultOnFailedToFetchJob and add logging for the index please.

-- ^ Fields needed to be selected from the jobs table in order to assemble a
-- job.
, ccJobFetcher :: !(row -> job)
, ccJobFetcher :: !(row -> Either (idx, String) job)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
, ccJobFetcher :: !(row -> Either (idx, String) job)
, ccJobFetcher :: !(row -> Either (idx, Text) job)

defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action
defaultOnFailedToFetchJob msg idx = do
logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "idx" A..= show idx]
pure . RerunAfter $ ihours 48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick on API: take an Interval instead of defaulting to a magic value. This way users of the library need to think about what makes sense for their job, one size does not fit all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is tricky, because this function is intended to be used only when you know that your job should always be parsed and could not fail. If I force users to think, it kinds of defeat its purpose.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While defaulting to a magic value is fine, I'd prefer idays 1 so you can see in logs every day something's up.

Copy link
Contributor

@jonathanjouty jonathanjouty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consumers_job_execution_seconds [...] despite the misleading name

Yes, there's a bit more info in our internal docs here, I should probably document it here too 🤔

I didn't find a way to plug this to consumer-monitoring, unfortunately.

You could wrap-around ccOnFailedToFetchJob here (in addition to ccProcessJob)?
https://github.com/scrive/consumers/blob/master/consumers-metrics-prometheus/src/Database/PostgreSQL/Consumers/Instrumented.hs#L238

We could add reporting into consumers_job_execution_seconds with 0 seconds and a new job_result label (hack), or add a dedicated counter for these failures (better).

-- ^ Action taken if fetching a job failed. It is advised to reenqueue the
-- job at a later date and emit a warning in such a case. This is mostly
-- to ensure the application using consumers won't fail completely when
-- this happens.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick: Add 'defaultOnFailedToFetchJob' Haddock link.

, ccNotificationChannel = Just "consumers_test_chan"
, -- select some small timeout
ccNotificationTimeout = 100 * 1000 -- 100 msec
, ccOnFailedToFetchJob = \_ _ -> pure . RerunAfter $ idays 14
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hypothetically, what happens if the test fails to parse a job? This silently reschedules and tests pass? I don't remember the test rig enough.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. (This is very hypothetical though, I don't see how it could happen: the payload is text; unless there's a slight discrepancy between the text formatting between PG and haskell, it's impossible for this to fail. This feature is really more meant for JSON deserialisation).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the test job can't fail to parse, just use defaultOnFailedToFetchJob here.

@Raveline Raveline force-pushed the reenqueue-unparseable-job branch from d546f1a to 712e537 Compare December 2, 2025 16:06
Copy link
Collaborator

@arybczak arybczak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix CI.

, "FOR UPDATE SKIP LOCKED"
]
stuckJobs <- fetchMany ccJobFetcher
stuckJobs <- rights <$> fetchMany ccJobFetcher
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this ignoring LeftS? Below code uses index only, you get an index with a Left 🤔

, "WHERE id IN (" <> reservedJobs now <> ")"
, "RETURNING" <+> mintercalate ", " ccJobSelectors
]
-- Decode lazily as we want the transaction to be as short as possible.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this removed? The comment is still accurate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still a diff.

pure (batchSize > 0)

reserveJobs :: Int -> m ([job], Int)
reserveJobs :: MonadCatch m => Int -> m ([Either (idx, T.Text) job], Int)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need MonadCatch here, it's a top level m with MonadMask constraint.


import Control.Exception (SomeException)
import Data.Aeson.Types qualified as A
import Data.Text
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import qualified or Text type only (I think this works).

defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action
defaultOnFailedToFetchJob msg idx = do
logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "idx" A..= show idx]
pure . RerunAfter $ ihours 48
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While defaulting to a magic value is fine, I'd prefer idays 1 so you can see in logs every day something's up.

, ccNotificationChannel = Just "consumers_test_chan"
, -- select some small timeout
ccNotificationTimeout = 100 * 1000 -- 100 msec
, ccOnFailedToFetchJob = \_ _ -> pure . RerunAfter $ idays 14
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the test job can't fail to parse, just use defaultOnFailedToFetchJob here.

-- This will create a logAttention and reenqueue, to be replayed in 2 days.
defaultOnFailedToFetchJob :: (MonadLog m, Show idx) => Text -> idx -> m Action
defaultOnFailedToFetchJob msg idx = do
logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "idx" A..= show idx]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "idx" A..= show idx]
logAttention "Unexpected unparseable job" $ A.object ["error" A..= msg, "job_id" A..= show idx]

@Raveline
Copy link
Contributor Author

CI is fixed and I addressed all your concerns (hopefully).


-- | A default implementation for ccOnFailedToFetchJob,
-- when the parsing of the row should never fail.
-- This will create a logAttention and reenqueue, to be replayed in 2 days.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 day

, "WHERE id IN (" <> reservedJobs now <> ")"
, "RETURNING" <+> mintercalate ", " ccJobSelectors
]
-- Decode lazily as we want the transaction to be as short as possible.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's still a diff.

action <-
lift $
either
(\(idx, t) -> ccOnFailedToFetchJob t idx)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That reminded me that we probably want to give ccOnFailedToFetchJob the same treatment we give to ccOnException here, i.e. have a safety net in case it itself throws due to a bug.

@Raveline Raveline force-pushed the reenqueue-unparseable-job branch from 4b08bf6 to 45c29c5 Compare December 19, 2025 08:47
@Raveline Raveline force-pushed the reenqueue-unparseable-job branch from 45c29c5 to 6f76b3c Compare December 19, 2025 11:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants