forked from AustinPolk/alite-implementation
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase.py
More file actions
151 lines (113 loc) · 6 KB
/
database.py
File metadata and controls
151 lines (113 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
from table import RelationalTable
from sentence_transformers import SentenceTransformer
from column_clustering import ColumnClustering
from sklearn.metrics import silhouette_score
import numpy as np
class RelationalDatabase:
def __init__(self):
self.Tables: list[RelationalTable] = []
self.IntegrationIDsAssigned: bool = False
# for benchmarking purposes
self.SilhouetteScores: dict[int, float] = {}
self.ColumnClusterSizes: list[int] = None
# Load all CSV files within the folder into tables in this database
def LoadFromFolder(self, data_folder: str):
for root, dirs, files in os.walk(data_folder):
if os.path.realpath(root) == os.path.realpath(data_folder):
for file in files:
print(f"Loading data from file {file} into relational table")
new_table = RelationalTable()
filepath = os.path.join(root, file)
new_table.LoadFromCSV(filepath)
self.Tables.append(new_table)
def TupleCount(self):
return sum(table.TupleCount() for table in self.Tables)
# Assign integration IDs to the columns of each table in the database
def AssignIntegrationIDs(self):
# load a pretrained transformer
model = SentenceTransformer("all-MiniLM-L6-v2")
# minimum and maximum columns that could be in the full disjunction
minimum_columns = 0
maximum_columns = 0
# initialize the tables with unique integration IDs and column embeddings
offset = 0
all_integrationIDs = []
all_column_embeddings = {}
from_table = []
for idx, table in enumerate(self.Tables):
print(f"Initializing table {idx}")
offset = table.InitializeIntegrationIDs(offset)
table.InitializeColumnEmbeddings(model)
column_count = len(table.ColumnNames)
# minimum columns is the size of the largest single table
if not minimum_columns or column_count > minimum_columns:
minimum_columns = column_count
# maximum columns is the sum of the sizes of all tables
maximum_columns += column_count
#print(f"Table {idx} embeddings: {table.ColumnEmbeddings}")
all_column_embeddings.update(table.ColumnEmbeddings)
all_integrationIDs.extend(table.IntegrationIDToColumnIndex.keys())
from_table.extend([idx]*column_count)
all_embeddings = list(all_column_embeddings.values())
print(f"Total embeddings: {len(all_embeddings)}")
print(f"Minimum columns: {minimum_columns}\tMaximum columns: {maximum_columns}")
# compute all possible clusterings here, choose from them below
print("Clustering column embeddings")
column_clustering = ColumnClustering(min_clusters=minimum_columns)
column_clustering.fit(all_embeddings, from_table)
best_clustering = None
best_score = -1
# try all possible cluster sizes, select the size that maximizes silhouette score
for n_clusters in range(minimum_columns, maximum_columns):
if n_clusters not in column_clustering.labels:
print(f"Skipping {n_clusters} clusters")
continue
cluster_labels = column_clustering.labels[n_clusters]
silhouette = silhouette_score(all_embeddings, cluster_labels)
self.SilhouetteScores[n_clusters] = silhouette
print(f"Silhouette score for {n_clusters} clusters: {silhouette}")
if best_score < silhouette:
best_score = silhouette
best_clustering = cluster_labels
print(f"Best clustering achieved using {len(set(best_clustering))} clusters")
self.ColumnClusterSizes = [minimum_columns, maximum_columns, len(set(best_clustering))]
# now cluster the table columns with this model
column_clusters = {id: cluster for cluster, id in zip(best_clustering, all_integrationIDs)}
# now reassign the table column names to be which cluster that column is in (the cluster is the new integration ID)
for idx, table in enumerate(self.Tables):
table.RenameColumns(column_clusters)
print(f"Table {idx} ({table.TableName}) final integration IDs: {table.DataFrame.columns}")
print(f"Integration ID Mapping: {table.ColumnNames}")
self.IntegrationIDsAssigned = True
print("Integration IDs assigned to all tables.")
# Run the ALITE algorithm on the database
def RunALITE(self):
# Step 1: Assign integration IDs
if not self.IntegrationIDsAssigned:
self.AssignIntegrationIDs()
# Step 2: Create a new table for the full disjunction
fullDisjunction = RelationalTable()
fullDisjunction.saveToFile("1 - Initial")
print("Outer Union Start")
# Step 3: Generate labeled nulls for each table and perform outer union
for table in self.Tables:
table.GenerateLabeledNulls()
fullDisjunction.OuterUnionWith(table)
fullDisjunction.saveToFile("2 - PostOuterJoinAndLabelledNulls")
print("Outer Union Done")
print(f"Tuple count: {fullDisjunction.TupleCount()}")
print("Complement Start")
# Step 4: Complement phase
fullDisjunction.Complement()
fullDisjunction.saveToFile("3 - PostComplement")
print(f"Tuple count: {fullDisjunction.TupleCount()}")
print("Complement Done")
# Step 5: Replace labeled nulls with actual values (if any replacement logic applies)
fullDisjunction.ReplaceLabeledNulls()
fullDisjunction.saveToFile("4 - ReplacingLabelledNulls")
# Step 6: Subsumption - remove subsumable tuples
fullDisjunction.SubsumeTuples()
fullDisjunction.saveToFile("5 - PostSubsumption")
print(f"Tuple count: {fullDisjunction.TupleCount()}")
return fullDisjunction