Skip to content

Commit

Permalink
adds typing support in the core library
Browse files Browse the repository at this point in the history
  • Loading branch information
sufyankhanrao committed Feb 19, 2025
1 parent 4ca19de commit 417e3f2
Show file tree
Hide file tree
Showing 92 changed files with 4,316 additions and 4,856 deletions.
1 change: 0 additions & 1 deletion apimatic_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
'decorators',
'http',
'utilities',
'factories',
'types',
'logger',
'exceptions',
Expand Down
44 changes: 28 additions & 16 deletions apimatic_core/api_call.py
Original file line number Diff line number Diff line change
@@ -1,40 +1,52 @@
from apimatic_core.configurations.endpoint_configuration import EndpointConfiguration
from __future__ import annotations

from apimatic_core_interfaces.configuration.endpoint_configuration import EndpointConfiguration
from apimatic_core_interfaces.logger.api_logger import ApiLogger
from pydantic import validate_call
from typing import Any

from apimatic_core.configurations.global_configuration import GlobalConfiguration
from apimatic_core.logger.sdk_logger import LoggerFactory
from apimatic_core.request_builder import RequestBuilder
from apimatic_core.response_handler import ResponseHandler


class ApiCall:

@property
def new_builder(self):
def new_builder(self) -> 'ApiCall':
return ApiCall(self._global_configuration)

def __init__(
self,
global_configuration=GlobalConfiguration()
global_configuration: GlobalConfiguration=GlobalConfiguration()
):
self._global_configuration = global_configuration
self._request_builder = None
self._response_handler = ResponseHandler()
self._endpoint_configuration = EndpointConfiguration()
self._api_logger = LoggerFactory.get_api_logger(self._global_configuration.get_http_client_configuration()
.logging_configuration)

def request(self, request_builder):
self._global_configuration: GlobalConfiguration = global_configuration
self._request_builder: RequestBuilder = RequestBuilder()
self._response_handler: ResponseHandler = ResponseHandler()
self._endpoint_configuration: EndpointConfiguration = EndpointConfiguration()
self._api_logger: ApiLogger = LoggerFactory.get_api_logger(
self._global_configuration.http_client_configuration.logging_configuration
)

@validate_call(config=dict(arbitrary_types_allowed=True))
def request(self, request_builder: RequestBuilder) -> 'ApiCall':
self._request_builder = request_builder
return self

def response(self, response_handler):
@validate_call(config=dict(arbitrary_types_allowed=True))
def response(self, response_handler: ResponseHandler) -> 'ApiCall':
self._response_handler = response_handler
return self

def endpoint_configuration(self, endpoint_configuration):
@validate_call(config=dict(arbitrary_types_allowed=True))
def endpoint_configuration(self, endpoint_configuration: EndpointConfiguration) -> 'ApiCall':
self._endpoint_configuration = endpoint_configuration
return self

def execute(self):
_http_client_configuration = self._global_configuration.get_http_client_configuration()
@validate_call
def execute(self) -> Any:
_http_client_configuration = self._global_configuration.http_client_configuration

if _http_client_configuration.http_client is None:
raise ValueError("An HTTP client instance is required to execute an Api call.")
Expand All @@ -61,4 +73,4 @@ def execute(self):
if _http_callback is not None:
_http_callback.on_after_response(_http_response)

return self._response_handler.handle(_http_response, self._global_configuration.get_global_errors())
return self._response_handler.handle(_http_response, self._global_configuration.global_errors)
26 changes: 21 additions & 5 deletions apimatic_core/authentication/header_auth.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,33 @@
from apimatic_core_interfaces.types.authentication import Authentication
from abc import abstractmethod

from apimatic_core_interfaces.authentication.authentication import Authentication
from typing import Dict, Optional

from apimatic_core_interfaces.http.http_request import HttpRequest
from pydantic import validate_call

from apimatic_core.utilities.auth_helper import AuthHelper


class HeaderAuth(Authentication):

def __init__(self, auth_params):
@property
@abstractmethod
def error_message(self) -> str:
...

def with_auth_managers(self, auth_managers: Dict[str, 'Authentication']):
pass

@validate_call
def __init__(self, auth_params: Dict[str, str]):
self._auth_params = auth_params
self._error_message = None
self._error_message: Optional[str] = None

def is_valid(self):
def is_valid(self) -> bool:
return AuthHelper.is_valid_auth(self._auth_params)

def apply(self, http_request):
@validate_call
def apply(self, http_request: HttpRequest) -> None:
AuthHelper.apply(self._auth_params, http_request.add_header)

13 changes: 10 additions & 3 deletions apimatic_core/authentication/multiple/and_auth_group.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
from apimatic_core_interfaces.authentication.authentication import Authentication
from pydantic import validate_call
from typing import Union

from apimatic_core.authentication.multiple.auth_group import AuthGroup


class And(AuthGroup):

@property
def error_message(self):
@validate_call
def error_message(self) -> str:
return " and ".join(self._error_messages)

def __init__(self, *auth_group):
@validate_call(config=dict(arbitrary_types_allowed=True))
def __init__(self, *auth_group: Union[str, Authentication]):
super(And, self).__init__(auth_group)
self._is_valid_group = True

def is_valid(self):
@validate_call
def is_valid(self) -> bool:
if not self.mapped_group:
return False

Expand Down
42 changes: 29 additions & 13 deletions apimatic_core/authentication/multiple/auth_group.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,63 @@
from apimatic_core_interfaces.types.authentication import Authentication
from abc import abstractmethod

from apimatic_core_interfaces.authentication.authentication import Authentication
from apimatic_core_interfaces.http.http_request import HttpRequest
from pydantic import validate_call
from typing import Union, List, Dict, Tuple

from apimatic_core.authentication.multiple.single_auth import Single


class AuthGroup(Authentication):

@property
def auth_participants(self):
@abstractmethod
def error_message(self) -> str:
...

@property
def auth_participants(self) -> List[Authentication]:
return self._auth_participants

@property
def mapped_group(self):
def mapped_group(self) -> List[Authentication]:
return self._mapped_group

@property
def error_messages(self):
def error_messages(self) -> List[str]:
return self._error_messages

@property
def is_valid_group(self):
def is_valid_group(self) -> bool:
return self._is_valid_group

def __init__(self, auth_group):
self._auth_participants = []
@validate_call(config=dict(arbitrary_types_allowed=True))
def __init__(self, auth_group: Tuple[Union[str, Authentication], ...]):
self._auth_participants: List[Authentication] = []
for auth_participant in auth_group:
if auth_participant is not None and isinstance(auth_participant, str):
self._auth_participants.append(Single(auth_participant))
elif auth_participant is not None:
self._auth_participants.append(auth_participant)

self._mapped_group = []
self._error_messages = []
self._is_valid_group = None
self._mapped_group: List[Authentication] = []
self._error_messages: List[str] = []
self._is_valid_group: bool = False

def with_auth_managers(self, auth_managers):
@validate_call(config=dict(arbitrary_types_allowed=True))
def with_auth_managers(self, auth_managers: Dict[str, Authentication]) -> 'Authentication':
for participant in self.auth_participants:
self.mapped_group.append(participant.with_auth_managers(auth_managers))

return self

def is_valid(self): # pragma: no cover
@validate_call
@abstractmethod
def is_valid(self) -> bool: # pragma: no cover
...

def apply(self, http_request):
@validate_call
def apply(self, http_request: HttpRequest):
if not self.is_valid_group:
return

Expand Down
13 changes: 10 additions & 3 deletions apimatic_core/authentication/multiple/or_auth_group.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
from apimatic_core_interfaces.authentication.authentication import Authentication
from typing import Union

from pydantic import validate_call

from apimatic_core.authentication.multiple.auth_group import AuthGroup


class Or(AuthGroup):

@property
def error_message(self):
def error_message(self) -> str:
return " or ".join(self._error_messages)

def __init__(self, *auth_group):
@validate_call(config=dict(arbitrary_types_allowed=True))
def __init__(self, *auth_group: Union[str, Authentication]):
super(Or, self).__init__(auth_group)
self._is_valid_group = False

def is_valid(self):
@validate_call
def is_valid(self) -> bool:
if not self.mapped_group:
return False

Expand Down
31 changes: 19 additions & 12 deletions apimatic_core/authentication/multiple/single_auth.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,44 @@
from apimatic_core_interfaces.types.authentication import Authentication
from apimatic_core_interfaces.authentication.authentication import Authentication
from apimatic_core_interfaces.http.http_request import HttpRequest
from pydantic import validate_call
from typing import Dict, Optional


class Single(Authentication):

@property
def error_message(self):
def error_message(self) -> str:
return "[{}]".format(self._error_message)

def __init__(self, auth_participant):
@validate_call
def __init__(self, auth_participant: str):
super(Single, self).__init__()
self._auth_participant = auth_participant
self._mapped_auth = None
self._error_message = None
self._is_valid = False
self._mapped_auth: Optional[Authentication] = None
self._error_message: Optional[str] = None
self._is_valid: bool = False

def with_auth_managers(self, auth_managers):
@validate_call(config=dict(arbitrary_types_allowed=True))
def with_auth_managers(self, auth_managers: Dict[str, Authentication]) -> Authentication:
if not auth_managers.get(self._auth_participant):
raise ValueError("Auth key is invalid.")

self._mapped_auth = auth_managers[self._auth_participant]

return self

def is_valid(self):
self._is_valid = self._mapped_auth.is_valid()
@validate_call
def is_valid(self) -> bool:
self._is_valid = self._mapped_auth.is_valid() if self._mapped_auth else False
if not self._is_valid:
self._error_message = self._mapped_auth.error_message
self._error_message = self._mapped_auth.error_message if self._mapped_auth else "Invalid auth key."

return self._is_valid

def apply(self, http_request):
@validate_call
def apply(self, http_request: HttpRequest):

if not self._is_valid:
if self._mapped_auth is None or not self._is_valid:
return

self._mapped_auth.apply(http_request)
Expand Down
35 changes: 30 additions & 5 deletions apimatic_core/authentication/query_auth.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,40 @@
from apimatic_core_interfaces.types.authentication import Authentication
from abc import abstractmethod

from apimatic_core_interfaces.authentication.authentication import Authentication
from apimatic_core_interfaces.http.http_request import HttpRequest
from pydantic import validate_call
from typing import Dict

from apimatic_core.utilities.api_helper import ApiHelper
from apimatic_core.utilities.auth_helper import AuthHelper


class QueryAuth(Authentication):

def __init__(self, auth_params):
@property
@abstractmethod
def error_message(self) -> str:
...

def with_auth_managers(self, auth_managers: Dict[str, 'Authentication']):
pass

@validate_call
def __init__(self, auth_params: Dict[str, str]):
self._auth_params = auth_params

def is_valid(self):
def is_valid(self) -> bool:
return AuthHelper.is_valid_auth(self._auth_params)

def apply(self, http_request):
return AuthHelper.apply(self._auth_params, http_request.add_query_parameter)
@validate_call
def apply(self, http_request: HttpRequest):
for key, value in self._auth_params.items():
self._add_query_parameter(key, value, http_request)

@validate_call
def _add_query_parameter(self, key: str, value: str, http_request: HttpRequest):
query_url = ApiHelper.append_url_with_query_parameters(
http_request.query_url,
{key: value}
)
http_request.query_url = ApiHelper.clean_url(query_url)
Loading

0 comments on commit 417e3f2

Please sign in to comment.