-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate.py
More file actions
139 lines (121 loc) · 5.18 KB
/
generate.py
File metadata and controls
139 lines (121 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from typing import List
import pandas as pd
import wget
import os
import xport.v56
from time import time
from nhutils.constants import *
# main driver function for generating dataset
def create_dataset(
vars: List[str], years: List[str], by: str = "SEQN", join_method: str = "outer", year_column: bool = True, output_excel_filename: str = None
) -> pd.DataFrame:
"""The main function to download data from Nhanes and merge together into single pandas dataframe.
All you have to pass is the variable names and years.
You can also specify the 'by' and 'join_method' parameters but default values are most likely what you want.
Args:
vars (List[str]):
list of all the variable names you want in the dataset (e.g. ['SEQN', 'DIQ010', 'RIDAGEYR'])
years (List[str]):
list of all the years you want to include in the dataset. Has to be in the format of 'YYYY-YYYY'
(e.g. ['2015-2016', '2017-2018'])
by (str, optional): Defaults to "SEQN".
the variable that is used to do the merging of data.
join_method (str, optional): Defaults to "outer".
the method used to merge data.
year_column (bool, optional): Defaults to True.
whether or not to include a column for the year.
output_excel_filename (str, optional): Defaults to None.
if given, will export returned dataset to excel file
Returns:
pd.DataFrame: the created dataset
"""
start = time()
vars = _preproccess_vars(vars)
years = _preproccess_years(years)
dataset = None
for year in years:
files_to_download = _get_filenames_to_download(vars, year)
_download_files(files_to_download, year)
print("merging files...")
year_dataset = None
for file in files_to_download:
vars_in_file = list(set(_find_all_vars_in_file(file, vars, year) + ["SEQN"]))
df = pd.read_csv(DOWNLOADED_DIR + file.replace(".XPT", ".csv"))
df = df[vars_in_file]
if file == files_to_download[0]:
year_dataset = df
else:
year_dataset = year_dataset.merge(df, on=by, how=join_method, suffixes=(False, False))
if year_column:
year_dataset['year'] = year
if year == years[0]:
dataset = year_dataset
else:
dataset = pd.concat([dataset, year_dataset], ignore_index=True)
print("done")
end = time()
print(f"finished creating dataset in {end - start} seconds")
# move SEQN to the front
col = dataset.pop('SEQN')
dataset.insert(0, col.name, col)
if output_excel_filename:
dataset.to_excel(output_excel_filename, index=False)
return dataset
# validate/preproccess years input
def _preproccess_years(years: List[str]) -> List[str]:
print("preproccessing years input...")
for year in years:
if year not in ALL_YEARS:
raise ValueError(f"{year} is not a valid year. Valid years are {ALL_YEARS}")
print("done")
return years
# validate/preproccess vars input
def _preproccess_vars(vars: List[str]) -> List[str]:
print("preproccessing vars input...")
vars = set(vars)
vars.add("SEQN")
vars = list(vars)
for var in vars:
if var not in ALL_VARS:
raise ValueError(f"{var} is not a valid variable. I could not find it.")
print("done")
return vars
# get list of filenames to download for a given year
def _get_filenames_to_download(vars: List[str], year: str) -> List[str]:
print(f"figuring out which files to download for {year}...")
var_file_map = globals()['VAR_TO_FILENAME_' + year.replace('-', '_')]
files_to_download = set()
for var in vars:
if not var == "SEQN" and var in var_file_map:
files_to_download.add(var_file_map[var])
print("done")
return list(files_to_download)
# downloads files and saves them as .csv to downloaded directory
def _download_files(files_to_download: List[str], year: str) -> None:
print("downloading files...")
BASE_URL = "https://wwwn.cdc.gov/Nchs/Nhanes/"
# make sure downloaded directory exists
if not os.path.exists(DOWNLOADED_DIR):
os.makedirs(DOWNLOADED_DIR)
for file in files_to_download:
# check to see if file is already downloaded
if os.path.exists(DOWNLOADED_DIR + file.replace('.XPT', '.csv')):
continue
# download the file
url = BASE_URL + year + '/' + file
filename = wget.download(url, out=DOWNLOADED_DIR)
# convert to csv
with open(DOWNLOADED_DIR + file, 'rb') as f:
library = xport.v56.load(f)
ds = next(iter(library.values()))
output_filename = DOWNLOADED_DIR + file.replace(".XPT", ".csv")
ds.to_csv(output_filename, index=False)
print("\ndone")
# find all vars that are in given file
def _find_all_vars_in_file(file: str, vars: List[str], year: str) -> List[str]:
var_file_map = globals()['VAR_TO_FILENAME_' + year.replace('-', '_')]
vars_in_file = []
for var in vars:
if var in var_file_map and var_file_map[var] == file:
vars_in_file.append(var)
return vars_in_file