From 0cda5d1600e7ae050301088546008cc7e218557b Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Tue, 1 Apr 2025 10:52:37 +0300 Subject: [PATCH 1/4] TEZ-4617 prevent resource leaks for Closeables --- .../tez/dag/app/web/AMWebController.java | 43 ++--- .../examples/BroadcastAndOneToOneExample.java | 11 +- .../apache/tez/mapreduce/examples/Join.java | 163 +++++++++--------- .../mapreduce/examples/RandomTextWriter.java | 125 +++++++------- .../tez/mapreduce/examples/RandomWriter.java | 89 +++++----- 5 files changed, 219 insertions(+), 212 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index 00cd26e2ce..3d9bb6ae0d 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -893,28 +893,29 @@ public static class StaticAMView extends View { @Override public void render() { response().setContentType(MimeType.HTML); - PrintWriter pw = writer(); - pw.write(""); - pw.write(""); - pw.write(""); - pw.write("Redirecting to Tez UI"); - pw.write(""); - pw.write(""); - if (historyUrl == null || historyUrl.isEmpty()) { - pw.write("

Tez UI Url is not defined.

" + - "

To enable tracking url pointing to Tez UI, set the config " + - TezConfiguration.TEZ_HISTORY_URL_BASE + " in the tez-site.xml.

"); - } else { - pw.write("

Redirecting to Tez UI

.

If you are not redirected shortly, click " + - "here

" - ); - pw.write(""); + try(PrintWriter pw = writer()) { + pw.write(""); + pw.write(""); + pw.write(""); + pw.write("Redirecting to Tez UI"); + pw.write(""); + pw.write(""); + if (historyUrl == null || historyUrl.isEmpty()) { + pw.write("

Tez UI Url is not defined.

" + + "

To enable tracking url pointing to Tez UI, set the config " + + TezConfiguration.TEZ_HISTORY_URL_BASE + " in the tez-site.xml.

"); + } else { + pw.write("

Redirecting to Tez UI

.

If you are not redirected shortly, click " + + "here

" + ); + pw.write(""); + } + pw.write(""); + pw.write(""); + pw.flush(); } - pw.write(""); - pw.write(""); - pw.flush(); } } diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java index 5c99f3efbf..ee18977ede 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/BroadcastAndOneToOneExample.java @@ -130,12 +130,13 @@ private DAG createDAG(FileSystem fs, TezConfiguration tezConf, int numBroadcastTasks = 2; int numOneToOneTasks = 3; + int numNMs; if (doLocalityCheck) { - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(tezConf); - yarnClient.start(); - int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size(); - yarnClient.stop(); + try (YarnClient yarnClient = YarnClient.createYarnClient()) { + yarnClient.init(tezConf); + yarnClient.start(); + numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size(); + } // create enough 1-1 tasks to run in parallel numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM if (numOneToOneTasks < 1) { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java index 92be836375..e1dd968f90 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java @@ -82,92 +82,95 @@ static int printUsage() { @SuppressWarnings("deprecation") public int run(String[] args) throws Exception { Configuration conf = getConf(); - JobClient client = new JobClient(conf); - ClusterStatus cluster = client.getClusterStatus(); - int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); - String join_reduces = conf.get(REDUCES_PER_HOST); - if (join_reduces != null) { - num_reduces = cluster.getTaskTrackers() * - Integer.parseInt(join_reduces); - } - Job job = new Job(conf); - job.setJobName("join"); - job.setJarByClass(Sort.class); - - job.setMapperClass(Mapper.class); - job.setReducerClass(Reducer.class); - - Class inputFormatClass = - SequenceFileInputFormat.class; - Class outputFormatClass = - SequenceFileOutputFormat.class; - Class outputKeyClass = BytesWritable.class; - Class outputValueClass = TupleWritable.class; - String op = "inner"; - List otherArgs = new ArrayList(); - for(int i=0; i < args.length; ++i) { - try { - if ("-r".equals(args[i])) { - num_reduces = Integer.parseInt(args[++i]); - } else if ("-inFormat".equals(args[i])) { - inputFormatClass = - Class.forName(args[++i]).asSubclass(InputFormat.class); - } else if ("-outFormat".equals(args[i])) { - outputFormatClass = - Class.forName(args[++i]).asSubclass(OutputFormat.class); - } else if ("-outKey".equals(args[i])) { - outputKeyClass = - Class.forName(args[++i]).asSubclass(WritableComparable.class); - } else if ("-outValue".equals(args[i])) { - outputValueClass = - Class.forName(args[++i]).asSubclass(Writable.class); - } else if ("-joinOp".equals(args[i])) { - op = args[++i]; - } else { - otherArgs.add(args[i]); + try (JobClient client = new JobClient(conf)) { + + ClusterStatus cluster = client.getClusterStatus(); + int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); + String join_reduces = conf.get(REDUCES_PER_HOST); + if (join_reduces != null) { + num_reduces = cluster.getTaskTrackers() * + Integer.parseInt(join_reduces); + } + + Job job = new Job(conf); + job.setJobName("join"); + job.setJarByClass(Sort.class); + + job.setMapperClass(Mapper.class); + job.setReducerClass(Reducer.class); + + Class inputFormatClass = + SequenceFileInputFormat.class; + Class outputFormatClass = + SequenceFileOutputFormat.class; + Class outputKeyClass = BytesWritable.class; + Class outputValueClass = TupleWritable.class; + String op = "inner"; + List otherArgs = new ArrayList(); + for (int i = 0; i < args.length; ++i) { + try { + if ("-r".equals(args[i])) { + num_reduces = Integer.parseInt(args[++i]); + } else if ("-inFormat".equals(args[i])) { + inputFormatClass = + Class.forName(args[++i]).asSubclass(InputFormat.class); + } else if ("-outFormat".equals(args[i])) { + outputFormatClass = + Class.forName(args[++i]).asSubclass(OutputFormat.class); + } else if ("-outKey".equals(args[i])) { + outputKeyClass = + Class.forName(args[++i]).asSubclass(WritableComparable.class); + } else if ("-outValue".equals(args[i])) { + outputValueClass = + Class.forName(args[++i]).asSubclass(Writable.class); + } else if ("-joinOp".equals(args[i])) { + op = args[++i]; + } else { + otherArgs.add(args[i]); + } + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); + return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + return printUsage(); // exits } - } catch (NumberFormatException except) { - System.out.println("ERROR: Integer expected instead of " + args[i]); - return printUsage(); - } catch (ArrayIndexOutOfBoundsException except) { - System.out.println("ERROR: Required parameter missing from " + - args[i-1]); - return printUsage(); // exits } - } - // Set user-supplied (possibly default) job configs - job.setNumReduceTasks(num_reduces); + // Set user-supplied (possibly default) job configs + job.setNumReduceTasks(num_reduces); - if (otherArgs.size() < 2) { - System.out.println("ERROR: Wrong number of parameters: "); - return printUsage(); - } + if (otherArgs.size() < 2) { + System.out.println("ERROR: Wrong number of parameters: "); + return printUsage(); + } - FileOutputFormat.setOutputPath(job, - new Path(otherArgs.remove(otherArgs.size() - 1))); - List plist = new ArrayList(otherArgs.size()); - for (String s : otherArgs) { - plist.add(new Path(s)); - } + FileOutputFormat.setOutputPath(job, + new Path(otherArgs.remove(otherArgs.size() - 1))); + List plist = new ArrayList(otherArgs.size()); + for (String s : otherArgs) { + plist.add(new Path(s)); + } - job.setInputFormatClass(CompositeInputFormat.class); - job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, - CompositeInputFormat.compose(op, inputFormatClass, - plist.toArray(new Path[0]))); - job.setOutputFormatClass(outputFormatClass); - - job.setOutputKeyClass(outputKeyClass); - job.setOutputValueClass(outputValueClass); - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1 ; - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " + - (end_time.getTime() - startTime.getTime()) /1000 + " seconds."); - return ret; + job.setInputFormatClass(CompositeInputFormat.class); + job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, + CompositeInputFormat.compose(op, inputFormatClass, + plist.toArray(new Path[0]))); + job.setOutputFormatClass(outputFormatClass); + + job.setOutputKeyClass(outputKeyClass); + job.setOutputValueClass(outputValueClass); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds."); + return ret; + } } public static void main(String[] args) throws Exception { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java index 55404ba1bd..509ebfb6d6 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java @@ -180,71 +180,72 @@ public int run(String[] args) throws Exception { } Configuration conf = getConf(); - JobClient client = new JobClient(conf); - ClusterStatus cluster = client.getClusterStatus(); - int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1*1024*1024*1024); - if (numBytesToWritePerMap == 0) { - System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0"); - return -2; - } - long totalBytesToWrite = conf.getLong(TOTAL_BYTES, - numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); - int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); - if (numMaps == 0 && totalBytesToWrite > 0) { - numMaps = 1; - conf.setLong(BYTES_PER_MAP, totalBytesToWrite); - } - conf.setInt(MRJobConfig.NUM_MAPS, numMaps); - - Job job = new Job(conf); - - job.setJarByClass(RandomTextWriter.class); - job.setJobName("random-text-writer"); - - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); - - job.setInputFormatClass(RandomWriter.RandomInputFormat.class); - job.setMapperClass(RandomTextMapper.class); - - Class outputFormatClass = - SequenceFileOutputFormat.class; - List otherArgs = new ArrayList(); - for(int i=0; i < args.length; ++i) { - try { - if ("-outFormat".equals(args[i])) { - outputFormatClass = - Class.forName(args[++i]).asSubclass(OutputFormat.class); - } else { - otherArgs.add(args[i]); + try (JobClient client = new JobClient(conf)) { + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1 * 1024 * 1024 * 1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + + Job job = new Job(conf); + + job.setJarByClass(RandomTextWriter.class); + job.setJobName("random-text-writer"); + + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + job.setInputFormatClass(RandomWriter.RandomInputFormat.class); + job.setMapperClass(RandomTextMapper.class); + + Class outputFormatClass = + SequenceFileOutputFormat.class; + List otherArgs = new ArrayList(); + for (int i = 0; i < args.length; ++i) { + try { + if ("-outFormat".equals(args[i])) { + outputFormatClass = + Class.forName(args[++i]).asSubclass(OutputFormat.class); + } else { + otherArgs.add(args[i]); + } + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + return printUsage(); // exits } - } catch (ArrayIndexOutOfBoundsException except) { - System.out.println("ERROR: Required parameter missing from " + - args[i-1]); - return printUsage(); // exits } - } - job.setOutputFormatClass(outputFormatClass); - FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0))); - - System.out.println("Running " + numMaps + " maps."); - - // reducer NONE - job.setNumReduceTasks(0); - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) /1000 + - " seconds."); - - return ret; + job.setOutputFormatClass(outputFormatClass); + FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0))); + + System.out.println("Running " + numMaps + " maps."); + + // reducer NONE + job.setNumReduceTasks(0); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) / 1000 + + " seconds."); + + return ret; + } } public static void main(String[] args) throws Exception { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java index 1627d688ab..e778ed889e 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java @@ -246,51 +246,52 @@ public int run(String[] args) throws Exception { Path outDir = new Path(args[0]); Configuration conf = getConf(); - JobClient client = new JobClient(conf); - ClusterStatus cluster = client.getClusterStatus(); - int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1*1024*1024*1024); - if (numBytesToWritePerMap == 0) { - System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); - return -2; - } - long totalBytesToWrite = conf.getLong(TOTAL_BYTES, - numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers()); - int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); - if (numMaps == 0 && totalBytesToWrite > 0) { - numMaps = 1; - conf.setLong(BYTES_PER_MAP, totalBytesToWrite); - } - conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + try (JobClient client = new JobClient(conf)) { + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1 * 1024 * 1024 * 1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); - Job job = new Job(conf); - - job.setJarByClass(RandomWriter.class); - job.setJobName("random-writer"); - FileOutputFormat.setOutputPath(job, outDir); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(BytesWritable.class); - job.setInputFormatClass(RandomInputFormat.class); - job.setMapperClass(RandomMapper.class); - job.setReducerClass(Reducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); - - System.out.println("Running " + numMaps + " maps."); - - // reducer NONE - job.setNumReduceTasks(0); - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) /1000 + - " seconds."); - - return ret; + Job job = new Job(conf); + + job.setJarByClass(RandomWriter.class); + job.setJobName("random-writer"); + FileOutputFormat.setOutputPath(job, outDir); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + job.setInputFormatClass(RandomInputFormat.class); + job.setMapperClass(RandomMapper.class); + job.setReducerClass(Reducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + + System.out.println("Running " + numMaps + " maps."); + + // reducer NONE + job.setNumReduceTasks(0); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) / 1000 + + " seconds."); + + return ret; + } } public static void main(String[] args) throws Exception { From 3fca95f45299d171ec665aa1d79986bb84ddb4d7 Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Sat, 17 Jan 2026 13:13:30 +0300 Subject: [PATCH 2/4] TEZ-4617 reacted upon comments --- .../tez/dag/app/web/AMWebController.java | 49 +++--- .../apache/tez/mapreduce/examples/Join.java | 164 +++++++++--------- .../mapreduce/examples/RandomTextWriter.java | 110 ++++++------ .../tez/mapreduce/examples/RandomWriter.java | 82 ++++----- 4 files changed, 210 insertions(+), 195 deletions(-) diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java index 3d9bb6ae0d..1565bc11d5 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/web/AMWebController.java @@ -41,7 +41,6 @@ import com.google.inject.Inject; import com.google.inject.name.Named; -import org.apache.commons.lang.StringEscapeUtils; import org.apache.tez.common.counters.CounterGroup; import org.apache.tez.common.counters.LimitExceededException; import org.apache.tez.common.counters.TezCounter; @@ -893,29 +892,33 @@ public static class StaticAMView extends View { @Override public void render() { response().setContentType(MimeType.HTML); - try(PrintWriter pw = writer()) { - pw.write(""); - pw.write(""); - pw.write(""); - pw.write("Redirecting to Tez UI"); - pw.write(""); - pw.write(""); - if (historyUrl == null || historyUrl.isEmpty()) { - pw.write("

Tez UI Url is not defined.

" + - "

To enable tracking url pointing to Tez UI, set the config " + - TezConfiguration.TEZ_HISTORY_URL_BASE + " in the tez-site.xml.

"); - } else { - pw.write("

Redirecting to Tez UI

.

If you are not redirected shortly, click " + - "here

" - ); - pw.write(""); - } - pw.write(""); - pw.write(""); - pw.flush(); + try (PrintWriter pw = writer()) { + render(pw); + } + } + + private void render(PrintWriter pw) { + pw.write(""); + pw.write(""); + pw.write(""); + pw.write("Redirecting to Tez UI"); + pw.write(""); + pw.write(""); + if (historyUrl == null || historyUrl.isEmpty()) { + pw.write("

Tez UI Url is not defined.

" + + "

To enable tracking url pointing to Tez UI, set the config " + + TezConfiguration.TEZ_HISTORY_URL_BASE + " in the tez-site.xml.

"); + } else { + pw.write("

Redirecting to Tez UI

.

If you are not redirected shortly, click " + + "here

" + ); + pw.write(""); } + pw.write(""); + pw.write(""); + pw.flush(); } } diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java index e1dd968f90..0b5b1b15fd 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java @@ -20,6 +20,7 @@ import static org.apache.tez.mapreduce.examples.ExampleDriver.getTezDecoratedConfiguration; +import java.io.IOException; import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -83,94 +84,97 @@ static int printUsage() { public int run(String[] args) throws Exception { Configuration conf = getConf(); try (JobClient client = new JobClient(conf)) { + return run(client, conf, args); + } + } - ClusterStatus cluster = client.getClusterStatus(); - int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); - String join_reduces = conf.get(REDUCES_PER_HOST); - if (join_reduces != null) { - num_reduces = cluster.getTaskTrackers() * - Integer.parseInt(join_reduces); - } + private int run (JobClient client, Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException { + ClusterStatus cluster = client.getClusterStatus(); + int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); + String join_reduces = conf.get(REDUCES_PER_HOST); + if (join_reduces != null) { + num_reduces = cluster.getTaskTrackers() * + Integer.parseInt(join_reduces); + } - Job job = new Job(conf); - job.setJobName("join"); - job.setJarByClass(Sort.class); - - job.setMapperClass(Mapper.class); - job.setReducerClass(Reducer.class); - - Class inputFormatClass = - SequenceFileInputFormat.class; - Class outputFormatClass = - SequenceFileOutputFormat.class; - Class outputKeyClass = BytesWritable.class; - Class outputValueClass = TupleWritable.class; - String op = "inner"; - List otherArgs = new ArrayList(); - for (int i = 0; i < args.length; ++i) { - try { - if ("-r".equals(args[i])) { - num_reduces = Integer.parseInt(args[++i]); - } else if ("-inFormat".equals(args[i])) { - inputFormatClass = - Class.forName(args[++i]).asSubclass(InputFormat.class); - } else if ("-outFormat".equals(args[i])) { - outputFormatClass = - Class.forName(args[++i]).asSubclass(OutputFormat.class); - } else if ("-outKey".equals(args[i])) { - outputKeyClass = - Class.forName(args[++i]).asSubclass(WritableComparable.class); - } else if ("-outValue".equals(args[i])) { - outputValueClass = - Class.forName(args[++i]).asSubclass(Writable.class); - } else if ("-joinOp".equals(args[i])) { - op = args[++i]; - } else { - otherArgs.add(args[i]); - } - } catch (NumberFormatException except) { - System.out.println("ERROR: Integer expected instead of " + args[i]); - return printUsage(); - } catch (ArrayIndexOutOfBoundsException except) { - System.out.println("ERROR: Required parameter missing from " + - args[i - 1]); - return printUsage(); // exits + Job job = new Job(conf); + job.setJobName("join"); + job.setJarByClass(Sort.class); + + job.setMapperClass(Mapper.class); + job.setReducerClass(Reducer.class); + + Class inputFormatClass = + SequenceFileInputFormat.class; + Class outputFormatClass = + SequenceFileOutputFormat.class; + Class outputKeyClass = BytesWritable.class; + Class outputValueClass = TupleWritable.class; + String op = "inner"; + List otherArgs = new ArrayList(); + for (int i = 0; i < args.length; ++i) { + try { + if ("-r".equals(args[i])) { + num_reduces = Integer.parseInt(args[++i]); + } else if ("-inFormat".equals(args[i])) { + inputFormatClass = + Class.forName(args[++i]).asSubclass(InputFormat.class); + } else if ("-outFormat".equals(args[i])) { + outputFormatClass = + Class.forName(args[++i]).asSubclass(OutputFormat.class); + } else if ("-outKey".equals(args[i])) { + outputKeyClass = + Class.forName(args[++i]).asSubclass(WritableComparable.class); + } else if ("-outValue".equals(args[i])) { + outputValueClass = + Class.forName(args[++i]).asSubclass(Writable.class); + } else if ("-joinOp".equals(args[i])) { + op = args[++i]; + } else { + otherArgs.add(args[i]); } - } - - // Set user-supplied (possibly default) job configs - job.setNumReduceTasks(num_reduces); - - if (otherArgs.size() < 2) { - System.out.println("ERROR: Wrong number of parameters: "); + } catch (NumberFormatException except) { + System.out.println("ERROR: Integer expected instead of " + args[i]); return printUsage(); + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + return printUsage(); // exits } + } - FileOutputFormat.setOutputPath(job, - new Path(otherArgs.remove(otherArgs.size() - 1))); - List plist = new ArrayList(otherArgs.size()); - for (String s : otherArgs) { - plist.add(new Path(s)); - } + // Set user-supplied (possibly default) job configs + job.setNumReduceTasks(num_reduces); - job.setInputFormatClass(CompositeInputFormat.class); - job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, - CompositeInputFormat.compose(op, inputFormatClass, - plist.toArray(new Path[0]))); - job.setOutputFormatClass(outputFormatClass); - - job.setOutputKeyClass(outputKeyClass); - job.setOutputValueClass(outputValueClass); - - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - Date end_time = new Date(); - System.out.println("Job ended: " + end_time); - System.out.println("The job took " + - (end_time.getTime() - startTime.getTime()) / 1000 + " seconds."); - return ret; + if (otherArgs.size() < 2) { + System.out.println("ERROR: Wrong number of parameters: "); + return printUsage(); } + + FileOutputFormat.setOutputPath(job, + new Path(otherArgs.remove(otherArgs.size() - 1))); + List plist = new ArrayList(otherArgs.size()); + for (String s : otherArgs) { + plist.add(new Path(s)); + } + + job.setInputFormatClass(CompositeInputFormat.class); + job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR, + CompositeInputFormat.compose(op, inputFormatClass, + plist.toArray(new Path[0]))); + job.setOutputFormatClass(outputFormatClass); + + job.setOutputKeyClass(outputKeyClass); + job.setOutputValueClass(outputValueClass); + + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date end_time = new Date(); + System.out.println("Job ended: " + end_time); + System.out.println("The job took " + + (end_time.getTime() - startTime.getTime()) / 1000 + " seconds."); + return ret; } public static void main(String[] args) throws Exception { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java index 509ebfb6d6..f7bc2f86bb 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java @@ -181,71 +181,75 @@ public int run(String[] args) throws Exception { Configuration conf = getConf(); try (JobClient client = new JobClient(conf)) { - ClusterStatus cluster = client.getClusterStatus(); - int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1 * 1024 * 1024 * 1024); - if (numBytesToWritePerMap == 0) { - System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0"); - return -2; - } - long totalBytesToWrite = conf.getLong(TOTAL_BYTES, - numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); - int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); - if (numMaps == 0 && totalBytesToWrite > 0) { - numMaps = 1; - conf.setLong(BYTES_PER_MAP, totalBytesToWrite); - } - conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + return run(client, conf, args); + } + } - Job job = new Job(conf); + private int run (JobClient client, Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException { + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1 * 1024 * 1024 * 1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + + Job job = new Job(conf); - job.setJarByClass(RandomTextWriter.class); - job.setJobName("random-text-writer"); + job.setJarByClass(RandomTextWriter.class); + job.setJobName("random-text-writer"); - job.setOutputKeyClass(Text.class); - job.setOutputValueClass(Text.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); - job.setInputFormatClass(RandomWriter.RandomInputFormat.class); - job.setMapperClass(RandomTextMapper.class); + job.setInputFormatClass(RandomWriter.RandomInputFormat.class); + job.setMapperClass(RandomTextMapper.class); - Class outputFormatClass = - SequenceFileOutputFormat.class; - List otherArgs = new ArrayList(); - for (int i = 0; i < args.length; ++i) { - try { - if ("-outFormat".equals(args[i])) { - outputFormatClass = - Class.forName(args[++i]).asSubclass(OutputFormat.class); - } else { - otherArgs.add(args[i]); - } - } catch (ArrayIndexOutOfBoundsException except) { - System.out.println("ERROR: Required parameter missing from " + - args[i - 1]); - return printUsage(); // exits + Class outputFormatClass = + SequenceFileOutputFormat.class; + List otherArgs = new ArrayList(); + for (int i = 0; i < args.length; ++i) { + try { + if ("-outFormat".equals(args[i])) { + outputFormatClass = + Class.forName(args[++i]).asSubclass(OutputFormat.class); + } else { + otherArgs.add(args[i]); } + } catch (ArrayIndexOutOfBoundsException except) { + System.out.println("ERROR: Required parameter missing from " + + args[i - 1]); + return printUsage(); // exits } + } - job.setOutputFormatClass(outputFormatClass); - FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0))); + job.setOutputFormatClass(outputFormatClass); + FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0))); - System.out.println("Running " + numMaps + " maps."); + System.out.println("Running " + numMaps + " maps."); - // reducer NONE - job.setNumReduceTasks(0); + // reducer NONE + job.setNumReduceTasks(0); - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) / 1000 + - " seconds."); + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) / 1000 + + " seconds."); - return ret; - } + return ret; } public static void main(String[] args) throws Exception { diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java index e778ed889e..3eed3ce308 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java @@ -247,51 +247,55 @@ public int run(String[] args) throws Exception { Path outDir = new Path(args[0]); Configuration conf = getConf(); try (JobClient client = new JobClient(conf)) { - ClusterStatus cluster = client.getClusterStatus(); - int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1 * 1024 * 1024 * 1024); - if (numBytesToWritePerMap == 0) { - System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); - return -2; - } - long totalBytesToWrite = conf.getLong(TOTAL_BYTES, - numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); - int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); - if (numMaps == 0 && totalBytesToWrite > 0) { - numMaps = 1; - conf.setLong(BYTES_PER_MAP, totalBytesToWrite); - } - conf.setInt(MRJobConfig.NUM_MAPS, numMaps); + return run(client, conf, outDir); + } + } - Job job = new Job(conf); + private int run (JobClient client, Configuration conf, Path outDir) throws IOException, ClassNotFoundException, InterruptedException { + ClusterStatus cluster = client.getClusterStatus(); + int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, + 1 * 1024 * 1024 * 1024); + if (numBytesToWritePerMap == 0) { + System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); + return -2; + } + long totalBytesToWrite = conf.getLong(TOTAL_BYTES, + numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers()); + int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap); + if (numMaps == 0 && totalBytesToWrite > 0) { + numMaps = 1; + conf.setLong(BYTES_PER_MAP, totalBytesToWrite); + } + conf.setInt(MRJobConfig.NUM_MAPS, numMaps); - job.setJarByClass(RandomWriter.class); - job.setJobName("random-writer"); - FileOutputFormat.setOutputPath(job, outDir); - job.setOutputKeyClass(BytesWritable.class); - job.setOutputValueClass(BytesWritable.class); - job.setInputFormatClass(RandomInputFormat.class); - job.setMapperClass(RandomMapper.class); - job.setReducerClass(Reducer.class); - job.setOutputFormatClass(SequenceFileOutputFormat.class); + Job job = new Job(conf); - System.out.println("Running " + numMaps + " maps."); + job.setJarByClass(RandomWriter.class); + job.setJobName("random-writer"); + FileOutputFormat.setOutputPath(job, outDir); + job.setOutputKeyClass(BytesWritable.class); + job.setOutputValueClass(BytesWritable.class); + job.setInputFormatClass(RandomInputFormat.class); + job.setMapperClass(RandomMapper.class); + job.setReducerClass(Reducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); - // reducer NONE - job.setNumReduceTasks(0); + System.out.println("Running " + numMaps + " maps."); - Date startTime = new Date(); - System.out.println("Job started: " + startTime); - int ret = job.waitForCompletion(true) ? 0 : 1; - Date endTime = new Date(); - System.out.println("Job ended: " + endTime); - System.out.println("The job took " + - (endTime.getTime() - startTime.getTime()) / 1000 + - " seconds."); + // reducer NONE + job.setNumReduceTasks(0); - return ret; - } + Date startTime = new Date(); + System.out.println("Job started: " + startTime); + int ret = job.waitForCompletion(true) ? 0 : 1; + Date endTime = new Date(); + System.out.println("Job ended: " + endTime); + System.out.println("The job took " + + (endTime.getTime() - startTime.getTime()) / 1000 + + " seconds."); + + return ret; } public static void main(String[] args) throws Exception { From 27a48285db4218abdadb9b9ce9b72f23867e6405 Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Sat, 17 Jan 2026 14:15:04 +0300 Subject: [PATCH 3/4] TEZ-4617 fixed line breaks --- .../org/apache/tez/mapreduce/examples/RandomTextWriter.java | 3 +-- .../java/org/apache/tez/mapreduce/examples/RandomWriter.java | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java index f7bc2f86bb..ccb7a841bc 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java @@ -188,8 +188,7 @@ public int run(String[] args) throws Exception { private int run (JobClient client, Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException { ClusterStatus cluster = client.getClusterStatus(); int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1 * 1024 * 1024 * 1024); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024); if (numBytesToWritePerMap == 0) { System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0"); return -2; diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java index 3eed3ce308..c0e1294160 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java @@ -254,8 +254,7 @@ public int run(String[] args) throws Exception { private int run (JobClient client, Configuration conf, Path outDir) throws IOException, ClassNotFoundException, InterruptedException { ClusterStatus cluster = client.getClusterStatus(); int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); - long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, - 1 * 1024 * 1024 * 1024); + long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024); if (numBytesToWritePerMap == 0) { System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0"); return -2; From 2c61e302a1ff7e76c4b399356bb526868a1730a1 Mon Sep 17 00:00:00 2001 From: Dmitry Kryukov Date: Sat, 17 Jan 2026 14:36:40 +0300 Subject: [PATCH 4/4] TEZ-4617 fixed checkstyle issues --- .../src/main/java/org/apache/tez/mapreduce/examples/Join.java | 3 ++- .../org/apache/tez/mapreduce/examples/RandomTextWriter.java | 3 ++- .../java/org/apache/tez/mapreduce/examples/RandomWriter.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java index 0b5b1b15fd..3c0dcf77d6 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java @@ -88,7 +88,8 @@ public int run(String[] args) throws Exception { } } - private int run (JobClient client, Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException { + private int run(JobClient client, Configuration conf, String[] args) + throws IOException, ClassNotFoundException, InterruptedException { ClusterStatus cluster = client.getClusterStatus(); int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9); String join_reduces = conf.get(REDUCES_PER_HOST); diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java index ccb7a841bc..a1e6dbf06a 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomTextWriter.java @@ -185,7 +185,8 @@ public int run(String[] args) throws Exception { } } - private int run (JobClient client, Configuration conf, String[] args) throws IOException, ClassNotFoundException, InterruptedException { + private int run(JobClient client, Configuration conf, String[] args) + throws IOException, ClassNotFoundException, InterruptedException { ClusterStatus cluster = client.getClusterStatus(); int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024); diff --git a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java index c0e1294160..4b7a4e5726 100644 --- a/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java +++ b/tez-tests/src/main/java/org/apache/tez/mapreduce/examples/RandomWriter.java @@ -251,7 +251,8 @@ public int run(String[] args) throws Exception { } } - private int run (JobClient client, Configuration conf, Path outDir) throws IOException, ClassNotFoundException, InterruptedException { + private int run(JobClient client, Configuration conf, Path outDir) + throws IOException, ClassNotFoundException, InterruptedException { ClusterStatus cluster = client.getClusterStatus(); int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10); long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024);