Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get rid of the activity stream middleware #6525

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 1 addition & 62 deletions awx/main/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,19 @@

from django.conf import settings
from django.contrib.auth.models import User
from django.db.models.signals import post_save
from django.db.migrations.executor import MigrationExecutor
from django.db import IntegrityError, connection
from django.utils.functional import curry
from django.db import connection
from django.shortcuts import get_object_or_404, redirect
from django.apps import apps
from django.utils.deprecation import MiddlewareMixin
from django.utils.translation import ugettext_lazy as _
from django.urls import reverse, resolve

from awx.main.models import ActivityStream
from awx.main.utils.named_url_graph import generate_graph, GraphNode
from awx.conf import fields, register


logger = logging.getLogger('awx.main.middleware')
analytics_logger = logging.getLogger('awx.analytics.activity_stream')
perf_logger = logging.getLogger('awx.analytics.performance')


Expand Down Expand Up @@ -76,63 +72,6 @@ def save_profile_file(self, request):
return filepath


class ActivityStreamMiddleware(threading.local, MiddlewareMixin):

def __init__(self, get_response=None):
self.disp_uid = None
self.instance_ids = []
super().__init__(get_response)

def process_request(self, request):
if hasattr(request, 'user') and request.user.is_authenticated:
user = request.user
else:
user = None

set_actor = curry(self.set_actor, user)
self.disp_uid = str(uuid.uuid1())
self.instance_ids = []
post_save.connect(set_actor, sender=ActivityStream, dispatch_uid=self.disp_uid, weak=False)

def process_response(self, request, response):
drf_request = getattr(request, 'drf_request', None)
drf_user = getattr(drf_request, 'user', None)
if self.disp_uid is not None:
post_save.disconnect(dispatch_uid=self.disp_uid)

for instance in ActivityStream.objects.filter(id__in=self.instance_ids):
if drf_user and drf_user.id:
from awx.api.serializers import ActivityStreamSerializer
summary_fields = ActivityStreamSerializer(instance).get_summary_fields(instance)
instance.actor = drf_user
try:
instance.save(update_fields=['actor'])
analytics_logger.info('Activity Stream update entry for %s' % str(instance.object1),
extra=dict(changes=instance.changes, relationship=instance.object_relationship_type,
actor=drf_user.username, operation=instance.operation,
object1=instance.object1, object2=instance.object2, summary_fields=summary_fields))
except IntegrityError:
logger.debug("Integrity Error saving Activity Stream instance for id : " + str(instance.id))
# else:
# obj1_type_actual = instance.object1_type.split(".")[-1]
# if obj1_type_actual in ("InventoryUpdate", "ProjectUpdate", "Job") and instance.id is not None:
# instance.delete()

self.instance_ids = []
return response

def set_actor(self, user, sender, instance, **kwargs):
if sender == ActivityStream:
if isinstance(user, User) and instance.actor is None:
user = User.objects.filter(id=user.id)
if user.exists():
user = user[0]
instance.actor = user
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else:
if instance.id not in self.instance_ids:
self.instance_ids.append(instance.id)


class SessionTimeoutMiddleware(MiddlewareMixin):
"""
Resets the session timeout for both the UI and the actual session for the API
Expand Down
29 changes: 29 additions & 0 deletions awx/main/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
__all__ = []

logger = logging.getLogger('awx.main.signals')
analytics_logger = logging.getLogger('awx.analytics.activity_stream')

# Update has_active_failures for inventory/groups when a Host/Group is deleted,
# when a Host-Group or Group-Group relationship is updated, or when a Job is deleted
Expand Down Expand Up @@ -363,6 +364,22 @@ def model_serializer_mapping():
}


def emit_activity_stream_change(instance):
Copy link
Contributor Author

@ryanpetrello ryanpetrello Apr 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any time we trigger a signal for Activity Stream create/update/delete/associate, just schedule a post-commit function that notifies the external logger.

Copy link
Contributor Author

@ryanpetrello ryanpetrello Apr 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the nice side effect of actually making external logging of the activity stream work outside of HTTP requests (like if you call an awx-manage command).

if 'migrate' in sys.argv:
# don't emit activity stream external logs during migrations, it
# could be really noisy
return
from awx.api.serializers import ActivityStreamSerializer
actor = None
if instance.actor:
actor = instance.actor.username
summary_fields = ActivityStreamSerializer(instance).get_summary_fields(instance)
analytics_logger.info('Activity Stream update entry for %s' % str(instance.object1),
extra=dict(changes=instance.changes, relationship=instance.object_relationship_type,
actor=actor, operation=instance.operation,
object1=instance.object1, object2=instance.object2, summary_fields=summary_fields))


def activity_stream_create(sender, instance, created, **kwargs):
if created and activity_stream_enabled:
# TODO: remove deprecated_group conditional in 3.3
Expand Down Expand Up @@ -399,6 +416,9 @@ def activity_stream_create(sender, instance, created, **kwargs):
else:
activity_entry.setting = conf_to_dict(instance)
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only thing that concerns me is that this will now be ran in migrations. If some migration created 1M activity stream entries, these methods would pile up, and that would cause problems. I don't feel like migrations should create any activity stream entries, and I disable them when the issue comes up.

)


def activity_stream_update(sender, instance, **kwargs):
Expand Down Expand Up @@ -430,6 +450,9 @@ def activity_stream_update(sender, instance, **kwargs):
else:
activity_entry.setting = conf_to_dict(instance)
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
)


def activity_stream_delete(sender, instance, **kwargs):
Expand Down Expand Up @@ -467,6 +490,9 @@ def activity_stream_delete(sender, instance, **kwargs):
object1=object1,
actor=get_current_user_or_none())
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
)


def activity_stream_associate(sender, instance, **kwargs):
Expand Down Expand Up @@ -540,6 +566,9 @@ def activity_stream_associate(sender, instance, **kwargs):
activity_entry.role.add(role)
activity_entry.object_relationship_type = obj_rel
activity_entry.save()
connection.on_commit(
lambda: emit_activity_stream_change(activity_entry)
)


@receiver(current_user_getter)
Expand Down
23 changes: 0 additions & 23 deletions awx/main/tests/functional/api/test_activity_streams.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pytest

from awx.api.versioning import reverse
from awx.main.middleware import ActivityStreamMiddleware
from awx.main.models.activity_stream import ActivityStream
from awx.main.access import ActivityStreamAccess
from awx.conf.models import Setting
Expand Down Expand Up @@ -61,28 +60,6 @@ def test_ctint_activity_stream(monkeypatch, get, user, settings):
assert response.data['summary_fields']['setting'][0]['name'] == 'FOO'


@pytest.mark.django_db
def test_middleware_actor_added(monkeypatch, post, get, user, settings):
settings.ACTIVITY_STREAM_ENABLED = True
u = user('admin-poster', True)

url = reverse('api:organization_list')
response = post(url,
dict(name='test-org', description='test-desc'),
u,
middleware=ActivityStreamMiddleware())
assert response.status_code == 201

org_id = response.data['id']
activity_stream = ActivityStream.objects.filter(organization__pk=org_id).first()

url = reverse('api:activity_stream_detail', kwargs={'pk': activity_stream.pk})
response = get(url, u)

assert response.status_code == 200
assert response.data['summary_fields']['actor']['username'] == 'admin-poster'


@pytest.mark.django_db
def test_rbac_stream_resource_roles(activity_stream_entry, organization, org_admin, settings):
settings.ACTIVITY_STREAM_ENABLED = True
Expand Down
1 change: 0 additions & 1 deletion awx/settings/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -1229,7 +1229,6 @@ def IS_TESTING(argv=None):
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'awx.main.middleware.ActivityStreamMiddleware',
'awx.sso.middleware.SocialAuthMiddleware',
'crum.CurrentRequestUserMiddleware',
'awx.main.middleware.URLModificationMiddleware',
Expand Down