-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdecision_tree.py
More file actions
248 lines (197 loc) · 6.49 KB
/
decision_tree.py
File metadata and controls
248 lines (197 loc) · 6.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import numpy as np
from sklearn.base import BaseEstimator, ClassifierMixin
from numpy import log2
class DTNode:
def __init__(self, data, splits, prev_splits=[]):
"""
:param split_data: data split into n
"""
self.data = data
self.splits = splits
self.split_idx = None
self.children = {}
self.prev_splits = prev_splits
self.most_common_split = None
return
def depth(self):
"""
gets depth of the tree
:return:
"""
depths = [0]
for child in self.children:
if type(child) == DTNode:
depths.append(child.depth())
elif type(child) == DTLeafNode:
return depths.append(1)
return 1 + max(depths)
def predict(self, x):
"""
Recurse down the tree to the bottom to get label
:param x: input to classify
:return:
"""
if self.split_idx == None:
return self.children["label"].predict(x)
f_idx = x[self.split_idx]
if f_idx in self.children.keys():
return self.children[f_idx].predict(x)
else:
return self.get_most_common_split().predict(x)
def get_most_common_split(self):
"""
gets the most common split given data feature not in splits
:return:
"""
return self.children[self.most_common_split]
class DTLeafNode(DTNode):
def __init__(self, label):
super().__init__(None, None)
self.label = label
def predict(self, x):
return self.label
class DTClassifier(BaseEstimator,ClassifierMixin):
def __init__(self,counts=None):
""" Initialize class with chosen hyperparameters.
Args:
counts = how many types for each attribute
Example:
DT = DTClassifier()
"""
self.counts = counts
self.root = None
def fit(self, X, y):
""" Fit the data; Make the Desicion tree
Args:
X (array-like): A 2D numpy array with the training data, excluding targets
y (array-like): A 2D numpy array with the training targets
Returns:
self: this allows this to be chained, e.g. model.fit(X,y).predict(X_test)
"""
X = np.hstack((X,y))
# initialize root
self.root = DTNode(X, self.get_splits(X, []))
self.recurse_learn(self.root)
return self
def recurse_learn(self, node):
"""
Recurse on the given node and expand children
:param node:
:return:
"""
if len(np.unique(node.data[:,-1])) == 1:
# add a lead node with the correct label
self.add_children(node,"label")
return
else:
# add children
self.split_node(node)
for k in node.children.keys():
# recurse
self.recurse_learn(node.children[k])
return
def split_node(self, node):
"""
Finds the best feature to split on the given node
:param node: node to split on
:return:
"""
# get minimum entropy feature
gain_idx = self.find_max_gain(node)
# add the children
self.add_children(node, gain_idx)
return
def find_max_gain(self, node):
"""
Finds the feature with the highest information gain from the splits in the given node
:param DTNode: node
:return:
"""
infos = []
keys = list(node.splits.keys())
for k in keys:
split = node.splits[k]
total = split['total']
del split['total']
info = 0
for k in split.keys():
d = split[k]
n = len(d)
# calcualte entropy for the given feature based on labels
s = self.split_data(d, len(d[0])-1)
info += n/total*sum([(-len(s)/n)*log2(len(s)/n)])
infos.append(info)
# get smallest entropy feature
max_idx = keys[np.argmin(infos)]
# keep track of features used
node.split_idx = max_idx
return max_idx
def add_children(self, node, split_idx):
"""
Adds children to the node
:param node:
:param split_idx:
:return:
"""
if split_idx == "label":
node.children['label'] = DTLeafNode(node.data[0, -1])
return self
# save children as that split
split = node.splits[split_idx]
max_data = 0
max_data_idx = 0
for k in split.keys():
data = split[k]
splits = self.get_splits(data, node.prev_splits)
node.children[k] = DTNode(data, splits, node.prev_splits + [split_idx])
if len(data) > max_data:
max_data = len(data)
max_data_idx = k
node.most_common_split = max_data_idx
return self
def get_splits(self, data, prev):
"""
Gets splits along all features in the data
:param data:
:return:
"""
splits = {}
for i in range(len(data[0]) - 1):
if i not in prev:
splits[i] = self.split_data(data, i)
return splits
def split_data(self, data, i):
"""
gets splits data for a given feature
:param data: data to split
:param i: feature number to split data on
:return: dict: all splits based on unique labels
"""
feature = data[:,i]
vals = np.unique(feature)
splits = {}
t = 0
for val in vals:
s = data[feature == val]
t += len(s)
splits[val] = s
# include total data points across splits
splits['total'] = t
return splits
def predict(self, X):
""" Predict all classes for a dataset X
Args:
X (array-like): A 2D numpy array with the training data, excluding targets
Returns:
array, shape (n_samples,)
Predicted target values per element in X.
"""
return np.array([self.root.predict(x) for x in X])
def score(self, X, y):
""" Return accuracy of model on a given dataset. Must implement own score function.
Args:
X (array-like): A 2D numpy array with data, excluding targets
y (array-li def _shuffle_data(self, X, y):
"""
s = self.predict(X) == y.flatten()
return sum(s) / len(s)