Skip to content

Commit

Permalink
D401 Support - Airflow/api thru Airflow/auth (#33333)
Browse files Browse the repository at this point in the history
  • Loading branch information
ferruzzi authored Aug 12, 2023
1 parent 740c263 commit b657ae9
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

def _check_action_and_resource(sm: AirflowSecurityManager, perms: list[tuple[str, str]]) -> None:
"""
Checks if the action or resource exists and otherwise raise 400.
Check if the action or resource exists and otherwise raise 400.
This function is intended for use in the REST API because it raise 400
"""
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_connexion/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@


def common_error_handler(exception: BaseException) -> flask.Response:
"""Used to capture connexion exceptions and add link to the type field."""
"""Use to capture connexion exceptions and add link to the type field."""
if isinstance(exception, ProblemException):

link = EXCEPTIONS_LINK_MAP.get(exception.status)
Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@


def validate_istimezone(value: datetime) -> None:
"""Validates that a datetime is not naive."""
"""Validate that a datetime is not naive."""
if not value.tzinfo:
raise BadRequest("Invalid datetime format", detail="Naive datetime is disallowed")

Expand Down Expand Up @@ -85,7 +85,7 @@ def check_limit(value: int) -> int:

def format_parameters(params_formatters: dict[str, Callable[[Any], Any]]) -> Callable[[T], T]:
"""
Decorator factory that create decorator that convert parameters using given formatters.
Create a decorator to convert parameters using given formatters.
Using it allows you to separate parameter formatting from endpoint logic.
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_connexion/schemas/dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def get_concurrency(obj: DAG):

@staticmethod
def get_tags(obj: DAG):
"""Dumps tags as objects."""
"""Dump tags as objects."""
tags = obj.tags
if tags:
return [DagTagSchema().dump(dict(name=tag)) for tag in tags]
Expand All @@ -132,12 +132,12 @@ def get_owners(obj: DAG):

@staticmethod
def get_is_paused(obj: DAG):
"""Checks entry in DAG table to see if this DAG is paused."""
"""Check entry in DAG table to see if this DAG is paused."""
return obj.get_is_paused()

@staticmethod
def get_is_active(obj: DAG):
"""Checks entry in DAG table to see if this DAG is active."""
"""Check entry in DAG table to see if this DAG is active."""
return obj.get_is_active()

@staticmethod
Expand Down
12 changes: 6 additions & 6 deletions airflow/api_connexion/schemas/pool_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,32 +46,32 @@ class Meta:

@staticmethod
def get_occupied_slots(obj: Pool) -> int:
"""Returns the occupied slots of the pool."""
"""Return the occupied slots of the pool."""
return obj.occupied_slots()

@staticmethod
def get_running_slots(obj: Pool) -> int:
"""Returns the running slots of the pool."""
"""Return the running slots of the pool."""
return obj.running_slots()

@staticmethod
def get_queued_slots(obj: Pool) -> int:
"""Returns the queued slots of the pool."""
"""Return the queued slots of the pool."""
return obj.queued_slots()

@staticmethod
def get_scheduled_slots(obj: Pool) -> int:
"""Returns the scheduled slots of the pool."""
"""Return the scheduled slots of the pool."""
return obj.scheduled_slots()

@staticmethod
def get_deferred_slots(obj: Pool) -> int:
"""Returns the deferred slots of the pool."""
"""Return the deferred slots of the pool."""
return obj.deferred_slots()

@staticmethod
def get_open_slots(obj: Pool) -> float:
"""Returns the open slots of the pool."""
"""Return the open slots of the pool."""
return obj.open_slots()


Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/schemas/task_instance_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ClearTaskInstanceFormSchema(Schema):

@validates_schema
def validate_form(self, data, **kwargs):
"""Validates clear task instance form."""
"""Validate clear task instance form."""
if data["only_failed"] and data["only_running"]:
raise ValidationError("only_failed and only_running both are set to True")
if data["start_date"] and data["end_date"]:
Expand Down Expand Up @@ -169,7 +169,7 @@ class SetTaskInstanceStateFormSchema(Schema):

@validates_schema
def validate_form(self, data, **kwargs):
"""Validates set task instance state form."""
"""Validate set task instance state form."""
if not exactly_one(data.get("execution_date"), data.get("dag_run_id")):
raise ValidationError("Exactly one of execution_date or dag_run_id must be provided")

Expand Down
4 changes: 2 additions & 2 deletions airflow/api_connexion/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@


def check_authentication() -> None:
"""Checks that the request has valid authorization information."""
"""Check that the request has valid authorization information."""
for auth in get_airflow_app().api_auth:
response = auth.requires_authentication(Response)()
if response.status_code == 200:
Expand All @@ -39,7 +39,7 @@ def check_authentication() -> None:


def requires_access(permissions: Sequence[tuple[str, str]] | None = None) -> Callable[[T], T]:
"""Factory for decorator that checks current user's permissions against required permissions."""
"""Check current user's permissions against required permissions."""
appbuilder = get_airflow_app().appbuilder
if appbuilder.update_perms:
appbuilder.sm.sync_resource_permissions(permissions)
Expand Down
2 changes: 1 addition & 1 deletion airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def _initialize_map() -> dict[str, Callable]:


def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
"""Handler for Internal API /internal_api/v1/rpcapi endpoint."""
"""Handle Internal API /internal_api/v1/rpcapi endpoint."""
log.debug("Got request")
json_rpc = body.get("jsonrpc")
if json_rpc != "2.0":
Expand Down
6 changes: 4 additions & 2 deletions airflow/api_internal/internal_api_call.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class InternalApiConfig:

@staticmethod
def force_database_direct_access():
"""Current component will not use Internal API.
"""
Block current component from using Internal API.
All methods decorated with internal_api_call will always be executed locally.
This mode is needed for "trusted" components like Scheduler, Webserver or Internal Api server.
Expand Down Expand Up @@ -80,7 +81,8 @@ def _init_values():


def internal_api_call(func: Callable[PS, RT]) -> Callable[PS, RT]:
"""Decorator for methods which may be executed in database isolation mode.
"""
Allow methods to be executed in database isolation mode.
If [core]database_access_isolation is true then such method are not executed locally,
but instead RPC call is made to Database API (aka Internal API). This makes some components
Expand Down

0 comments on commit b657ae9

Please sign in to comment.