Skip to content
This repository was archived by the owner on Dec 1, 2018. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ __pycache__
*.pyo
build/
dist/
.idea/
tmp/
Pipfile*
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
* `Query.watch` learned to carry forward all query parameters
* `APIObject` learned `watch` to enable per-object watches
* `Deployment` learned to roll back using `rollout_undo` similar to `kubectl rollout undo deployment`
* Added `includeUninitialized` query parameter to include partially initialized resources in the response
* `Query` can now receive data by chunks (to perform consistent reads across a large list)

## 0.14.0

Expand Down
21 changes: 21 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,27 @@ Selector query:
pending_pods = pykube.objects.Pod.objects(api).filter(
field_selector={"status.phase": "Pending"}
)
uninitialized_pods = pykube.objects.Pod.objects(api).filter(
selector={"environment": "production", "tier": "frontend"},
include_uninitialized=True,
)

Consistent reads across a large list:

.. code:: python

# Fetch all pods by chunks of size 10
pods = pykube.Pod.objects(api).all().limit(10).iterator()

# Pagination
first_pods = pykube.Pod.objects(api).all().limit(10)
for pod, offset in first_pods.paginate():
pass
second_pods = first_pods.offset(offset)
for pod, offset in second_pods.paginate():
pass
for pod, offset in second_pods.paginate(offset):
pass

Watch query:

Expand Down
182 changes: 138 additions & 44 deletions pykube/query.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json

from collections import namedtuple
import collections
import json

from six import string_types
from six import string_types, class_types
from six.moves.urllib.parse import urlencode

from .exceptions import ObjectDoesNotExist
Expand All @@ -13,53 +13,113 @@
now = object()


def returns_clone(cls_or_func):
cls = None

def wrap(func):
def inner(self, *args, **kwargs):
clone = self._clone(cls)
func(clone, *args, **kwargs)
return clone
return inner

if isinstance(cls_or_func, class_types):
cls = cls_or_func
return wrap

return wrap(cls_or_func)


WatchEvent = collections.namedtuple("WatchEvent", "type object")


class BaseQuery(object):

def __init__(self, api, api_obj_class, namespace=None):
self.api = api
self.api_obj_class = api_obj_class
self.namespace = namespace
self.selector = everything
self.field_selector = everything

self._namespace = namespace
self._selector = everything
self._field_selector = everything
self._include_uninitialized = None

def all(self):
return self._clone()

def filter(self, namespace=None, selector=None, field_selector=None):
clone = self._clone()
@returns_clone
def filter(self, namespace=None, selector=None, field_selector=None,
include_uninitialized=None):
"""
:rtype: self
"""
if namespace is not None:
clone.namespace = namespace
self._namespace = namespace
if selector is not None:
clone.selector = selector
self._selector = selector
if field_selector is not None:
clone.field_selector = field_selector
return clone
self._field_selector = field_selector
if include_uninitialized is not None:
self._include_uninitialized = include_uninitialized

def _clone(self, cls=None):
if cls is None:
cls = self.__class__
clone = cls(self.api, self.api_obj_class, namespace=self.namespace)
clone.selector = self.selector
clone.field_selector = self.field_selector
clone = cls(self.api, self.api_obj_class, namespace=self._namespace)
clone._selector = self._selector
clone._field_selector = self._field_selector
clone._include_uninitialized = self._include_uninitialized
return clone

def _build_api_url(self, params=None):
def _collect_params(self, params=None):
if params is None:
params = {}
if self.selector is not everything:
params["labelSelector"] = as_selector(self.selector)
if self.field_selector is not everything:
params["fieldSelector"] = as_selector(self.field_selector)
query_string = urlencode(params)
return "{}{}".format(self.api_obj_class.endpoint, "?{}".format(query_string) if query_string else "")
if self._selector is not everything:
# noinspection PyTypeChecker
params["labelSelector"] = as_selector(self._selector)
if self._field_selector is not everything:
# noinspection PyTypeChecker
params["fieldSelector"] = as_selector(self._field_selector)
if self._include_uninitialized is not None:
params["includeUninitialized"] = \
"true" if self._include_uninitialized else "false"
return params

def _build_api_url(self, params=None):
return "{}{}".format(
self.api_obj_class.endpoint,
"?{}".format(urlencode(params)) if params else "",
)


class Query(BaseQuery):

def __init__(self, *args, **kwargs):
super(Query, self).__init__(*args, **kwargs)

self._limit = None
self._continue = None

self._query_cache = None

def _clone(self, cls=None):
clone = super(Query, self)._clone(cls)
clone._limit = self._limit
clone._continue = self._continue
return clone

def _collect_params(self, params=None):
params = super(Query, self)._collect_params(params)
if self._limit:
params["limit"] = self._limit
if self._continue:
params["continue"] = self._continue
return params

def get_by_name(self, name):
kwargs = {
"url": "{}/{}".format(self.api_obj_class.endpoint, name),
"namespace": self.namespace,
"namespace": self._namespace,
}
if self.api_obj_class.base:
kwargs["base"] = self.api_obj_class.base
Expand Down Expand Up @@ -89,40 +149,67 @@ def get_or_none(self, *args, **kwargs):
except ObjectDoesNotExist:
return None

@returns_clone
def limit(self, lim):
self._limit = lim

@returns_clone
def offset(self, off):
self._continue = off

def watch(self, since=None):
query = self._clone(WatchQuery)
if since is now:
query.resource_version = self.response["metadata"]["resourceVersion"]
# noinspection PyTypeChecker
query._resource_version = \
self.response["metadata"]["resourceVersion"]
elif since is not None:
query.resource_version = since
query._resource_version = since
return query

def execute(self):
kwargs = {"url": self._build_api_url()}
kwargs = {"url": self._build_api_url(params=self._collect_params())}
if self.api_obj_class.base:
kwargs["base"] = self.api_obj_class.base
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
if self.namespace is not None and self.namespace is not all_:
kwargs["namespace"] = self.namespace
if self._namespace is not None and self._namespace is not all_:
kwargs["namespace"] = self._namespace
r = self.api.get(**kwargs)
r.raise_for_status()
return r

def _fetch_chunk(self):
resp = self.execute().json()
self._continue = resp["metadata"].get("continue")
for obj in resp["items"]:
yield self.api_obj_class(self.api, obj)

def iterator(self):
"""
Execute the API request and return an iterator over the objects. This
method does not use the query cache.
"""
for obj in (self.execute().json().get("items") or []):
yield self.api_obj_class(self.api, obj)
while True:
for item in self._fetch_chunk():
yield item
if not self._continue:
break

def paginate(self, offset=None):
query = self.offset(offset) if offset else self
for item in query._fetch_chunk():
yield item, self._continue

@property
def query_cache(self):
if not hasattr(self, "_query_cache"):
cache = {"objects": []}
cache["response"] = self.execute().json()
for obj in (cache["response"].get("items") or []):
if not self._query_cache:
cache = {
"objects": [],
"response": self.execute().json(),
}
# noinspection PyTypeChecker
for obj in cache["response"]["items"]:
cache["objects"].append(self.api_obj_class(self.api, obj))
self._query_cache = cache
return self._query_cache
Expand All @@ -141,27 +228,33 @@ def response(self):
class WatchQuery(BaseQuery):

def __init__(self, *args, **kwargs):
self.resource_version = kwargs.pop("resource_version", None)
self._resource_version = kwargs.pop("resource_version", None)
super(WatchQuery, self).__init__(*args, **kwargs)

def _collect_params(self, params=None):
params = super(WatchQuery, self)._collect_params()
params["watch"] = "true"
if self._resource_version is not None:
params["resourceVersion"] = self._resource_version
return params

def object_stream(self):
params = {"watch": "true"}
if self.resource_version is not None:
params["resourceVersion"] = self.resource_version
kwargs = {
"url": self._build_api_url(params=params),
"url": self._build_api_url(params=self._collect_params()),
"stream": True,
}
if self.namespace is not all_:
kwargs["namespace"] = self.namespace
if self._namespace is not all_:
kwargs["namespace"] = self._namespace
if self.api_obj_class.version:
kwargs["version"] = self.api_obj_class.version
r = self.api.get(**kwargs)
self.api.raise_for_status(r)
WatchEvent = namedtuple("WatchEvent", "type object")
for line in r.iter_lines():
we = json.loads(line.decode("utf-8"))
yield WatchEvent(type=we["type"], object=self.api_obj_class(self.api, we["object"]))
yield WatchEvent(
type=we["type"],
object=self.api_obj_class(self.api, we["object"]),
)

def __iter__(self):
return iter(self.object_stream())
Expand Down Expand Up @@ -190,5 +283,6 @@ def as_selector(value):
elif op == "notin":
s.append("{} notin ({})".format(label, ",".join(v)))
else:
raise ValueError("{} is not a valid comparison operator".format(op))
raise ValueError("{} is not a valid comparison "
"operator".format(op))
return ",".join(s)