Skip to content

Commit 3485a72

Browse files
committed
fix bug for dataflow parser executor for powerjob distribute mode datavane/tis#442
1 parent c9c8dc5 commit 3485a72

10 files changed

Lines changed: 65 additions & 132 deletions

File tree

tis-datax/executor/powerjob-worker-samples/src/main/java/com/qlangtech/tis/datax/powerjob/TISTableJoinProcessor.java

Lines changed: 0 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -16,126 +16,12 @@ public class TISTableJoinProcessor extends BasicTISTableJoinProcessor implements
1616
static {
1717
setDataXExecutorDir();
1818
}
19-
// public static final String KEY_instanceParams = "instanceParams";
20-
// private static final Pattern PATTERN_PARTITION_PARAMS =
21-
// Pattern.compile("^" + ExecChainContextUtils.PARTITION_DATA_PARAMS + "_(.+?)$");
22-
// transient RpcServiceReference statusRpc;
23-
//
24-
// private static DataXJoinProcessConsumer createTableJoinConsumer() {
25-
// DataXJoinProcessConsumer joinProcessConsumer = new DataXJoinProcessConsumer(BasicTISTableDumpProcessor.createSysPramsSuppiler());
26-
// return joinProcessConsumer;
27-
// }
28-
//
29-
// private RpcServiceReference createRpcServiceReference() {
30-
// if (this.statusRpc != null) {
31-
// return this.statusRpc;
32-
// }
33-
// try {
34-
// this.statusRpc = StatusRpcClientFactory.getService(ITISCoordinator.create());
35-
// } catch (Exception e) {
36-
// throw new RuntimeException(e);
37-
// }
38-
// return this.statusRpc;
39-
// }
40-
4119

4220
@Override
4321
public ProcessResult process(TaskContext context) throws Exception {
4422

4523
this.process(new PowerJobTaskContext(context));
4624
return new ProcessResult(true);
47-
// RpcServiceReference rpcRef = createRpcServiceReference();
48-
// final OmsLogger logger = context.getOmsLogger();
49-
// StatusRpcClientFactory.AssembleSvcCompsite feedback = rpcRef.get();
50-
// // JSONObject instanceParams = JSONObject.parseObject(context.getInstanceParams());
51-
// // DefaultExecContext execContext = IExecChainContext.deserializeInstanceParams(instanceParams);
52-
// // execContext.setResType(StoreResourceType.DataFlow);
53-
// // execContext.setWorkflowName(execContext.getIndexName());
54-
// SqlTaskNodeMeta sqlTask =
55-
// SqlTaskNodeMeta.deserializeTaskNode(ISqlTask.toCfg(JSON.parseObject(context.getJobParams())));
56-
// DefaultExecContext execContext = createDftExecContent(context);
57-
// try {
58-
//
59-
//
60-
// // IDataxProcessor dataxProc = execContext.getProcessor();
61-
// // IPartionableWarehouse partionableWarehouse =
62-
// // IDataxWriter.getPartionableWarehouse(dataxProc.getWriter(null));
63-
// //
64-
// // Matcher matcher = null;
65-
// //
66-
// // for (Map.Entry<String, Object> entry : instanceParams.entrySet()) {
67-
// // Object val = entry.getValue();
68-
// // try {
69-
// // matcher = PATTERN_PARTITION_PARAMS.matcher(entry.getKey());
70-
// // if (matcher.matches()) {
71-
// // execContext.putTablePt(EntityName.parse(matcher.group(1)),
72-
// // () -> partionableWarehouse.getPsFormat().format(Long.parseLong(String
73-
// // .valueOf(val))));
74-
// // }
75-
// // } catch (Exception e) {
76-
// // throw new RuntimeException("key:" + entry.getKey() + ",val:" + entry.getValue(), e);
77-
// // }
78-
// // }
79-
// if (TisAppLaunch.isTestMock()) {
80-
// DataXJoinProcessExecutor.executeJoin(feedback, execContext, sqlTask);
81-
// } else {
82-
// // 另外单独启一个进程来执行
83-
// DataXJoinProcessConsumer tableJoinConsumer = createTableJoinConsumer();
84-
//
85-
// WorkflowHookMsg wfHookMsg = WorkflowHookMsg.create(sqlTask, execContext, sqlTask.getExportName());
86-
//
87-
// tableJoinConsumer.consumeMessage(wfHookMsg);
88-
// }
89-
// TISTableDumpProcessor.addSuccessPartition(context, execContext, sqlTask.getExportName());
90-
// return new ProcessResult(true);
91-
//
92-
// } catch (StatusRuntimeException e) {
93-
// rpcRef.reConnect();
94-
// throw e;
95-
// } catch (Exception e) {
96-
// Throwable rootCause = ExceptionUtils.getRootCause(e);
97-
//
98-
// RpcUtils.setJoinStatus(execContext.getTaskId(), true, true, feedback, sqlTask.getExportName());
99-
//
100-
// feedback.appendLog(LoggingEvent.Level.ERROR, execContext.getTaskId(), Optional.empty(),
101-
// rootCause != null ? ExceptionUtils.getStackTrace(rootCause) : ExceptionUtils.getStackTrace(e));
102-
// throw new RuntimeException(e);
103-
// }
104-
10525
}
10626

107-
// private DefaultExecContext createDftExecContent(TaskContext context) {
108-
// JSONObject instanceParams = JSONObject.parseObject(context.getInstanceParams());
109-
// final CfgsSnapshotConsumer snapshotConsumer = new CfgsSnapshotConsumer();
110-
// DefaultExecContext execContext = IExecChainContext.deserializeInstanceParams(instanceParams, (ctx) -> {
111-
// ctx.setResType(StoreResourceType.DataFlow);
112-
// ctx.setWorkflowName(ctx.getIndexName());
113-
// ctx.setExecutePhaseRange(new ExecutePhaseRange(FullbuildPhase.FullDump, FullbuildPhase.JOIN));
114-
// }, snapshotConsumer);
115-
//
116-
//
117-
// snapshotConsumer.synchronizTpisAndConfs(execContext, TISTableDumpProcessor.cacheSnaphsot);
118-
//
119-
// IDataxProcessor dataxProc = execContext.getProcessor();
120-
// IPartionableWarehouse partionableWarehouse = IDataxWriter.getPartionableWarehouse(dataxProc.getWriter(null));
121-
//
122-
// Matcher matcher = null;
123-
//
124-
// for (Map.Entry<String, Object> entry : instanceParams.entrySet()) {
125-
// Object val = entry.getValue();
126-
// try {
127-
// matcher = PATTERN_PARTITION_PARAMS.matcher(entry.getKey());
128-
// if (matcher.matches()) {
129-
// execContext.putTablePt(EntityName.parse(matcher.group(1)),
130-
// () -> partionableWarehouse.getPsFormat().format(Long.parseLong(String.valueOf(val))));
131-
// }
132-
// } catch (Exception e) {
133-
// throw new RuntimeException("key:" + entry.getKey() + ",val:" + entry.getValue(), e);
134-
// }
135-
// }
136-
//
137-
// return execContext;
138-
// }
139-
140-
14127
}

tis-datax/executor/powerjob-worker-samples/src/test/resources/tis-web-config/config.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ tis.datasource.password=123456
2626
tis.datasource.dbname=tis_console
2727
zk.host=192.168.28.200:2181/tis/cloud
2828

29-
assemble.host=192.168.28.113
30-
tis.host=192.168.28.113
29+
assemble.host=192.168.28.115
30+
tis.host=192.168.28.115
3131

3232

tis-datax/executor/tis-datax-executor/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,13 @@
105105
</exclusions>
106106
</dependency>
107107

108+
<dependency>
109+
<groupId>com.qlangtech.tis</groupId>
110+
<artifactId>tis-common</artifactId>
111+
<version>${tis.version}</version>
112+
<scope>compile</scope>
113+
</dependency>
114+
108115
<dependency>
109116
<groupId>com.qlangtech.tis</groupId>
110117
<artifactId>tis-dag</artifactId>

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/executor/BasicTISInitializeProcessor.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import com.qlangtech.tis.job.common.JobParams;
2929
import com.qlangtech.tis.manage.common.HttpUtils;
3030
import com.qlangtech.tis.manage.common.HttpUtils.PostParam;
31-
import com.qlangtech.tis.offline.DataxUtils;
3231
import com.qlangtech.tis.rpc.grpc.log.ILoggerAppenderClient.LogLevel;
3332
import com.qlangtech.tis.trigger.util.JsonUtil;
3433
import com.tis.hadoop.rpc.RpcServiceReference;

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/join/DataXJoinProcessConsumer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,10 @@
66
import com.qlangtech.tis.datax.DataXJobSingleProcessorExecutor;
77
import com.qlangtech.tis.datax.DataXJobSubmit;
88
import com.qlangtech.tis.datax.DataxPrePostConsumer;
9-
import com.qlangtech.tis.datax.IDataXTaskRelevant;
109
import com.qlangtech.tis.fullbuild.IFullBuildContext;
1110
import com.qlangtech.tis.fullbuild.indexbuild.IDumpTable;
1211
import com.qlangtech.tis.fullbuild.indexbuild.ITabPartition;
1312
import com.qlangtech.tis.job.common.JobParams;
14-
import com.qlangtech.tis.manage.common.Config;
1513
import com.qlangtech.tis.offline.DataxUtils;
1614
import com.qlangtech.tis.sql.parser.ISqlTask;
1715
import com.qlangtech.tis.sql.parser.TabPartitions;
@@ -53,14 +51,17 @@ protected void addMainClassParams(WorkflowHookMsg msg, Integer taskId, String jo
5351
if (StringUtils.isEmpty(msg.getJobName())) {
5452
throw new IllegalArgumentException("param getJobName can not be empty");
5553
}
54+
5655
Command command = new Command(cmdLine);
5756

5857
JSONObject sqlTskJson = ISqlTask.json(msg.getSqlTask());
5958

6059
command.add(KEY_ID, sqlTskJson.getString(KEY_ID));
61-
command.add(KEY_SQL_SCRIPT, sqlTskJson.getString(KEY_SQL_SCRIPT));
60+
// command.add(KEY_SQL_SCRIPT, sqlTskJson.getString(KEY_SQL_SCRIPT));
6261
command.add(KEY_EXECUTE_TYPE, sqlTskJson.getString(KEY_EXECUTE_TYPE));
6362
command.add(KEY_EXPORT_NAME, sqlTskJson.getString(KEY_EXPORT_NAME));
63+
// command.add(KEY_DEPENDENCIES, Objects.requireNonNull(sqlTskJson.getString(KEY_DEPENDENCIES)
64+
// , "key:" + KEY_DEPENDENCIES + " relevant value can not be null"));
6465

6566
command.add(JobParams.KEY_TASK_ID, String.valueOf(Objects.requireNonNull(taskId, "taskId can not be null")));
6667
command.add(IFullBuildContext.DRY_RUN, msg.getDryRun());
@@ -85,6 +86,10 @@ protected void addMainClassParams(WorkflowHookMsg msg, Integer taskId, String jo
8586
// cmdLine.addArgument(tab.getFullName() + "_" + pt.getPt(), true);
8687
});
8788

89+
// msg.getSqlTask();
90+
//
91+
// List<DependencyNode> dependencyNodes = msg.getSqlTask().getDependencies();
92+
8893
}
8994

9095
@Override

tis-datax/executor/tis-datax-executor/src/main/java/com/qlangtech/tis/datax/join/DataXJoinProcessExecutor.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,29 @@
66
import com.qlangtech.tis.datax.IDataxProcessor;
77
import com.qlangtech.tis.datax.IDataxWriter;
88
import com.qlangtech.tis.datax.RpcUtils;
9+
import com.qlangtech.tis.datax.StoreResourceType;
910
import com.qlangtech.tis.exec.AbstractExecContext;
11+
import com.qlangtech.tis.exec.ExecChainContextUtils;
1012
import com.qlangtech.tis.exec.ExecutePhaseRange;
1113
import com.qlangtech.tis.exec.ExecuteResult;
1214
import com.qlangtech.tis.exec.IExecChainContext;
1315
import com.qlangtech.tis.fullbuild.IFullBuildContext;
16+
import com.qlangtech.tis.fullbuild.indexbuild.IDumpTable;
1417
import com.qlangtech.tis.fullbuild.phasestatus.IJoinTaskStatus;
1518
import com.qlangtech.tis.fullbuild.phasestatus.impl.JoinPhaseStatus;
19+
import com.qlangtech.tis.fullbuild.taskflow.AdapterTask;
1620
import com.qlangtech.tis.fullbuild.taskflow.DataflowTask;
1721
import com.qlangtech.tis.fullbuild.taskflow.IFlatTableBuilder;
1822
import com.qlangtech.tis.job.common.JobCommon;
1923
import com.qlangtech.tis.job.common.JobParams;
2024
import com.qlangtech.tis.offline.DataxUtils;
21-
import com.qlangtech.tis.datax.StoreResourceType;
2225
import com.qlangtech.tis.plugin.ds.IDataSourceFactoryGetter;
2326
import com.qlangtech.tis.powerjob.TriggersConfig;
24-
import com.qlangtech.tis.sql.parser.ISqlTask;
2527
import com.qlangtech.tis.sql.parser.SqlTaskNodeMeta;
28+
import com.qlangtech.tis.sql.parser.TabPartitions;
29+
import com.qlangtech.tis.sql.parser.TopologyDir;
2630
import com.qlangtech.tis.sql.parser.er.IPrimaryTabFinder;
31+
import com.qlangtech.tis.sql.parser.meta.DependencyNode;
2732
import com.qlangtech.tis.sql.parser.tuple.creator.EntityName;
2833
import com.tis.hadoop.rpc.RpcServiceReference;
2934
import com.tis.hadoop.rpc.StatusRpcClientFactory;
@@ -34,10 +39,12 @@
3439
import org.apache.commons.cli.Option;
3540
import org.apache.commons.cli.Options;
3641
import org.apache.commons.cli.ParseException;
42+
import org.apache.commons.collections.CollectionUtils;
3743
import org.apache.commons.lang.StringUtils;
3844
import org.slf4j.Logger;
3945
import org.slf4j.LoggerFactory;
4046

47+
import java.util.List;
4148
import java.util.Map;
4249
import java.util.Properties;
4350
import java.util.function.Consumer;
@@ -71,9 +78,10 @@ private static Options getOpts() {
7178
// sqlTaskCfg.put(KEY_EXPORT_NAME, args[3]);
7279

7380
addOpt(options, KEY_ID);
74-
addOpt(options, KEY_SQL_SCRIPT);
81+
// addOpt(options, KEY_DATAFLOW_NAME);
7582
addOpt(options, KEY_EXECUTE_TYPE);
7683
addOpt(options, KEY_EXPORT_NAME);
84+
// addOpt(options, KEY_DEPENDENCIES);
7785

7886
addOpt(options, JobParams.KEY_TASK_ID);
7987
addOpt(options, IFullBuildContext.DRY_RUN, (builder) -> {
@@ -102,7 +110,7 @@ private static void addOpt(Options options, String key, Consumer<Option.Builder>
102110
/**
103111
* @param args
104112
* @throws Exception
105-
* @see DataXJoinProcessConsumer
113+
* @see DataXJoinProcessConsumer 提交任务,设置必要参数
106114
*/
107115
public static void main(String[] args) throws Exception {
108116
// create the parser
@@ -122,6 +130,7 @@ public static void main(String[] args) throws Exception {
122130

123131
try {
124132
AbstractExecContext execContext = createDftExecContent(line);
133+
// TabPartitions tabPartitions = ExecChainContextUtils.getDependencyTablesPartitions(execContext);
125134
JobCommon.setMDC(execContext.getTaskId(), null);
126135

127136
// cmdLine.addArgument(sqlTskJson.getString(KEY_ID));
@@ -131,15 +140,32 @@ public static void main(String[] args) throws Exception {
131140

132141
JSONObject sqlTaskCfg = new JSONObject();
133142
sqlTaskCfg.put(KEY_ID, line.getOptionValue(KEY_ID));
134-
sqlTaskCfg.put(KEY_SQL_SCRIPT, line.getOptionValue(KEY_SQL_SCRIPT));
143+
// sqlTaskCfg.put(KEY_SQL_SCRIPT, line.getOptionValue(KEY_SQL_SCRIPT));
135144
sqlTaskCfg.put(KEY_EXECUTE_TYPE, line.getOptionValue(KEY_EXECUTE_TYPE));
136145
sqlTaskCfg.put(KEY_EXPORT_NAME, line.getOptionValue(KEY_EXPORT_NAME));
137-
ISqlTask.SqlTaskCfg sqlCfg = ISqlTask.toCfg(sqlTaskCfg);
138-
logger.info("start join process:{},sqlScript:{}", sqlCfg.getExportName(), sqlCfg.getSqlScript());
139-
SqlTaskNodeMeta sqlTask = SqlTaskNodeMeta.deserializeTaskNode(sqlCfg);
140146

147+
String dataflowName = line.getOptionValue(JobParams.KEY_COLLECTION);
148+
String nodeId = line.getOptionValue(KEY_ID);
149+
150+
TopologyDir topologyDir = SqlTaskNodeMeta.getTopologyDir(dataflowName);
151+
//SqlDataFlowTopology sqlDataFlowTopology = SqlTaskNodeMeta.getSqlDataFlowTopology(dataflowName);
152+
153+
SqlTaskNodeMeta sqlTask = topologyDir.getSqlTaskNodeMeta(nodeId); //SqlTaskNodeMeta.deserializeTaskNode();// SqlTaskNodeMeta.deserializeTaskNode(sqlCfg);
154+
155+
// ISqlTask.SqlTaskCfg sqlCfg = ISqlTask.toCfg(sqlTaskCfg);
156+
// logger.info("start join process:{},sqlScript:{}", sqlCfg.getExportName(), sqlCfg.getSqlScript());
157+
List<DependencyNode> dependencyNodes = sqlTask.getDependencies();// // JSONArray.parseArray(line.getOptionValue(KEY_DEPENDENCIES), DependencyNode.class);
158+
Map<String, Boolean> taskWorkStatus = AdapterTask.createTaskWorkStatus(execContext);
159+
// 分布式环境中执行,这里暂且把依赖的表设置为已经成功执行
160+
dependencyNodes.forEach((node) -> {
161+
taskWorkStatus.put(node.getId(), true);
162+
});
163+
if (CollectionUtils.isEmpty(dependencyNodes)) {
164+
throw new IllegalStateException("dependencyNodes can not be empty");
165+
}
166+
// sqlTask.setDependencies(dependencyNodes);
141167
executeJoin(statusRpc, execContext, sqlTask);
142-
logger.info("exit the process:{},sqlScript:{}", sqlCfg.getExportName(), sqlCfg.getSqlScript());
168+
logger.info("exit the process:{},sqlScript:{}", sqlTask.getExportName(), sqlTask.getSql());
143169
System.exit(0);
144170
} catch (Exception e) {
145171
logger.error(e.getMessage(), e);

tis-datax/tis-datax-common-plugin/src/main/java/com/qlangtech/tis/fullbuild/taskflow/HiveTask.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ public String getIdentityName() {
163163

164164
protected ISqlTask.RewriteSql getRewriteSql() {
165165
if (this.rewriteSql == null) {
166+
167+
166168
this.rewriteSql = nodeMeta.getRewriteSql(this.getName(), this.getDumpPartition()
167169
, this.partionableWarehouse, this.erRules, this.getExecContext(), this.isFinalNode);
168170
}

tis-datax/tis-datax-hdfs-plugin/src/main/java/com/qlangtech/tis/plugin/datax/BasicFSWriter.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,13 @@ public FileSystemFactory getFs() {
8888
}
8989

9090
@Override
91-
public void startScanDependency() {
91+
public final void startScanDependency() {
9292
this.getFsFactory();
93+
this.startScanFSWriterDependency();
9394
}
9495

96+
protected abstract void startScanFSWriterDependency();
97+
9598
@Override
9699
public FileSystemFactory getFsFactory() {
97100
return this.getFs();

tis-datax/tis-datax-hdfs-reader-writer-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHdfsWriter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public static String getDftTemplate() {
5656
return IOUtils.loadResourceFromClasspath(DataXHdfsWriter.class, "DataXHdfsWriter-tpl.json");
5757
}
5858

59+
@Override
60+
protected void startScanFSWriterDependency() {
61+
62+
}
63+
5964
@Override
6065
protected FSDataXContext getDataXContext(IDataxProcessor.TableMap tableMap, Optional<RecordTransformerRules> transformerRules) {
6166
return new HdfsDataXContext(tableMap, this.dataXName, transformerRules);

tis-datax/tis-hive-flat-table-builder-plugin/src/main/java/com/qlangtech/tis/plugin/datax/DataXHiveWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,10 +279,10 @@ public JDBCConnection getConnection() {
279279
}
280280

281281
@Override
282-
public void startScanDependency() {
282+
protected void startScanFSWriterDependency() {
283283
this.getHiveConnGetter();
284284
}
285-
285+
286286
@Override
287287
public Hiveserver2DataSourceFactory getDataSourceFactory() {
288288
if (StringUtils.isBlank(this.dbName)) {

0 commit comments

Comments
 (0)