Skip to content

Commit 4808f1d

Browse files
committed
add ResourceOpts, a pydantic basemodel replacing kwargs on Resource
1 parent 3f1cba2 commit 4808f1d

File tree

8 files changed

+163
-111
lines changed

8 files changed

+163
-111
lines changed

src/pyff/builtins.py

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
from six.moves.urllib_parse import quote_plus, urlparse
2222

2323
from pyff.pipes import registry
24-
2524
from .constants import NS
2625
from .decorators import deprecated
2726
from .exceptions import MetadataException
2827
from .logs import get_log
2928
from .pipes import PipeException, PipelineCallback, Plumbing, pipe
29+
from .resource import ResourceOpts
3030
from .samlmd import (
3131
annotate_entity,
3232
discojson_t,
@@ -352,7 +352,7 @@ def _pipe(req, *opts):
352352

353353

354354
@pipe
355-
def when(req, condition, *values):
355+
def when(req: Plumbing.Request, condition: str, *values):
356356
"""
357357
Conditionally execute part of the pipeline.
358358
@@ -377,6 +377,8 @@ def when(req, condition, *values):
377377
followed. If 'bar' is present in the state with the value 'bill' then the other branch is followed.
378378
"""
379379
c = req.state.get(condition, None)
380+
if c is None:
381+
log.debug(f'Condition {repr(condition)} not present in state {req.state}')
380382
if c is not None and (not values or _any(values, c)):
381383
return Plumbing(pipeline=req.args, pid="%s.when" % req.plumbing.id).iprocess(req)
382384
return req.t
@@ -628,32 +630,37 @@ def load(req, *opts):
628630
)
629631

630632
url = r.pop(0)
631-
params = {"via": [], "cleanup": [], "verify": None, "as": url}
633+
634+
# Copy parent node opts as a starting point
635+
child_opts = req.md.rm.opts.copy(update={"via": [], "cleanup": [], "verify": None, "as": url})
632636

633637
while len(r) > 0:
634638
elt = r.pop(0)
635639
if elt in ("as", "verify", "via", "cleanup"):
640+
# These elements have an argument
636641
if len(r) > 0:
637-
if elt in ("via", "cleanup"):
638-
params[elt].append(r.pop(0))
642+
value = r.pop(0)
643+
if elt == "as":
644+
child_opts.alias = value
645+
elif elt == "verify":
646+
child_opts.verify = value
647+
elif elt == "via":
648+
child_opts.via.append(PipelineCallback(value, req, store=req.md.store))
649+
elif elt == "cleanup":
650+
child_opts.cleanup.append(PipelineCallback(value, req, store=req.md.store))
639651
else:
640-
params[elt] = r.pop(0)
652+
raise ValueError(f'Unhandled resource option {elt}')
641653
else:
642654
raise PipeException(
643655
"Usage: load resource [as url] [[verify] verification] [via pipeline]* [cleanup pipeline]*"
644656
)
645657
else:
646-
params['verify'] = elt
647-
648-
if params['via'] is not None:
649-
params['via'] = [PipelineCallback(p, req, store=req.md.store) for p in params['via']]
650-
651-
if params['cleanup'] is not None:
652-
params['cleanup'] = [PipelineCallback(p, req, store=req.md.store) for p in params['cleanup']]
658+
child_opts.verify = elt
653659

654-
params.update(opts)
660+
# override anything in child_opts with what is in opts
661+
child_opts = child_opts.copy(update=opts)
655662

656-
req.md.rm.add_child(url, **params)
663+
req.md.rm.add_child(url, child_opts)
657664

658665
log.debug("Refreshing all resources")
659666
req.md.rm.reload(fail_on_error=bool(opts['fail_on_error']))

src/pyff/parse.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from .constants import NS
99
from .logs import get_log
10-
from .resource import Resource
10+
from .resource import Resource, ResourceOpts
1111
from .utils import find_matching_files, parse_xml, root, unicode_stream, utc_now
1212

1313
__author__ = 'leifj'
@@ -73,7 +73,8 @@ def parse(self, resource: Resource, content: str):
7373
info['Expiration Time'] = 'never expires'
7474
n = 0
7575
for fn in find_matching_files(content, self.extensions):
76-
resource.add_child("file://" + fn)
76+
child_opts = resource.opts.copy(update={'alias': None})
77+
resource.add_child("file://" + fn, child_opts)
7778
n += 1
7879

7980
if n == 0:
@@ -112,7 +113,8 @@ def parse(self, resource: Resource, content: str) -> Mapping[str, Any]:
112113
if len(fingerprints) > 0:
113114
fp = fingerprints[0]
114115
log.debug("XRD: {} verified by {}".format(link_href, fp))
115-
resource.add_child(link_href, verify=fp)
116+
child_opts = resource.opts.copy(update={'alias': None})
117+
resource.add_child(link_href, child_opts)
116118
resource.last_seen = utc_now().replace(microsecond=0)
117119
resource.expire_time = None
118120
resource.never_expires = True

src/pyff/pipes.py

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
"""
55
from __future__ import annotations
66

7+
import functools
78
import os
89
import traceback
9-
import functools
10-
from typing import Any, Dict, Optional, Callable, Type, Tuple
10+
from typing import Any, Callable, Dict, List, Optional
11+
from typing import Type
1112

1213
import yaml
1314
from apscheduler.schedulers.background import BackgroundScheduler
@@ -140,14 +141,15 @@ class PipelineCallback(object):
140141

141142
def __init__(self, entry_point, req, store=None):
142143
self.entry_point = entry_point
143-
self.plumbing = Plumbing(
144-
req.scope_of(entry_point).plumbing.pipeline, "%s-via-%s" % (req.plumbing.id, entry_point)
145-
)
144+
self.plumbing = Plumbing(req.scope_of(entry_point).plumbing.pipeline, f"{req.plumbing.id}-via-{entry_point}")
146145
self.req = req
147146
self.store = store
148147

149148
def __str__(self) -> str:
150-
return f"<PipelineCallback to {self.req.plumbing}]>"
149+
return f"<PipelineCallback to {self.req.plumbing}>"
150+
151+
def __repr__(self) -> str:
152+
return str(self)
151153

152154
def __copy__(self):
153155
return self
@@ -200,19 +202,19 @@ class Plumbing(object):
200202
would then be signed (using signer.key) and finally published in /var/metadata/public/metadata.xml
201203
"""
202204

203-
def __init__(self, pipeline, pid):
205+
def __init__(self, pipeline: List[Dict[str, Any]], pid: str):
204206
self._id = pid
205207
self.pipeline = pipeline
206208

207209
def to_json(self):
208210
return self.pipeline
209211

210212
@property
211-
def id(self):
213+
def id(self) -> str:
212214
return self._id
213215

214216
@property
215-
def pid(self):
217+
def pid(self) -> str:
216218
return self._id
217219

218220
def __iter__(self):
@@ -304,7 +306,9 @@ def iprocess(self, req: Plumbing.Request):
304306
try:
305307
pipefn, opts, name, args = load_pipe(p)
306308
log.debug(
307-
"{!s}: calling '{}' using args: {} and opts: {}".format(self.pipeline, name, repr(args), repr(opts))
309+
"{!s}: calling '{}' using args:\n{} and opts:\n{}".format(
310+
self.pipeline, name, repr(args), repr(opts)
311+
)
308312
)
309313
if is_text(args):
310314
args = [args]
@@ -326,7 +330,16 @@ def iprocess(self, req: Plumbing.Request):
326330
break
327331
return req.t
328332

329-
def process(self, md, args=None, state=None, t=None, store=None, raise_exceptions=True, scheduler=None):
333+
def process(
334+
self,
335+
md: MDRepository,
336+
args=None,
337+
state: Optional[Dict[str, Any]] = None,
338+
t=None,
339+
store=None,
340+
raise_exceptions: bool = True,
341+
scheduler=None,
342+
):
330343
"""
331344
The main entrypoint for processing a request pipeline. Calls the inner processor.
332345

src/pyff/repo.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from .constants import config
44
from .logs import get_log
5-
from .resource import IconHandler, Resource
5+
from .resource import IconHandler, Resource, ResourceOpts
66
from .samlmd import entitiesdescriptor, root
77
from .store import make_icon_store_instance, make_store_instance
88
from .utils import is_text, make_default_scheduler
@@ -15,7 +15,7 @@ class MDRepository:
1515

1616
def __init__(self, scheduler=None):
1717
random.seed(self)
18-
self.rm = Resource() # root
18+
self.rm = Resource(url=None, opts=ResourceOpts()) # root
1919
if scheduler is None:
2020
scheduler = make_default_scheduler()
2121
scheduler.start()

src/pyff/resource.py

Lines changed: 48 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,25 @@
88
import os
99
import traceback
1010
from collections import deque
11-
from copy import deepcopy
1211
from datetime import datetime
1312
from threading import Condition, Lock
14-
from typing import Any, Callable, Deque, Dict, Optional, Tuple
13+
from typing import Any, Callable, Deque, Dict, List, Optional, TYPE_CHECKING, Tuple
1514
from urllib.parse import quote as urlescape
1615

1716
import requests
1817
from requests.adapters import Response
18+
from pydantic import BaseModel, Field
1919

2020
from .constants import config
2121
from .exceptions import ResourceException
2222
from .fetch import make_fetcher
2323
from .logs import get_log
2424
from .utils import Watchable, hex_digest, img_to_data, non_blocking_lock, resource_string, safe_write, url_get, utc_now
2525

26+
if TYPE_CHECKING:
27+
from .parse import PyffParser
28+
from .pipes import PipelineCallback
29+
2630
requests.packages.urllib3.disable_warnings()
2731

2832
log = get_log(__name__)
@@ -131,11 +135,32 @@ def i_handle(self, t: Resource, url=None, response=None, exception=None, last_fe
131135
t.info['Exception'] = ex
132136

133137

138+
class ResourceOpts(BaseModel):
139+
alias: Optional[str] = Field(None, alias='as') # TODO: Rename to 'name'?
140+
# a certificate (file) or a SHA1 fingerprint to use for signature verification
141+
verify: Optional[str] = None
142+
via: List[Any] = Field([]) # list of PipelineCallback
143+
# A list of PipelineCallback that can be used to pre-process parsed metadata before validation. Use as a clue-bat.
144+
cleanup: List[Any] = Field([]) # list of PipelineCallback
145+
fail_on_error: bool = False
146+
# remove invalid EntityDescriptor elements rather than raise an error
147+
filter_invalid: bool = True
148+
# set to False to turn off all XML schema validation
149+
validate_schema: bool = Field(True, alias='validate')
150+
151+
def to_dict(self) -> Dict[str, Any]:
152+
res = self.dict()
153+
# Compensate for the 'alias' field options
154+
res['as'] = res.pop('alias')
155+
res['validate'] = res.pop('validate_schema')
156+
return res
157+
158+
134159
class Resource(Watchable):
135-
def __init__(self, url=None, **kwargs):
160+
def __init__(self, url: Optional[str], opts: ResourceOpts):
136161
super().__init__()
137162
self.url: str = url
138-
self.opts = kwargs
163+
self.opts = opts
139164
self.t = None
140165
self.type = "text/plain"
141166
self.etag = None
@@ -148,12 +173,6 @@ def __init__(self, url=None, **kwargs):
148173
self._setup()
149174

150175
def _setup(self):
151-
self.opts.setdefault('cleanup', [])
152-
self.opts.setdefault('via', [])
153-
self.opts.setdefault('fail_on_error', False)
154-
self.opts.setdefault('verify', None)
155-
self.opts.setdefault('filter_invalid', True)
156-
self.opts.setdefault('validate', True)
157176
if self.url is not None:
158177
if "://" not in self.url:
159178
pth = os.path.abspath(self.url)
@@ -178,20 +197,20 @@ def local_copy_fn(self):
178197
return os.path.join(config.local_copy_dir, urlescape(self.url))
179198

180199
@property
181-
def post(self):
182-
return self.opts['via']
200+
def post(self) -> List['PipelineCallback']:
201+
return self.opts.via
183202

184-
def add_via(self, callback):
185-
self.opts['via'].append(callback)
203+
def add_via(self, callback: 'PipelineCallback') -> None:
204+
self.opts.via.append(callback)
186205

187206
@property
188-
def cleanup(self):
189-
return self.opts['cleanup']
207+
def cleanup(self) -> List['PipelineCallback']:
208+
return self.opts.cleanup
190209

191210
def __str__(self):
192211
return "Resource {} expires at {} using ".format(
193212
self.url if self.url is not None else "(root)", self.expire_time
194-
) + ",".join(["{}={}".format(k, v) for k, v in list(self.opts.items())])
213+
) + ",".join(["{}={}".format(k, v) for k, v in sorted(list(self.opts.dict().items()))])
195214

196215
def reload(self, fail_on_error=False):
197216
with non_blocking_lock(self.lock):
@@ -252,25 +271,24 @@ def _replace(self, r: Resource) -> None:
252271
return
253272
raise ValueError("Resource {} not present - use add_child".format(r.url))
254273

255-
def add_child(self, url: str, **kwargs) -> Resource:
256-
opts = deepcopy(self.opts)
257-
if 'as' in opts:
258-
del opts['as']
259-
opts.update(kwargs)
260-
r = Resource(url, **opts)
274+
def add_child(self, url: str, opts: ResourceOpts) -> Resource:
275+
r = Resource(url, opts)
261276
if r in self.children:
277+
log.debug(f'\n\n{self}:\nURL {url}\nReplacing child {r}')
262278
self._replace(r)
263279
else:
280+
log.debug(f'\n\n{self}:\nURL {url}\nAdding child {r}')
281+
if not r.opts.via:
282+
log.debug('Empty Via')
264283
self.children.append(r)
265284

266285
return r
267286

268287
@property
269288
def name(self) -> Optional[str]:
270-
if 'as' in self.opts:
271-
return self.opts['as']
272-
else:
273-
return self.url
289+
if self.opts.alias:
290+
return self.opts.alias
291+
return self.url
274292

275293
@property
276294
def info(self):
@@ -359,7 +377,7 @@ def load_resource(self, getter: Callable[[str], Response]) -> Tuple[Optional[str
359377

360378
return data, status, info
361379

362-
def parse(self, getter):
380+
def parse(self, getter: Callable[[str], Response]) -> Deque[Resource]:
363381
data, status, info = self.load_resource(getter)
364382
info['State'] = 'Parsing'
365383
# local import to avoid circular import
@@ -375,10 +393,10 @@ def parse(self, getter):
375393

376394
info['State'] = 'Parsed'
377395
if self.t is not None:
378-
if self.post and isinstance(self.post, list):
396+
if self.post:
379397
for cb in self.post:
380398
if self.t is not None:
381-
self.t = cb(self.t, **self.opts)
399+
self.t = cb(self.t, self.opts.dict())
382400

383401
if self.is_expired():
384402
info['Expired'] = True

0 commit comments

Comments
 (0)