Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #277 from nolar/246-aiohttp4-readiness
Browse files Browse the repository at this point in the history
[246] Prepare for aiohttp v4, but stay with v3
  • Loading branch information
nolar authored Jan 9, 2020
2 parents 1719184 + 02d15e1 commit 848e1da
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 97 deletions.
70 changes: 45 additions & 25 deletions kopf/clients/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ def reauthenticated_request(fn: _F) -> _F:
@functools.wraps(fn)
async def wrapper(*args: Any, **kwargs: Any) -> Any:

# If a session is explicitly passed, make it a simple call without re-auth.
# If a context is explicitly passed, make it a simple call without re-auth.
# Exceptions are escalated to a caller, which is probably wrapped itself.
if 'session' in kwargs:
if 'context' in kwargs:
return await fn(*args, **kwargs)

# Otherwise, attempt the execution with the vault credentials and re-authenticate on 401s.
vault: credentials.Vault = vault_var.get()
async for key, info, session in vault.extended(APISession.from_connection_info, 'sessions'):
async for key, info, context in vault.extended(APIContext, 'contexts'):
try:
return await fn(*args, **kwargs, session=session)
return await fn(*args, **kwargs, context=context)
except aiohttp.ClientResponseError as e:
if e.status == 401:
await vault.invalidate(key, exc=e)
Expand All @@ -66,18 +66,18 @@ def reauthenticated_stream(fn: _F) -> _F:
@functools.wraps(fn)
async def wrapper(*args: Any, **kwargs: Any) -> Any:

# If a session is explicitly passed, make it a simple call without re-auth.
# If a context is explicitly passed, make it a simple call without re-auth.
# Exceptions are escalated to a caller, which is probably wrapped itself.
if 'session' in kwargs:
if 'context' in kwargs:
async for item in fn(*args, **kwargs):
yield item
return

# Otherwise, attempt the execution with the vault credentials and re-authenticate on 401s.
vault: credentials.Vault = vault_var.get()
async for key, info, session in vault.extended(APISession.from_connection_info, 'sessions'):
async for key, info, context in vault.extended(APIContext, 'contexts'):
try:
async for item in fn(*args, **kwargs, session=session):
async for item in fn(*args, **kwargs, context=context):
yield item
break # out of credentials cycle (instead of `return`)
except aiohttp.ClientResponseError as e:
Expand All @@ -90,29 +90,36 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any:
return cast(_F, wrapper)


class APISession(aiohttp.ClientSession):
class APIContext:
"""
An extended aiohttp session, with k8s scopes for server & namespace scopes.
A container for an aiohttp session and the caches of the environment info.
It is constructed once per every `ConnectionInfo`, and then cached
for later re-use (see `Vault.extended`).
The container is constructed only once for every `ConnectionInfo`,
and then cached for later re-use (see `Vault.extended`).
We assume that the whole operator runs in the same event loop, so there is
no need to split the sessions for multiple loops. Synchronous handlers are
threaded with other event loops per thread, but no operator's requests are
performed inside of those threads: everything is in the main thread/loop.
"""

# The main contained object used by the API methods.
session: aiohttp.ClientSession

# Contextual information for URL building.
server: str
default_namespace: Optional[str]

# Temporary caches of the information retrieved for and from the environment.
_tempfiles: "_TempFiles"
_discovery_lock: asyncio.Lock
_discovered_resources: Dict[str, Dict[resources.Resource, Dict[str, object]]]

@classmethod
def from_connection_info(
cls,
def __init__(
self,
info: credentials.ConnectionInfo,
) -> "APISession":
) -> None:
super().__init__()

# Some SSL data are not accepted directly, so we have to use temp files.
tempfiles = _TempFiles()
Expand Down Expand Up @@ -184,7 +191,7 @@ def from_connection_info(
headers['User-Agent'] = f'kopf/unknown' # TODO: add version someday

# Generic aiohttp session based on the constructed credentials.
session = cls(
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(
limit=0,
ssl=context,
Expand All @@ -194,27 +201,40 @@ def from_connection_info(
)

# Add the extra payload information. We avoid overriding the constructor.
session.server = info.server
session.default_namespace = info.default_namespace
session._tempfiles = tempfiles # for purging on garbage collection
session._discovery_lock = asyncio.Lock()
session._discovered_resources = {}
self.server = info.server
self.default_namespace = info.default_namespace

# For purging on garbage collection.
self._tempfiles = tempfiles
self._discovery_lock = asyncio.Lock()
self._discovered_resources = {}

return session
async def close(self) -> None:

# Closing is triggered by `Vault._flush_caches()` -- forward it to the actual session.
await self.session.close()

# Additionally, explicitly remove any temporary files we have created.
# They will be purged on garbage collection anyway, but it is better to make it sooner.
self._tempfiles.purge()


class _TempFiles(Mapping[bytes, str]):
"""
A container for the temporary files, which are purged on garbage collection.
The files are purged when the container is garbage-collected. The container
is garbage-collected when its parent `APISession` is garbage-collected.
is garbage-collected when its parent `APISession` is garbage-collected or
explicitly closed (by `Vault` on removal of corresponding credentials).
"""

def __init__(self) -> None:
super().__init__()
self._paths: Dict[bytes, str] = {}

def __del__(self) -> None:
self.purge()

def __len__(self) -> int:
return len(self._paths)

Expand All @@ -228,7 +248,7 @@ def __getitem__(self, item: bytes) -> str:
self._paths[item] = f.name
return self._paths[item]

def __del__(self) -> None:
def purge(self) -> None:
for _, path in self._paths.items():
try:
os.remove(path)
Expand Down
26 changes: 13 additions & 13 deletions kopf/clients/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,24 @@
async def discover(
*,
resource: resources.Resource,
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> Optional[Dict[str, object]]:
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

if resource.api_version not in session._discovered_resources:
async with session._discovery_lock:
if resource.api_version not in session._discovered_resources:
session._discovered_resources[resource.api_version] = {}
if resource.api_version not in context._discovered_resources:
async with context._discovery_lock:
if resource.api_version not in context._discovered_resources:
context._discovered_resources[resource.api_version] = {}

try:
response = await session.get(
url=resource.get_version_url(server=session.server),
response = await context.session.get(
url=resource.get_version_url(server=context.server),
)
response.raise_for_status()
respdata = await response.json()

session._discovered_resources[resource.api_version].update({
context._discovered_resources[resource.api_version].update({
resources.Resource(resource.group, resource.version, info['name']): info
for info in respdata['resources']
})
Expand All @@ -38,17 +38,17 @@ async def discover(
else:
raise

return session._discovered_resources[resource.api_version].get(resource, None)
return context._discovered_resources[resource.api_version].get(resource, None)


@auth.reauthenticated_request
async def is_namespaced(
*,
resource: resources.Resource,
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> bool:
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

info = await discover(resource=resource, session=session)
info = await discover(resource=resource, context=context)
return cast(bool, info['namespaced']) if info is not None else True # assume namespaced
10 changes: 5 additions & 5 deletions kopf/clients/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def post_event(
type: str,
reason: str,
message: str = '',
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> None:
"""
Issue an event for the object.
Expand All @@ -34,12 +34,12 @@ async def post_event(
and where the rate-limits should be maintained. It can (and should)
be done by the client library, as it is done in the Go client.
"""
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

# See #164. For cluster-scoped objects, use the current namespace from the current context.
# It could be "default", but in some systems, we are limited to one specific namespace only.
namespace: str = ref.get('namespace') or session.default_namespace or 'default'
namespace: str = ref.get('namespace') or context.default_namespace or 'default'
full_ref: bodies.ObjectReference = copy.copy(ref)
full_ref['namespace'] = namespace

Expand Down Expand Up @@ -74,8 +74,8 @@ async def post_event(
}

try:
response = await session.post(
url=EVENTS_CORE_V1_CRD.get_url(server=session.server, namespace=namespace),
response = await context.session.post(
url=EVENTS_CORE_V1_CRD.get_url(server=context.server, namespace=namespace),
headers={'Content-Type': 'application/json'},
json=body,
)
Expand Down
28 changes: 14 additions & 14 deletions kopf/clients/fetching.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ async def read_crd(
*,
resource: resources.Resource,
default: Union[_T, _UNSET] = _UNSET.token,
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> Union[bodies.Body, _T]:
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

try:
response = await session.get(
url=CRD_CRD.get_url(server=session.server, name=resource.name),
response = await context.session.get(
url=CRD_CRD.get_url(server=context.server, name=resource.name),
)
response.raise_for_status()
respdata = await response.json()
Expand All @@ -48,17 +48,17 @@ async def read_obj(
namespace: Optional[str] = None,
name: Optional[str] = None,
default: Union[_T, _UNSET] = _UNSET.token,
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> Union[bodies.Body, _T]:
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
is_namespaced = await discovery.is_namespaced(resource=resource, context=context)
namespace = namespace if is_namespaced else None

try:
response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace, name=name),
response = await context.session.get(
url=resource.get_url(server=context.server, namespace=namespace, name=name),
)
response.raise_for_status()
respdata = await response.json()
Expand All @@ -75,7 +75,7 @@ async def list_objs_rv(
*,
resource: resources.Resource,
namespace: Optional[str] = None,
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> Tuple[Collection[bodies.Body], str]:
"""
List the objects of specific resource type.
Expand All @@ -89,14 +89,14 @@ async def list_objs_rv(
* The resource is namespace-scoped AND operator is namespaced-restricted.
"""
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
is_namespaced = await discovery.is_namespaced(resource=resource, context=context)
namespace = namespace if is_namespaced else None

response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace),
response = await context.session.get(
url=resource.get_url(server=context.server, namespace=namespace),
)
response.raise_for_status()
rsp = await response.json()
Expand Down
10 changes: 5 additions & 5 deletions kopf/clients/patching.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async def patch_obj(
namespace: Optional[str] = None,
name: Optional[str] = None,
body: Optional[bodies.Body] = None,
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
) -> None:
"""
Patch a resource of specific kind.
Expand All @@ -29,7 +29,7 @@ async def patch_obj(
used for the namespaced resources, even if the operator serves
the whole cluster (i.e. is not namespace-restricted).
"""
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

if body is not None and (name is not None or namespace is not None):
Expand All @@ -38,7 +38,7 @@ async def patch_obj(
namespace = body.get('metadata', {}).get('namespace') if body is not None else namespace
name = body.get('metadata', {}).get('name') if body is not None else name

is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
is_namespaced = await discovery.is_namespaced(resource=resource, context=context)
namespace = namespace if is_namespaced else None

if body is None:
Expand All @@ -47,8 +47,8 @@ async def patch_obj(
body['metadata']['namespace'] = namespace

try:
await session.patch(
url=resource.get_url(server=session.server, namespace=namespace, name=name),
await context.session.patch(
url=resource.get_url(server=context.server, namespace=namespace, name=name),
headers={'Content-Type': 'application/merge-patch+json'},
json=patch,
raise_for_status=True,
Expand Down
10 changes: 5 additions & 5 deletions kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ async def watch_objs(
namespace: Optional[str] = None,
timeout: Optional[float] = None,
since: Optional[str] = None,
session: Optional[auth.APISession] = None, # injected by the decorator
context: Optional[auth.APIContext] = None, # injected by the decorator
freeze_waiter: asyncio_Future,
) -> AsyncIterator[bodies.RawEvent]:
"""
Expand All @@ -185,10 +185,10 @@ async def watch_objs(
* The resource is namespace-scoped AND operator is namespaced-restricted.
"""
if session is None:
if context is None:
raise RuntimeError("API instance is not injected by the decorator.")

is_namespaced = await discovery.is_namespaced(resource=resource, session=session)
is_namespaced = await discovery.is_namespaced(resource=resource, context=context)
namespace = namespace if is_namespaced else None

params: Dict[str, str] = {}
Expand All @@ -199,8 +199,8 @@ async def watch_objs(
params['timeoutSeconds'] = str(timeout)

# Talk to the API and initiate a streaming response.
response = await session.get(
url=resource.get_url(server=session.server, namespace=namespace, params=params),
response = await context.session.get(
url=resource.get_url(server=context.server, namespace=namespace, params=params),
timeout=aiohttp.ClientTimeout(total=None),
)
response.raise_for_status()
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
'typing_extensions',
'click',
'iso8601',
'aiohttp',
'aiohttp<4.0.0',
'aiojobs',
'pykube-ng>=0.27', # used only for config parsing
],
Expand Down
Loading

0 comments on commit 848e1da

Please sign in to comment.