Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-40691: Add watch support for namespaces #205

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions changelog.d/20230912_145508_rra_DM_40691.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
### New features

- Add watch, field selector, and label selector support to `list_namespace` in the Kubernetes mock.

### Bug fixes

- `read_namespace` and `list_namespace` in the Kubernetes mock now only return namespace objects that have been explicitly created, not implicit namespaces created by creating another object without making a namespace first. This more closely matches the behavior of Kubernetes while still making it easy to use the mock in a test environment simulating a pre-existing namespace.
164 changes: 125 additions & 39 deletions src/safir/testing/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,12 @@ def build_watch_response(
between events. It matches the corresponding parameter in the
Kubernetes API. If `None`, watch forever.
field_selector
Which events to retrieve when performing a watch. If set, it must
be set to ``metadata.name=...`` to match a specific object name.
Limit the returned events to the objects matching the selector.
If set, it must be set to ``metadata.name=...`` to match a
specific object name.
label_selector
Limit the returned events to the objects matching this label
selector.

Returns
-------
Expand Down Expand Up @@ -294,11 +298,12 @@ def _build_watcher(
between events. It matches the corresponding parameter in the
Kubernetes API. If `None`, watch forever.
field_selector
Which events to retrieve when performing a watch. If set, it must
be set to ``metadata.name=...`` to match a specific object name.
Limit the returned events to the objects matching the selector.
If set, it must be set to ``metadata.name=...`` to match a
specific object name.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Limit the returned events to the objects matching this label
selector.

Returns
-------
Expand Down Expand Up @@ -359,10 +364,10 @@ class MockKubernetesApi:
This mock does not enforce namespace creation before creating objects in a
namespace. Creating an object in a namespace will implicitly create that
namespace if it doesn't exist. However, it will not store a
``V1Namespace`` object, so to verify that a namespace was properly created
(although not the order of creation), retrieve all the objects in the
namespace with `get_namespace_objects_for_test` and one of them will be
the ``V1Namespace`` object.
``V1Namespace`` object or emit events for `list_namespace` watches. To
verify that a namespace was properly created (although not the order of
creation), check for the namespace object explicitly with `read_namespace`
or `list_namespace`.

Objects stored with ``create_*`` or ``replace_*`` methods are **NOT**
copied. The object provided will be stored, so changing that object will
Expand Down Expand Up @@ -404,7 +409,8 @@ def __init__(self) -> None:
self._nodes = V1NodeList(items=[])
self._objects: dict[str, dict[str, dict[str, Any]]] = {}
self._events: defaultdict[str, list[CoreV1Event]] = defaultdict(list)
self._event_streams: defaultdict[str, dict[str, _EventStream]]
self._namespace_stream = _EventStream()
self._event_streams: defaultdict[str, defaultdict[str, _EventStream]]
self._event_streams = defaultdict(lambda: defaultdict(_EventStream))

def get_all_objects_for_test(self, kind: str) -> list[Any]:
Expand Down Expand Up @@ -669,8 +675,7 @@ async def list_namespaced_custom_object(
Only ``metadata.name=...`` is supported. It is parsed to find the
ingress name and only ingresss matching that name will be returned.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -1098,8 +1103,7 @@ async def list_namespaced_ingress(
Only ``metadata.name=...`` is supported. It is parsed to find the
ingress name and only ingresss matching that name will be returned.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -1349,8 +1353,7 @@ async def list_namespaced_job(
Only ``metadata.name=...`` is supported. It is parsed to find the
job name and only jobs matching that name will be returned.
label_selector
Which events to retrieve when performing a watch. All
labels must match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -1451,6 +1454,7 @@ async def create_namespace(self, body: V1Namespace) -> None:
msg = f"Namespace {name} already exists"
raise ApiException(status=409, reason=msg)
self._store_object(name, "Namespace", name, body)
self._namespace_stream.add_event("ADDED", body)

async def delete_namespace(
self,
Expand Down Expand Up @@ -1480,11 +1484,20 @@ async def delete_namespace(
self._maybe_error("delete_namespace")
if name not in self._objects:
raise ApiException(status=404, reason=f"{name} not found")
try:
body = await self.read_namespace(name)
self._namespace_stream.add_event("DELETED", body)
except ApiException:
pass
del self._objects[name]

async def read_namespace(self, name: str) -> V1Namespace:
"""Return the namespace object for a namespace.

This will only return a namespace object if one was created via
`create_namespace`, not if one was implicitly added by creating some
other object.

Parameters
----------
name
Expand All @@ -1493,38 +1506,112 @@ async def read_namespace(self, name: str) -> V1Namespace:
Returns
-------
kubernetes_asyncio.client.V1Namespace
Corresponding namespace object. If `create_namespace` has
been called, will return the stored object. Otherwise, returns a
synthesized ``V1Namespace`` object if the namespace has been
implicitly created.
Corresponding namespace object. If `create_namespace` has not been
called, but the namespace was added implicitly, an exception will
be raised.

Raises
------
kubernetes_asyncio.client.ApiException
Raised with 404 status if the namespace does not exist.
"""
self._maybe_error("read_namespace", name)
if name not in self._objects:
msg = f"Namespace {name} not found"
raise ApiException(status=404, reason=msg)
try:
return self._get_object(name, "Namespace", name)
except ApiException:
return V1Namespace(metadata=V1ObjectMeta(name=name))
return self._get_object(name, "Namespace", name)

async def list_namespace(self) -> V1NamespaceList:
"""List known namespaces.
async def list_namespace(
self,
*,
field_selector: str | None = None,
label_selector: str | None = None,
resource_version: str | None = None,
timeout_seconds: int | None = None,
watch: bool = False,
_preload_content: bool = True,
_request_timeout: int | None = None,
) -> V1NamespaceList | Mock:
"""List namespaces.

This does support watches. Only namespaces that are explicitly
created with `create_namespace` will be shown, not ones that were
implicitly created by creating some other object.

Parameters
----------
namespace
Namespace of jobs to list.
field_selector
Only ``metadata.name=...`` is supported. It is parsed to find the
namespace name and only the namespace matching that name will be
returned.
label_selector
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
timeout_seconds
How long to return events for before exiting when performing a
watch.
watch
Whether to act as a watch.
_preload_content
Verified to be `False` when performing a watch.
_request_timeout
Ignored, accepted for compatibility with the watch API.

Returns
-------
kubernetes_asyncio.client.V1NamespaceList
All namespaces, whether implicitly created or not. These will be
the actual ``V1Namespace`` objects if one was stored, otherwise
synthesized namespace objects.
kubernetes_asyncio.client.V1JobList or unittest.mock.Mock
List of namespaces, when not called as a watch. If called as a
watch, returns a mock ``aiohttp.Response`` with a ``readline``
metehod that yields the events.

Raises
------
AssertionError
Some other ``field_selector`` was provided.
kubernetes_asyncio.client.ApiException
Raised with 404 status if the namespace does not exist.
"""
self._maybe_error("list_namespace")
namespaces = [await self.read_namespace(n) for n in self._objects]
return V1NamespaceList(items=namespaces)

# We annoyingly have to duplicate a bunch of logic from _list_objects
# because namespaces aren't stored under a single namespace.
if not watch:
if field_selector:
match = re.match(r"metadata\.name=(.*)$", field_selector)
if not match or not match.group(1):
msg = f"Field selector {field_selector} not supported"
raise ValueError(msg)
try:
name = match.group(1)
obj = self._get_object(name, "Namespace", name)
if _check_labels(obj.metadata.labels, label_selector):
return [obj]
else:
return []
except ApiException:
return []
namespaces = []
for name in self._objects:
try:
namespace = await self.read_namespace(name)
except ApiException:
continue
if _check_labels(namespace.metadata.labels, label_selector):
namespaces.append(namespace)
return V1NamespaceList(items=namespaces)

# All watches must not preload content since we're returning raw JSON.
# This is done by the Kubernetes API Watch object.
assert not _preload_content

# Return the mock response expected by the Kubernetes API.
return self._namespace_stream.build_watch_response(
resource_version,
timeout_seconds,
field_selector=field_selector,
label_selector=label_selector,
)

# NETWORKPOLICY API

Expand Down Expand Up @@ -1749,7 +1836,7 @@ async def list_namespaced_pod(
Only ``metadata.name=...`` is supported. It is parsed to find the
pod name and only pods matching that name will be returned.
label_selector
Only pods with labels matching all of these will be returned.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down Expand Up @@ -2136,8 +2223,7 @@ async def list_namespaced_service(
Only ``metadata.name=...`` is supported. It is parsed to find the
service name and only services matching that name will be returned.
label_selector
Which matching objects to retrieve by label. All labels must
match.
Which objects to retrieve. All labels must match.
resource_version
Where to start in the event stream when performing a watch. If
`None`, starts with the next change.
Expand Down