diff --git a/README.md b/README.md index 5d9aff6..3e48a37 100644 --- a/README.md +++ b/README.md @@ -54,11 +54,11 @@ pip install -r requirements.txt ### 3. Run models using `drain`. To fit a current model and make predictions change to `./lead` and run: ``` -drain execute lead.model.workflows::bll6_forest_today ... +drain execute -w lead.model.workflows.address_predictions_today ... ``` -Here `lead.model.workflows.bll6_forest_today` is a drain workflow, i.e. a function taking no arguments that returns collection of drain steps. +Here `lead.model.workflows.address_predictions_today()` is a drain workflow, i.e. a function taking no arguments that returns collection of drain steps. -For temporal cross validation use the `bll6_forest` workflow. +For temporal cross validation use the `kid_predictions_past` workflow in the same module. # License diff --git a/lead/features/aggregations.py b/lead/features/aggregations.py index b38c7d2..657e0cc 100644 --- a/lead/features/aggregations.py +++ b/lead/features/aggregations.py @@ -25,32 +25,36 @@ 'tract':'census_tract_id', } -def get_deltas(): - return { - 'address': ['1y', '2y', '5y', '10y', 'all'], - #'complex': ['1y', '2y', '5y', '10y', 'all'], - 'block': ['1y','2y','5y'], - 'tract': ['1y','2y','3y'] - } +deltas = { + 'address': ['1y', '2y', '5y', '10y', 'all'], + #'complex': ['1y', '2y', '5y', '10y', 'all'], + 'block': ['1y','2y','5y'], + 'tract': ['1y','2y','3y'] +} + +# short list of deltas used for building permits and violations +deltas_thin = { + 'address': ['1y', '5y', 'all'], +} -wic = {'kid': ['all']} +#wic = {'kid': ['all']} -def get_args(deltas): +def get_args(): return dict( buildings = ['building', 'complex', 'block', 'tract'], assessor = ['address', 'building', 'complex', 'block', 'tract'], tests = deltas, investigations = deltas, #events = deltas, - permits = deltas, - kids = dict(kid=['all'], **deltas), - violations = util.dict_subset(deltas, ('address', 'block')), - wic_enroll = wic, - wic_birth = wic, - wic_prenatal = wic, + permits = deltas_thin, + kids = deltas, + violations = deltas_thin, + #wic_enroll = wic, + #wic_birth = wic, + #wic_prenatal = wic, ) -args = get_args(get_deltas()) +args = get_args() @lru_cache(maxsize=10) def all_dict(dates=None, lag=None, parallel=True): diff --git a/lead/features/events.py b/lead/features/events.py index 60a7acd..31a2091 100644 --- a/lead/features/events.py +++ b/lead/features/events.py @@ -9,7 +9,7 @@ from drain.step import Step from drain.aggregate import Aggregate, Count from drain.aggregation import SpacetimeAggregation -from drain.data import FromSQL, Merge +from drain.data import FromSQL day = np.timedelta64(1, 'D') diff --git a/lead/features/investigations.py b/lead/features/investigations.py index 32e6745..29afbba 100644 --- a/lead/features/investigations.py +++ b/lead/features/investigations.py @@ -12,7 +12,7 @@ from drain.data import FromSQL day = np.timedelta64(1, 'D') -CLOSURE_CODES = range(1,12+1) +CLOSURE_CODES = [1,4,12] # complied, court, state attorney DATE_COLUMNS = ['referral_date', 'init_date', 'comply_date', 'closure_date'] DATE_NAMES = ['referral', 'inspection', 'compliance', 'closure'] @@ -72,8 +72,8 @@ def get_aggregates(self, date, delta): aggregates = [ Count(), - Aggregate('inspected', 'max', fname=False), - Aggregate('complied', 'max', fname=False), + Aggregate(lambda i: i.inspected.fillna(0), 'max', name='inspected', fname=False), + Aggregate(lambda i: i.complied.fillna(0), 'max', name='complied', fname=False), Count('hazard_int', prop=True), Count('hazard_ext', prop=True), Count('hazard', prop=True), Count('hazard_both', prop=True), diff --git a/lead/features/kids.py b/lead/features/kids.py index 669dc46..220ea1c 100644 --- a/lead/features/kids.py +++ b/lead/features/kids.py @@ -1,7 +1,7 @@ from drain import data -from drain.data import FromSQL, Merge, Revise +from drain.data import FromSQL, Revise from drain.util import day -from drain.step import Step +from drain.step import Step, Call, MapResults from drain.aggregation import SpacetimeAggregation from drain.aggregate import Fraction, Count, Aggregate, Aggregator, days @@ -38,7 +38,7 @@ def revise_kid_addresses(date): for i in kid_addresses.inputs[0].inputs: i.target = True for i in kids.inputs[0].inputs: i.target = True - return Merge(inputs=[kids, kid_addresses], on='kid_id') + return Call(kids, 'merge', [MapResults(kid_addresses, 'right')], on='kid_id') class KidsAggregation(SpacetimeAggregation): """ @@ -55,8 +55,7 @@ def __init__(self, spacedeltas, dates, parallel=False): kid_addresses = revise_kid_addresses(date=dates[0]) addresses = FromSQL(table='output.addresses') addresses.target = True - self.inputs = [Merge(inputs=[kid_addresses, addresses], - on='address_id')] + self.inputs =[Call(kid_addresses, 'merge', [MapResults(addresses, 'right')], on='address_id')] def get_aggregator(self, date, index, delta): df = self.get_data(date, delta) @@ -72,17 +71,6 @@ def get_aggregates(self, date, index, delta): Aggregate(['test_address_count', 'address_count', 'test_count'], 'max', fname=False), Aggregate(['max_bll'], 'max', fname=False), - # Comment out this and all other wic aggregates because they can't be lagged - # and they're not useful for predicting poisoning - #Aggregate(lambda k: k.last_wic_date == k.address_wic_max_date, - # 'any', 'last_wic_address', fname=False), - #Aggregate(['address_wic_mother', 'address_wic_infant'], 'any', fname=False), - #Aggregate([days('address_wic_max_date', date), - # days('address_wic_min_date', date), - # days('last_wic_date', date), - # days('first_wic_date', date)], - # ['max'], ['address_wic_min_date', 'address_wic_max_date', - # 'last_wic_date', 'first_wic_date'], fname=False) ] sample_2y = lambda k: ((k.last_sample_date - k.date_of_birth)/day > 365*2) | (k.max_bll >= 6) @@ -91,29 +79,20 @@ def get_aggregates(self, date, index, delta): aggregates = [ counts, Aggregate(['test_address_count', 'test_count', 'address_count'], - ['median', 'mean', 'min', 'max']), + ['mean', 'max']), Count([lambda k: k.address_test_min_date.notnull(), lambda k: k.first_sample_date.notnull()], prop=True, name=['tested_here', 'tested_ever']), - #Count(lambda k: k.first_wic_date.notnull(), prop=True, name='wic'), - - #Count([lambda k: k.address_wic_min_date.notnull() & k.address_test_min_date.notnull(), - # lambda k: k.address_wic_min_date.notnull() & k.first_sample_date.notnull()], - # name=['wic_tested_here', 'wic_tested_ever'], - # prop=lambda k: k.first_wic_date.notnull(), prop_name='wic'), - Aggregate([days('address_min_date', 'address_max_date'), - #days('address_wic_min_date', 'address_wic_max_date'), - days('address_test_min_date', 'address_test_max_date')], - ['mean'], ['address_total_time', #'address_wic_time', - 'address_test_time']), + Aggregate([days('address_min_date', 'address_max_date')], + ['mean'], ['address_total_time']), # the first of these are kid level, not address-kid level # that means kids get double counted when aggregated to above the address level # if they lived in multiple addresses on that e.g. census tract. oh well. - Aggregate(['max_bll', 'avg_bll', 'cumulative_bll', 'avg_cumulative_bll', + Aggregate(['max_bll', 'avg_cumulative_bll', 'mean_bll', 'address_max_bll', 'address_mean_bll'], ['mean', 'median', 'min', 'max']), @@ -143,7 +122,6 @@ def get_aggregates(self, date, index, delta): ] if delta == 'all': aggregates.extend([ - #Aggregate(days('address_wic_min_date', date), ['min', 'max'], 'days_since_wic'), Aggregate(days('date_of_birth', date), ['min', 'max', 'mean'], 'date_of_birth'), ]) diff --git a/lead/features/tests.py b/lead/features/tests.py index fc6be00..14322c7 100644 --- a/lead/features/tests.py +++ b/lead/features/tests.py @@ -1,22 +1,21 @@ from drain import data from drain.util import day -from drain.data import FromSQL, Merge -from drain.step import Step +from drain.data import FromSQL +from drain.step import Step, Call from drain.aggregation import SpacetimeAggregation from drain.aggregate import Count, Fraction, Aggregate, days import pandas as pd -tests = Merge(inputs=[ - Merge(inputs=[ - FromSQL(table='output.tests'), - FromSQL(table='output.addresses')], on='address_id'), +tests = Call( + Call(FromSQL(table='output.tests'), 'merge', + [FromSQL(table='output.addresses')], on='address_id'), + 'merge', # get kid first bll6 and bll10 counts to calculate incidences - FromSQL(""" + [FromSQL(""" select kid_id, first_bll6_sample_date, first_bll10_sample_date from output.kids - """)], -on='kid_id') + """)], on='kid_id') tests.target = True class TestsAggregation(SpacetimeAggregation): diff --git a/lead/features/violations.py b/lead/features/violations.py index 9a5df79..215a821 100644 --- a/lead/features/violations.py +++ b/lead/features/violations.py @@ -3,14 +3,16 @@ from drain.aggregation import SpacetimeAggregation from itertools import product -KEYWORDS = ['water', 'paint', 'window', 'wall', 'porch', 'chip', 'flak', 'peel'] +KEYWORDS = (['water','window', 'wall', 'porch', '(paint|chip|flak|peel)'], + ['water','window','wall','porch','paint']) + STATUS = (['OPEN', 'COMPLIED', 'NO ENTRY'], ['open', 'complied', 'no_entry']) KEYWORD_COLUMNS = str.join(', ', ("violation_description ~* '{0}' " - "or violation_inspector_comments ~* '{0}' AS {0}".format(k) - for k in KEYWORDS)) + "or violation_inspector_comments ~* '{0}' AS {1}".format(*k) + for k in zip(*KEYWORDS))) STATUS_COLUMNS = str.join(', ', ("violation_status = '{0}' AS {1}".format(*s) @@ -37,11 +39,11 @@ def __init__(self, spacedeltas, dates, parallel=False): def get_aggregates(self, date, data): aggregates = [ Count(), - Count(KEYWORDS, prop=True), + Count(KEYWORDS[1], prop=True), Count(STATUS[1], prop=True), Count([lambda v,k=k,s=s: v[k] & v[s] - for k,s in product(KEYWORDS, STATUS[1])], prop=True, - name=['%s_%s' % p for p in product(KEYWORDS, STATUS[1])] + for k,s in product(KEYWORDS[1], STATUS[1])], prop=True, + name=['%s_%s' % p for p in product(KEYWORDS[1], STATUS[1])] ) ] diff --git a/lead/features/wic.py b/lead/features/wic.py index 91c57ca..0c99014 100644 --- a/lead/features/wic.py +++ b/lead/features/wic.py @@ -7,7 +7,7 @@ from drain import util, aggregate, data from drain.aggregate import Aggregate, Count, aggregate_counts, days from drain.aggregation import SpacetimeAggregation -from drain.step import Construct +from drain.step import Call from drain.data import FromSQL, binarize, binarize_sets, select_regexes from drain.util import list_filter_none, union @@ -30,9 +30,9 @@ from enroll """, tables=['aux.kid_wics', 'aux.kid_mothers'], parse_dates=['register_d', 'last_upd_d']) -enroll2 = Construct(binarize, inputs=[enroll], category_classes=['employment', 'occupation', 'clinic'], min_freq=100) +enroll2 = Call(binarize, inputs=[enroll], category_classes=['employment', 'occupation', 'clinic'], min_freq=100) -enroll3 = Construct(binarize_sets, inputs=[enroll2], columns=['assistance', 'language'], cast=True, min_freq=100) +enroll3 = Call(binarize_sets, inputs=[enroll2], columns=['assistance', 'language'], cast=True, min_freq=100) enroll3.target=True class EnrollAggregation(SpacetimeAggregation): @@ -46,7 +46,7 @@ def __init__(self, spacedeltas, dates, parallel=False): parallel=parallel) def get_aggregates(self, date, delta): - enroll = self.inputs[0].get_result() + enroll = self.inputs[0].result aggregates = [ Aggregate('medical_risk', 'any', fname=False), Aggregate(['household_size', 'household_income'], @@ -74,8 +74,8 @@ def get_aggregates(self, date, delta): """, tables=['aux.kids', 'aux.kid_mothers'], parse_dates=['date_of_birth']) -births2 = Construct(binarize, inputs=[births], category_classes=['place_type', 'disposition']) -births3 = Construct(binarize_sets, inputs=[births2], columns=['complication'], cast=True) +births2 = Call(binarize, inputs=[births], category_classes=['place_type', 'disposition']) +births3 = Call(binarize_sets, inputs=[births2], columns=['complication'], cast=True) births3.target = True class BirthAggregation(SpacetimeAggregation): @@ -89,7 +89,7 @@ def __init__(self, spacedeltas, dates, parallel=False): parallel=parallel) def get_aggregates(self, date, delta): - births = self.inputs[0].get_result() + births = self.inputs[0].result aggregates = [ Aggregate('length', 'max', fname=False), Aggregate('weight', 'max', fname=False), @@ -112,7 +112,7 @@ def get_aggregates(self, date, delta): """, tables=['aux.kids', 'aux.kid_mothers'], parse_dates=['date_of_birth', 'visit_d']) prenatal.target = True -prenatal2 = Construct(binarize, inputs=[prenatal], category_classes=['service']) +prenatal2 = Call(binarize, inputs=[prenatal], category_classes=['service']) class PrenatalAggregation(SpacetimeAggregation): def __init__(self, spacedeltas, dates, parallel=False): @@ -125,7 +125,7 @@ def __init__(self, spacedeltas, dates, parallel=False): parallel=parallel) def get_aggregates(self, date, delta): - prenatal = self.inputs[0].get_result() + prenatal = self.inputs[0].result aggregates = [ Count(), diff --git a/lead/model/address.py b/lead/model/address.py index 63250be..dd6e146 100644 --- a/lead/model/address.py +++ b/lead/model/address.py @@ -1,12 +1,43 @@ from drain.step import Step from drain.util import timestamp, cross_join -from drain.data import FromSQL, Merge +from drain.data import FromSQL import pandas as pd import numpy as np import logging -addresses = FromSQL(table='output.addresses') +# in addition to all addresses, we add all cells in the partition +# created by intersecting blocks, wards and communities +# in anticipation of any new addresses in deployment +addresses = FromSQL(""" +with blocks as ( +select + b.geoid10::double precision census_block_id, + substring(b.geoid10 for 11)::double precision census_tract_id, + c.area_numbe::int community_area_id, + w.ward::int ward_id +from input.census_blocks b +join input.community_areas c + on st_intersects(b.geom, c.geom) +join input.wards w + on st_intersects(b.geom, w.geom) and st_intersects(c.geom, w.geom) +group by 1,2,3,4 +) +select + null address, + null address_lat, + null address_lng, + null as address_id, + null as building_id, + null as complex_id, * +from blocks +UNION ALL +select address, address_lat, address_lng, + address_id, building_id, complex_id, + census_block_id, census_tract_id, + community_area_id, ward_id +from output.addresses + """, tables=['output.addresses', 'input.census_blocks', 'input.census_tracts', 'input.community_areas', 'input.wards']) addresses.target = True class LeadAddressLeft(Step): diff --git a/lead/model/cv.py b/lead/model/cv.py index 7daba52..73caae8 100644 --- a/lead/model/cv.py +++ b/lead/model/cv.py @@ -7,12 +7,13 @@ from datetime import date import pandas as pd import numpy as np +import os import logging from drain.util import lru_cache YEAR_MIN = 2003 -YEAR_MAX = 2017 +YEAR_MAX = pd.Timestamp(os.environ['TODAY']).year @lru_cache(maxsize=10) def lead_data(month, day, wic_lag): @@ -64,11 +65,11 @@ def run(self, revised, X, aux): today = util.timestamp(self.year, self.month, self.day) min_date = util.timestamp(self.year - self.train_years, self.month, self.day) - date = data.index_as_series(X, 'date') + date = X.index.to_frame().date X = X[date.between(min_date, today)] aux = aux[date.between(min_date,today)] - date = data.index_as_series(aux, 'date') + date = aux.index.to_frame().date train = (date < today) & (aux.address_min_date < today) test = date == today @@ -107,7 +108,7 @@ def augment(y): """ augment the aux matrix with variables that are useful for train and test queries """ - y['age'] = (data.index_as_series(y, 'date') - y.date_of_birth) / util.day + y['age'] = (y.index.to_frame().date - y.date_of_birth) / util.day y['last_sample_age'] = (y.last_sample_date - y.date_of_birth) / util.day y['first_sample_age'] = (y.first_sample_date - y.date_of_birth) / util.day y['address_test_max_age'] = (y.address_test_max_date - y.date_of_birth) / util.day diff --git a/lead/model/data.py b/lead/model/data.py index 635d3ac..27441d6 100644 --- a/lead/model/data.py +++ b/lead/model/data.py @@ -1,6 +1,6 @@ from drain.step import Step, Call, MapResults from drain import util, data -from drain.data import FromSQL, Merge +from drain.data import FromSQL from drain.aggregation import SpacetimeAggregationJoin from lead.features import aggregations @@ -23,7 +23,7 @@ class LeadData(Step): Address datasets contain one row per address. They are built primarily to be able to later quickly access the features for scoring. """ - def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, address=False): + def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, address=False, left=None, index_columns=None): """ Args: month: the month for feature generation @@ -32,30 +32,37 @@ def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, add year_max: the year to stop generating features wic_lag: a lag for the WIC aggregations, parsed by drain.data.parse_delta, e.g. '6m' is a six month lag. - Defaultis to None, which is no lag. - dtype: the dtype to use for features. Defaults to np.float16. + Default is to None, which is no lag. + dtype: the dtype to use for features. Defaults to np.float16 for memory efficiency. address: whether to build an address dataset. Defaults to False, which builds a kid dataset. + left: optional Left step. if None, defaults to LeadLeft (when address=False) or LeadAddressLeft (when address=True) + index_columns: columns of left to use as index """ if dtype is None: dtype = np.float16 Step.__init__(self, month=month, day=day, year_min=year_min, year_max=year_max, - wic_lag=wic_lag, dtype=dtype, address=address) - - if address: - left = LeadAddressLeft(month=month, day=day, year_min=year_min, year_max=year_max) - # left_only is left without aux - # in the address case it's the same as left - left_only = left + wic_lag=wic_lag, dtype=dtype, address=address, + index_columns=index_columns) + + if left is None: + if address: + left = LeadAddressLeft(month=month, day=day, year_min=year_min, year_max=year_max) + # left_only is left without aux + # in the address case it's the same as left + left_only = left + self.index_columns = ['address', 'census_block_id', 'ward_id', 'community_area_id', 'date'] + else: + left = LeadLeft(month=month, day=day, year_min=year_min) + left.target = True + left_only = MapResults([left], {'aux':None}) + self.index_columns = ['kid_id', 'address_id', 'date'] else: - left = LeadLeft(month=month, day=day, year_min=year_min) - left.target = True left_only = MapResults([left], {'aux':None}) - acs = Call("astype", inputs=[ACS(inputs=[left_only])], - dtype=dtype) + acs = Call(ACS(inputs=[left_only]), "astype", dtype=dtype) acs.target = True dates = tuple((date(y, month, day) for y in range(year_min, year_max+1))) @@ -66,7 +73,7 @@ def __init__(self, month, day, year_min, year_max, wic_lag=None, dtype=None, add aj = SpacetimeAggregationJoin( inputs=[a, left_only], lag=wic_lag if name.startswith('wic') else None) - aj = Call("astype", inputs=[aj], dtype=dtype) + aj = Call(aj, "astype", dtype=dtype) aj.target = True self.aggregation_joins.append(aj) @@ -83,26 +90,23 @@ def run(self, acs, left, aux=None): sample weights, and evaluation. """ if self.address: - index_columns = ['address','date'] - if not self.address: - index_columns = ['kid_id', 'address_id', 'date'] + left_columns = ['address_lat', 'address_lng'] + else: + left_columns = ['ward_id', 'community_area_id', 'address_lat', 'address_lng'] - left_columns = ['ward_id', 'community_area_id', 'address_lat', 'address_lng'] - left = left[index_columns + left_columns] + left = left[self.index_columns + left_columns] logging.info('Binarizing community area and ward') - left = data.binarize(left, ['community_area_id', 'ward_id'], astype=self.dtype) + left = data.binarize(left, + {'community_area_id':range(1,78), 'ward_id':range(1,51)}, + astype=self.dtype, drop=(not self.address)) logging.info('Joining aggregations') - X = left.join([a.get_result() for a in self.aggregation_joins] + [acs]) + X = left.join([a.result for a in self.aggregation_joins] + [acs]) # delete all aggregation inputs so that memory can be freed - for a in self.aggregation_joins: del a._result - - if not self.address: - logging.info('Adding auxillary features') - add_aux_features(X, aux, self.dtype) + for a in self.aggregation_joins: del a.result - X.set_index(index_columns, inplace=True) + X.set_index(self.index_columns, inplace=True) c = data.non_numeric_columns(X) if len(c) > 0: @@ -111,18 +115,5 @@ def run(self, acs, left, aux=None): if self.address: return {'X':X} else: - aux.set_index(index_columns, inplace=True) + aux.set_index(self.index_columns, inplace=True) return {'X':X, 'aux':aux} - -def add_aux_features(X, aux, dtype): - """ - Args: - X: the DataFrame to which to add features - aux: the DataFrame from which to build the features - dtype: the dtype with which to add the features - """ - X['age'] = ((aux.date - aux.date_of_birth)/util.day).astype(dtype) - X['date_of_birth_days'] = util.date_to_days(aux.date_of_birth).astype(dtype) - X['date_of_birth_month'] = aux.date_of_birth.dt.month.astype(dtype) - X['male'] = (aux.sex == 'M').astype(dtype) - X['wic'] = (aux.first_wic_date < aux.date).fillna(False).astype(dtype) diff --git a/lead/model/experiments.py b/lead/model/experiments.py index bcdc31e..1e2b615 100644 --- a/lead/model/experiments.py +++ b/lead/model/experiments.py @@ -20,7 +20,51 @@ def bll6_forest_no_kid(): return bll6_models( forest(), - transform_search={'aggregations':args}) + transform_search={'aggregations':args, 'exclude': [['^kid_.*']]}) + +def bll6_forest_estimators(): + return bll6_models([forest(n_estimators=1000*n) for n in (1,)]) + +def bll6_forest_no_kid_wic_estimators(): + """ + No kid-level aggregations + """ + args = deepcopy(aggregations.args) + for a in args.values(): + if 'kid' in a: + a.pop('kid') + + return bll6_models( + [forest(n_estimators=2000, random_state=n) for n in (0,1,2,3,4)], + cv_search={'year':[2012]}, + transform_search={'aggregations':args, 'exclude': [['^kid_.*']]}) + +def bll6_forest_quick(): + """ + A fast lead model that only uses 1 year of training data + """ + today = pd.Timestamp(os.environ['TODAY']) + p = bll6_models( + forest(n_estimators=10), + dict(year=today.year, + month=today.month, + day=today.day, + train_years=1))[0] + return p + +def bll6_forest_quarterly(): + """ + Quarterly forest models + """ + return bll6_models(forest(), + {'month':[1,4,7,10], 'year':range(2010,2014+1)}) + +def bll6_forest_monthly(): + """ + Monthly forest models + """ + return bll6_models(forest(), + {'month':range(1,13), 'year':range(2010,2014+1)}) def bll6_svm(): return models(model.svms()) diff --git a/lead/model/left.py b/lead/model/left.py index 3b807fd..518e552 100644 --- a/lead/model/left.py +++ b/lead/model/left.py @@ -1,6 +1,6 @@ -from drain.step import Step +from drain.step import Step, Call from drain import util, data -from drain.data import FromSQL, Merge +from drain.data import FromSQL import pandas as pd import numpy as np @@ -30,7 +30,7 @@ def __init__(self, month, day, year_min): """ Step.__init__(self, month=month, day=day, year_min=year_min) - aux = Merge(on='kid_id', inputs=[kid_addresses, kids]) + aux = Call(kid_addresses, 'merge', [kids], on='kid_id') self.inputs = [aux, addresses] def run(self, aux, addresses): diff --git a/lead/model/score.py b/lead/model/score.py new file mode 100644 index 0000000..29f7b17 --- /dev/null +++ b/lead/model/score.py @@ -0,0 +1,33 @@ +from drain.step import Step + +class LeadScore(Step): + def run(self, scores, aux, y=None, test=None): + """ + Args: + scores: the address scores + aux: dataframe including an address column + y: optional outcomes, aligned with aux + """ + if test is not None: + aux = aux[test] + if y is not None: + y = y[test] + + scores = scores.reset_index() + merged = aux.merge(scores[['address', 'score']], on='address', how='left') + merged.index = aux.index + + missing = merged.score.isnull() + # if scores are null that means it's a "new" address + # so we use the geography score + # which is based on community area, ward, and census block + if missing.sum() > 0: + geography_columns = ['community_area_id', 'ward_id', 'census_block_id'] + geography_scores = scores[scores.address.isnull()] + missing_scores = aux[missing].merge(geography_scores, on=geography_columns, how='left').score.values + merged.loc[missing, 'score'] = missing_scores + + if y is not None: + merged['true'] = y + + return {'y':merged} diff --git a/lead/model/split.py b/lead/model/split.py new file mode 100644 index 0000000..afeb1da --- /dev/null +++ b/lead/model/split.py @@ -0,0 +1,14 @@ +def split(df, k): + """ + Split the dataframe into pieces + Args: + df: DataFrame + k: number of pieces + """ + N = len(df) + n = len(df)/k + + dfs = [df.iloc[n*i:n*(i+1),:] for i in range(k-1)] + dfs.append(df.iloc[n*(k-1):N]) + + return dfs diff --git a/lead/model/transform.py b/lead/model/transform.py index 1a201a7..f402659 100644 --- a/lead/model/transform.py +++ b/lead/model/transform.py @@ -14,19 +14,23 @@ class LeadTransform(Step): performing feature selection and creating sample weights. """ def __init__(self, inputs, outcome_expr, aggregations, - wic_sample_weight=0, exclude=[], include=[]): + outcome_where_expr=None, wic_sample_weight=0, + exclude=[], include=[]): """ Args: inputs: list containing a LeadCrossValidate step outcome_expr: the query to perform on the auxillary information to produce an outcome variable aggregations: defines which of the SpacetimeAggregations to include - and which to drop + and which to drop + outcome_where_expr: where to evaluate the outcome_expr, + defaults to None, which means everywhere wic_sample_weight: optional different sample weight for wic kids """ Step.__init__(self, inputs=inputs, outcome_expr=outcome_expr, aggregations=aggregations, + outcome_where_expr=outcome_where_expr, wic_sample_weight=wic_sample_weight, exclude=exclude, include=include) @@ -40,6 +44,8 @@ def run(self, X, aux, train, test): """ y = aux.eval(self.outcome_expr) + if self.outcome_where_expr is not None: + y = y.where(aux.eval(self.outcome_where_expr)) logging.info('Selecting aggregations') aggregations = self.get_input(LeadData).aggregations diff --git a/lead/model/workflows.py b/lead/model/workflows.py index cafa0c8..0380e9f 100644 --- a/lead/model/workflows.py +++ b/lead/model/workflows.py @@ -1,6 +1,7 @@ from drain import data, step, model, data -from drain.util import dict_product -from drain.step import Call, Construct, MapResults +from drain.util import dict_product, make_list +from drain.step import Call, MapResults, GetItem +from drain.data import ToHDF from itertools import product import pandas as pd @@ -9,110 +10,70 @@ import lead.model.data import lead.model.transform import lead.model.cv +import lead.model.score from lead.features import aggregations +from .split import split -def bll6_forest(): - """ - The basic temporal cross-validation workflow - """ - return bll6_models(forest()) +def args_list(*args): + return args - -def bll6_forest_today(): - """ - The workflow used to construct a current model - Parses the environment variable TODAY using pd.Timestamp to set the date - """ - today = pd.Timestamp(os.environ['TODAY']) - p = bll6_models( - forest(), - dict(year=today.year, - month=today.month, - day=today.day))[0] - # save the model - p.get_input('fit').target = True - p.get_input('mean').target = True - - # put the predictions into the database - tosql = data.ToSQL(table_name='predictions', if_exists='replace', - inputs=[MapResults([p], mapping=[{'y':'df', 'feature_importances':None}])]) - tosql.target = True - return tosql - -def address_data_past(): +def kid_predictions_past(): """ - Builds address-level features for the past - Plus saves fitted models and means for the past + Temporal cross validation workflow """ - ds = [] # lead address data - for y in range(2011,2014+1): - d = lead.model.data.LeadData( - year_min=y, - year_max=y, - month=1, - day=1, - address=True) - d.target = True - ds.append(d) - - ps = bll6_forest() # predictions + ps = address_predictions_past() + w = [] for p in ps: - p.get_input('fit').target = True - p.get_input('mean').target = True + t = p.get_input('transform') + s = lead.model.score.LeadScore(inputs=[MapResults( + [p, t], + ['scores', {'train':None, 'X':None, 'sample_weight':None}])]) + s.target = True + w.append(s) + + return w - return ds + ps - -def address_data_today(): + +def kid_predictions_today(): """ - Builds address-level features today + Predictions for kids today """ - today = pd.Timestamp(os.environ['TODAY']) - d = lead.model.data.LeadData( - year_min=today.year, - year_max=today.year, - month=today.month, - day=today.day, - address=True) - d.target = True + p = address_predictions_today() + t = p.get_input('transform') + s = lead.model.score.LeadScore(inputs=[MapResults( + [p, t], + ['scores', {'train':None, 'X':None, 'sample_weight':None}])]) + s.target = True + return s - return d - -def bll6_forest_quick(): + +def address_predictions_today(): """ - A fast lead model that only uses 1 year of training data + Predictions for all addresses today """ today = pd.Timestamp(os.environ['TODAY']) p = bll6_models( - forest(n_estimators=10), + forest(), dict(year=today.year, month=today.month, - day=today.day, - train_years=1))[0] + day=today.day), + dump_estimator=True)[0] return p -def bll6_forest_quarterly(): +def address_predictions_past(): """ - Quarterly forest models + Predictions for addresses in the past (for cross validation) """ - return bll6_models(forest(), - {'month':[1,4,7,10], 'year':range(2010,2014+1)}) - -def bll6_forest_monthly(): - """ - Monthly forest models - """ - return bll6_models(forest(), - {'month':range(1,13), 'year':range(2010,2014+1)}) - + return bll6_models(forest(), dump_estimator=True) + def forest(**update_kwargs): """ Returns a step constructing a scikit-learn RandomForestClassifier """ kwargs = dict( - _class='sklearn.ensemble.RandomForestClassifier', - n_estimators=1000, + n_estimators=2000, n_jobs=int(os.environ.get('N_JOBS', -1)), criterion='entropy', class_weight='balanced_bootstrap', @@ -121,9 +82,9 @@ def forest(**update_kwargs): kwargs.update(**update_kwargs) - return [step.Construct(**kwargs)] + return step.Call('sklearn.ensemble.RandomForestClassifier', **kwargs) -def bll6_models(estimators, cv_search={}, transform_search={}): +def bll6_models(estimators, cv_search={}, transform_search={}, dump_estimator=False): """ Provides good defaults for transform_search to models() Args: @@ -143,18 +104,20 @@ def bll6_models(estimators, cv_search={}, transform_search={}): transformd = dict( wic_sample_weight=[0], aggregations=aggregations.args, - outcome_expr=['max_bll0 >= 6'] + outcome_expr='max_bll0 >= 6', + outcome_where_expr='max_bll0 == max_bll0' # this means max_bll0.notnull() ) transformd.update(transform_search) - return models(estimators, cvd, transformd) + return models(make_list(estimators), cvd, transformd, dump_estimator=dump_estimator) -def models(estimators, cv_search, transform_search): +def models(estimators, cv_search, transform_search, dump_estimator): """ Grid search prediction workflows. Used by bll6_models, test_models, and product_models. Args: estimators: collection of steps, each of which constructs an estimator cv_search: dictionary of arguments to LeadCrossValidate to search over transform_search: dictionary of arguments to LeadTransform to search over + dump_estimator: whether to dump the estimator. Returns: a list drain.model.Predict steps constructed by taking the product of the estimators with the the result of drain.util.dict_product on each of @@ -172,14 +135,14 @@ def models(estimators, cv_search, transform_search): cv = lead.model.cv.LeadCrossValidate(**cv_args) cv.name = 'cv' - X_train = Call('__getitem__', inputs=[MapResults([cv], {'X':'obj', 'train':'key', - 'test':None, 'aux':None})]) - mean = Call('mean', inputs=[X_train]) + X_train = GetItem(GetItem(cv, 'X'), GetItem(cv, 'train')) + mean = Call(X_train, 'mean') mean.name = 'mean' + mean.target = True - X_impute = Construct(data.impute, - inputs=[MapResults([cv], {'aux':None, 'test':None, 'train':None}), - MapResults([mean], 'value')]) + X_impute = Call(data.impute, + inputs=[MapResults([GetItem(cv, 'X'), mean], + ['X', 'value'])]) cv_imputed = MapResults([X_impute, cv], ['X', {'X':None}]) cv_imputed.target = True @@ -189,12 +152,42 @@ def models(estimators, cv_search, transform_search): fit = model.Fit(inputs=[estimator, transform], return_estimator=True) fit.name = 'fit' - + y = model.Predict(inputs=[fit, transform], return_feature_importances=True) y.name = 'predict' - y.target = True + + if dump_estimator: + fit.target = True + + X_test = lead.model.data.LeadData( + year_min=cv_args['year'], + year_max=cv_args['year'], + month=cv_args['month'], + day=cv_args['day'], + address=True) + # there are over 800k addresses + # to avoid running out of memory, we split into pieces for prediction + k = 4 + pieces = list(map(str, range(k))) + X_split = Call(split, inputs=[MapResults([X_test], {'X':'df'})], k=k) + tohdf = ToHDF(inputs = [MapResults([X_split], [pieces])]) + tohdf.target = True + + ys = [] + for j in pieces: + X_impute = Call(data.impute, + inputs=[MapResults(Call(tohdf, 'get', key=j), 'X'), + MapResults([mean], 'value')]) + + y = model.Predict(inputs=[fit, MapResults([X_impute], 'X')]) + y.target = True + ys.append(GetItem(y, 'y')) + + # concatenate the pieces + y = Call(pd.concat, inputs=[MapResults([Call(args_list, inputs=ys)], 'objs')]) + y.target = True steps.append(y) - + return steps diff --git a/requirements.txt b/requirements.txt index 2bb3cec..0ab5d57 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -drain +drain==0.0.6 git+https://github.com/potash/scikit-learn@merged/balanced-random-forest#egg=scikit-learn