-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathq1.java
More file actions
93 lines (75 loc) · 3.26 KB
/
q1.java
File metadata and controls
93 lines (75 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package tde1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class q1 {
public static void main(String[] args) throws IOException,
ClassNotFoundException,
InterruptedException {
BasicConfigurator.configure();
Configuration c = new Configuration();
String[] files = new GenericOptionsParser(c, args).getRemainingArgs();
// arquivo de entrada
Path input = new Path("in/transactions_amostra.csv");
// arquivo de saida
Path output = new Path("output/q1");
Job j = new Job(c, "q1");
j.setJarByClass(q1.class);
j.setReducerClass(Reduceq1.class);
j.setMapperClass(Mapq1.class);
j.setCombinerClass(Combineq1.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(IntWritable.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(j, input);
FileOutputFormat.setOutputPath(j, output);
j.waitForCompletion(false);
}
public static class Mapq1 extends Mapper<LongWritable, Text, Text, IntWritable> {
// Funcao de map
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String linha = value.toString();
//ignorando cabeçalho
if(linha.startsWith("country_or_area")) return;
String[] colunas = linha.split(";");
String pais = colunas[0]; //coluna dos países
// write apenas quando pais for "Brazil"
if(pais.compareTo("Brazil")==0 ){
//write com chave brasil e passando um IntWritable(1) que simboliza uma transação de Brazil
con.write(new Text("Brazil"), new IntWritable(1));
}
}
}
public static class Combineq1 extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context con)
throws IOException, InterruptedException {
//O combine faz a soma feita no reduce com antecedencia para poupar tempo
int soma = 0;
for (IntWritable i : values){ // passa pela lista de transações do Brasil
soma += i.get();
}
con.write(key, new IntWritable(soma));
}
}
public static class Reduceq1 extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context con)
throws IOException, InterruptedException {
// Somando as vezes que as transações do Brasil ocorreram
int soma = 0;
for (IntWritable i : values){
soma += i.get();
}
con.write(key, new IntWritable(soma));
}
}
}