Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions openaddr/conform.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,20 @@ def decompress(self, source_paths, workdir, filenames):

# Extract contents of zip file into expand_path directory.
for source_path in source_paths:
with ZipFile(source_path, 'r') as z:
for name in z.namelist():
if len(filenames) and not is_in(name, filenames):
# Download only the named file, if any.
_L.debug("Skipped file {}".format(name))
continue
self._extract_zip(source_path, expand_path, filenames)

# Recursively extract nested zip files, but fail if more than one zip
# appears at the same directory level.
processed = set()
pending = list(self._find_single_zips(expand_path))

z.extract(name, expand_path)
while pending:
zip_path = pending.pop()
if zip_path in processed:
continue
processed.add(zip_path)
self._extract_zip(zip_path, os.path.dirname(zip_path), filenames)
pending.extend(self._find_single_zips(os.path.dirname(zip_path)))

# Collect names of directories and files in expand_path directory.
for (dirpath, dirnames, filenames) in os.walk(expand_path):
Expand All @@ -230,6 +236,28 @@ def decompress(self, source_paths, workdir, filenames):

return output_files

def _extract_zip(self, source_path, expand_path, filenames):
with ZipFile(source_path, 'r') as z:
for name in z.namelist():
if len(filenames) and not is_in(name, filenames):
# Download only the named file, if any.
_L.debug("Skipped file {}".format(name))
continue

z.extract(name, expand_path)

def _find_single_zips(self, root_path):
zip_paths = []
for dirpath, _, file_names in os.walk(root_path):
zip_files = [name for name in file_names if name.lower().endswith('.zip')]
if len(zip_files) > 1:
raise DecompressionError(
"Multiple nested zip files found in {}".format(dirpath)
)
for zip_name in zip_files:
zip_paths.append(os.path.join(dirpath, zip_name))
return zip_paths

class GzipDecompressTask(DecompressionTask):
def decompress(self, source_paths, workdir, filenames):
output_files = []
Expand Down