-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPartitionByMultipleOutputs.bak
More file actions
120 lines (106 loc) · 4.51 KB
/
PartitionByMultipleOutputs.bak
File metadata and controls
120 lines (106 loc) · 4.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.json.JSONException;
import org.json.JSONObject;
public class PartitionByMultipleOutputs extends Configured implements Tool
{
public static class MultipleOutputsMapper extends Mapper<LongWritable, Text, Text, Text>
{
public void map(LongWritable mkey, Text mvalue, Context context) throws IOException, InterruptedException
{
try
{
JSONObject jsonObj = new JSONObject(mvalue.toString());
//parse the input data with JSONObject
String country = (String)jsonObj.get("country");
String state = (String)jsonObj.get("state");
String city = (String)jsonObj.get("city");
String street = (String)jsonObj.get("street");
String zip = (String)jsonObj.get("zip");
StringBuilder key = new StringBuilder();
key.append(country);
key.append("/");
key.append(state);
key.append("/");
key.append(city);
key.append("/");
key.append(street);
key.append("/");
key.append(zip);
//emitting directory structure as key and input record as value.
context.write(new Text(key.toString()), mvalue);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
public static class MultipleOutputsReducer extends Reducer<Text, Text ,NullWritable, Text>
{
private MultipleOutputs<NullWritable,Text> multipleOutputs;
public void setup(Context context) throws IOException, InterruptedException
{
multipleOutputs = new MultipleOutputs<NullWritable, Text>(context);
}
public void reduce(Text rkey, Iterable<Text> rvalue, Context context) throws IOException, InterruptedException
{
for(Text value : rvalue) {
multipleOutputs.write(NullWritable.get(), value, rkey.toString()); }
}
public void cleanup(Context context) throws IOException, InterruptedException
{
multipleOutputs.close();
}
}
public int run(String[] args) throws Exception
{
Configuration conf = new Configuration();
String inputpath = args[0];
String outputpath = args[1];
FileSystem fs = FileSystem.get(conf);
Job job = new Job(conf, "PartitionByMultipleOutputs");
job.setJarByClass(PartitionByMultipleOutputs.class);
job.setMapperClass(MultipleOutputsMapper.class);
job.setReducerClass(MultipleOutputsReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.getConfiguration().set("mapred.child.java.opts","-Xmx2048m");
job.getConfiguration().setInt("mapreduce.map.memory.mb",2048);
job.getConfiguration().setInt("mapreduce.reduce.memory.mb",2048);
job.getConfiguration().set("mapreduce.map.java.opts","-Xmx2048m");
job.getConfiguration().set("mapreduce.reduce.java.opts","-Xmx2048m");
job.getConfiguration().setBoolean("mapreduce.reduce.speculative", true);
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, new Path(inputpath));
FileOutputFormat.setOutputPath(job, new Path(outputpath));
if(fs.exists(new Path(outputpath)))
{
fs.delete(new Path(outputpath), true);
}
job.waitForCompletion(true);
return 0;
}
public static void main(String[] args) throws Exception
{
int exitCode = ToolRunner.run(new PartitionByMultipleOutputs(), args);
System.exit(exitCode);
}
}