From 2543988b21dde98fa608f98cf9cf45d943a33c24 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Tue, 17 Oct 2017 17:40:46 -0500 Subject: [PATCH 01/20] update to agree with drain result API --- lead/features/wic.py | 6 +++--- lead/model/data.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lead/features/wic.py b/lead/features/wic.py index 91c57ca..c6f991d 100644 --- a/lead/features/wic.py +++ b/lead/features/wic.py @@ -46,7 +46,7 @@ def __init__(self, spacedeltas, dates, parallel=False): parallel=parallel) def get_aggregates(self, date, delta): - enroll = self.inputs[0].get_result() + enroll = self.inputs[0].result aggregates = [ Aggregate('medical_risk', 'any', fname=False), Aggregate(['household_size', 'household_income'], @@ -89,7 +89,7 @@ def __init__(self, spacedeltas, dates, parallel=False): parallel=parallel) def get_aggregates(self, date, delta): - births = self.inputs[0].get_result() + births = self.inputs[0].result aggregates = [ Aggregate('length', 'max', fname=False), Aggregate('weight', 'max', fname=False), @@ -125,7 +125,7 @@ def __init__(self, spacedeltas, dates, parallel=False): parallel=parallel) def get_aggregates(self, date, delta): - prenatal = self.inputs[0].get_result() + prenatal = self.inputs[0].result aggregates = [ Count(), diff --git a/lead/model/data.py b/lead/model/data.py index 635d3ac..62e16a9 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -94,7 +94,7 @@ def run(self, acs, left, aux=None): left = data.binarize(left, ['community_area_id', 'ward_id'], astype=self.dtype) logging.info('Joining aggregations') - X = left.join([a.get_result() for a in self.aggregation_joins] + [acs]) + X = left.join([a.result for a in self.aggregation_joins] + [acs]) # delete all aggregation inputs so that memory can be freed for a in self.aggregation_joins: del a._result From 45cabc95f41c56e2eced13539a87d5a5aaf4cb5c Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Tue, 17 Oct 2017 20:04:29 -0500 Subject: [PATCH 02/20] update data.py for result API --- lead/model/data.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lead/model/data.py b/lead/model/data.py index 62e16a9..b3061ef 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -96,7 +96,7 @@ def run(self, acs, left, aux=None): logging.info('Joining aggregations') X = left.join([a.result for a in self.aggregation_joins] + [acs]) # delete all aggregation inputs so that memory can be freed - for a in self.aggregation_joins: del a._result + for a in self.aggregation_joins: del a.result if not self.address: logging.info('Adding auxillary features') From 1957d57d93f1973698b762074550c2e91081a7fb Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Tue, 7 Nov 2017 16:36:34 -0600 Subject: [PATCH 03/20] update requirements.txt to use drain from github --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 2bb3cec..9377c0a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -drain +git+https://github.com/potash/drain git+https://github.com/potash/scikit-learn@merged/balanced-random-forest#egg=scikit-learn From ef00eb4fe76aafeb7c95e9f13de4e85507ea7e04 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Thu, 21 Dec 2017 12:58:39 -0600 Subject: [PATCH 04/20] Predict on new addresses (#5) Changes to be able to predict on new addresses that are in a geography combination that is new. * add outcome_where_expr argument to match drain missing outcome handling * tested and incorporated into workflows * include (block,ward,community) pieces in address dataset * use drain from github in requirements.txt * fix index * include ward * fix geographies --- lead/model/address.py | 33 ++++++++++++++++++++++++++++++++- lead/model/data.py | 11 ++++++----- lead/model/transform.py | 10 ++++++++-- lead/model/workflows.py | 3 ++- 4 files changed, 48 insertions(+), 9 deletions(-) diff --git a/lead/model/address.py b/lead/model/address.py index 63250be..27b55a5 100644 --- a/lead/model/address.py +++ b/lead/model/address.py @@ -6,7 +6,38 @@ import numpy as np import logging -addresses = FromSQL(table='output.addresses') +# in addition to all addresses, we add all cells in the partition +# created by intersecting blocks, wards and communities +# in anticipation of any new addresses in deployment +addresses = FromSQL(""" +with blocks as ( +select + b.geoid10::double precision census_block_id, + substring(b.geoid10 for 11)::double precision census_tract_id, + c.area_numbe::int community_area_id, + w.ward::int ward_id +from input.census_blocks b +join input.community_areas c + on st_intersects(b.geom, c.geom) +join input.wards w + on st_intersects(b.geom, w.geom) and st_intersects(c.geom, w.geom) +group by 1,2,3,4 +) +select + null address, + null address_lat, + null address_lng, + null as address_id, + null as building_id, + null as complex_id, * +from blocks +UNION ALL +select address, address_lat, address_lng, + address_id, building_id, complex_id, + census_block_id, census_tract_id, + community_area_id, ward_id +from output.addresses + """, tables=['output.addresses', 'input.census_blocks', 'input.census_tracts', 'input.community_areas', 'input.wards']) addresses.target = True class LeadAddressLeft(Step): diff --git a/lead/model/data.py b/lead/model/data.py index b3061ef..69ca99b 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -32,8 +32,8 @@ def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, add year_max: the year to stop generating features wic_lag: a lag for the WIC aggregations, parsed by drain.data.parse_delta, e.g. '6m' is a six month lag. - Defaultis to None, which is no lag. - dtype: the dtype to use for features. Defaults to np.float16. + Default is to None, which is no lag. + dtype: the dtype to use for features. Defaults to np.float16 for memory efficiency. address: whether to build an address dataset. Defaults to False, which builds a kid dataset. """ @@ -83,15 +83,16 @@ def run(self, acs, left, aux=None): sample weights, and evaluation. """ if self.address: - index_columns = ['address','date'] + index_columns = ['address', 'census_block_id', 'ward_id', 'community_area_id', 'date'] + left_columns = ['address_lat', 'address_lng'] if not self.address: index_columns = ['kid_id', 'address_id', 'date'] + left_columns = ['ward_id', 'community_area_id', 'address_lat', 'address_lng'] - left_columns = ['ward_id', 'community_area_id', 'address_lat', 'address_lng'] left = left[index_columns + left_columns] logging.info('Binarizing community area and ward') - left = data.binarize(left, ['community_area_id', 'ward_id'], astype=self.dtype) + left = data.binarize(left, ['community_area_id', 'ward_id'], astype=self.dtype, drop=(not self.address)) logging.info('Joining aggregations') X = left.join([a.result for a in self.aggregation_joins] + [acs]) diff --git a/lead/model/transform.py b/lead/model/transform.py index 1a201a7..f402659 100644 --- a/lead/model/transform.py +++ b/lead/model/transform.py @@ -14,19 +14,23 @@ class LeadTransform(Step): performing feature selection and creating sample weights. """ def __init__(self, inputs, outcome_expr, aggregations, - wic_sample_weight=0, exclude=[], include=[]): + outcome_where_expr=None, wic_sample_weight=0, + exclude=[], include=[]): """ Args: inputs: list containing a LeadCrossValidate step outcome_expr: the query to perform on the auxillary information to produce an outcome variable aggregations: defines which of the SpacetimeAggregations to include - and which to drop + and which to drop + outcome_where_expr: where to evaluate the outcome_expr, + defaults to None, which means everywhere wic_sample_weight: optional different sample weight for wic kids """ Step.__init__(self, inputs=inputs, outcome_expr=outcome_expr, aggregations=aggregations, + outcome_where_expr=outcome_where_expr, wic_sample_weight=wic_sample_weight, exclude=exclude, include=include) @@ -40,6 +44,8 @@ def run(self, X, aux, train, test): """ y = aux.eval(self.outcome_expr) + if self.outcome_where_expr is not None: + y = y.where(aux.eval(self.outcome_where_expr)) logging.info('Selecting aggregations') aggregations = self.get_input(LeadData).aggregations diff --git a/lead/model/workflows.py b/lead/model/workflows.py index cafa0c8..32191ab 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -143,7 +143,8 @@ def bll6_models(estimators, cv_search={}, transform_search={}): transformd = dict( wic_sample_weight=[0], aggregations=aggregations.args, - outcome_expr=['max_bll0 >= 6'] + outcome_expr='max_bll0 >= 6', + outcome_where_expr='max_bll0 == max_bll0' # this means max_bll0.notnull() ) transformd.update(transform_search) return models(estimators, cvd, transformd) From 6e1f6ce84deeddbd0ea046b9450076e3275d7ad2 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Mon, 5 Feb 2018 13:22:28 -0600 Subject: [PATCH 05/20] save models in historical runs, too --- lead/model/workflows.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lead/model/workflows.py b/lead/model/workflows.py index 32191ab..521465c 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -16,7 +16,7 @@ def bll6_forest(): """ The basic temporal cross-validation workflow """ - return bll6_models(forest()) + return bll6_models(forest(), dump_estimator=True) def bll6_forest_today(): @@ -29,11 +29,9 @@ def bll6_forest_today(): forest(), dict(year=today.year, month=today.month, - day=today.day))[0] - # save the model - p.get_input('fit').target = True - p.get_input('mean').target = True - + day=today.day), + dump_estimator=True)[0] + # put the predictions into the database tosql = data.ToSQL(table_name='predictions', if_exists='replace', inputs=[MapResults([p], mapping=[{'y':'df', 'feature_importances':None}])]) @@ -123,7 +121,7 @@ def forest(**update_kwargs): return [step.Construct(**kwargs)] -def bll6_models(estimators, cv_search={}, transform_search={}): +def bll6_models(estimators, cv_search={}, transform_search={}, dump_estimator=False): """ Provides good defaults for transform_search to models() Args: @@ -147,15 +145,17 @@ def bll6_models(estimators, cv_search={}, transform_search={}): outcome_where_expr='max_bll0 == max_bll0' # this means max_bll0.notnull() ) transformd.update(transform_search) - return models(estimators, cvd, transformd) + return models(estimators, cvd, transformd, dump_estimator=dump_estimator) -def models(estimators, cv_search, transform_search): +def models(estimators, cv_search, transform_search, dump_estimator): """ Grid search prediction workflows. Used by bll6_models, test_models, and product_models. Args: estimators: collection of steps, each of which constructs an estimator cv_search: dictionary of arguments to LeadCrossValidate to search over transform_search: dictionary of arguments to LeadTransform to search over + dump_estimator: whether to dump the estimator (and the mean). + Necessary for re-using the model for more scoring later. Returns: a list drain.model.Predict steps constructed by taking the product of the estimators with the the result of drain.util.dict_product on each of @@ -190,12 +190,16 @@ def models(estimators, cv_search, transform_search): fit = model.Fit(inputs=[estimator, transform], return_estimator=True) fit.name = 'fit' - + y = model.Predict(inputs=[fit, transform], return_feature_importances=True) y.name = 'predict' y.target = True + if dump_estimator: + mean.target = True + fit.target = True + steps.append(y) return steps From 37e6141dd1df397b7c894da1c0f8689801098429 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Wed, 7 Mar 2018 12:18:00 -0600 Subject: [PATCH 06/20] fix investigation inspected, complied features --- lead/features/investigations.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lead/features/investigations.py b/lead/features/investigations.py index 32e6745..29afbba 100644 --- a/lead/features/investigations.py +++ b/lead/features/investigations.py @@ -12,7 +12,7 @@ from drain.data import FromSQL day = np.timedelta64(1, 'D') -CLOSURE_CODES = range(1,12+1) +CLOSURE_CODES = [1,4,12] # complied, court, state attorney DATE_COLUMNS = ['referral_date', 'init_date', 'comply_date', 'closure_date'] DATE_NAMES = ['referral', 'inspection', 'compliance', 'closure'] @@ -72,8 +72,8 @@ def get_aggregates(self, date, delta): aggregates = [ Count(), - Aggregate('inspected', 'max', fname=False), - Aggregate('complied', 'max', fname=False), + Aggregate(lambda i: i.inspected.fillna(0), 'max', name='inspected', fname=False), + Aggregate(lambda i: i.complied.fillna(0), 'max', name='complied', fname=False), Count('hazard_int', prop=True), Count('hazard_ext', prop=True), Count('hazard', prop=True), Count('hazard_both', prop=True), From 407de074f664df31ea08f2bfe89f99d0a1296d43 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Wed, 7 Mar 2018 12:18:42 -0600 Subject: [PATCH 07/20] cull unnecessary kid features and dead code --- lead/features/kids.py | 29 ++++------------------------- 1 file changed, 4 insertions(+), 25 deletions(-) diff --git a/lead/features/kids.py b/lead/features/kids.py index 669dc46..18fc622 100644 --- a/lead/features/kids.py +++ b/lead/features/kids.py @@ -72,17 +72,6 @@ def get_aggregates(self, date, index, delta): Aggregate(['test_address_count', 'address_count', 'test_count'], 'max', fname=False), Aggregate(['max_bll'], 'max', fname=False), - # Comment out this and all other wic aggregates because they can't be lagged - # and they're not useful for predicting poisoning - #Aggregate(lambda k: k.last_wic_date == k.address_wic_max_date, - # 'any', 'last_wic_address', fname=False), - #Aggregate(['address_wic_mother', 'address_wic_infant'], 'any', fname=False), - #Aggregate([days('address_wic_max_date', date), - # days('address_wic_min_date', date), - # days('last_wic_date', date), - # days('first_wic_date', date)], - # ['max'], ['address_wic_min_date', 'address_wic_max_date', - # 'last_wic_date', 'first_wic_date'], fname=False) ] sample_2y = lambda k: ((k.last_sample_date - k.date_of_birth)/day > 365*2) | (k.max_bll >= 6) @@ -91,29 +80,20 @@ def get_aggregates(self, date, index, delta): aggregates = [ counts, Aggregate(['test_address_count', 'test_count', 'address_count'], - ['median', 'mean', 'min', 'max']), + ['mean', 'max']), Count([lambda k: k.address_test_min_date.notnull(), lambda k: k.first_sample_date.notnull()], prop=True, name=['tested_here', 'tested_ever']), - #Count(lambda k: k.first_wic_date.notnull(), prop=True, name='wic'), - - #Count([lambda k: k.address_wic_min_date.notnull() & k.address_test_min_date.notnull(), - # lambda k: k.address_wic_min_date.notnull() & k.first_sample_date.notnull()], - # name=['wic_tested_here', 'wic_tested_ever'], - # prop=lambda k: k.first_wic_date.notnull(), prop_name='wic'), - Aggregate([days('address_min_date', 'address_max_date'), - #days('address_wic_min_date', 'address_wic_max_date'), - days('address_test_min_date', 'address_test_max_date')], - ['mean'], ['address_total_time', #'address_wic_time', - 'address_test_time']), + Aggregate([days('address_min_date', 'address_max_date')], + ['mean'], ['address_total_time']), # the first of these are kid level, not address-kid level # that means kids get double counted when aggregated to above the address level # if they lived in multiple addresses on that e.g. census tract. oh well. - Aggregate(['max_bll', 'avg_bll', 'cumulative_bll', 'avg_cumulative_bll', + Aggregate(['max_bll', 'avg_cumulative_bll', 'mean_bll', 'address_max_bll', 'address_mean_bll'], ['mean', 'median', 'min', 'max']), @@ -143,7 +123,6 @@ def get_aggregates(self, date, index, delta): ] if delta == 'all': aggregates.extend([ - #Aggregate(days('address_wic_min_date', date), ['min', 'max'], 'days_since_wic'), Aggregate(days('date_of_birth', date), ['min', 'max', 'mean'], 'date_of_birth'), ]) From 9fb3df3310b2efbfc471454b81597180d4ca4385 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Wed, 7 Mar 2018 12:21:16 -0600 Subject: [PATCH 08/20] combine paint violations features --- lead/features/violations.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/lead/features/violations.py b/lead/features/violations.py index 9a5df79..215a821 100644 --- a/lead/features/violations.py +++ b/lead/features/violations.py @@ -3,14 +3,16 @@ from drain.aggregation import SpacetimeAggregation from itertools import product -KEYWORDS = ['water', 'paint', 'window', 'wall', 'porch', 'chip', 'flak', 'peel'] +KEYWORDS = (['water','window', 'wall', 'porch', '(paint|chip|flak|peel)'], + ['water','window','wall','porch','paint']) + STATUS = (['OPEN', 'COMPLIED', 'NO ENTRY'], ['open', 'complied', 'no_entry']) KEYWORD_COLUMNS = str.join(', ', ("violation_description ~* '{0}' " - "or violation_inspector_comments ~* '{0}' AS {0}".format(k) - for k in KEYWORDS)) + "or violation_inspector_comments ~* '{0}' AS {1}".format(*k) + for k in zip(*KEYWORDS))) STATUS_COLUMNS = str.join(', ', ("violation_status = '{0}' AS {1}".format(*s) @@ -37,11 +39,11 @@ def __init__(self, spacedeltas, dates, parallel=False): def get_aggregates(self, date, data): aggregates = [ Count(), - Count(KEYWORDS, prop=True), + Count(KEYWORDS[1], prop=True), Count(STATUS[1], prop=True), Count([lambda v,k=k,s=s: v[k] & v[s] - for k,s in product(KEYWORDS, STATUS[1])], prop=True, - name=['%s_%s' % p for p in product(KEYWORDS, STATUS[1])] + for k,s in product(KEYWORDS[1], STATUS[1])], prop=True, + name=['%s_%s' % p for p in product(KEYWORDS[1], STATUS[1])] ) ] From 66993a602409b08b48758c68c65c3cc9f904a3ec Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Wed, 7 Mar 2018 12:21:52 -0600 Subject: [PATCH 09/20] allow external left sources, e.g. alliance --- lead/model/data.py | 53 ++++++++++++++++++++++++++-------------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/lead/model/data.py b/lead/model/data.py index 69ca99b..ddb0994 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -23,7 +23,7 @@ class LeadData(Step): Address datasets contain one row per address. They are built primarily to be able to later quickly access the features for scoring. """ - def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, address=False): + def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, address=False, left=None, index_columns=None): """ Args: month: the month for feature generation @@ -36,22 +36,30 @@ def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, add dtype: the dtype to use for features. Defaults to np.float16 for memory efficiency. address: whether to build an address dataset. Defaults to False, which builds a kid dataset. + left: optional Left step. if None, defaults to LeadLeft (when address=False) or LeadAddressLeft (when address=True) + index_columns: columns of left to use as index """ if dtype is None: dtype = np.float16 Step.__init__(self, month=month, day=day, year_min=year_min, year_max=year_max, - wic_lag=wic_lag, dtype=dtype, address=address) - - if address: - left = LeadAddressLeft(month=month, day=day, year_min=year_min, year_max=year_max) - # left_only is left without aux - # in the address case it's the same as left - left_only = left + wic_lag=wic_lag, dtype=dtype, address=address, + index_columns=index_columns) + + if left is None: + if address: + left = LeadAddressLeft(month=month, day=day, year_min=year_min, year_max=year_max) + # left_only is left without aux + # in the address case it's the same as left + left_only = left + self.index_columns = ['address', 'census_block_id', 'ward_id', 'community_area_id', 'date'] + else: + left = LeadLeft(month=month, day=day, year_min=year_min) + left.target = True + left_only = MapResults([left], {'aux':None}) + self.index_columns = ['kid_id', 'address_id', 'date'] else: - left = LeadLeft(month=month, day=day, year_min=year_min) - left.target = True left_only = MapResults([left], {'aux':None}) acs = Call("astype", inputs=[ACS(inputs=[left_only])], @@ -83,13 +91,11 @@ def run(self, acs, left, aux=None): sample weights, and evaluation. """ if self.address: - index_columns = ['address', 'census_block_id', 'ward_id', 'community_area_id', 'date'] left_columns = ['address_lat', 'address_lng'] - if not self.address: - index_columns = ['kid_id', 'address_id', 'date'] + else: left_columns = ['ward_id', 'community_area_id', 'address_lat', 'address_lng'] - left = left[index_columns + left_columns] + left = left[self.index_columns + left_columns] logging.info('Binarizing community area and ward') left = data.binarize(left, ['community_area_id', 'ward_id'], astype=self.dtype, drop=(not self.address)) @@ -101,9 +107,9 @@ def run(self, acs, left, aux=None): if not self.address: logging.info('Adding auxillary features') - add_aux_features(X, aux, self.dtype) + add_kid_features(X, aux, self.dtype) - X.set_index(index_columns, inplace=True) + X.set_index(self.index_columns, inplace=True) c = data.non_numeric_columns(X) if len(c) > 0: @@ -112,18 +118,19 @@ def run(self, acs, left, aux=None): if self.address: return {'X':X} else: - aux.set_index(index_columns, inplace=True) + aux.set_index(self.index_columns, inplace=True) return {'X':X, 'aux':aux} -def add_aux_features(X, aux, dtype): +def add_kid_features(X, aux, dtype): """ + Adds kid features (age, date of birth, etc.) from the aux data Args: X: the DataFrame to which to add features aux: the DataFrame from which to build the features dtype: the dtype with which to add the features """ - X['age'] = ((aux.date - aux.date_of_birth)/util.day).astype(dtype) - X['date_of_birth_days'] = util.date_to_days(aux.date_of_birth).astype(dtype) - X['date_of_birth_month'] = aux.date_of_birth.dt.month.astype(dtype) - X['male'] = (aux.sex == 'M').astype(dtype) - X['wic'] = (aux.first_wic_date < aux.date).fillna(False).astype(dtype) + X['kid_age'] = ((aux.date - aux.date_of_birth)/util.day).astype(dtype) + X['kid_date_of_birth_days'] = util.date_to_days(aux.date_of_birth).astype(dtype) + X['kid_date_of_birth_month'] = aux.date_of_birth.dt.month.astype(dtype) + X['kid_male'] = (aux.sex == 'M').astype(dtype) + X['kid_wic'] = (aux.first_wic_date < aux.date).fillna(False).astype(dtype) From 06edaf382377fd187acdb40b1205135917f01a53 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Wed, 7 Mar 2018 12:25:48 -0600 Subject: [PATCH 10/20] use 2000 trees --- lead/model/workflows.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lead/model/workflows.py b/lead/model/workflows.py index 521465c..992c037 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -1,5 +1,5 @@ from drain import data, step, model, data -from drain.util import dict_product +from drain.util import dict_product, make_list from drain.step import Call, Construct, MapResults from itertools import product @@ -110,7 +110,7 @@ def forest(**update_kwargs): """ kwargs = dict( _class='sklearn.ensemble.RandomForestClassifier', - n_estimators=1000, + n_estimators=2000, n_jobs=int(os.environ.get('N_JOBS', -1)), criterion='entropy', class_weight='balanced_bootstrap', @@ -119,7 +119,7 @@ def forest(**update_kwargs): kwargs.update(**update_kwargs) - return [step.Construct(**kwargs)] + return step.Construct(**kwargs) def bll6_models(estimators, cv_search={}, transform_search={}, dump_estimator=False): """ @@ -145,7 +145,7 @@ def bll6_models(estimators, cv_search={}, transform_search={}, dump_estimator=Fa outcome_where_expr='max_bll0 == max_bll0' # this means max_bll0.notnull() ) transformd.update(transform_search) - return models(estimators, cvd, transformd, dump_estimator=dump_estimator) + return models(make_list(estimators), cvd, transformd, dump_estimator=dump_estimator) def models(estimators, cv_search, transform_search, dump_estimator): """ From 2e81de76d61359f04b5785119a59d0ddeabb6d45 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Wed, 7 Mar 2018 12:26:41 -0600 Subject: [PATCH 11/20] rm wic aggregations, use fewer building permits and violations aggregations --- lead/features/aggregations.py | 36 +++++++++++++++++++---------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/lead/features/aggregations.py b/lead/features/aggregations.py index b38c7d2..657e0cc 100644 --- a/lead/features/aggregations.py +++ b/lead/features/aggregations.py @@ -25,32 +25,36 @@ 'tract':'census_tract_id', } -def get_deltas(): - return { - 'address': ['1y', '2y', '5y', '10y', 'all'], - #'complex': ['1y', '2y', '5y', '10y', 'all'], - 'block': ['1y','2y','5y'], - 'tract': ['1y','2y','3y'] - } +deltas = { + 'address': ['1y', '2y', '5y', '10y', 'all'], + #'complex': ['1y', '2y', '5y', '10y', 'all'], + 'block': ['1y','2y','5y'], + 'tract': ['1y','2y','3y'] +} + +# short list of deltas used for building permits and violations +deltas_thin = { + 'address': ['1y', '5y', 'all'], +} -wic = {'kid': ['all']} +#wic = {'kid': ['all']} -def get_args(deltas): +def get_args(): return dict( buildings = ['building', 'complex', 'block', 'tract'], assessor = ['address', 'building', 'complex', 'block', 'tract'], tests = deltas, investigations = deltas, #events = deltas, - permits = deltas, - kids = dict(kid=['all'], **deltas), - violations = util.dict_subset(deltas, ('address', 'block')), - wic_enroll = wic, - wic_birth = wic, - wic_prenatal = wic, + permits = deltas_thin, + kids = deltas, + violations = deltas_thin, + #wic_enroll = wic, + #wic_birth = wic, + #wic_prenatal = wic, ) -args = get_args(get_deltas()) +args = get_args() @lru_cache(maxsize=10) def all_dict(dates=None, lag=None, parallel=True): From add079d8f407a74b8dafec11359995406b45eaff Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Wed, 7 Mar 2018 12:37:01 -0600 Subject: [PATCH 12/20] no kid and n_estimators experiments --- lead/model/experiments.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/lead/model/experiments.py b/lead/model/experiments.py index bcdc31e..5a16889 100644 --- a/lead/model/experiments.py +++ b/lead/model/experiments.py @@ -20,7 +20,24 @@ def bll6_forest_no_kid(): return bll6_models( forest(), - transform_search={'aggregations':args}) + transform_search={'aggregations':args, 'exclude': [['^kid_.*']]}) + +def bll6_forest_estimators(): + return bll6_models([forest(n_estimators=1000*n) for n in (1,)]) + +def bll6_forest_no_kid_wic_estimators(): + """ + No kid-level aggregations + """ + args = deepcopy(aggregations.args) + for a in args.values(): + if 'kid' in a: + a.pop('kid') + + return bll6_models( + [forest(n_estimators=2000, random_state=n) for n in (0,1,2,3,4)], + cv_search={'year':[2012]}, + transform_search={'aggregations':args, 'exclude': [['^kid_.*']]}) def bll6_svm(): return models(model.svms()) From 61cd5345b4db75a702fad7b432fce1259200f622 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Fri, 9 Mar 2018 14:40:06 -0600 Subject: [PATCH 13/20] predict all addresses --- lead/model/workflows.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/lead/model/workflows.py b/lead/model/workflows.py index 992c037..4348038 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -43,24 +43,28 @@ def address_data_past(): Builds address-level features for the past Plus saves fitted models and means for the past """ - ds = [] # lead address data - for y in range(2011,2014+1): - d = lead.model.data.LeadData( + ys = [] # lead address data + for y in range(2011,2011+1): + X = lead.model.data.LeadData( year_min=y, year_max=y, month=1, day=1, address=True) - d.target = True - ds.append(d) - ps = bll6_forest() # predictions - for p in ps: - p.get_input('fit').target = True - p.get_input('mean').target = True + p = bll6_forest()[0] + mean = p.get_input('mean') + fit = p.get_input('fit') + + X_impute = Construct(data.impute, + inputs=[X, MapResults([mean], 'value')]) + + y = model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) + y.target = True + ys.append(y) + + return ys - return ds + ps - def address_data_today(): """ Builds address-level features today From 6f19a886ad36042feb248837e59996c8dc0923ca Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Fri, 9 Mar 2018 14:42:05 -0600 Subject: [PATCH 14/20] move experiments out of workflows --- lead/model/experiments.py | 27 +++++++++++++++++++++++++++ lead/model/workflows.py | 28 ---------------------------- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/lead/model/experiments.py b/lead/model/experiments.py index 5a16889..1e2b615 100644 --- a/lead/model/experiments.py +++ b/lead/model/experiments.py @@ -39,6 +39,33 @@ def bll6_forest_no_kid_wic_estimators(): cv_search={'year':[2012]}, transform_search={'aggregations':args, 'exclude': [['^kid_.*']]}) +def bll6_forest_quick(): + """ + A fast lead model that only uses 1 year of training data + """ + today = pd.Timestamp(os.environ['TODAY']) + p = bll6_models( + forest(n_estimators=10), + dict(year=today.year, + month=today.month, + day=today.day, + train_years=1))[0] + return p + +def bll6_forest_quarterly(): + """ + Quarterly forest models + """ + return bll6_models(forest(), + {'month':[1,4,7,10], 'year':range(2010,2014+1)}) + +def bll6_forest_monthly(): + """ + Monthly forest models + """ + return bll6_models(forest(), + {'month':range(1,13), 'year':range(2010,2014+1)}) + def bll6_svm(): return models(model.svms()) diff --git a/lead/model/workflows.py b/lead/model/workflows.py index 4348038..ab5c428 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -80,34 +80,6 @@ def address_data_today(): return d -def bll6_forest_quick(): - """ - A fast lead model that only uses 1 year of training data - """ - today = pd.Timestamp(os.environ['TODAY']) - p = bll6_models( - forest(n_estimators=10), - dict(year=today.year, - month=today.month, - day=today.day, - train_years=1))[0] - return p - - -def bll6_forest_quarterly(): - """ - Quarterly forest models - """ - return bll6_models(forest(), - {'month':[1,4,7,10], 'year':range(2010,2014+1)}) - -def bll6_forest_monthly(): - """ - Monthly forest models - """ - return bll6_models(forest(), - {'month':range(1,13), 'year':range(2010,2014+1)}) - def forest(**update_kwargs): """ Returns a step constructing a scikit-learn RandomForestClassifier From 2a89bc9b98493b3c7388b8f442e905d14c940774 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Fri, 9 Mar 2018 14:42:29 -0600 Subject: [PATCH 15/20] no kid features, hardcode commuinity area and ward ranges --- lead/model/data.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lead/model/data.py b/lead/model/data.py index ddb0994..11ea2c5 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -98,16 +98,18 @@ def run(self, acs, left, aux=None): left = left[self.index_columns + left_columns] logging.info('Binarizing community area and ward') - left = data.binarize(left, ['community_area_id', 'ward_id'], astype=self.dtype, drop=(not self.address)) + left = data.binarize(left, + {'community_area_id':range(1,78), 'ward_id':range(1,51)}, + astype=self.dtype, drop=(not self.address)) logging.info('Joining aggregations') X = left.join([a.result for a in self.aggregation_joins] + [acs]) # delete all aggregation inputs so that memory can be freed for a in self.aggregation_joins: del a.result - if not self.address: - logging.info('Adding auxillary features') - add_kid_features(X, aux, self.dtype) + #if not self.address: + # logging.info('Adding auxillary features') + # add_kid_features(X, aux, self.dtype) X.set_index(self.index_columns, inplace=True) From b930bda663a808f00533472e5b620857a347aba8 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Fri, 9 Mar 2018 14:59:39 -0600 Subject: [PATCH 16/20] predict addresses today --- lead/model/workflows.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lead/model/workflows.py b/lead/model/workflows.py index ab5c428..59fce55 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -70,15 +70,24 @@ def address_data_today(): Builds address-level features today """ today = pd.Timestamp(os.environ['TODAY']) - d = lead.model.data.LeadData( + X = lead.model.data.LeadData( year_min=today.year, year_max=today.year, month=today.month, day=today.day, address=True) - d.target = True - return d + p = bll6_forest_today() + mean = p.get_input('mean') + fit = p.get_input('fit') + + X_impute = Construct(data.impute, + inputs=[X, MapResults([mean], 'value')]) + + y = model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) + y.target = True + + return y def forest(**update_kwargs): """ From caeb119a6bca5337f79bcaaf3933794d33fd00f6 Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Sun, 11 Mar 2018 22:44:01 -0500 Subject: [PATCH 17/20] cleanup --- lead/features/events.py | 2 +- lead/model/address.py | 2 +- lead/model/data.py | 18 ------ lead/model/item.py | 11 ++++ lead/model/score.py | 16 +++++ lead/model/split.py | 14 +++++ lead/model/workflows.py | 132 ++++++++++++++++++++-------------------- 7 files changed, 109 insertions(+), 86 deletions(-) create mode 100644 lead/model/item.py create mode 100644 lead/model/score.py create mode 100644 lead/model/split.py diff --git a/lead/features/events.py b/lead/features/events.py index 60a7acd..31a2091 100644 --- a/lead/features/events.py +++ b/lead/features/events.py @@ -9,7 +9,7 @@ from drain.step import Step from drain.aggregate import Aggregate, Count from drain.aggregation import SpacetimeAggregation -from drain.data import FromSQL, Merge +from drain.data import FromSQL day = np.timedelta64(1, 'D') diff --git a/lead/model/address.py b/lead/model/address.py index 27b55a5..dd6e146 100644 --- a/lead/model/address.py +++ b/lead/model/address.py @@ -1,6 +1,6 @@ from drain.step import Step from drain.util import timestamp, cross_join -from drain.data import FromSQL, Merge +from drain.data import FromSQL import pandas as pd import numpy as np diff --git a/lead/model/data.py b/lead/model/data.py index 11ea2c5..5aecea8 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -107,10 +107,6 @@ def run(self, acs, left, aux=None): # delete all aggregation inputs so that memory can be freed for a in self.aggregation_joins: del a.result - #if not self.address: - # logging.info('Adding auxillary features') - # add_kid_features(X, aux, self.dtype) - X.set_index(self.index_columns, inplace=True) c = data.non_numeric_columns(X) @@ -122,17 +118,3 @@ def run(self, acs, left, aux=None): else: aux.set_index(self.index_columns, inplace=True) return {'X':X, 'aux':aux} - -def add_kid_features(X, aux, dtype): - """ - Adds kid features (age, date of birth, etc.) from the aux data - Args: - X: the DataFrame to which to add features - aux: the DataFrame from which to build the features - dtype: the dtype with which to add the features - """ - X['kid_age'] = ((aux.date - aux.date_of_birth)/util.day).astype(dtype) - X['kid_date_of_birth_days'] = util.date_to_days(aux.date_of_birth).astype(dtype) - X['kid_date_of_birth_month'] = aux.date_of_birth.dt.month.astype(dtype) - X['kid_male'] = (aux.sex == 'M').astype(dtype) - X['kid_wic'] = (aux.first_wic_date < aux.date).fillna(False).astype(dtype) diff --git a/lead/model/item.py b/lead/model/item.py new file mode 100644 index 0000000..6d4774b --- /dev/null +++ b/lead/model/item.py @@ -0,0 +1,11 @@ +from drain.step import Step + +class GetItem(Step): + def __init__(self, step, key): + Step.__init__(self, step=step, key=key, inputs=[step]) + + def run(self, **kwargs): + return kwargs[self.key] + +def args_list(*args): + return args diff --git a/lead/model/score.py b/lead/model/score.py new file mode 100644 index 0000000..e5b78ea --- /dev/null +++ b/lead/model/score.py @@ -0,0 +1,16 @@ +from drain.step import Step + +class LeadScore(Step): + def run(self, scores, aux, y=None): + """ + Args: + scores: the address scores + aux: dataframe including an address column + y: optional outcomes, aligned with aux + """ + merged = aux.merge(scores.reset_index()[['address', 'score']], on='address', how='left') + merged.index = aux.index + if y is None: + merged['y'] = y + + return y diff --git a/lead/model/split.py b/lead/model/split.py new file mode 100644 index 0000000..afeb1da --- /dev/null +++ b/lead/model/split.py @@ -0,0 +1,14 @@ +def split(df, k): + """ + Split the dataframe into pieces + Args: + df: DataFrame + k: number of pieces + """ + N = len(df) + n = len(df)/k + + dfs = [df.iloc[n*i:n*(i+1),:] for i in range(k-1)] + dfs.append(df.iloc[n*(k-1):N]) + + return dfs diff --git a/lead/model/workflows.py b/lead/model/workflows.py index 59fce55..fab544e 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -1,6 +1,7 @@ from drain import data, step, model, data from drain.util import dict_product, make_list from drain.step import Call, Construct, MapResults +from drain.data import ToHDF from itertools import product import pandas as pd @@ -9,20 +10,42 @@ import lead.model.data import lead.model.transform import lead.model.cv +import lead.model.score from lead.features import aggregations +from .split import split +from .item import GetItem, args_list -def bll6_forest(): + +def kid_predictions_past(): """ - The basic temporal cross-validation workflow + Temporal cross validation workflow """ - return bll6_models(forest(), dump_estimator=True) + ps = address_predictions_past() + w = [] + for p in ps: + t = p.get_input('transform') + aux_test = Call('__getattr__', inputs=[MapResults([t], {'train':None, 'X':None, 'sample_weight':None, 'aux':'obj', 'test':'key'})]) + s = lead.model.score.LeadScore(inputs=[MapResults([p, aux_test, GetItem(t, 'y')], ['scores', 'aux', 'y'])]) + w.append(s) + + return w + +def kid_predictions_today(): + """ + Predictions for kids today + """ + p = address_predictions_today()[0] + t = p.get_input('transform') + aux_test = Call('__getattr__', inputs=[MapResults([t], {'train':None, 'X':None, 'sample_weight':None, 'aux':'obj', 'test':'key'})]) + s = lead.model.score.LeadScore(inputs=[MapResults([p, aux_test, GetItem(t, 'y')], ['scores', 'aux', 'y'])]) + return s -def bll6_forest_today(): + +def address_predictions_today(): """ - The workflow used to construct a current model - Parses the environment variable TODAY using pd.Timestamp to set the date + Predictions for all addresses today """ today = pd.Timestamp(os.environ['TODAY']) p = bll6_models( @@ -30,64 +53,15 @@ def bll6_forest_today(): dict(year=today.year, month=today.month, day=today.day), - dump_estimator=True)[0] - - # put the predictions into the database - tosql = data.ToSQL(table_name='predictions', if_exists='replace', - inputs=[MapResults([p], mapping=[{'y':'df', 'feature_importances':None}])]) - tosql.target = True - return tosql - -def address_data_past(): - """ - Builds address-level features for the past - Plus saves fitted models and means for the past - """ - ys = [] # lead address data - for y in range(2011,2011+1): - X = lead.model.data.LeadData( - year_min=y, - year_max=y, - month=1, - day=1, - address=True) - - p = bll6_forest()[0] - mean = p.get_input('mean') - fit = p.get_input('fit') - - X_impute = Construct(data.impute, - inputs=[X, MapResults([mean], 'value')]) - - y = model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) - y.target = True - ys.append(y) + dump_estimator=True) + return p - return ys -def address_data_today(): +def address_predictions_past(): """ - Builds address-level features today + Predictions for addresses in the past (for cross validation) """ - today = pd.Timestamp(os.environ['TODAY']) - X = lead.model.data.LeadData( - year_min=today.year, - year_max=today.year, - month=today.month, - day=today.day, - address=True) - - p = bll6_forest_today() - mean = p.get_input('mean') - fit = p.get_input('fit') - - X_impute = Construct(data.impute, - inputs=[X, MapResults([mean], 'value')]) - - y = model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) - y.target = True - - return y + return bll6_models(forest(), dump_estimator=True) def forest(**update_kwargs): """ @@ -139,8 +113,7 @@ def models(estimators, cv_search, transform_search, dump_estimator): estimators: collection of steps, each of which constructs an estimator cv_search: dictionary of arguments to LeadCrossValidate to search over transform_search: dictionary of arguments to LeadTransform to search over - dump_estimator: whether to dump the estimator (and the mean). - Necessary for re-using the model for more scoring later. + dump_estimator: whether to dump the estimator. Returns: a list drain.model.Predict steps constructed by taking the product of the estimators with the the result of drain.util.dict_product on each of @@ -162,6 +135,7 @@ def models(estimators, cv_search, transform_search, dump_estimator): 'test':None, 'aux':None})]) mean = Call('mean', inputs=[X_train]) mean.name = 'mean' + mean.target = True X_impute = Construct(data.impute, inputs=[MapResults([cv], {'aux':None, 'test':None, 'train':None}), @@ -179,12 +153,38 @@ def models(estimators, cv_search, transform_search, dump_estimator): y = model.Predict(inputs=[fit, transform], return_feature_importances=True) y.name = 'predict' - y.target = True - + if dump_estimator: - mean.target = True fit.target = True + + X_test = lead.model.data.LeadData( + year_min=cv_args['year'], + year_max=cv_args['year'], + month=cv_args['month'], + day=cv_args['day'], + address=True) + # there are over 800k addresses + # to avoid running out of memory, we split into pieces for prediction + k = 4 + pieces = list(map(str, range(k))) + X_split = Construct(split, inputs=[MapResults([X_test], {'X':'df'})], k=k) + tohdf = ToHDF(inputs = [MapResults([X_split], [pieces])]) + tohdf.target = True + + ys = [] + for j in pieces: + X_impute = Construct(data.impute, + inputs=[MapResults([Call('get', key=j, inputs=[tohdf])], 'X'), + MapResults([mean], 'value')]) + + y =model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) + y.target = True + ys.append(GetItem(y, 'y')) + + # concatenate the pieces + y = Construct(pd.concat, inputs=[MapResults([Construct(args_list, inputs=ys)], 'objs')]) + y.target = True steps.append(y) - + return steps From 48777b49ef7193bb2effd43bf8c97a8c07590e7a Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Tue, 13 Mar 2018 17:25:02 -0500 Subject: [PATCH 18/20] cleanup and updates to drain 0.0.6 , update README and requirements --- README.md | 6 ++--- lead/features/kids.py | 9 ++++---- lead/features/tests.py | 17 +++++++------- lead/features/wic.py | 12 +++++----- lead/model/cv.py | 6 ++--- lead/model/data.py | 7 +++--- lead/model/item.py | 11 --------- lead/model/left.py | 6 ++--- lead/model/score.py | 13 +++++++---- lead/model/workflows.py | 49 ++++++++++++++++++++++------------------- requirements.txt | 2 +- 11 files changed, 66 insertions(+), 72 deletions(-) delete mode 100644 lead/model/item.py diff --git a/README.md b/README.md index 5d9aff6..3e48a37 100644 --- a/README.md +++ b/README.md @@ -54,11 +54,11 @@ pip install -r requirements.txt ### 3. Run models using `drain`. To fit a current model and make predictions change to `./lead` and run: ``` -drain execute lead.model.workflows::bll6_forest_today ... +drain execute -w lead.model.workflows.address_predictions_today ... ``` -Here `lead.model.workflows.bll6_forest_today` is a drain workflow, i.e. a function taking no arguments that returns collection of drain steps. +Here `lead.model.workflows.address_predictions_today()` is a drain workflow, i.e. a function taking no arguments that returns collection of drain steps. -For temporal cross validation use the `bll6_forest` workflow. +For temporal cross validation use the `kid_predictions_past` workflow in the same module. # License diff --git a/lead/features/kids.py b/lead/features/kids.py index 18fc622..220ea1c 100644 --- a/lead/features/kids.py +++ b/lead/features/kids.py @@ -1,7 +1,7 @@ from drain import data -from drain.data import FromSQL, Merge, Revise +from drain.data import FromSQL, Revise from drain.util import day -from drain.step import Step +from drain.step import Step, Call, MapResults from drain.aggregation import SpacetimeAggregation from drain.aggregate import Fraction, Count, Aggregate, Aggregator, days @@ -38,7 +38,7 @@ def revise_kid_addresses(date): for i in kid_addresses.inputs[0].inputs: i.target = True for i in kids.inputs[0].inputs: i.target = True - return Merge(inputs=[kids, kid_addresses], on='kid_id') + return Call(kids, 'merge', [MapResults(kid_addresses, 'right')], on='kid_id') class KidsAggregation(SpacetimeAggregation): """ @@ -55,8 +55,7 @@ def __init__(self, spacedeltas, dates, parallel=False): kid_addresses = revise_kid_addresses(date=dates[0]) addresses = FromSQL(table='output.addresses') addresses.target = True - self.inputs = [Merge(inputs=[kid_addresses, addresses], - on='address_id')] + self.inputs =[Call(kid_addresses, 'merge', [MapResults(addresses, 'right')], on='address_id')] def get_aggregator(self, date, index, delta): df = self.get_data(date, delta) diff --git a/lead/features/tests.py b/lead/features/tests.py index fc6be00..14322c7 100644 --- a/lead/features/tests.py +++ b/lead/features/tests.py @@ -1,22 +1,21 @@ from drain import data from drain.util import day -from drain.data import FromSQL, Merge -from drain.step import Step +from drain.data import FromSQL +from drain.step import Step, Call from drain.aggregation import SpacetimeAggregation from drain.aggregate import Count, Fraction, Aggregate, days import pandas as pd -tests = Merge(inputs=[ - Merge(inputs=[ - FromSQL(table='output.tests'), - FromSQL(table='output.addresses')], on='address_id'), +tests = Call( + Call(FromSQL(table='output.tests'), 'merge', + [FromSQL(table='output.addresses')], on='address_id'), + 'merge', # get kid first bll6 and bll10 counts to calculate incidences - FromSQL(""" + [FromSQL(""" select kid_id, first_bll6_sample_date, first_bll10_sample_date from output.kids - """)], -on='kid_id') + """)], on='kid_id') tests.target = True class TestsAggregation(SpacetimeAggregation): diff --git a/lead/features/wic.py b/lead/features/wic.py index c6f991d..0c99014 100644 --- a/lead/features/wic.py +++ b/lead/features/wic.py @@ -7,7 +7,7 @@ from drain import util, aggregate, data from drain.aggregate import Aggregate, Count, aggregate_counts, days from drain.aggregation import SpacetimeAggregation -from drain.step import Construct +from drain.step import Call from drain.data import FromSQL, binarize, binarize_sets, select_regexes from drain.util import list_filter_none, union @@ -30,9 +30,9 @@ from enroll """, tables=['aux.kid_wics', 'aux.kid_mothers'], parse_dates=['register_d', 'last_upd_d']) -enroll2 = Construct(binarize, inputs=[enroll], category_classes=['employment', 'occupation', 'clinic'], min_freq=100) +enroll2 = Call(binarize, inputs=[enroll], category_classes=['employment', 'occupation', 'clinic'], min_freq=100) -enroll3 = Construct(binarize_sets, inputs=[enroll2], columns=['assistance', 'language'], cast=True, min_freq=100) +enroll3 = Call(binarize_sets, inputs=[enroll2], columns=['assistance', 'language'], cast=True, min_freq=100) enroll3.target=True class EnrollAggregation(SpacetimeAggregation): @@ -74,8 +74,8 @@ def get_aggregates(self, date, delta): """, tables=['aux.kids', 'aux.kid_mothers'], parse_dates=['date_of_birth']) -births2 = Construct(binarize, inputs=[births], category_classes=['place_type', 'disposition']) -births3 = Construct(binarize_sets, inputs=[births2], columns=['complication'], cast=True) +births2 = Call(binarize, inputs=[births], category_classes=['place_type', 'disposition']) +births3 = Call(binarize_sets, inputs=[births2], columns=['complication'], cast=True) births3.target = True class BirthAggregation(SpacetimeAggregation): @@ -112,7 +112,7 @@ def get_aggregates(self, date, delta): """, tables=['aux.kids', 'aux.kid_mothers'], parse_dates=['date_of_birth', 'visit_d']) prenatal.target = True -prenatal2 = Construct(binarize, inputs=[prenatal], category_classes=['service']) +prenatal2 = Call(binarize, inputs=[prenatal], category_classes=['service']) class PrenatalAggregation(SpacetimeAggregation): def __init__(self, spacedeltas, dates, parallel=False): diff --git a/lead/model/cv.py b/lead/model/cv.py index 7daba52..ac9908c 100644 --- a/lead/model/cv.py +++ b/lead/model/cv.py @@ -64,11 +64,11 @@ def run(self, revised, X, aux): today = util.timestamp(self.year, self.month, self.day) min_date = util.timestamp(self.year - self.train_years, self.month, self.day) - date = data.index_as_series(X, 'date') + date = X.index.to_frame().date X = X[date.between(min_date, today)] aux = aux[date.between(min_date,today)] - date = data.index_as_series(aux, 'date') + date = aux.index.to_frame().date train = (date < today) & (aux.address_min_date < today) test = date == today @@ -107,7 +107,7 @@ def augment(y): """ augment the aux matrix with variables that are useful for train and test queries """ - y['age'] = (data.index_as_series(y, 'date') - y.date_of_birth) / util.day + y['age'] = (y.index.to_frame().date - y.date_of_birth) / util.day y['last_sample_age'] = (y.last_sample_date - y.date_of_birth) / util.day y['first_sample_age'] = (y.first_sample_date - y.date_of_birth) / util.day y['address_test_max_age'] = (y.address_test_max_date - y.date_of_birth) / util.day diff --git a/lead/model/data.py b/lead/model/data.py index 5aecea8..27441d6 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -1,6 +1,6 @@ from drain.step import Step, Call, MapResults from drain import util, data -from drain.data import FromSQL, Merge +from drain.data import FromSQL from drain.aggregation import SpacetimeAggregationJoin from lead.features import aggregations @@ -62,8 +62,7 @@ def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, add else: left_only = MapResults([left], {'aux':None}) - acs = Call("astype", inputs=[ACS(inputs=[left_only])], - dtype=dtype) + acs = Call(ACS(inputs=[left_only]), "astype", dtype=dtype) acs.target = True dates = tuple((date(y, month, day) for y in range(year_min, year_max+1))) @@ -74,7 +73,7 @@ def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, add aj = SpacetimeAggregationJoin( inputs=[a, left_only], lag=wic_lag if name.startswith('wic') else None) - aj = Call("astype", inputs=[aj], dtype=dtype) + aj = Call(aj, "astype", dtype=dtype) aj.target = True self.aggregation_joins.append(aj) diff --git a/lead/model/item.py b/lead/model/item.py deleted file mode 100644 index 6d4774b..0000000 --- a/lead/model/item.py +++ /dev/null @@ -1,11 +0,0 @@ -from drain.step import Step - -class GetItem(Step): - def __init__(self, step, key): - Step.__init__(self, step=step, key=key, inputs=[step]) - - def run(self, **kwargs): - return kwargs[self.key] - -def args_list(*args): - return args diff --git a/lead/model/left.py b/lead/model/left.py index 3b807fd..518e552 100644 --- a/lead/model/left.py +++ b/lead/model/left.py @@ -1,6 +1,6 @@ -from drain.step import Step +from drain.step import Step, Call from drain import util, data -from drain.data import FromSQL, Merge +from drain.data import FromSQL import pandas as pd import numpy as np @@ -30,7 +30,7 @@ def __init__(self, month, day, year_min): """ Step.__init__(self, month=month, day=day, year_min=year_min) - aux = Merge(on='kid_id', inputs=[kid_addresses, kids]) + aux = Call(kid_addresses, 'merge', [kids], on='kid_id') self.inputs = [aux, addresses] def run(self, aux, addresses): diff --git a/lead/model/score.py b/lead/model/score.py index e5b78ea..30438a2 100644 --- a/lead/model/score.py +++ b/lead/model/score.py @@ -1,16 +1,21 @@ from drain.step import Step class LeadScore(Step): - def run(self, scores, aux, y=None): + def run(self, scores, aux, y=None, test=None): """ Args: scores: the address scores aux: dataframe including an address column y: optional outcomes, aligned with aux """ + if test is not None: + aux = aux[test] + if y is not None: + y = y[test] + merged = aux.merge(scores.reset_index()[['address', 'score']], on='address', how='left') merged.index = aux.index - if y is None: - merged['y'] = y + if y is not None: + merged['true'] = y - return y + return {'y':merged} diff --git a/lead/model/workflows.py b/lead/model/workflows.py index fab544e..8c5f7ee 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -1,6 +1,6 @@ from drain import data, step, model, data from drain.util import dict_product, make_list -from drain.step import Call, Construct, MapResults +from drain.step import Call, MapResults, GetItem from drain.data import ToHDF from itertools import product @@ -14,8 +14,9 @@ from lead.features import aggregations from .split import split -from .item import GetItem, args_list +def args_list(*args): + return args def kid_predictions_past(): """ @@ -25,8 +26,10 @@ def kid_predictions_past(): w = [] for p in ps: t = p.get_input('transform') - aux_test = Call('__getattr__', inputs=[MapResults([t], {'train':None, 'X':None, 'sample_weight':None, 'aux':'obj', 'test':'key'})]) - s = lead.model.score.LeadScore(inputs=[MapResults([p, aux_test, GetItem(t, 'y')], ['scores', 'aux', 'y'])]) + s = lead.model.score.LeadScore(inputs=[MapResults( + [p, t], + ['scores', {'train':None, 'X':None, 'sample_weight':None}])]) + s.target = True w.append(s) return w @@ -36,10 +39,12 @@ def kid_predictions_today(): """ Predictions for kids today """ - p = address_predictions_today()[0] + p = address_predictions_today() t = p.get_input('transform') - aux_test = Call('__getattr__', inputs=[MapResults([t], {'train':None, 'X':None, 'sample_weight':None, 'aux':'obj', 'test':'key'})]) - s = lead.model.score.LeadScore(inputs=[MapResults([p, aux_test, GetItem(t, 'y')], ['scores', 'aux', 'y'])]) + s = lead.model.score.LeadScore(inputs=[MapResults( + [p, t], + ['scores', {'train':None, 'X':None, 'sample_weight':None}])]) + s.target = True return s @@ -53,7 +58,7 @@ def address_predictions_today(): dict(year=today.year, month=today.month, day=today.day), - dump_estimator=True) + dump_estimator=True)[0] return p @@ -68,7 +73,6 @@ def forest(**update_kwargs): Returns a step constructing a scikit-learn RandomForestClassifier """ kwargs = dict( - _class='sklearn.ensemble.RandomForestClassifier', n_estimators=2000, n_jobs=int(os.environ.get('N_JOBS', -1)), criterion='entropy', @@ -78,7 +82,7 @@ def forest(**update_kwargs): kwargs.update(**update_kwargs) - return step.Construct(**kwargs) + return step.Call('sklearn.ensemble.RandomForestClassifier', **kwargs) def bll6_models(estimators, cv_search={}, transform_search={}, dump_estimator=False): """ @@ -89,7 +93,7 @@ def bll6_models(estimators, cv_search={}, transform_search={}, dump_estimator=Fa """ cvd = dict( - year=range(2011, 2014+1), + year=range(2013, 2014+1), month=1, day=1, train_years=[6], @@ -131,15 +135,14 @@ def models(estimators, cv_search, transform_search, dump_estimator): cv = lead.model.cv.LeadCrossValidate(**cv_args) cv.name = 'cv' - X_train = Call('__getitem__', inputs=[MapResults([cv], {'X':'obj', 'train':'key', - 'test':None, 'aux':None})]) - mean = Call('mean', inputs=[X_train]) + X_train = GetItem(GetItem(cv, 'X'), GetItem(cv, 'train')) + mean = Call(X_train, 'mean') mean.name = 'mean' mean.target = True - X_impute = Construct(data.impute, - inputs=[MapResults([cv], {'aux':None, 'test':None, 'train':None}), - MapResults([mean], 'value')]) + X_impute = Call(data.impute, + inputs=[MapResults([GetItem(cv, 'X'), mean], + ['X', 'value'])]) cv_imputed = MapResults([X_impute, cv], ['X', {'X':None}]) cv_imputed.target = True @@ -168,22 +171,22 @@ def models(estimators, cv_search, transform_search, dump_estimator): # to avoid running out of memory, we split into pieces for prediction k = 4 pieces = list(map(str, range(k))) - X_split = Construct(split, inputs=[MapResults([X_test], {'X':'df'})], k=k) + X_split = Call(split, inputs=[MapResults([X_test], {'X':'df'})], k=k) tohdf = ToHDF(inputs = [MapResults([X_split], [pieces])]) tohdf.target = True ys = [] for j in pieces: - X_impute = Construct(data.impute, - inputs=[MapResults([Call('get', key=j, inputs=[tohdf])], 'X'), - MapResults([mean], 'value')]) + X_impute = Call(data.impute, + inputs=[MapResults(Call(tohdf, 'get', key=j), 'X'), + MapResults([mean], 'value')]) - y =model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) + y = model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) y.target = True ys.append(GetItem(y, 'y')) # concatenate the pieces - y = Construct(pd.concat, inputs=[MapResults([Construct(args_list, inputs=ys)], 'objs')]) + y = Call(pd.concat, inputs=[MapResults([Call(args_list, inputs=ys)], 'objs')]) y.target = True steps.append(y) diff --git a/requirements.txt b/requirements.txt index 9377c0a..0ab5d57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -git+https://github.com/potash/drain +drain==0.0.6 git+https://github.com/potash/scikit-learn@merged/balanced-random-forest#egg=scikit-learn From eca0d23075a622629d9833572f21850235e9947c Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Tue, 13 Mar 2018 18:23:57 -0500 Subject: [PATCH 19/20] predict on new addresses using geographies --- lead/model/score.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/lead/model/score.py b/lead/model/score.py index 30438a2..29f7b17 100644 --- a/lead/model/score.py +++ b/lead/model/score.py @@ -13,8 +13,20 @@ def run(self, scores, aux, y=None, test=None): if y is not None: y = y[test] - merged = aux.merge(scores.reset_index()[['address', 'score']], on='address', how='left') + scores = scores.reset_index() + merged = aux.merge(scores[['address', 'score']], on='address', how='left') merged.index = aux.index + + missing = merged.score.isnull() + # if scores are null that means it's a "new" address + # so we use the geography score + # which is based on community area, ward, and census block + if missing.sum() > 0: + geography_columns = ['community_area_id', 'ward_id', 'census_block_id'] + geography_scores = scores[scores.address.isnull()] + missing_scores = aux[missing].merge(geography_scores, on=geography_columns, how='left').score.values + merged.loc[missing, 'score'] = missing_scores + if y is not None: merged['true'] = y From a50e0f6b2caa390abaeb7c7247a7cd9af49a6e2f Mon Sep 17 00:00:00 2001 From: Eric Potash Date: Thu, 4 Oct 2018 10:13:45 -0500 Subject: [PATCH 20/20] fix max year --- lead/model/cv.py | 3 ++- lead/model/workflows.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/lead/model/cv.py b/lead/model/cv.py index ac9908c..73caae8 100644 --- a/lead/model/cv.py +++ b/lead/model/cv.py @@ -7,12 +7,13 @@ from datetime import date import pandas as pd import numpy as np +import os import logging from drain.util import lru_cache YEAR_MIN = 2003 -YEAR_MAX = 2017 +YEAR_MAX = pd.Timestamp(os.environ['TODAY']).year @lru_cache(maxsize=10) def lead_data(month, day, wic_lag): diff --git a/lead/model/workflows.py b/lead/model/workflows.py index 8c5f7ee..0380e9f 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -93,7 +93,7 @@ def bll6_models(estimators, cv_search={}, transform_search={}, dump_estimator=Fa """ cvd = dict( - year=range(2013, 2014+1), + year=range(2011, 2014+1), month=1, day=1, train_years=[6],