Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,12 @@ public static class StaticAMView extends View {
@Override
public void render() {
response().setContentType(MimeType.HTML);
PrintWriter pw = writer();
try (PrintWriter pw = writer()) {
render(pw);
}
}

private void render(PrintWriter pw) {
pw.write("<html>");
pw.write("<head>");
pw.write("<meta charset=\"utf-8\">");
Expand All @@ -903,11 +908,11 @@ public void render() {
pw.write("<body>");
if (historyUrl == null || historyUrl.isEmpty()) {
pw.write("<h1>Tez UI Url is not defined.</h1>" +
"<p>To enable tracking url pointing to Tez UI, set the config <b>" +
TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the tez-site.xml.</p>");
"<p>To enable tracking url pointing to Tez UI, set the config <b>" +
TezConfiguration.TEZ_HISTORY_URL_BASE + "</b> in the tez-site.xml.</p>");
} else {
pw.write("<h1>Redirecting to Tez UI</h1>. <p>If you are not redirected shortly, click " +
"<a href='" + historyUrl + "'><b>here</b></a></p>"
"<a href='" + historyUrl + "'><b>here</b></a></p>"
);
pw.write("<script type='text/javascript'>setTimeout(function() { " +
"window.location.replace('" + historyUrl + "');" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ private DAG createDAG(FileSystem fs, TezConfiguration tezConf,

int numBroadcastTasks = 2;
int numOneToOneTasks = 3;
int numNMs;
if (doLocalityCheck) {
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(tezConf);
yarnClient.start();
int numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
yarnClient.stop();
try (YarnClient yarnClient = YarnClient.createYarnClient()) {
yarnClient.init(tezConf);
yarnClient.start();
numNMs = yarnClient.getNodeReports(NodeState.RUNNING).size();
}
// create enough 1-1 tasks to run in parallel
numOneToOneTasks = numNMs - numBroadcastTasks - 1;// 1 AM
if (numOneToOneTasks < 1) {
Expand Down
60 changes: 34 additions & 26 deletions tez-tests/src/main/java/org/apache/tez/mapreduce/examples/Join.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.tez.mapreduce.examples.ExampleDriver.getTezDecoratedConfiguration;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -82,45 +83,52 @@ static int printUsage() {
@SuppressWarnings("deprecation")
public int run(String[] args) throws Exception {
Configuration conf = getConf();
JobClient client = new JobClient(conf);
try (JobClient client = new JobClient(conf)) {
return run(client, conf, args);
}
}

private int run(JobClient client, Configuration conf, String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
ClusterStatus cluster = client.getClusterStatus();
int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
String join_reduces = conf.get(REDUCES_PER_HOST);
if (join_reduces != null) {
num_reduces = cluster.getTaskTrackers() *
Integer.parseInt(join_reduces);
num_reduces = cluster.getTaskTrackers() *
Integer.parseInt(join_reduces);
}

Job job = new Job(conf);
job.setJobName("join");
job.setJarByClass(Sort.class);

job.setMapperClass(Mapper.class);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);

Class<? extends InputFormat> inputFormatClass =
SequenceFileInputFormat.class;
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
Class<? extends InputFormat> inputFormatClass =
SequenceFileInputFormat.class;
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
Class<? extends Writable> outputValueClass = TupleWritable.class;
String op = "inner";
List<String> otherArgs = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
for (int i = 0; i < args.length; ++i) {
try {
if ("-r".equals(args[i])) {
num_reduces = Integer.parseInt(args[++i]);
} else if ("-inFormat".equals(args[i])) {
inputFormatClass =
Class.forName(args[++i]).asSubclass(InputFormat.class);
inputFormatClass =
Class.forName(args[++i]).asSubclass(InputFormat.class);
} else if ("-outFormat".equals(args[i])) {
outputFormatClass =
Class.forName(args[++i]).asSubclass(OutputFormat.class);
outputFormatClass =
Class.forName(args[++i]).asSubclass(OutputFormat.class);
} else if ("-outKey".equals(args[i])) {
outputKeyClass =
Class.forName(args[++i]).asSubclass(WritableComparable.class);
outputKeyClass =
Class.forName(args[++i]).asSubclass(WritableComparable.class);
} else if ("-outValue".equals(args[i])) {
outputValueClass =
Class.forName(args[++i]).asSubclass(Writable.class);
outputValueClass =
Class.forName(args[++i]).asSubclass(Writable.class);
} else if ("-joinOp".equals(args[i])) {
op = args[++i];
} else {
Expand All @@ -131,7 +139,7 @@ public int run(String[] args) throws Exception {
return printUsage();
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
args[i - 1]);
return printUsage(); // exits
}
}
Expand All @@ -144,29 +152,29 @@ public int run(String[] args) throws Exception {
return printUsage();
}

FileOutputFormat.setOutputPath(job,
new Path(otherArgs.remove(otherArgs.size() - 1)));
FileOutputFormat.setOutputPath(job,
new Path(otherArgs.remove(otherArgs.size() - 1)));
List<Path> plist = new ArrayList<Path>(otherArgs.size());
for (String s : otherArgs) {
plist.add(new Path(s));
}

job.setInputFormatClass(CompositeInputFormat.class);
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
CompositeInputFormat.compose(op, inputFormatClass,
plist.toArray(new Path[0])));
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
CompositeInputFormat.compose(op, inputFormatClass,
plist.toArray(new Path[0])));
job.setOutputFormatClass(outputFormatClass);

job.setOutputKeyClass(outputKeyClass);
job.setOutputValueClass(outputValueClass);

Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1 ;
int ret = job.waitForCompletion(true) ? 0 : 1;
Date end_time = new Date();
System.out.println("Job ended: " + end_time);
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
System.out.println("The job took " +
(end_time.getTime() - startTime.getTime()) / 1000 + " seconds.");
return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,70 +180,75 @@ public int run(String[] args) throws Exception {
}

Configuration conf = getConf();
JobClient client = new JobClient(conf);
try (JobClient client = new JobClient(conf)) {
return run(client, conf, args);
}
}

private int run(JobClient client, Configuration conf, String[] args)
throws IOException, ClassNotFoundException, InterruptedException {
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
1*1024*1024*1024);
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have " + BYTES_PER_MAP +" set to 0");
System.err.println("Cannot have " + BYTES_PER_MAP + " set to 0");
return -2;
}
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
conf.setLong(BYTES_PER_MAP, totalBytesToWrite);
}
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);

Job job = new Job(conf);

job.setJarByClass(RandomTextWriter.class);
job.setJobName("random-text-writer");

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

job.setInputFormatClass(RandomWriter.RandomInputFormat.class);
job.setMapperClass(RandomTextMapper.class);
Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
job.setMapperClass(RandomTextMapper.class);

Class<? extends OutputFormat> outputFormatClass =
SequenceFileOutputFormat.class;
List<String> otherArgs = new ArrayList<String>();
for(int i=0; i < args.length; ++i) {
for (int i = 0; i < args.length; ++i) {
try {
if ("-outFormat".equals(args[i])) {
outputFormatClass =
Class.forName(args[++i]).asSubclass(OutputFormat.class);
outputFormatClass =
Class.forName(args[++i]).asSubclass(OutputFormat.class);
} else {
otherArgs.add(args[i]);
}
} catch (ArrayIndexOutOfBoundsException except) {
System.out.println("ERROR: Required parameter missing from " +
args[i-1]);
args[i - 1]);
return printUsage(); // exits
}
}

job.setOutputFormatClass(outputFormatClass);
FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(0)));

System.out.println("Running " + numMaps + " maps.");

// reducer NONE
job.setNumReduceTasks(0);

Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) / 1000 +
" seconds.");

return ret;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,22 @@ public int run(String[] args) throws Exception {

Path outDir = new Path(args[0]);
Configuration conf = getConf();
JobClient client = new JobClient(conf);
try (JobClient client = new JobClient(conf)) {
return run(client, conf, outDir);
}
}

private int run(JobClient client, Configuration conf, Path outDir)
throws IOException, ClassNotFoundException, InterruptedException {
ClusterStatus cluster = client.getClusterStatus();
int numMapsPerHost = conf.getInt(MAPS_PER_HOST, 10);
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP,
1*1024*1024*1024);
long numBytesToWritePerMap = conf.getLong(BYTES_PER_MAP, 1 * 1024 * 1024 * 1024);
if (numBytesToWritePerMap == 0) {
System.err.println("Cannot have" + BYTES_PER_MAP + " set to 0");
return -2;
}
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
numMapsPerHost*numBytesToWritePerMap*cluster.getTaskTrackers());
long totalBytesToWrite = conf.getLong(TOTAL_BYTES,
numMapsPerHost * numBytesToWritePerMap * cluster.getTaskTrackers());
int numMaps = (int) (totalBytesToWrite / numBytesToWritePerMap);
if (numMaps == 0 && totalBytesToWrite > 0) {
numMaps = 1;
Expand All @@ -265,31 +270,31 @@ public int run(String[] args) throws Exception {
conf.setInt(MRJobConfig.NUM_MAPS, numMaps);

Job job = new Job(conf);

job.setJarByClass(RandomWriter.class);
job.setJobName("random-writer");
FileOutputFormat.setOutputPath(job, outDir);
job.setOutputKeyClass(BytesWritable.class);
job.setOutputValueClass(BytesWritable.class);
job.setInputFormatClass(RandomInputFormat.class);
job.setMapperClass(RandomMapper.class);
job.setMapperClass(RandomMapper.class);
job.setReducerClass(Reducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);

System.out.println("Running " + numMaps + " maps.");

// reducer NONE
job.setNumReduceTasks(0);

Date startTime = new Date();
System.out.println("Job started: " + startTime);
int ret = job.waitForCompletion(true) ? 0 : 1;
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) /1000 +
" seconds.");
System.out.println("The job took " +
(endTime.getTime() - startTime.getTime()) / 1000 +
" seconds.");

return ret;
}

Expand Down