Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(query): Use long running async view for queries with poll #27758

Merged
merged 20 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
e."$group_0" as aggregation_target,
if(notEmpty(pdi.distinct_id), pdi.person_id, e.person_id) as person_id,
person.person_props as person_props,
person.pmat_email as pmat_email,
if(event = 'step one', 1, 0) as step_0,
if(step_0 = 1, timestamp, null) as latest_0,
if(event = 'step two', 1, 0) as step_1,
Expand All @@ -79,6 +80,7 @@
HAVING argMax(is_deleted, version) = 0) AS pdi ON e.distinct_id = pdi.distinct_id
INNER JOIN
(SELECT id,
argMax(pmat_email, version) as pmat_email,
argMax(properties, version) as person_props
FROM person
WHERE team_id = 99999
Expand All @@ -95,7 +97,7 @@
AND event IN ['step one', 'step three', 'step two']
AND toTimeZone(timestamp, 'UTC') >= toDateTime('2021-05-01 00:00:00', 'UTC')
AND toTimeZone(timestamp, 'UTC') <= toDateTime('2021-05-10 23:59:59', 'UTC')
AND ((replaceRegexpAll(JSONExtractRaw(person_props, 'email'), '^"|"$', '') ILIKE '%g0%'
AND (("pmat_email" ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(person_props, 'name'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(e.properties, 'distinct_id'), '^"|"$', '') ILIKE '%g0%'
OR replaceRegexpAll(JSONExtractRaw(group_properties_0, 'name'), '^"|"$', '') ILIKE '%g0%'
Expand Down
30 changes: 30 additions & 0 deletions frontend/src/lib/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,10 @@ class ApiRequest {
return apiRequest
}

public queryAwaited(teamId?: TeamType['id']): ApiRequest {
return this.environmentsDetail(teamId).addPathComponent('query_awaited')
}

// Conversations
public conversations(teamId?: TeamType['id']): ApiRequest {
return this.environmentsDetail(teamId).addPathComponent('conversations')
Expand Down Expand Up @@ -2668,6 +2672,32 @@ const api = {
})
},

async queryAwaited<T extends Record<string, any> = QuerySchema>(
query: T,
options?: ApiMethodOptions,
queryId?: string,
refresh?: RefreshType,
filtersOverride?: DashboardFilter | null,
variablesOverride?: Record<string, HogQLVariable> | null
): Promise<
T extends { [response: string]: any }
? T['response'] extends infer P | undefined
? P
: T['response']
: Record<string, any>
> {
return await new ApiRequest().queryAwaited().create({
...options,
data: {
query,
client_query_id: queryId,
refresh,
filters_override: filtersOverride,
variables_override: variablesOverride,
},
})
},

conversations: {
async create(data: { content: string; conversation?: string | null; trace_id: string }): Promise<Response> {
return api.createResponse(new ApiRequest().conversations().assembleFullUrl(), data)
Expand Down
35 changes: 22 additions & 13 deletions frontend/src/queries/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,28 +101,37 @@ async function executeQuery<N extends DataNode>(
!SYNC_ONLY_QUERY_KINDS.includes(queryNode.kind) &&
!!featureFlagLogic.findMounted()?.values.featureFlags?.[FEATURE_FLAGS.QUERY_ASYNC]

const useOptimizedPolling = posthog.isFeatureEnabled('query-optimized-polling')

if (!pollOnly) {
const refreshParam: RefreshType | undefined =
refresh && isAsyncQuery ? 'force_async' : isAsyncQuery ? 'async' : refresh
const response = await api.query(
queryNode,
methodOptions,
queryId,
refreshParam,
filtersOverride,
variablesOverride
)
let response: NonNullable<N['response']>
if (useOptimizedPolling) {
response = await api.queryAwaited(
queryNode,
methodOptions,
queryId,
refreshParam,
filtersOverride,
variablesOverride
)
} else {
response = await api.query(
queryNode,
methodOptions,
queryId,
refreshParam,
filtersOverride,
variablesOverride
)
}

if (!isAsyncResponse(response)) {
// Executed query synchronously or from cache
return response
}

if (response.query_status.complete) {
// Async query returned immediately
return response.results
}

queryId = response.query_status.id
} else {
if (!isAsyncQuery) {
Expand Down
70 changes: 67 additions & 3 deletions posthog/api/query.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
import re
import uuid

from django.http import JsonResponse
import json
import time
import asyncio
from django.http import JsonResponse, HttpResponse
from drf_spectacular.utils import OpenApiResponse
from pydantic import BaseModel
from rest_framework import status, viewsets
from rest_framework.exceptions import NotAuthenticated, ValidationError
from rest_framework.request import Request
from rest_framework.response import Response
from sentry_sdk import capture_exception, set_tag

from asgiref.sync import sync_to_async
from posthog.api.documentation import extend_schema
from posthog.api.mixins import PydanticModelMixin
from posthog.api.monitoring import Feature, monitor
from posthog.api.routing import TeamAndOrgViewSetMixin
from posthog.api.services.query import process_query_model

from posthog.api.utils import action
from posthog.clickhouse.client.execute_async import (
cancel_query,
get_query_status,
QueryStatusManager,
)
from posthog.clickhouse.query_tagging import tag_queries
from posthog.errors import ExposedCHQueryError
Expand All @@ -42,6 +46,7 @@
QueryResponseAlternative,
QueryStatusResponse,
)
from typing import cast


class QueryViewSet(TeamAndOrgViewSetMixin, PydanticModelMixin, viewsets.ViewSet):
Expand Down Expand Up @@ -140,6 +145,10 @@ def retrieve(self, request: Request, pk=None, *args, **kwargs) -> JsonResponse:

return JsonResponse(query_status_response.model_dump(), safe=False, status=http_code)

@action(methods=["POST"], detail=False)
def check_auth_for_async(self, request: Request, *args, **kwargs):
return JsonResponse({"user": "ok"}, status=status.HTTP_200_OK)

@extend_schema(
description="(Experimental)",
responses={
Expand Down Expand Up @@ -183,3 +192,58 @@ def _tag_client_query_id(self, query_id: str | None):

tag_queries(client_query_id=query_id)
set_tag("client_query_id", query_id)


ASYNC_FALLBACK_TO_POLLING_TIMEOUT = 20


async def query_awaited(request: Request, *args, **kwargs) -> HttpResponse:
"""Async endpoint for handling event source queries."""

# Call the create method on QueryViewSet
view = await sync_to_async(QueryViewSet.as_view)({"post": "create"}, **kwargs)
response = await sync_to_async(view)(request)

if (
response.status_code != 202
): # 202 means accepted, which means we're calculating async. Anything else means we can just return immediately
return response

response.render()
data = json.loads(response.rendered_content)

# For async responses, poll until complete or timeout
async def check_query_status():
assert kwargs.get("team_id") is not None
manager = QueryStatusManager(data["query_status"]["id"], cast(int, kwargs["team_id"]))
start_time = time.time()
sleep_time = 0.1 # Start with 100ms
max_sleep_time = 1.0 # Don't wait more than 1 second between checks

while time.time() - start_time < ASYNC_FALLBACK_TO_POLLING_TIMEOUT:
try:
status = await sync_to_async(manager.get_query_status)(show_progress=True)
if status.complete:
return status
except Exception:
pass

await asyncio.sleep(sleep_time)
sleep_time = min(sleep_time * 1.5, max_sleep_time)

return None

status = await check_query_status()

if status is None:
# If we timeout on responding syncronously, return the original response so the client can continue to poll
return response
elif isinstance(status, BaseModel) and hasattr(status, "results"):
response_data = json.dumps(status.results)
else:
response_data = json.dumps(status)

return HttpResponse(
response_data,
Fixed Show fixed Hide fixed
content_type="application/json",
)
40 changes: 22 additions & 18 deletions posthog/api/routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,37 +252,41 @@ def team_id(self) -> int:
@cached_property
def team(self) -> Team:
if team_from_token := self._get_team_from_request():
return team_from_token

if self._is_project_view:
return Team.objects.get(
team = team_from_token
elif self._is_project_view:
team = Team.objects.get(
id=self.project_id # KLUDGE: This is just for the period of transition to project environments
)

if self.param_derived_from_user_current_team == "team_id":
elif self.param_derived_from_user_current_team == "team_id":
user = cast(User, self.request.user)
assert user.team is not None
team = user.team
assert team is not None
return team
try:
return Team.objects.get(id=self.team_id)
except Team.DoesNotExist:
raise NotFound(
detail="Project not found." # TODO: "Environment" instead of "Project" when project environments are rolled out
)
else:
try:
team = Team.objects.get(id=self.team_id)
except Team.DoesNotExist:
raise NotFound(
detail="Project not found." # TODO: "Environment" instead of "Project" when project environments are rolled out
)

tag_queries(team_id=team.pk)
return team

@cached_property
def project_id(self) -> int:
if team_from_token := self._get_team_from_request():
return team_from_token.project_id
project_id = team_from_token.project_id

if self.param_derived_from_user_current_team == "project_id":
elif self.param_derived_from_user_current_team == "project_id":
user = cast(User, self.request.user)
team = user.team
assert team is not None
return team.project_id
project_id = team.project_id
else:
project_id = self.parents_query_dict["project_id"]

return self.parents_query_dict["project_id"]
tag_queries(team_id=project_id)
return project_id

@cached_property
def project(self) -> Project:
Expand Down
1 change: 1 addition & 0 deletions posthog/api/test/__snapshots__/test_api_docs.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
'/home/runner/work/posthog/posthog/posthog/api/project.py: Warning [ProjectViewSet > ProjectBackwardCompatSerializer]: unable to resolve type hint for function "get_product_intents". Consider using a type hint or @extend_schema_field. Defaulting to string.',
'/home/runner/work/posthog/posthog/posthog/api/proxy_record.py: Warning [ProxyRecordViewset]: could not derive type of path parameter "id" because it is untyped and obtaining queryset from the viewset failed. Consider adding a type to the path (e.g. <int:id>) or annotating the parameter type with @extend_schema. Defaulting to "string".',
'/home/runner/work/posthog/posthog/posthog/api/proxy_record.py: Warning [ProxyRecordViewset]: could not derive type of path parameter "organization_id" because it is untyped and obtaining queryset from the viewset failed. Consider adding a type to the path (e.g. <int:organization_id>) or annotating the parameter type with @extend_schema. Defaulting to "string".',
'/home/runner/work/posthog/posthog/posthog/api/query.py: Error [QueryViewSet]: unable to guess serializer. This is graceful fallback handling for APIViews. Consider using GenericAPIView as view base class, if view is under your control. Either way you may want to add a serializer_class (or method). Ignoring view for now.',
'/home/runner/work/posthog/posthog/posthog/api/query.py: Warning [QueryViewSet]: could not derive type of path parameter "id" because it is untyped and obtaining queryset from the viewset failed. Consider adding a type to the path (e.g. <int:id>) or annotating the parameter type with @extend_schema. Defaulting to "string".',
'/home/runner/work/posthog/posthog/posthog/api/query.py: Warning [QueryViewSet]: could not derive type of path parameter "project_id" because it is untyped and obtaining queryset from the viewset failed. Consider adding a type to the path (e.g. <int:project_id>) or annotating the parameter type with @extend_schema. Defaulting to "string".',
'/home/runner/work/posthog/posthog/posthog/api/session.py: Warning [SessionViewSet]: could not derive type of path parameter "project_id" because it is untyped and obtaining queryset from the viewset failed. Consider adding a type to the path (e.g. <int:project_id>) or annotating the parameter type with @extend_schema. Defaulting to "string".',
Expand Down
52 changes: 52 additions & 0 deletions posthog/api/test/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from rest_framework import status

from posthog.api.services.query import process_query_dict
from posthog.clickhouse.client.execute_async import QueryStatusManager, QueryStatus
from posthog.hogql.query import LimitContext
from posthog.models.property_definition import PropertyDefinition, PropertyType
from posthog.models.utils import UUIDT
Expand Down Expand Up @@ -1009,6 +1010,57 @@ def test_dashboard_filters_applied_with_source(self):
self.assertEqual(response_with_dashboard_filters.results, [(1,)])


class TestQueryAwaited(ClickhouseTestMixin, APIBaseTest):
def test_async_query_returns_something(self):
query = HogQLQuery(query="select event, distinct_id, properties.key from events order by timestamp")

# Mock QueryStatusManager to return completed status immediately
with patch.object(QueryStatusManager, "get_query_status") as mock_get_query_status:
mock_get_query_status.return_value = QueryStatus(
id="123",
query_async=True,
team_id=self.team.id,
error=False,
complete=True,
error_message=None,
results={"results": [[1]]},
)

response = self.client.post(
f"/api/environments/{self.team.id}/query_awaited/", {"query": query.dict(), "refresh": "force_async"}
)
self.assertEqual(response.status_code, status.HTTP_200_OK, response.content)
data = response.json()
self.assertEqual(data["results"], [[1]])

def test_sync_query_returns_something(self):
query = HogQLQuery(query="select event, distinct_id, properties.key from events order by timestamp")
response = self.client.post(
f"/api/environments/{self.team.id}/query_awaited/",
{"query": query.dict(), "refresh": True, "async": False},
)
self.assertEqual(response.status_code, status.HTTP_200_OK, response.content)
data = response.json()
self.assertEqual(data["results"], [])

def test_async_query_invalid_json(self):
response = self.client.post(
f"/api/environments/{self.team.pk}/query_awaited/", "invalid json", content_type="application/json"
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json()["type"], "invalid_request")

def test_async_auth(self):
self.client.logout()
query = HogQLQuery(query="select event, distinct_id, properties.key from events order by timestamp")
response = self.client.post(f"/api/environments/{self.team.id}/query_awaited/", {"query": query.dict()})
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN, response.content)

def test_get_returns_405(self):
response = self.client.get(f"/api/environments/{self.team.id}/query_awaited/")
self.assertEqual(response.status_code, status.HTTP_405_METHOD_NOT_ALLOWED, response.content)


class TestQueryRetrieve(APIBaseTest):
def setUp(self):
super().setUp()
Expand Down
2 changes: 2 additions & 0 deletions posthog/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
stats,
)
from .year_in_posthog import year_in_posthog
from posthog.api.query import query_awaited

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -171,6 +172,7 @@ def opt_slash_path(route: str, view: Callable, name: Optional[str] = None) -> UR
# ee
*ee_urlpatterns,
# api
path("api/environments/<int:team_id>/query_awaited/", query_awaited),
path("api/unsubscribe", unsubscribe.unsubscribe),
path("api/", include(router.urls)),
path("", include(tf_urls)),
Expand Down
Loading