Skip to content

Commit

Permalink
chore: split to modules
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmiran137 committed Aug 16, 2021
1 parent 45888ea commit 6eb5dbb
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 136 deletions.
3 changes: 2 additions & 1 deletion superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
SqlaTable,
)
from superset.models.core import Database # pylint: disable=unused-import
from tasks.caching.cache_strategy import Strategy

# Realtime stats logger, a StatsD implementation exists
STATS_LOGGER = DummyStatsLogger()
Expand Down Expand Up @@ -705,7 +706,7 @@ class CeleryConfig: # pylint: disable=too-few-public-methods
CELERY_CONFIG = CeleryConfig # pylint: disable=invalid-name

# Additional Caching strategies to be used in the CELERYBEAT_SCHEDULE config
EXTRA_CACHING_STRATEGIES = []
EXTRA_CACHING_STRATEGIES: List[Type[Strategy]] = []

# Set celery config to None to disable all the above configuration
# CELERY_CONFIG = None
Expand Down
Empty file.
86 changes: 86 additions & 0 deletions superset/tasks/caching/cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-few-public-methods

import logging
from typing import Any, Dict, List, Type, Union
from urllib import request
from urllib.error import URLError

from celery.utils.log import get_task_logger

from superset import app
from superset.extensions import celery_app
from superset.tasks.caching.cache_strategy import (
DashboardTagsStrategy,
DummyStrategy,
Strategy,
TopNDashboardsStrategy,
)

logger = get_task_logger(__name__)
logger.setLevel(logging.INFO)


strategies: List[Type[Strategy]] = [
DummyStrategy,
TopNDashboardsStrategy,
DashboardTagsStrategy,
]


@celery_app.task(name="cache-warmup")
def cache_warmup(
strategy_name: str, *args: Any, **kwargs: Any
) -> Union[Dict[str, List[str]], str]:
"""
Warm up cache.
This task periodically hits charts to warm up the cache.
"""
logger.info("Loading strategy")

extra_strategies: List[Type[Strategy]] = app.config["EXTRA_CACHING_STRATEGIES"]
for class_ in strategies + extra_strategies:
if class_.__name__ == strategy_name:
break
else:
message = f"No strategy {strategy_name} found!"
logger.error(message, exc_info=True)
return message

logger.info("Loading %s", class_.__name__)
try:
strategy = class_()
logger.info("Success!")
except TypeError:
message = "Error loading strategy!"
logger.exception(message)
return message

results: Dict[str, List[str]] = {"success": [], "errors": []}
for url in strategy.get_urls():
try:
logger.info("Fetching %s", url)
request.urlopen(url)
results["success"].append(url)
except URLError:
logger.exception("Error warming up cache!")
results["errors"].append(url)

return results
129 changes: 4 additions & 125 deletions superset/tasks/cache.py → superset/tasks/caching/cache_strategy.py
Original file line number Diff line number Diff line change
@@ -1,88 +1,14 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-few-public-methods

import json
import logging
from typing import Any, Dict, List, Optional, Union
from urllib import request
from urllib.error import URLError

from celery.utils.log import get_task_logger
from typing import List, Optional

from sqlalchemy import and_, func

from superset import app, db
from superset.extensions import celery_app
from superset import db
from superset.models.core import Log
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
from superset.models.tags import Tag, TaggedObject
from superset.tasks.caching.utils import get_form_data, get_url
from superset.utils.date_parser import parse_human_datetime
from superset.views.utils import build_extra_filters

logger = get_task_logger(__name__)
logger.setLevel(logging.INFO)


def get_form_data(
chart_id: int, dashboard: Optional[Dashboard] = None
) -> Dict[str, Any]:
"""
Build `form_data` for chart GET request from dashboard's `default_filters`.
When a dashboard has `default_filters` they need to be added as extra
filters in the GET request for charts.
"""
form_data: Dict[str, Any] = {"slice_id": chart_id}

if dashboard is None or not dashboard.json_metadata:
return form_data

json_metadata = json.loads(dashboard.json_metadata)
default_filters = json.loads(json_metadata.get("default_filters", "null"))
if not default_filters:
return form_data

filter_scopes = json_metadata.get("filter_scopes", {})
layout = json.loads(dashboard.position_json or "{}")
if (
isinstance(layout, dict)
and isinstance(filter_scopes, dict)
and isinstance(default_filters, dict)
):
extra_filters = build_extra_filters(
layout, filter_scopes, default_filters, chart_id
)
if extra_filters:
form_data["extra_filters"] = extra_filters

return form_data


def get_url(chart: Slice, extra_filters: Optional[Dict[str, Any]] = None) -> str:
"""Return external URL for warming up a given chart/table cache."""
with app.test_request_context():
baseurl = (
"{SUPERSET_WEBSERVER_PROTOCOL}://"
"{SUPERSET_WEBSERVER_ADDRESS}:"
"{SUPERSET_WEBSERVER_PORT}".format(**app.config)
)
return f"{baseurl}{chart.get_explore_url(overrides=extra_filters)}"


class Strategy:
Expand Down Expand Up @@ -250,50 +176,3 @@ def get_urls(self) -> List[str]:
urls.append(get_url(chart))

return urls


strategies = [DummyStrategy, TopNDashboardsStrategy, DashboardTagsStrategy]


@celery_app.task(name="cache-warmup")
def cache_warmup(
strategy_name: str, *args: Any, **kwargs: Any
) -> Union[Dict[str, List[str]], str]:
"""
Warm up cache.
This task periodically hits charts to warm up the cache.
"""
logger.info("Loading strategy")
class_ = None

extra_strategies = app.config["EXTRA_CACHING_STRATEGIES"]
for class_ in strategies + extra_strategies:
if class_.name == strategy_name: # type: ignore
break
else:
message = f"No strategy {strategy_name} found!"
logger.error(message, exc_info=True)
return message

logger.info("Loading %s", class_.__name__)
try:
strategy = class_(*args, **kwargs)
logger.info("Success!")
except TypeError:
message = "Error loading strategy!"
logger.exception(message)
return message

results: Dict[str, List[str]] = {"success": [], "errors": []}
for url in strategy.get_urls():
try:
logger.info("Fetching %s", url)
request.urlopen(url)
results["success"].append(url)
except URLError:
logger.exception("Error warming up cache!")
results["errors"].append(url)

return results
55 changes: 55 additions & 0 deletions superset/tasks/caching/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import json
from typing import Any, Dict, Optional

from maf.tests.models import Slice

from superset import app
from superset.models.dashboard import Dashboard
from superset.views.utils import build_extra_filters


def get_form_data(
chart_id: int, dashboard: Optional[Dashboard] = None
) -> Dict[str, Any]:
"""
Build `form_data` for chart GET request from dashboard's `default_filters`.
When a dashboard has `default_filters` they need to be added as extra
filters in the GET request for charts.
"""
form_data: Dict[str, Any] = {"slice_id": chart_id}

if dashboard is None or not dashboard.json_metadata:
return form_data

json_metadata = json.loads(dashboard.json_metadata)
default_filters = json.loads(json_metadata.get("default_filters", "null"))
if not default_filters:
return form_data

filter_scopes = json_metadata.get("filter_scopes", {})
layout = json.loads(dashboard.position_json or "{}")
if (
isinstance(layout, dict)
and isinstance(filter_scopes, dict)
and isinstance(default_filters, dict)
):
extra_filters = build_extra_filters(
layout, filter_scopes, default_filters, chart_id
)
if extra_filters:
form_data["extra_filters"] = extra_filters

return form_data


def get_url(chart: Slice, extra_filters: Optional[Dict[str, Any]] = None) -> str:
"""Return external URL for warming up a given chart/table cache."""
with app.test_request_context():
baseurl = (
"{SUPERSET_WEBSERVER_PROTOCOL}://"
"{SUPERSET_WEBSERVER_ADDRESS}:"
"{SUPERSET_WEBSERVER_PORT}".format(**app.config)
)
return f"{baseurl}{chart.get_explore_url(overrides=extra_filters)}"
3 changes: 2 additions & 1 deletion superset/tasks/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@

# Need to import late, as the celery_app will have been setup by "create_app()"
# pylint: disable=wrong-import-position, unused-import
from . import cache, schedules, scheduler # isort:skip
from . import schedules, scheduler # isort:skip
import caching.cache # isort:skip

# Export the celery app globally for Celery (as run on the cmd line) to find
app = celery_app
Expand Down
10 changes: 1 addition & 9 deletions tests/integration_tests/strategy_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,27 +23,19 @@
load_birth_names_dashboard_with_slices,
)

from sqlalchemy import String, Date, Float

import pytest
import pandas as pd

from superset.models.slice import Slice
from superset.utils.core import get_example_database

from superset import db

from superset.models.core import Log
from superset.models.tags import get_tag, ObjectTypes, TaggedObject, TagTypes
from superset.tasks.cache import (
from superset.tasks.caching.cache_strategy import (
DashboardTagsStrategy,
get_form_data,
TopNDashboardsStrategy,
)

from .base_tests import SupersetTestCase
from .dashboard_utils import create_dashboard, create_slice, create_table_for_dashboard
from .fixtures.unicode_dashboard import load_unicode_dashboard_with_slice

URL_PREFIX = "http://0.0.0.0:8081"

Expand Down

0 comments on commit 6eb5dbb

Please sign in to comment.