[vpj] Support running VTConsistencyCheckerJob via VenicePushJob#2805
[vpj] Support running VTConsistencyCheckerJob via VenicePushJob#2805Mohith22 wants to merge 1 commit into
Conversation
| /** | ||
| * When set to {@code true}, {@link com.linkedin.venice.hadoop.VenicePushJob#run()} skips the | ||
| * push and instead invokes {@link com.linkedin.venice.spark.consistency.VTConsistencyCheckerJob} | ||
| * against the store's current version topic. | ||
| */ | ||
| public static final String VT_CONSISTENCY_CHECK_ONLY = "vt.consistency.check.only"; |
There was a problem hiding this comment.
For heartbeat push jobs that want produce + verify in one run, the natural UX is: flip a single flag and the job does the push and then the consistency check. Today VT_CONSISTENCY_CHECK_ONLY=true is mutually exclusive with the push, so the user has to schedule a second VPJ run with the flag flipped. The second run also targets whatever the current version is at that later time, not necessarily the version just pushed.
A simpler shape: keep this flag for the "skip push, check only" path, and add a sibling vt.consistency.check.after.push that runs the checker as a follow-on phase inside the same run() after a successful push. Heartbeat jobs then flip one config and get produce + post-push consistency check in one invocation.
There was a problem hiding this comment.
I intentionally kept this separately because of two reasons:
- It doesn't really make sense to run this right after VPJ because we need nearline writes for this validation to make sense. This is more relevant for AA stores.
- I want it to be light weighted so we can trigger and quickly do the validation, instead of waiting for the VPJ to finish.
7b6450b to
8f9e7be
Compare
Problem Statement
VTConsistencyCheckerJob (recently introduced) is a Spark job that scans Version Topics across two DCs to detect cross-region inconsistencies. Currently, we integrate it via VBnP using HadoopJavaOperator which complicates the wiring process. We want to integrate it to VenicePushOperator so it can be seamlessly added to the airflow DAGs. Also, the spark job today doesn't fail the operator when inconsistencies are found, rather it silently ends by writing the errors to parquet. We need to throw an error, so the DAG step can be failed.
Solution
Code changes
Concurrency-Specific Checks
Both reviewer and PR author to verify
synchronized,RWLock) are used where needed.ConcurrentHashMap,CopyOnWriteArrayList).How was this PR tested?
Does this PR introduce any user-facing or breaking changes?