NIFI-15145: Add RecordLookup, KeyValueLookup, and MapCacheClient Services for Couchbase bundle#10467
NIFI-15145: Add RecordLookup, KeyValueLookup, and MapCacheClient Services for Couchbase bundle#10467mark-bathori wants to merge 2 commits intoapache:mainfrom
Conversation
|
any updates on when this PR will be reviewed? |
|
@mark-bathori Do you need any help to resolve the conflicts? I can push a pr to your branch. |
|
@deniswsrosa The PR has been abandoned due to lack of reviewers. I'm reviewing it now and it should be merged soon. |
exceptionfactory
left a comment
There was a problem hiding this comment.
@mark-bathori and @turcsanyip, raising a general question, is it necessary to have all of the added components? Map Cache Client, Key Value Lookup, and Record Lookup are all for different use cases. If you have confirmed usage of all three, that's good, but just raising the question as opposed to simply carrying over what previously existed.
exceptionfactory
left a comment
There was a problem hiding this comment.
@turcsanyip I will hold off on further comments until you have completed your review. On a very cursory scan, I notice a handful of mostly minor concerns related to exception handling, such as lack of detailed messages and exceptions that are never logged in some cases.
Answering the main question about component usage would help guide where to focus review cycles.
Thanks for the work on this!
|
If you all need any help, just let me know, I can also push some fixes.
…On Fri, Jan 16, 2026 at 3:50 PM David Handermann ***@***.***> wrote:
***@***.**** commented on this pull request.
@turcsanyip <https://github.com/turcsanyip> I will hold off on further
comments until you have completed your review. On a very cursory scan, I
notice a handful of mostly minor concerns related to exception handling,
such as lack of detailed messages and exceptions that are never logged in
some cases.
Answering the main question about component usage would help guide where
to focus review cycles.
Thanks for the work on this!
—
Reply to this email directly, view it on GitHub
<#10467 (review)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AABUXRQQTSJWABVQAJCBU3T4HD3D7AVCNFSM6AAAAACKKSEVCSVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZTMNZRGIZTKNRSGI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
|
@exceptionfactory @turcsanyip We could push another PR with the changes requested in this PR if needed. |
turcsanyip
left a comment
There was a problem hiding this comment.
@mark-bathori Please find my review comments below.
Please also rebase the branch to main and update the old 2.7.0 version references.
| <groupId>org.apache.nifi</groupId> | ||
| <artifactId>nifi-distributed-cache-client-service-api</artifactId> | ||
| <scope>compile</scope> |
There was a problem hiding this comment.
The service API dependency should be provided (no scope definition is needed but it can be inherited from dependency management).
| private static final String DEFAULT_SCOPE = "_default"; | ||
| private static final String DEFAULT_COLLECTION = "_default"; | ||
|
|
||
| protected CouchbaseClient couchbaseClient; |
There was a problem hiding this comment.
The field should be volatile and please move it after the static property definitions below.
Please also fix the same in AbstractCouchbaseProcessor.
| @CapabilityDescription("Lookup a string value from Couchbase Server associated with the specified key. The coordinates that are passed to the lookup must contain the key 'key'.") | ||
| public class CouchbaseKeyValueLookupService extends AbstractCouchbaseService implements StringLookupService { | ||
|
|
||
| private volatile String subDocPath; |
There was a problem hiding this comment.
Please move it after the static members.
| final String scopeName = context.getProperty(SCOPE_NAME).evaluateAttributeExpressions().getValue(); | ||
| final String collectionName = context.getProperty(COLLECTION_NAME).evaluateAttributeExpressions().getValue(); | ||
|
|
||
| return new CouchbaseContext(bucketName, scopeName, collectionName, DocumentType.JSON); |
There was a problem hiding this comment.
Due to the fixed document type, CouchbaseKeyValueLookupService and CouchbaseMapCacheClient can only handle Json documents. They should also be able to process binary (raw) type too.
There was a problem hiding this comment.
I have only added support for JSON, as using binary types with the current supporting services does not seem practical.
The CouchbaseRecordLookupService needs to return a Record type.
The CouchbaseKeyValueLookupService requires a structured path to perform lookups within the document.
I am unsure if using the CouchbaseMapCacheClient to cache binary data is a valid scenario. However, I am open to adding binary support if you see a specific use case for it.
There was a problem hiding this comment.
@mark-bathori I believe raw (non-json) string data is valid. CouchbaseRecordLookupService really needs Json but Lookup Sub-Document Path is optional in CouchbaseKeyValueLookupService. DistributedMapCacheClient implementations should also be able to handle plain strings.
There was a problem hiding this comment.
thanks for your insight. I'll add the Binary type support for the new services.
| final byte[] document = serializeDocument(entry.getValue(), valueSerializer); | ||
|
|
||
| try { | ||
| couchbaseClient.replaceDocument(documentId, document); |
There was a problem hiding this comment.
AtomicDistributedMapCacheClient interface expects to apply optimistic locking so AtomicCacheEntry.version should be passed as the CAS value to Couchbase if it is available.
| final byte[] document = serializeDocument(entry.getValue(), valueSerializer); | ||
|
|
||
| try { | ||
| couchbaseClient.replaceDocument(documentId, document); |
There was a problem hiding this comment.
Though the API specification of the replace() method says "Replace an existing key with new value", the only client of the method expects insert logic as well. So we may consider adding that logic too (as it was in the old implementation). I suggest checking the other replace() implementations to get an idea how it should work.
There was a problem hiding this comment.
I also find this conflicting. The AtomicDistributedMapCacheClient documentation explicitly states that 'if the revision does not match with the one in the cache storage, value will not be replaced' and that the method returns 'true only if the key is replaced.'
However, despite this, the legacy implementation includes an insert step, as you noted. I am unsure whether we should correct this to strictly follow the documentation or maintain the existing (potentially incorrect) behavior.
| try { | ||
| final CouchbaseGetResult result = couchbaseClient.getDocument(documentId); | ||
| return new AtomicCacheEntry<>(key, deserializeDocument(valueDeserializer, result.resultContent()), result.cas()); | ||
| } catch (CouchbaseException e) { |
There was a problem hiding this comment.
Only DocumentNotFoundException should be handled gracefully. Other exceptions should be wrapped into IOException and thrown.
Similarly: replace(), remove(), get()
DocumentExistsException: putIfAbsent()
| if (containsKey(key, keySerializer)) { | ||
| return get(key, keySerializer, valueDeserializer); | ||
| } |
There was a problem hiding this comment.
The cache entry can be deleted or expired after containsKey() and get() may return null. So it would be more error-proof to just call get() and return if something is found.
| return get(key, keySerializer, valueDeserializer); | ||
| } | ||
|
|
||
| put(key, value, keySerializer, valueSerializer); |
There was a problem hiding this comment.
Based on the method name shouldn't it be putIfAbsent()?
| } | ||
|
|
||
| @Override | ||
| public CouchbaseLookupInResult lookUpIn(String documentId, String subDocPath) throws CouchbaseException { |
There was a problem hiding this comment.
Based on the Couchbase Java client type and method names, the correct spelling would be: lookupIn()
|
Thanks @turcsanyip for the review. I'll go through your comments and make the requested changes. |
3693b51 to
8218476
Compare
…ices for Couchbase bundle
8218476 to
21e98af
Compare
|
Will the project pom.xml (not the bundle ones) be updated to include couchbase connector into nifi as part of this PR or as a separate change? |
The Couchbase NAR is rather large, given the extensive shaded content of the client library. For this reason, it is not included in the standard |
Summary
NIFI-15145
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000NIFI-00000Pull Request Formatting
mainbranchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
./mvnw clean install -P contrib-checkLicensing
LICENSEandNOTICEfilesDocumentation