Skip to content

Commit 5f0941f

Browse files
committed
add oracle validator before cdc launching
1 parent 9dfdb6c commit 5f0941f

5 files changed

Lines changed: 661 additions & 4 deletions

File tree

tis-datax/tis-datax-mongodb-plugin/src/main/java/com/qlangtech/tis/plugin/ds/mangodb/MangoDBDataSourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public TableInDB getTablesInDB() {
157157
});
158158
}
159159

160-
protected final <RESULT> RESULT vistMongoClient(Function<MongoClient, RESULT> consumer) {
160+
public final <RESULT> RESULT vistMongoClient(Function<MongoClient, RESULT> consumer) {
161161
try (MongoClient mongoClient = createMongoClient()) {
162162
return consumer.apply(mongoClient);
163163
}

tis-datax/tis-datax-odps-plugin/pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@
5151
<dependency>
5252
<groupId>com.aliyun.odps</groupId>
5353
<artifactId>odps-jdbc</artifactId>
54-
<version>3.3.6</version>
54+
<!-- <version>3.3.6</version>-->
55+
<version>3.10.1</version>
5556
</dependency>
5657
<dependency>
5758
<groupId>com.qlangtech.tis.plugins</groupId>

tis-incr/tis-flink-cdc-mongdb-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/mongdb/FlinkCDCMongoDBSourceFactory.java

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,23 @@
1818

1919
package com.qlangtech.plugins.incr.flink.cdc.mongdb;
2020

21+
import com.alibaba.citrus.turbine.Context;
2122
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
2223
import com.qlangtech.tis.annotation.Public;
23-
import com.qlangtech.tis.async.message.client.consumer.IConsumerHandle;
2424
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
2525
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
2626
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
27+
import com.qlangtech.tis.datax.DataXName;
28+
import com.qlangtech.tis.datax.impl.DataxReader;
2729
import com.qlangtech.tis.extension.TISExtension;
2830
import com.qlangtech.tis.plugin.IEndTypeGetter;
2931
import com.qlangtech.tis.plugin.annotation.FormField;
3032
import com.qlangtech.tis.plugin.annotation.FormFieldType;
3133
import com.qlangtech.tis.plugin.annotation.Validator;
34+
import com.qlangtech.tis.plugin.datax.DataXMongodbReader;
3235
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
36+
import com.qlangtech.tis.plugin.ds.mangodb.MangoDBDataSourceFactory;
37+
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
3338

3439
/**
3540
* https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/connectors/flink-sources/mongodb-cdc/
@@ -82,6 +87,122 @@ public String getDisplayName() {
8287
return "Flink-CDC-MongoDB";
8388
}
8489

90+
@Override
91+
protected boolean validateMQListenerForm(IControlMsgHandler msgHandler, Context context, MQListenerFactory sourceFactory) {
92+
DataXName pipe = msgHandler.getCollectionName();
93+
DataXMongodbReader mongoReader = (DataXMongodbReader) DataxReader.load(null, pipe.getPipelineName());
94+
MangoDBDataSourceFactory dsFactory = mongoReader.getDataSourceFactory();
95+
96+
try {
97+
// Validate MongoDB CDC prerequisites
98+
return dsFactory.vistMongoClient((mongoClient) -> {
99+
try {
100+
// 1. Check if MongoDB is running in Replica Set or Sharded Cluster mode
101+
validateReplicaSetMode(mongoClient, msgHandler, context);
102+
103+
// 2. Check user permissions for CDC operations
104+
validateCDCPermissions(mongoClient, dsFactory, msgHandler, context);
105+
106+
return true;
107+
} catch (Exception e) {
108+
msgHandler.addErrorMessage(context, "MongoDB CDC validation failed: " + e.getMessage());
109+
return false;
110+
}
111+
});
112+
} catch (Exception e) {
113+
msgHandler.addErrorMessage(context, "Failed to connect to MongoDB: " + e.getMessage());
114+
return false;
115+
}
116+
}
117+
118+
/**
119+
* Validate that MongoDB is running in Replica Set or Sharded Cluster mode
120+
* MongoDB CDC requires replica set mode to capture change events from oplog
121+
*/
122+
private void validateReplicaSetMode(com.mongodb.client.MongoClient mongoClient, IControlMsgHandler msgHandler, Context context) {
123+
try {
124+
com.mongodb.client.MongoDatabase adminDb = mongoClient.getDatabase("admin");
125+
org.bson.Document isMasterResult = adminDb.runCommand(new org.bson.Document("isMaster", 1));
126+
127+
// Check if it's a replica set member
128+
boolean isReplicaSet = isMasterResult.containsKey("setName");
129+
// Check if it's a sharded cluster (mongos)
130+
boolean isSharded = "isdbgrid".equals(isMasterResult.getString("msg"));
131+
132+
if (!isReplicaSet && !isSharded) {
133+
throw new IllegalStateException(
134+
"MongoDB CDC requires Replica Set or Sharded Cluster mode. " +
135+
"Current MongoDB instance is running in standalone mode. " +
136+
"Please configure MongoDB as a replica set to enable CDC functionality.");
137+
138+
}
139+
} catch (com.mongodb.MongoCommandException e) {
140+
throw new RuntimeException("Failed to execute isMaster command, please check MongoDB connection and permissions", e);
141+
}
142+
}
143+
144+
/**
145+
* Validate user has necessary permissions for CDC operations:
146+
* 1. Read access to local database (for oplog)
147+
* 2. Read access to target database
148+
* 3. Permission to execute changeStream operations
149+
*/
150+
private void validateCDCPermissions(com.mongodb.client.MongoClient mongoClient,
151+
MangoDBDataSourceFactory dsFactory,
152+
IControlMsgHandler msgHandler,
153+
Context context) {
154+
try {
155+
// Check permission to access local database (required for oplog access)
156+
com.mongodb.client.MongoDatabase localDb = mongoClient.getDatabase("local");
157+
try {
158+
// Try to list collections in local database
159+
localDb.listCollectionNames().first();
160+
} catch (com.mongodb.MongoCommandException e) {
161+
throw new IllegalStateException(
162+
"User does not have permission to read 'local' database. " +
163+
"MongoDB CDC requires read access to local database to capture change events. " +
164+
"Please grant the following role to user '" + dsFactory.getUserName() + "': " +
165+
"{ role: 'read', db: 'local' } or use the 'changeStream' role.");
166+
}
167+
168+
// Check permission to access target database
169+
com.mongodb.client.MongoDatabase targetDb = mongoClient.getDatabase(dsFactory.getDbName());
170+
try {
171+
// Try to list collections in target database
172+
targetDb.listCollectionNames().first();
173+
} catch (com.mongodb.MongoCommandException e) {
174+
throw new IllegalStateException(
175+
"User does not have permission to read database '" + dsFactory.getDbName() + "'. " +
176+
"Please grant read permission to user '" + dsFactory.getUserName() + "' on database '" + dsFactory.getDbName() + "'.");
177+
}
178+
179+
// Check if user can execute changeStream (try to create a dummy change stream)
180+
try {
181+
// Attempt to create a change stream on the target database
182+
// This validates that the user has the necessary permissions
183+
com.mongodb.client.ChangeStreamIterable<org.bson.Document> changeStream =
184+
targetDb.watch();
185+
// Close the cursor immediately, we just need to verify permissions
186+
changeStream.cursor().close();
187+
} catch (com.mongodb.MongoCommandException e) {
188+
// Check if it's a permission error
189+
if (e.getErrorCode() == 13) { // Unauthorized error code
190+
throw new IllegalStateException(
191+
"User does not have permission to execute changeStream operations. " +
192+
"Please grant the 'changeStream' privilege or 'read' role on database '" + dsFactory.getDbName() + "' " +
193+
"to user '" + dsFactory.getUserName() + "'.");
194+
}
195+
// Other errors might be acceptable (e.g., if the collection doesn't exist yet)
196+
}
197+
198+
} catch (IllegalStateException e) {
199+
// Re-throw IllegalStateException with our custom error messages
200+
throw e;
201+
} catch (Exception e) {
202+
throw new RuntimeException("Failed to validate CDC permissions: " + e.getMessage(), e);
203+
}
204+
}
205+
85206
@Override
86207
public PluginVender getVender() {
87208
return PluginVender.FLINK_CDC;

tis-incr/tis-flink-cdc-oracle-plugin/src/main/java/com/qlangtech/plugins/incr/flink/cdc/oracle/FlinkCDCOracleSourceFactory.java

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,28 +18,43 @@
1818

1919
package com.qlangtech.plugins.incr.flink.cdc.oracle;
2020

21+
import com.alibaba.citrus.turbine.Context;
2122
import com.google.common.collect.Lists;
2223
import com.qlangtech.plugins.incr.debuzium.DebuziumPropAssist;
2324
import com.qlangtech.plugins.incr.flink.cdc.FlinkCol;
25+
import com.qlangtech.plugins.incr.flink.cdc.oracle.OracleCDCValidator.ValidationResult;
2426
import com.qlangtech.tis.annotation.Public;
2527
import com.qlangtech.tis.async.message.client.consumer.IFlinkColCreator;
2628
import com.qlangtech.tis.async.message.client.consumer.IMQListener;
2729
import com.qlangtech.tis.async.message.client.consumer.impl.MQListenerFactory;
30+
import com.qlangtech.tis.datax.DataXName;
31+
import com.qlangtech.tis.datax.impl.DataxReader;
2832
import com.qlangtech.tis.extension.TISExtension;
2933
import com.qlangtech.tis.extension.util.AbstractPropAssist.Options;
3034
import com.qlangtech.tis.plugin.IEndTypeGetter;
3135
import com.qlangtech.tis.plugin.annotation.FormField;
3236
import com.qlangtech.tis.plugin.annotation.FormFieldType;
3337
import com.qlangtech.tis.plugin.annotation.Validator;
38+
import com.qlangtech.tis.plugin.datax.common.BasicDataXRdbmsReader;
39+
import com.qlangtech.tis.plugin.ds.BasicDataSourceFactory;
3440
import com.qlangtech.tis.plugin.ds.DataSourceMeta;
41+
import com.qlangtech.tis.plugin.ds.ISelectedTab;
42+
import com.qlangtech.tis.plugin.ds.JDBCConnection;
43+
import com.qlangtech.tis.plugin.ds.TableNotFoundException;
44+
import com.qlangtech.tis.runtime.module.misc.IControlMsgHandler;
3545
import io.debezium.config.Field;
3646
import io.debezium.connector.oracle.OracleConnectorConfig;
3747
import org.apache.commons.lang3.tuple.Triple;
3848
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
49+
import org.slf4j.Logger;
50+
import org.slf4j.LoggerFactory;
3951

52+
import java.sql.Connection;
53+
import java.sql.SQLException;
4054
import java.util.List;
4155
import java.util.function.BooleanSupplier;
4256
import java.util.function.Function;
57+
import java.util.stream.Collectors;
4358

4459
/**
4560
* Oracle监听踩坑记: https://mp.weixin.qq.com/s/IQiK7enF5fX0ighRE_i2sg
@@ -49,6 +64,7 @@
4964
**/
5065
@Public
5166
public class FlinkCDCOracleSourceFactory extends MQListenerFactory {
67+
private static final Logger logger = LoggerFactory.getLogger(FlinkCDCOracleSourceFactory.class);
5268

5369

5470
// opts.addFieldDescriptor("lob", OracleConnectorConfig.LOB_ENABLED);
@@ -103,6 +119,10 @@ public boolean getAsBoolean() {
103119
@FormField(ordinal = 5, type = FormFieldType.ENUM, validate = {Validator.require})
104120
public String failureHandle;
105121

122+
public OracleConnectorConfig.LogMiningStrategy parseMiningStrategy() {
123+
return OracleConnectorConfig.LogMiningStrategy.parse(this.miningStrategy);
124+
}
125+
106126

107127
/**
108128
* binlog监听在独立的slot中执行
@@ -146,6 +166,60 @@ public DefaultDescriptor() {
146166
}
147167
}
148168

169+
@Override
170+
protected boolean validateMQListenerForm(IControlMsgHandler msgHandler, Context context, MQListenerFactory sourceFactory) {
171+
DataXName pipe = msgHandler.getCollectionName();
172+
FlinkCDCOracleSourceFactory oracleCDC = (FlinkCDCOracleSourceFactory) sourceFactory;
173+
BasicDataXRdbmsReader reader = (BasicDataXRdbmsReader) DataxReader.load(null, pipe.getPipelineName());
174+
List<ISelectedTab> unfilledSelectedTabs = reader.getUnfilledSelectedTabs();
175+
176+
// Get table names from selected tabs
177+
List<String> tableNames = unfilledSelectedTabs.stream()
178+
.map(ISelectedTab::getName)
179+
.collect(Collectors.toList());
180+
181+
BasicDataSourceFactory dsFactory = (BasicDataSourceFactory) reader.getDataSourceFactory();
182+
183+
// Execute Oracle CDC pre-launch validation
184+
final boolean[] validationPassed = {true};
185+
186+
dsFactory.visitFirstConnection(new BasicDataSourceFactory.IConnProcessor() {
187+
@Override
188+
public void vist(JDBCConnection conn) throws SQLException, TableNotFoundException {
189+
Connection connection = conn.getConnection();
190+
191+
logger.info("Starting Oracle CDC validation, table count: {}", tableNames.size());
192+
193+
// Call validator to perform complete validation
194+
ValidationResult result = OracleCDCValidator.validate(
195+
connection,
196+
oracleCDC,
197+
tableNames
198+
);
199+
200+
// Add validation errors to msgHandler for frontend display
201+
if (!result.isValid()) {
202+
validationPassed[0] = false;
203+
for (String error : result.getErrors()) {
204+
msgHandler.addErrorMessage(context, error);
205+
}
206+
logger.error("Oracle CDC validation failed, error count: {}", result.getErrors().size());
207+
}
208+
209+
// Log warning messages
210+
for (String warning : result.getWarnings()) {
211+
logger.warn("Oracle CDC validation warning: {}", warning);
212+
}
213+
214+
if (result.isValid()) {
215+
logger.info("Oracle CDC validation passed");
216+
}
217+
}
218+
});
219+
220+
return validationPassed[0];
221+
}
222+
149223
@Override
150224
public String getDisplayName() {
151225
return "Flink-CDC-Oracle";
@@ -161,4 +235,4 @@ public IEndTypeGetter.EndType getEndType() {
161235
return IEndTypeGetter.EndType.Oracle;
162236
}
163237
}
164-
}
238+
}

0 commit comments

Comments
 (0)