-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrelativeFrequency.java
More file actions
129 lines (101 loc) · 3.28 KB
/
relativeFrequency.java
File metadata and controls
129 lines (101 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import java.io.IOException;
import java.util.TreeSet;
class wordPair implements Comparable<wordPair> {
double Frequency;
String key;
String value;
wordPair(double Frequency, String key, String value){
this.Frequency = Frequency;
this.key = key;
this.value = value;
}
public int compareTo(wordPair wordpair){
if (this.Frequency >= wordpair.Frequency){
return 1;
}
else{
return -1;
}
}
}
public class relativeFrequency {
public static void main(String[] args) throws Exception {
Job job = new Job(new Configuration());
job.setJarByClass(relativeFrequency.class);
if(args.length != 2) {
System.err.println("Useage <inputFile> <outputDir>");
System.exit(2);
}
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Merge.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean status = job.waitForCompletion(true);
if(status){
System.exit(0);
}
else{
System.exit(1);
}
}
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text text, Context context) throws IOException, InterruptedException {
String sepRegex = " ";
String seperator = ",";
String[] words = text.toString().split(sepRegex);
for(int i=1;i<words.length;i++){
String tempString = words[i-1].toLowerCase()+seperator+words[i].toLowerCase();
context.write(new Text(tempString), new Text("1"));
}
}
}
private static class Merge extends Reducer<Text, Text, Text, Text> {
public void reduce(Text text, Iterable<Text> iterableValues, Context context) throws IOException, InterruptedException {
int count = 0;
for (Text value : iterableValues) {
count = count + Integer.parseInt(value.toString());
}
context.write(text, new Text(String.valueOf(count)));
}
}
public static class Reduce extends Reducer<Text, Text, Text, Text> {
TreeSet<wordPair> treeSet = new TreeSet<wordPair>();
double total = 0;
public void reduce(Text text, Iterable<Text> iteratorValue, Context context) throws IOException, InterruptedException {
int count = 0;
String regex = ".*\\\\*";
String seperator = ",";
for (Text valueIn : iteratorValue){
count = count + Integer.parseInt(valueIn.toString());
}
if (text.toString().matches(regex)){
total = count;
}
else{
String[] pairOfWords = text.toString().split(seperator);
treeSet.add(new wordPair(count/total, pairOfWords[0], pairOfWords[1]));
if (treeSet.size() > 100){
treeSet.pollFirst();
}
}
}
protected void cleanup(Context context) throws IOException, InterruptedException{
while (!treeSet.isEmpty()){
wordPair pair = treeSet.pollLast();
context.write(new Text(pair.key), new Text(pair.value));
}
}
}
}