Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Different async action mapping approach #3678

Merged
merged 18 commits into from
Mar 18, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ee/clickhouse/queries/test/test_trends.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ def test_breakdown_by_person_property(self):
if response["breakdown_value"] == "person3":
self.assertEqual(response["count"], 3)

self.assertTrue(self._compare_entity_response(event_response, action_response,))
self.assertEntityResponseEqual(
event_response, action_response,
)

def test_breakdown_filtering(self):
self._create_events()
Expand Down Expand Up @@ -213,7 +215,7 @@ def test_dau_with_breakdown_filtering(self):

self.assertEqual(sum(event_response[1]["data"]), 1)
self.assertEqual(event_response[1]["data"][5], 1)
self.assertTrue(self._compare_entity_response(action_response, event_response))
self.assertEntityResponseEqual(action_response, event_response)

def test_dau_with_breakdown_filtering_with_prop_filter(self):
sign_up_action, _ = self._create_events()
Expand Down Expand Up @@ -252,7 +254,7 @@ def test_dau_with_breakdown_filtering_with_prop_filter(self):
self.assertEqual(sum(event_response[1]["data"]), 1)
self.assertEqual(event_response[1]["data"][5], 1) # property not defined

self.assertTrue(self._compare_entity_response(action_response, event_response))
self.assertEntityResponseEqual(action_response, event_response)

# this ensures that the properties don't conflict when formatting params
def test_action_with_prop(self):
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ auth: 0011_update_proxy_permissions
axes: 0006_remove_accesslog_trusted
contenttypes: 0002_remove_content_type_name
ee: 0002_hook
posthog: 0132_team_test_account_filters
posthog: 0133_event_site_url
rest_hooks: 0002_swappable_hook_model
sessions: 0001_initial
social_django: 0008_partial_timestamp
42 changes: 15 additions & 27 deletions posthog/api/test/test_action_people.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from json import dumps as jdumps

from freezegun import freeze_time

from posthog.constants import ENTITY_ID, ENTITY_TYPE
from posthog.models import Action, ActionStep, Cohort, Event, Organization, Person
from posthog.queries.abstract_test.test_interval import AbstractIntervalTest
from posthog.tasks.calculate_action import calculate_actions_from_last_calculation

from .base import TransactionBaseTest

Expand Down Expand Up @@ -72,7 +71,7 @@ def _create_breakdown_events(self):
team=self.team, event="sign up", distinct_id="blabla", properties={"$some_property": i},
)

def _compare_entity_response(self, response1, response2, remove=("action", "label")):
def assertEntityResponseEqual(self, response1, response2, remove=("action", "label")):
if len(response1):
for attr in remove:
response1[0].pop(attr)
Expand All @@ -83,7 +82,7 @@ def _compare_entity_response(self, response1, response2, remove=("action", "labe
response2[0].pop(attr)
else:
return False
return str(response1[0]) == str(response2[0])
self.assertDictEqual(response1[0], response2[0])

def test_people_endpoint_paginated(self):

Expand Down Expand Up @@ -145,7 +144,7 @@ def _create_people_interval_events(self):
event_factory(
team=self.team, event="sign up", distinct_id="person1", timestamp="2019-11-27T16:50:00Z",
)

calculate_actions_from_last_calculation()
return person1, person2, person3, person4, person5, person6, person7

def test_minute_interval(self):
Expand Down Expand Up @@ -178,10 +177,8 @@ def test_minute_interval(self):
all_people_ids = [str(person["id"]) for person in min_grouped_action_response["results"][0]["people"]]
self.assertListEqual(sorted(all_people_ids), sorted([str(person4.pk), str(person5.pk)]))
self.assertEqual(len(all_people_ids), 2)
self.assertTrue(
self._compare_entity_response(
min_grouped_action_response["results"], min_grouped_grevent_response["results"], remove=[],
)
self.assertEntityResponseEqual(
min_grouped_action_response["results"], min_grouped_grevent_response["results"], remove=[],
)

def test_hour_interval(self):
Expand Down Expand Up @@ -212,9 +209,7 @@ def test_hour_interval(self):
).json()
self.assertEqual(str(action_response["results"][0]["people"][0]["id"]), str(person1.pk))
self.assertEqual(len(action_response["results"][0]["people"]), 1)
self.assertTrue(
self._compare_entity_response(action_response["results"], event_response["results"], remove=[])
)
self.assertEntityResponseEqual(action_response["results"], event_response["results"], remove=[])

# check grouped hour
hour_grouped_action_response = self.client.get(
Expand All @@ -240,10 +235,8 @@ def test_hour_interval(self):
all_people_ids = [str(person["id"]) for person in hour_grouped_action_response["results"][0]["people"]]
self.assertListEqual(sorted(all_people_ids), sorted([str(person2.pk), str(person3.pk)]))
self.assertEqual(len(all_people_ids), 2)
self.assertTrue(
self._compare_entity_response(
hour_grouped_action_response["results"], hour_grouped_grevent_response["results"], remove=[],
)
self.assertEntityResponseEqual(
hour_grouped_action_response["results"], hour_grouped_grevent_response["results"], remove=[],
)

def test_day_interval(self):
Expand All @@ -256,6 +249,7 @@ def test_day_interval(self):
event_factory(
team=self.team, event="sign up", distinct_id="person2", timestamp="2020-01-05T12:00:00Z",
)
calculate_actions_from_last_calculation()
# test people
action_response = self.client.get(
"/api/action/people/",
Expand All @@ -279,9 +273,7 @@ def test_day_interval(self):
).json()

self.assertEqual(str(action_response["results"][0]["people"][0]["id"]), str(person1.pk))
self.assertTrue(
self._compare_entity_response(action_response["results"], event_response["results"], remove=[])
)
self.assertEntityResponseEqual(action_response["results"], event_response["results"], remove=[])

def test_week_interval(self):
sign_up_action, person = self._create_events()
Expand Down Expand Up @@ -314,10 +306,8 @@ def test_week_interval(self):
self.assertListEqual(sorted(all_people_ids), sorted([str(person6.pk), str(person7.pk)]))
self.assertEqual(len(all_people_ids), 2)

self.assertTrue(
self._compare_entity_response(
week_grouped_action_response["results"], week_grouped_grevent_response["results"], remove=[],
)
self.assertEntityResponseEqual(
week_grouped_action_response["results"], week_grouped_grevent_response["results"], remove=[],
)

def test_month_interval(self):
Expand Down Expand Up @@ -351,10 +341,8 @@ def test_month_interval(self):
self.assertListEqual(sorted(all_people_ids), sorted([str(person6.pk), str(person7.pk), str(person1.pk)]))
self.assertEqual(len(all_people_ids), 3)

self.assertTrue(
self._compare_entity_response(
month_group_action_response["results"], month_group_grevent_response["results"], remove=[],
)
self.assertEntityResponseEqual(
month_group_action_response["results"], month_group_grevent_response["results"], remove=[],
)

def test_interval_rounding(self):
Expand Down
6 changes: 3 additions & 3 deletions posthog/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
app.conf.broker_pool_limit = 0

# How frequently do we want to calculate action -> event relationships if async is enabled
ACTION_EVENT_MAPPING_INTERVAL_MINUTES = 5
ACTION_EVENT_MAPPING_INTERVAL_SECONDS = 30

if settings.STATSD_HOST is not None:
statsd.Connection.set_defaults(host=settings.STATSD_HOST, port=settings.STATSD_PORT)
Expand Down Expand Up @@ -76,10 +76,10 @@ def setup_periodic_tasks(sender, **kwargs):

if settings.ASYNC_EVENT_ACTION_MAPPING:
sender.add_periodic_task(
(60 * ACTION_EVENT_MAPPING_INTERVAL_MINUTES),
ACTION_EVENT_MAPPING_INTERVAL_SECONDS,
calculate_event_action_mappings.s(),
name="calculate event action mappings",
expires=(60 * ACTION_EVENT_MAPPING_INTERVAL_MINUTES),
expires=ACTION_EVENT_MAPPING_INTERVAL_SECONDS,
)

if settings.ASYNC_EVENT_PROPERTY_USAGE:
Expand Down
16 changes: 16 additions & 0 deletions posthog/migrations/0133_event_site_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Generated by Django 3.0.11 on 2021-03-17 02:15

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("posthog", "0132_team_test_account_filters"),
]

operations = [
migrations.AddField(
model_name="event", name="site_url", field=models.CharField(blank=True, max_length=200, null=True),
),
]
78 changes: 36 additions & 42 deletions posthog/models/action.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime

from django.conf import settings
import celery
from django.core.exceptions import EmptyResultSet
from django.db import connection, models, transaction
from django.db.models import Q
Expand All @@ -24,7 +24,8 @@ def calculate_events(self, start=None, end=None):
if end is None:
end = timezone.now() + datetime.timedelta(days=1)

calculated_at = timezone.now()
last_calculated_at = self.last_calculated_at
now_calculated_at = timezone.now()
self.is_calculating = True
self.save()
from .event import Event
Expand All @@ -36,49 +37,42 @@ def calculate_events(self, start=None, end=None):
event_query, params = (
Event.objects.query_db_by_action(self, start=start, end=end).only("pk").query.sql_with_params()
)

except EmptyResultSet:
self.last_calculated_at = calculated_at
self.is_calculating = False
self.save()
self.events.all().delete()
return

query = """DELETE FROM "posthog_action_events" WHERE "action_id" = {}""".format(self.pk)

period_delete_query = """AND "event_id" in
(SELECT id
FROM posthog_event
WHERE "created_at" >= '{}'
AND "created_at" < '{}');
""".format(
start.isoformat(), end.isoformat()
)

insert_query = """INSERT INTO "posthog_action_events" ("action_id", "event_id")
{}
ON CONFLICT DO NOTHING
""".format(
event_query.replace("SELECT ", "SELECT {}, ".format(self.pk), 1)
)

if not recalculate_all:
query += period_delete_query
else:
query += ";"

query += insert_query

cursor = connection.cursor()
with transaction.atomic():
try:
cursor.execute(query, params)
except Exception as err:
capture_exception(err)

self.is_calculating = False
self.last_calculated_at = calculated_at
self.save()
delete_query = f"""DELETE FROM "posthog_action_events" WHERE "action_id" = {self.pk}"""

if not recalculate_all:
delete_query += f""" AND "event_id" IN (
macobo marked this conversation as resolved.
Show resolved Hide resolved
SELECT id FROM posthog_event
WHERE "created_at" >= '{start.isoformat()}' AND "created_at" < '{end.isoformat()}'
)"""

insert_query = """INSERT INTO "posthog_action_events" ("action_id", "event_id")
{}
ON CONFLICT DO NOTHING
""".format(
event_query.replace("SELECT ", f"SELECT {self.pk}, ", 1)
)

cursor = connection.cursor()
with transaction.atomic():
try:
cursor.execute(delete_query + ";" + insert_query, params)
except Exception as err:
capture_exception(err)

if self.post_to_slack:
macobo marked this conversation as resolved.
Show resolved Hide resolved
for event in self.events.filter(
macobo marked this conversation as resolved.
Show resolved Hide resolved
created_at__gt=last_calculated_at, team__slack_incoming_webhook__isnull=False
).only("pk"):
celery.current_app.send_task(
"posthog.tasks.webhooks.post_event_to_webhook", (event.pk, event.site_url)
)
finally:
self.is_calculating = False
self.last_calculated_at = now_calculated_at
self.save()

def on_perform(self, event):
from posthog.api.event import EventSerializer
Expand Down
9 changes: 5 additions & 4 deletions posthog/models/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ def filter_by_event_with_people(self, event, team_id: int, order_by: str = "-id"
events = events.order_by(order_by)
return events

def create(self, site_url: Optional[str] = None, *args: Any, **kwargs: Any):
def create(self, *args: Any, **kwargs: Any):
site_url = kwargs.get("site_url")

with transaction.atomic():
if kwargs.get("elements"):
if kwargs.get("team"):
Expand All @@ -246,9 +248,7 @@ def create(self, site_url: Optional[str] = None, *args: Any, **kwargs: Any):
).hash
event = super().create(*args, **kwargs)

# Matching actions to events can get very expensive to do as events are streaming in
# In a few cases we have had it OOM Postgres with the query it is running
# Short term solution is to have this be configurable to be run in batch
# DEPRECATED: ASYNC_EVENT_ACTION_MAPPING is the main approach now, as it works with the plugin server
if not settings.ASYNC_EVENT_ACTION_MAPPING:
should_post_webhook = False
relations = []
Expand Down Expand Up @@ -375,6 +375,7 @@ def actions(self) -> List:
properties: JSONField = JSONField(default=dict)
timestamp: models.DateTimeField = models.DateTimeField(default=timezone.now, blank=True)
elements_hash: models.CharField = models.CharField(max_length=200, null=True, blank=True)
site_url: models.CharField = models.CharField(max_length=200, null=True, blank=True)

# DEPRECATED: elements are stored against element groups now
elements: JSONField = JSONField(default=list, null=True, blank=True)
7 changes: 6 additions & 1 deletion posthog/queries/sessions/test/test_sessions_list.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import pytz
from dateutil.relativedelta import relativedelta
from django.utils.timezone import now
from freezegun import freeze_time

from posthog.models import Action, ActionStep, Cohort, Event, Organization, Person, SessionRecordingEvent
from posthog.models.filters.sessions_filter import SessionsFilter
from posthog.queries.sessions.sessions_list import SessionsList
from posthog.tasks.calculate_action import calculate_action, calculate_actions_from_last_calculation
from posthog.test.base import BaseTest


Expand Down Expand Up @@ -109,6 +109,10 @@ def test_filter_by_entity_action(self):

self.create_test_data()

calculate_action(action1.id)
calculate_action(action2.id)
calculate_action(action3.id)

self.assertLength(
self.run_query(
SessionsFilter(data={"filters": [{"type": "action_type", "key": "id", "value": action1.id}]})
Expand Down Expand Up @@ -243,6 +247,7 @@ def create_test_data(self):
Person.objects.create(team=self.team, distinct_ids=["1", "3", "4"], properties={"email": "bla"})
# Test team leakage
Person.objects.create(team=team_2, distinct_ids=["1", "3", "4"], properties={"email": "bla"})
calculate_actions_from_last_calculation()

def create_large_testset(self):
for i in range(100):
Expand Down
Loading