-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathNGramProcessor.py
More file actions
400 lines (331 loc) · 14.9 KB
/
NGramProcessor.py
File metadata and controls
400 lines (331 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
import pandas as pd
import numpy as np
from collections import defaultdict
from tqdm import tqdm
from joblib import Parallel, delayed
import os
import matplotlib.pyplot as plt
tqdm.pandas()
class NGramProcessor:
"""
This class is used to create n-gram language models
"""
def __init__(self, df_train, n=1):
"""
Args:
df_train (pd.DataFrame): Preprocessed training dataset
ngram (int, optional): Value of n for the n-gram. Defaults to 1.
"""
self.n = n
self.df_train = df_train
pass
def ngrams(self, comment, n=1):
"""Function to create n-grams from a comment
Args:
comment (str): Comment from the dataset
n (int, optional): Value of n for the n-gram. Defaults to 1.
Returns:
List: List of n-grams from the comment
"""
words = comment.split()
# for all the words also split based on \n or \t
all_words = []
for word in words:
all_words.extend(word.split("\n"))
return [
" ".join(all_words[i : i + n])
for i in range(4 - self.n, len(all_words) - n - 1)
]
def create_frequency_dict(self, n):
"""Function to create a frequency dictionary for n-grams
Args:
n (int): Value of n for the n-gram
Returns:
Dict: Dictionary of n-grams and their frequencies
"""
# ngrams = self.ngrams(self.df_train, n, startIdx)
ngrams_series = self.df_train["Sentences"].progress_apply(
self.ngrams, args=(n,)
)
ngrams = ngrams_series.explode().tolist()
frequency_dict = defaultdict(int)
for item in tqdm(ngrams, desc=f"Creating frequency dictionary for {n}-grams"):
frequency_dict[item] += 1
return frequency_dict
def train(self):
"""Function to train the n-gram model"""
self.vocab_size = self.__get_vocab_size()
self.frequency_dict = self.create_frequency_dict(self.n)
self.frequency_dict_size = sum(self.frequency_dict.values())
if self.n != 1:
self.frequency_dict_n_1 = self.create_frequency_dict(self.n - 1)
def compute_log_probability(self, key, smoothing=None, k=1):
"""Function to compute the log probability of an n-gram
Args:
key (str): n-gram
smoothing (str, optional): Type of smoothing to be used. Defaults to None.
k (int, optional): Value of k for additive smoothing. Defaults to 1 -> Laplace Smoothing.
Returns:
Tuple: Tuple containing the n-gram (key) and its log probability
"""
if smoothing == "laplace" or smoothing == "additive":
if self.n == 1:
if bool(self.frequency_dict.get(key)):
return key, np.log2(self.frequency_dict[key] + k) - np.log2(
sum(self.frequency_dict.values()) + k * self.vocab_size
)
else:
return key, np.log2(k) - np.log2(
sum(self.frequency_dict.values()) + k * self.vocab_size
)
else:
key_n_1 = " ".join(key.split()[: self.n - 1])
if bool(self.frequency_dict_n_1.get(key_n_1)):
if bool(self.frequency_dict.get(key)):
return key, np.log2(self.frequency_dict[key] + k) - np.log2(
self.frequency_dict_n_1[key_n_1] + k * self.vocab_size
)
else:
return key, np.log2(k) - np.log2(
self.frequency_dict_n_1[key_n_1] + k * self.vocab_size
)
else:
return key, -np.log2(self.vocab_size)
elif smoothing == "turing":
updated_frequency_dict = self.turing_frequency_dict
unseen_words_probability = np.log2(
self.frequency_of_frequency[1] / self.frequency_dict_size
)
if self.n == 1:
if bool(updated_frequency_dict.get(key)):
return key, np.log2(updated_frequency_dict[key]) - np.log2(
sum(self.frequency_dict.values())
)
else:
return key, unseen_words_probability
else:
key_n_1 = " ".join(key.split()[: self.n - 1])
if bool(updated_frequency_dict.get(key)):
return key, np.log2(updated_frequency_dict[key]) - np.log2(
self.frequency_dict_n_1[key_n_1]
)
else:
return key, unseen_words_probability
elif smoothing is None:
if self.n == 1:
if bool(self.frequency_dict.get(key)):
return key, np.log2(self.frequency_dict[key]) - np.log2(
sum(self.frequency_dict.values())
)
else:
return key, -np.inf
else:
key_n_1 = " ".join(key.split()[: self.n - 1])
if bool(self.frequency_dict.get(key)):
return key, np.log2(self.frequency_dict[key]) - np.log2(
self.frequency_dict_n_1[key_n_1]
)
else:
return key, -np.inf
def find_probability(self, save_csv="sample.csv"):
"""Function to find the log probability of n-grams
Args:
save_csv (str, optional): Path to save the csv file. Defaults to 'sample.csv'.
Returns:
pd.DataFrame: Dataframe containing the n-grams and their log probabilities
"""
keys = list(self.frequency_dict.keys())
probability_results = []
for key in tqdm(keys, desc=f"Finding log probability for {self.n}-grams"):
probability_results.append(self.compute_log_probability(key))
log_probability_dict = dict(probability_results)
df = pd.DataFrame(
list(log_probability_dict.items()), columns=["Comment", "Probability"]
)
df.to_csv(save_csv, index=False)
print(f"Saved {self.n}-gram probabilities to {save_csv}")
return df
def calculate_sentence_perplexity(self, sentence, log_probability_dict):
"""
Function to calculate the perplexity of a sentence
Args:
sentence (str): Sentence for which perplexity is to be calculated
log_probability_dict (dict): Dictionary containing the log probabilities of n-grams
Returns:
float: Perplexity of the sentence
"""
ngrams_of_sentence = self.ngrams(sentence, self.n)
total_log_prob = 0
for ngram_set in ngrams_of_sentence:
total_log_prob += log_probability_dict[ngram_set]
perplexity = 2 ** (-total_log_prob / len(ngrams_of_sentence))
return perplexity
def calc_perplexity(
self,
df_test,
smoothing=None,
perplexity_csv="perplexity.csv",
log_prob_save_csv="logprob.csv",
k=1,
):
"""Function to find the probability of n-grams
Args:
df_test (pd.DataFrame): Preprocessed test dataset
smoothing (str, optional): Type of smoothing to be used. Defaults to None.
perplexity_csv (str, optional): Path to save the csv file containing perplexities of the word. Defaults to 'sample.csv'.
log_prob_save_csv (str, optional): Path to save the log probability of each word in a csv file. Defaults to 'logprob.csv'.
Returns:
pd.DataFrame: Dataframe containing the n-grams and their probabilities
"""
ngrams_series = df_test["Sentences"].progress_apply(self.ngrams, args=(self.n,))
ngrams = ngrams_series.explode().tolist()
save_csv_dir_path = perplexity_csv[: -len(perplexity_csv.split("/")[-1]) - 1]
if save_csv_dir_path != "":
os.makedirs(save_csv_dir_path, exist_ok=True)
probability_results = []
if smoothing == "turing":
self.__populate_turing()
elif smoothing is None:
log_prob_save_csv = None
for key in tqdm(
list(set(ngrams)),
desc=f"Finding log probability for {self.n}-grams with {smoothing} smoothing",
):
probability_results.append(
self.compute_log_probability(key, smoothing=smoothing, k=k)
)
log_probability_dict = dict(probability_results)
# save the log probability dict
if log_prob_save_csv is not None:
log_prob_save_csv_dir_path = log_prob_save_csv[
: -len(log_prob_save_csv.split("/")[-1]) - 1
]
if log_prob_save_csv_dir_path != "":
os.makedirs(log_prob_save_csv_dir_path, exist_ok=True)
df = pd.DataFrame(
list(log_probability_dict.items()),
columns=["Comment", "Log Probability"],
)
df.to_csv(log_prob_save_csv, index=False)
print(f"Saved {self.n}-gram probabilities to {log_prob_save_csv}")
sentences = df_test["Sentences"].tolist()
perplexity_scores = []
for sentence in tqdm(
sentences, desc=f"Calculating perplexity for {self.n}-grams"
):
perplexity_scores.append(
self.calculate_sentence_perplexity(sentence, log_probability_dict)
)
df = pd.DataFrame({"Comment": sentences, "Perplexity": perplexity_scores})
avg_perplexity = np.mean(perplexity_scores)
print(
f"Average perplexity for {self.n}-grams: {avg_perplexity} with {smoothing} smoothing"
)
if df_test.equals(self.df_train):
if k == 1:
avg_file = (
f"average_perplexity/avg_perplexity_{smoothing}_smoothing_train.csv"
)
else:
avg_file = f"average_perplexity/avg_perplexity_{smoothing}_{k}_smoothing_train.csv"
else:
if k == 1:
avg_file = (
f"average_perplexity/avg_perplexity_{smoothing}_smoothing_test.csv"
)
else:
avg_file = f"average_perplexity/avg_perplexity_{smoothing}_{k}_smoothing_test.csv"
# if avg file does not exist, create it
if not os.path.exists(avg_file):
temp_df = pd.DataFrame(
{
"n": ["Unigram", "Bigram", "Trigram", "Quadgram"],
"Average Perplexity": [0, 0, 0, 0],
}
)
temp_df.to_csv(avg_file, index=False)
perp_df = pd.read_csv(avg_file)
perp_df.loc[self.n - 1, "Average Perplexity"] = avg_perplexity
perp_df.to_csv(avg_file, index=False)
df.to_csv(perplexity_csv, index=False)
print(
f"Saved {self.n}-gram perplexities with {smoothing} smoothing to {perplexity_csv}"
)
return df
def __populate_turing(self):
"""
Function to populate the frequency of frequency dictionary for turing smoothing
"""
self.frequency_of_frequency = {}
for key in self.frequency_dict.keys():
if self.frequency_dict[key] not in self.frequency_of_frequency:
self.frequency_of_frequency[self.frequency_dict[key]] = 1
else:
self.frequency_of_frequency[self.frequency_dict[key]] += 1
# Find frequency of frequency for unseen words that we need
interpolated_frequency_of_frequency = {}
for count in self.frequency_of_frequency:
if count + 1 not in self.frequency_of_frequency:
if count == max(self.frequency_of_frequency.keys()):
# Interpolate the frequency of frequency for count+1 using the previous nearest frequency
nearest_freq = count - 1
while nearest_freq not in self.frequency_of_frequency:
nearest_freq -= 1
slope = (
self.frequency_of_frequency[nearest_freq]
- self.frequency_of_frequency[count]
) / (nearest_freq - count)
intercept = self.frequency_of_frequency[count] - slope * count
interpolated_frequency_of_frequency[count + 1] = (
slope * (count + 1) + intercept
)
else:
# Interpolate the frequency of frequency for count+1 using the next nearest frequency
nearest_freq = count + 1
while nearest_freq not in self.frequency_of_frequency:
nearest_freq += 1
slope = (
self.frequency_of_frequency[nearest_freq]
- self.frequency_of_frequency[count]
) / (nearest_freq - count)
intercept = self.frequency_of_frequency[count] - slope * count
interpolated_frequency_of_frequency[count + 1] = (
slope * (count + 1) + intercept
)
# Update the frequency of frequency dictionary
for key in interpolated_frequency_of_frequency:
if key not in self.frequency_of_frequency:
self.frequency_of_frequency[key] = interpolated_frequency_of_frequency[
key
]
self.turing_frequency_dict = {}
for key in self.frequency_dict.keys():
count = self.frequency_dict[key]
freq_count = self.frequency_of_frequency[count]
freq_count_plus = self.frequency_of_frequency[count + 1]
self.turing_frequency_dict[key] = (
(count + 1) * freq_count_plus
) / freq_count
#####################################################
# Code to create a csv file for the original count and the new count :
# original_count = []
# new_count = []
# for key in self.frequency_dict.keys():
# if self.frequency_dict[key] not in original_count:
# original_count.append(self.frequency_dict[key])
# new_count.append(self.turing_frequency_dict[key])
# df = pd.DataFrame(
# {'Original Count': original_count, 'New Count': new_count})
# df.to_csv(
# f'turing_counts/original_new_count_{self.n}-gram.csv', index=False)
def __get_vocab_size(self):
"""
Function to get the vocabulary size i.e. the number of unique words in the dataset.
"""
words_set = set()
for sentence in self.df_train["Sentences"]:
words = sentence.split()
for word in words:
words_set.add(word)
self.vocab_size = len(words_set) - 2
return len(words_set) - 2