-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathq5.java
More file actions
103 lines (85 loc) · 3.73 KB
/
q5.java
File metadata and controls
103 lines (85 loc) · 3.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package tde1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;
public class q5 {
public static void main(String[] args) throws IOException,
ClassNotFoundException,
InterruptedException {
BasicConfigurator.configure();
Configuration c = new Configuration();
String[] files = new GenericOptionsParser(c, args).getRemainingArgs();
// arquivo de entrada
Path input = new Path("in/transactions_amostra.csv");
// arquivo de saida
Path output = new Path("output/q5");
Job j = new Job(c, "q5");
j.setJarByClass(q5.class);
j.setMapperClass(Mapq5.class);
j.setReducerClass(Reduceq5.class);
j.setCombinerClass(Combineq5.class);
j.setMapOutputKeyClass(Text.class);
j.setMapOutputValueClass(Commodity.class);
j.setOutputKeyClass(Text.class);
j.setOutputValueClass(DoubleWritable.class);
FileInputFormat.addInputPath(j, input);
FileOutputFormat.setOutputPath(j, output);
j.waitForCompletion(false);
}
public static class Mapq5 extends Mapper<LongWritable, Text, Text, Commodity> {
// Funcao de map
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String linha = value.toString();
// ignora o cabeçalho
if(linha.startsWith("country_or_area")) return;
String[] colunas = linha.split(";");
//pega as colunas flow type, pais, unit, categoria e preço
String flow = colunas[4];
String pais = colunas[0];
String unit = colunas[7];
String category = colunas[9];
String ano = colunas[1];
String price = colunas[5];
// condição que filtra para que o pais seja Brazil e o flow type seja Export
if (pais.compareTo("Brazil") == 0 && flow.compareTo("Export") == 0) {
//write das chaves e o valor sendo um objeto com um n de ocorrencias e um valor de preço das da coluna
con.write(new Text(unit + " " + ano + " " + category + " " + flow + " " + pais), new Commodity(1, Double.parseDouble(price)));
}
}
}
public static class Combineq5 extends Reducer<Text, q6Writable, Text, Commodity> {
public void reduce(Text key, Iterable<Commodity> values, Context con)
throws IOException, InterruptedException {
//O combine faz a soma feita no reduce com antecedencia para poupar tempo
int soma = 0;
int n = 0;
for (Commodity i : values) {
soma += i.getSoma();
n += i.getN();
}
con.write(key, new Commodity(n, soma));
}
}
public static class Reduceq5 extends Reducer<Text, Commodity, Text, DoubleWritable> {
public void reduce(Text key, Iterable<Commodity> values, Context con)
throws IOException, InterruptedException {
// Somando as vezes que algumas transação ocorreu
int soma = 0;
int n = 0;
for (Commodity i : values) {
soma += i.getSoma();
n += i.getN();
}
con.write(key, new DoubleWritable((double) soma / n));
}
}
}