Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 58 additions & 14 deletions zipline/data/bundles/csvdir.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""
Module for building a complete dataset from local directory with csv files.
V1 initial version
v2 ability to set Exchange and AssetType
"""
import os
import sys
Expand All @@ -12,6 +14,7 @@
from zipline.utils.cli import maybe_show_progress

logger = logbook.Logger(__name__)
exchanges =['SMART']


def csvdir_equities(tframes=['daily'], start=None, end=None):
Expand Down Expand Up @@ -47,7 +50,22 @@ def csvdir_equities(tframes=['daily'], start=None, end=None):
following structure:
daily/<symbol>.cvs files
minute/<symbol>.csv files

or
daily/<symbol>_<exchange>.cvs files
minute/<symbol>_<exchange>.csv files

or
daily/<symbol>_<exchange>_<assettype>.cvs files
minute/<symbol>_<exchange>_<assettype>.csv files

for each symbol.

One can add the exchange and asset type only when needed, eg. GLD needs
ARCA to trade or to get pricing data from IB instead of the default SMART.
VIX needs CBOE as exchange and IND as asset type to get data.
Exchanges are also mandatory for futures (not supported yet)

"""

return CSVDIRBundle(tframes, start, end).ingest
Expand All @@ -71,6 +89,12 @@ def __init__(self, tframes, start, end):

self.splits = None
self.dividends = None
self.default_exchange = "CSVDIR"
self.default_asset_type = "STK"
self.exchanges = {}
self.assettypes = {}
self.csvfiles = {}


def ingest(self, environ, asset_db_writer, minute_bar_writer,
daily_bar_writer, adjustment_writer, calendar, start_session,
Expand All @@ -92,10 +116,22 @@ def ingest(self, environ, asset_db_writer, minute_bar_writer,

for tframe in self.tframes:
ddir = os.path.join(csvdir, tframe)

self.symbols = sorted(item.split('.csv')[0]
for item in os.listdir(ddir)
if item.endswith('.csv'))
self.symbols =[]
self.csvfiles={}
for item in os.listdir(ddir):
if item.endswith('.csv'):
file = item
symbol = item.split('.csv')[0].split('_')
if len(symbol)>1:
#The exchange was in the file name
self.exchanges[symbol[0]]=symbol[1]
if len(symbol)>2:
self.assettypes[symbol[0]]=symbol[2]
symbol=symbol[0]
self.symbols.append(symbol)
self.csvfiles[symbol]=file
self.symbols.sort()

if not self.symbols:
logger.error("no <symbol>.csv files found in %s" % ddir)
sys.exit(1)
Expand All @@ -105,7 +141,10 @@ def ingest(self, environ, asset_db_writer, minute_bar_writer,
dtype = [('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('symbol', 'object')]
('symbol', 'object'),
('exchange','object'),
('exchange_full', 'object')]
#TODO ASSET TYPE
self.metadata = DataFrame(empty(len(self.symbols), dtype=dtype))

self.show_progress = show_progress
Expand All @@ -117,10 +156,10 @@ def ingest(self, environ, asset_db_writer, minute_bar_writer,

writer.write(self._pricing_iter(), show_progress=show_progress)

# Hardcode the exchange to "CSVDIR" for all assets and (elsewhere)
# register "CSVDIR" to resolve to the NYSE calendar, because these
# are all equities and thus can use the NYSE calendar.
self.metadata['exchange'] = "CSVDIR"
# # Hardcode the exchange to "CSVDIR" for all assets and (elsewhere)
# # register "CSVDIR" to resolve to the NYSE calendar, because these
# # are all equities and thus can use the NYSE calendar.
# self.metadata['exchange'] = "CSVDIR"

asset_db_writer.write(equities=self.metadata)

Expand All @@ -132,7 +171,7 @@ def _pricing_iter(self):
for sid, symbol in enumerate(it):
logger.debug('%s: sid %s' % (symbol, sid))

dfr = read_csv(os.path.join(self.csvdir, '%s.csv' % symbol),
dfr = read_csv(os.path.join(self.csvdir, self.csvfiles[symbol]),
parse_dates=[0], infer_datetime_format=True,
index_col=0).sort_index()

Expand All @@ -143,8 +182,10 @@ def _pricing_iter(self):

# The auto_close date is the day after the last trade.
ac_date = end_date + Timedelta(days=1)
self.metadata.iloc[sid] = start_date, end_date, ac_date, symbol

exchange = self.default_exchange if symbol not in self.exchanges else self.exchanges[symbol]
exchange_full = self.default_asset_type if symbol not in self.assettypes else self.assettypes[symbol]
self.metadata.iloc[sid] = start_date, end_date, ac_date, symbol, exchange, exchange_full

if 'split' in dfr.columns:
if self.splits is None:
self.splits = DataFrame()
Expand Down Expand Up @@ -179,6 +220,9 @@ def _pricing_iter(self):
self.dividends = self.dividends.append(div)

yield sid, dfr

exchanges.extend(list(set(self.exchanges.values())))


register_calendar_alias("CSVDIR", "NYSE")
exchanges = list(set(exchanges))
for exchange in exchanges:
register_calendar_alias(exchange, "NYSE")