Skip to content

Commit

Permalink
#119: Implement client object pool using LRU cache to hold 'in use' …
Browse files Browse the repository at this point in the history
…clients

- LRU cache allows recently used client objects to be kept around. This means there's a 1 client object to 1 session ID ratio, more resource sensitive than a 1 client object for every request ratio
- Workflow: client pool is created at startup, incoming request fetches client from LRU cache, which pulls a client from the pool. Client is kept in the cache until it becomes least recently used, at which point it's then put back into the pool
- POST to /sessions uses the same workflow, passing a (pretend) session ID of None to get the same client from the cache each time
- The first time a new session ID is passed in the request headers, there is no slowdown in the request like before (where a client was being created for that ID). The client is fetched from the pool
- I've ran this commit against some e2e tests on the frontend and the performance from the API was good, similar to the branch which uses a single client object, passed around using kwargs. I have no concerns regarding a 'slow API' with the pool and cache
- This is just a rough proof of concept, there's lots of cleaning up to do, including making the resource stats on the pool accurate and not just passing the default ones each time. One potential solution is to make something similar to the `Executor` class in the pool library. I experiemented with this (think the class I mocked up is in this commit?) but I wanted to get a basic example working before worrying about the stats (which the API doesn't make use of, but it might be useful to keep accurate stats if they ever need to be logged out. All part of the cleanup process
- This doesn't use the context manager as this wouldn't allow me to implement the LRU cache in the way I have
  • Loading branch information
MRichards99 committed Mar 29, 2021
1 parent fb20095 commit 74f1edd
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 57 deletions.
13 changes: 10 additions & 3 deletions datagateway_api/common/icat/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ class PythonICATBackend(Backend):
def __init__(self):
pass

def login(self, credentials):
def login(self, credentials, **kwargs):
log.info("Logging in to get session ID")
client_pool = kwargs.get("client_pool")

# There is no session ID required for this endpoint, a client object will be
# fetched from cache with a blank `sessionId` attribute
client = get_cached_client(None)
client = get_cached_client(None, client_pool)

# Syntax for Python ICAT
login_details = {
Expand Down Expand Up @@ -82,7 +84,12 @@ def logout(self, session_id, **kwargs):
@requires_session_id
@queries_records
def get_with_filters(self, session_id, entity_type, filters, **kwargs):
client = kwargs["client"] if kwargs["client"] else create_client()
# TODO - Pool only needed for logging
client_pool = kwargs.get("client_pool")
log.debug(f"Pool Size: {client_pool.get_pool_size()}")

client = kwargs.get("client")
client.sessionId = session_id
return get_entity_with_filters(client, entity_type, filters)

@requires_session_id
Expand Down
16 changes: 11 additions & 5 deletions datagateway_api/common/icat/helpers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime, timedelta
from functools import lru_cache, wraps
from functools import wraps
import logging

from cachetools import cached
import icat.client
from icat.entities import getTypeMap
from icat.exception import (
Expand All @@ -26,6 +27,7 @@
PythonICATLimitFilter,
PythonICATWhereFilter,
)
from datagateway_api.common.icat.icat_client_pool import ExtendedLRUCache
from datagateway_api.common.icat.query import ICATQuery


Expand Down Expand Up @@ -54,8 +56,10 @@ def requires_session_id(method):
@wraps(method)
def wrapper_requires_session(*args, **kwargs):
try:
client = get_cached_client(args[1])
client_pool = kwargs.get("client_pool")

client = get_cached_client(args[1], client_pool)
client.sessionId = args[1]
# Client object put into kwargs so it can be accessed by backend functions
kwargs["client"] = client

Expand All @@ -72,12 +76,14 @@ def wrapper_requires_session(*args, **kwargs):
return wrapper_requires_session


@lru_cache(maxsize=config.get_client_cache_size())
def get_cached_client(session_id):
@cached(cache=ExtendedLRUCache())
def get_cached_client(session_id, client_pool):
"""
TODO - Add docstring
"""
client = create_client()

# Get a client from the pool
client, stats = client_pool._get_resource()

# `session_id` of None suggests this function is being called from an endpoint that
# doesn't use the `requires_session_id` decorator (e.g. POST /sessions)
Expand Down
69 changes: 69 additions & 0 deletions datagateway_api/common/icat/icat_client_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import logging

from cachetools.func import _cache
from cachetools.lru import LRUCache

from icat.client import Client
from object_pool import ObjectPool

from datagateway_api.common.config import config

log = logging.getLogger()


class ICATClient(Client):
"""Wrapper class to allow an object pool of client objects to be created"""

def __init__(self):
super().__init__(config.get_icat_url(), checkCert=config.get_icat_check_cert())
self.autoLogout = False

def clean_up(self):
"""
Allows object pool to cleanup the client's resources, using the existing Python
ICAT functionality
"""
super().cleanup()


def create_client_pool():
return ObjectPool(
ICATClient, min_init=5, max_capacity=20, max_reusable=0, expires=0,
)


class ClientPoolExecutor(ObjectPool.Executor):
"""TODO"""

def __init__(self, klass):
# klass is the instance of object pool
self.__pool = klass
self.client, self.resource_stats = None

def get_client(self):
self.client, self.resource_stats = self.__pool._get_resource()
return self.client

def release_client(self):
self.__pool._queue_resource(self.client, self.resource_stats)


def get_executor(client_pool):
return ClientPoolExecutor(client_pool)


class ExtendedLRUCache(LRUCache):
def __init__(self):
super().__init__(maxsize=8)

def popitem(self):
key, client = super().popitem()
session_id, client_pool = key
log.debug(f"Item popped from LRU cache: {key}, {client}")
# Put client back into pool
# Passes in default stats for now, though these aren't used in the API
client_pool._queue_resource(client, client_pool._get_default_stats())


def my_lru_cache():
return _cache(ExtendedLRUCache())
28 changes: 19 additions & 9 deletions datagateway_api/src/api_start_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from datagateway_api.common.backends import create_backend
from datagateway_api.common.config import config
from datagateway_api.common.icat.icat_client_pool import create_client_pool
from datagateway_api.src.resources.entities.entity_endpoint import (
get_count_endpoint,
get_endpoint,
Expand Down Expand Up @@ -72,63 +73,72 @@ def create_api_endpoints(flask_app, api, spec):

backend = create_backend(backend_type)

if backend_type == "python_icat":
# Create client pool
# TODO - Protect against other backends
icat_client_pool = create_client_pool()

for entity_name in endpoints:
get_endpoint_resource = get_endpoint(
entity_name, endpoints[entity_name], backend,
entity_name, endpoints[entity_name], backend, client_pool=icat_client_pool,
)
api.add_resource(get_endpoint_resource, f"/{entity_name.lower()}")
spec.path(resource=get_endpoint_resource, api=api)

get_id_endpoint_resource = get_id_endpoint(
entity_name, endpoints[entity_name], backend,
entity_name, endpoints[entity_name], backend, client_pool=icat_client_pool,
)
api.add_resource(get_id_endpoint_resource, f"/{entity_name.lower()}/<int:id_>")
spec.path(resource=get_id_endpoint_resource, api=api)

get_count_endpoint_resource = get_count_endpoint(
entity_name, endpoints[entity_name], backend,
entity_name, endpoints[entity_name], backend, client_pool=icat_client_pool,
)
api.add_resource(get_count_endpoint_resource, f"/{entity_name.lower()}/count")
spec.path(resource=get_count_endpoint_resource, api=api)

get_find_one_endpoint_resource = get_find_one_endpoint(
entity_name, endpoints[entity_name], backend,
entity_name, endpoints[entity_name], backend, client_pool=icat_client_pool,
)
api.add_resource(
get_find_one_endpoint_resource, f"/{entity_name.lower()}/findone",
)
spec.path(resource=get_find_one_endpoint_resource, api=api)

# Session endpoint
session_endpoint_resource = session_endpoints(backend)
session_endpoint_resource = session_endpoints(backend, client_pool=icat_client_pool)
api.add_resource(session_endpoint_resource, "/sessions")
spec.path(resource=session_endpoint_resource, api=api)

# Table specific endpoints
instrument_facility_cycle_resource = instrument_facility_cycles_endpoint(backend)
instrument_facility_cycle_resource = instrument_facility_cycles_endpoint(
backend, client_pool=icat_client_pool
)
api.add_resource(
instrument_facility_cycle_resource, "/instruments/<int:id_>/facilitycycles",
)
spec.path(resource=instrument_facility_cycle_resource, api=api)

count_instrument_facility_cycle_res = count_instrument_facility_cycles_endpoint(
backend,
backend, client_pool=icat_client_pool,
)
api.add_resource(
count_instrument_facility_cycle_res,
"/instruments/<int:id_>/facilitycycles/count",
)
spec.path(resource=count_instrument_facility_cycle_res, api=api)

instrument_investigation_resource = instrument_investigation_endpoint(backend)
instrument_investigation_resource = instrument_investigation_endpoint(
backend, client_pool=icat_client_pool,
)
api.add_resource(
instrument_investigation_resource,
"/instruments/<int:instrument_id>/facilitycycles/<int:cycle_id>/investigations",
)
spec.path(resource=instrument_investigation_resource, api=api)

count_instrument_investigation_resource = count_instrument_investigation_endpoint(
backend,
backend, client_pool=icat_client_pool,
)
api.add_resource(
count_instrument_investigation_resource,
Expand Down
35 changes: 23 additions & 12 deletions datagateway_api/src/resources/entities/entity_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
)


def get_endpoint(name, entity_type, backend):
def get_endpoint(name, entity_type, backend, **kwargs):
"""
Given an entity name generate a flask_restful Resource class.
In main.py these generated classes are registered with the api e.g
Expand All @@ -31,6 +31,7 @@ def get(self):
get_session_id_from_auth_header(),
entity_type,
get_filters_from_query_string(),
**kwargs,
),
200,
)
Expand Down Expand Up @@ -72,7 +73,10 @@ def get(self):
def post(self):
return (
backend.create(
get_session_id_from_auth_header(), entity_type, request.json,
get_session_id_from_auth_header(),
entity_type,
request.json,
**kwargs,
),
200,
)
Expand Down Expand Up @@ -115,7 +119,10 @@ def post(self):
def patch(self):
return (
backend.update(
get_session_id_from_auth_header(), entity_type, request.json,
get_session_id_from_auth_header(),
entity_type,
request.json,
**kwargs,
),
200,
)
Expand Down Expand Up @@ -159,7 +166,7 @@ def patch(self):
return Endpoint


def get_id_endpoint(name, entity_type, backend):
def get_id_endpoint(name, entity_type, backend, **kwargs):
"""
Given an entity name generate a flask_restful Resource class.
In main.py these generated classes are registered with the api e.g
Expand All @@ -180,7 +187,7 @@ class EndpointWithID(Resource):
def get(self, id_):
return (
backend.get_with_id(
get_session_id_from_auth_header(), entity_type, id_,
get_session_id_from_auth_header(), entity_type, id_, **kwargs,
),
200,
)
Expand Down Expand Up @@ -216,7 +223,9 @@ def get(self, id_):
"""

def delete(self, id_):
backend.delete_with_id(get_session_id_from_auth_header(), entity_type, id_)
backend.delete_with_id(
get_session_id_from_auth_header(), entity_type, id_, **kwargs,
)
return "", 204

delete.__doc__ = f"""
Expand Down Expand Up @@ -248,8 +257,10 @@ def delete(self, id_):

def patch(self, id_):
session_id = get_session_id_from_auth_header()
backend.update_with_id(session_id, entity_type, id_, request.json)
return backend.get_with_id(session_id, entity_type, id_), 200
backend.update_with_id(
session_id, entity_type, id_, request.json, **kwargs,
)
return backend.get_with_id(session_id, entity_type, id_, **kwargs,), 200

patch.__doc__ = f"""
---
Expand Down Expand Up @@ -293,7 +304,7 @@ def patch(self, id_):
return EndpointWithID


def get_count_endpoint(name, entity_type, backend):
def get_count_endpoint(name, entity_type, backend, **kwargs):
"""
Given an entity name generate a flask_restful Resource class.
In main.py these generated classes are registered with the api e.g
Expand All @@ -313,7 +324,7 @@ def get(self):
filters = get_filters_from_query_string()
return (
backend.count_with_filters(
get_session_id_from_auth_header(), entity_type, filters,
get_session_id_from_auth_header(), entity_type, filters, **kwargs,
),
200,
)
Expand Down Expand Up @@ -350,7 +361,7 @@ def get(self):
return CountEndpoint


def get_find_one_endpoint(name, entity_type, backend):
def get_find_one_endpoint(name, entity_type, backend, **kwargs):
"""
Given an entity name generate a flask_restful Resource class.
In main.py these generated classes are registered with the api e.g
Expand All @@ -372,7 +383,7 @@ def get(self):
filters = get_filters_from_query_string()
return (
backend.get_one_with_filters(
get_session_id_from_auth_header(), entity_type, filters,
get_session_id_from_auth_header(), entity_type, filters, **kwargs,
),
200,
)
Expand Down
Loading

0 comments on commit 74f1edd

Please sign in to comment.