Skip to content
Draft
123 changes: 120 additions & 3 deletions weather_dl/download_pipeline/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,18 @@
import typing as t
import urllib3
import warnings

import shutil
import cdsapi
from ecmwfapi import ECMWFService
from .util import retry_with_exponential_backoff
from ecmwfapi import ECMWFService, api
from .config import Config
from urllib.parse import urljoin
from urllib.request import (
Request,
urlopen,
)
from contextlib import closing
from apache_beam.io.gcp.gcsio import DEFAULT_READ_BUFFER_SIZE

warnings.simplefilter(
"ignore", category=urllib3.connectionpool.InsecureRequestWarning)
Expand Down Expand Up @@ -59,6 +67,16 @@ def num_requests_per_key(self, dataset: str) -> int:
"""Specifies the number of workers to be used per api key for the dataset."""
pass

@abc.abstractmethod
def fetch(self, dataset: str, selection: t.Dict) -> t.Dict:
"""Fetch data from data source."""
pass

@abc.abstractmethod
def download(self, dataset: str, result: t.Dict, output: str) -> None:
"""Download from data source."""
pass

@property
@abc.abstractmethod
def license_url(self):
Expand Down Expand Up @@ -102,6 +120,12 @@ def __init__(self, config: Config, level: int = logging.INFO) -> None:
def retrieve(self, dataset: str, selection: t.Dict, target: str) -> None:
self.c.retrieve(dataset, selection, target)

def fetch(self, dataset: str, selection: t.Dict) -> None:
raise NotImplementedError()

def download(self, dataset: str, result: t.Dict, output: str) -> None:
raise NotImplementedError()

@property
def license_url(self):
return 'https://cds.climate.copernicus.eu/api/v2/terms/static/licence-to-use-copernicus-products.pdf'
Expand Down Expand Up @@ -152,6 +176,85 @@ def __exit__(self, exc_type, exc_value, traceback):
self._redirector.__exit__(exc_type, exc_value, traceback)


class SplitMARSRequest(api.APIRequest):
"""Extended MARS APIRequest class that separates fetch and download stage."""
@retry_with_exponential_backoff
def _download(self, url, path: str, size: int) -> None:
existing_size = 0
req = Request(url)

if os.path.exists(path):
mode = "ab"
existing_size = os.path.getsize(path)
req.add_header("Range", "bytes=%s-" % existing_size)
else:
mode = "wb"

self.log(
"Transfering %s into %s" % (self._bytename(size), path)
)
self.log("From %s" % (url,))

with open(path, mode) as f:
with closing(urlopen(req)) as http:
shutil.copyfileobj(http, f, DEFAULT_READ_BUFFER_SIZE)

def fetch(self, request: t.Dict) -> t.Dict:
status = None

self.connection.submit("%s/%s/requests" % (self.url, self.service), request)
self.log("Request submitted")
self.log("Request id: " + self.connection.last.get("name"))
if self.connection.status != status:
status = self.connection.status
self.log("Request is %s" % (status,))

while not self.connection.ready():
if self.connection.status != status:
status = self.connection.status
self.log("Request is %s" % (status,))
self.connection.wait()

if self.connection.status != status:
status = self.connection.status
self.log("Request is %s" % (status,))

result = self.connection.result()
return result

def download(self, result: t.Dict, target: t.Optional[str] = None) -> None:
if target:
if os.path.exists(target):
# Empty the target file, if it already exists, otherwise the
# transfer below might be fooled into thinking we're resuming
# an interrupted download.
open(target, "w").close()

self._download(urljoin(self.url, result["href"]), target, result["size"])
self.connection.cleanup()


class MARSECMWFServiceExtended(ECMWFService):
"""Extended MARS ECMFService class that separates fetch and download stage."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Comment on lines +239 to +240
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, this can be omitted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be needed, see comment below.

self.c = SplitMARSRequest(
self.url,
"services/%s" % (self.service,),
email=self.email,
key=self.key,
log=self.log,
verbose=self.verbose,
quiet=self.quiet,
)

def fetch(self, req: t.Dict) -> t.Dict:
return self.c.fetch(req)

def download(self, res: t.Dict, target: str) -> None:
self.c.download(res, target)


class MarsClient(Client):
"""A client to access data from the Meteorological Archival and Retrieval System (MARS).

Expand All @@ -174,7 +277,7 @@ class MarsClient(Client):

def __init__(self, config: Config, level: int = logging.INFO) -> None:
super().__init__(config, level)
self.c = ECMWFService(
self.c = MARSECMWFServiceExtended(
"mars",
key=config.kwargs.get('api_key', os.environ.get("MARSAPI_KEY")),
url=config.kwargs.get('api_url', os.environ.get("MARSAPI_URL")),
Expand All @@ -187,6 +290,14 @@ def retrieve(self, dataset: str, selection: t.Dict, output: str) -> None:
with StdoutLogger(self.logger, level=logging.DEBUG):
self.c.execute(req=selection, target=output)

def fetch(self, dataset: str, selection: t.Dict) -> t.Dict:
with StdoutLogger(self.logger, level=logging.DEBUG):
return self.c.fetch(req=selection)

def download(self, dataset: str, result: t.Dict, output: str) -> None:
with StdoutLogger(self.logger, level=logging.DEBUG):
self.c.download(res=result, target=output)

@property
def license_url(self):
return 'https://apps.ecmwf.int/datasets/licences/general/'
Expand Down Expand Up @@ -214,6 +325,12 @@ def retrieve(self, dataset: str, selection: t.Dict, output: str) -> None:
with open(output, 'w') as f:
json.dump({dataset: selection}, f)

def fetch(self, dataset: str, selection: t.Dict) -> None:
raise NotImplementedError()

def download(self, dataset: str, result: t.Dict, output: str) -> None:
raise NotImplementedError()

@property
def license_url(self):
return 'lorem ipsum'
Expand Down
Loading