diff --git a/netbox/core/models/data.py b/netbox/core/models/data.py index efda879afb7..6597a4b4d60 100644 --- a/netbox/core/models/data.py +++ b/netbox/core/models/data.py @@ -14,6 +14,7 @@ from django.utils.module_loading import import_string from django.utils.translation import gettext as _ +from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED from netbox.models import PrimaryModel from netbox.models.features import JobsMixin from netbox.registry import registry @@ -130,6 +131,28 @@ def clean(self): 'source_url': f"URLs for local sources must start with file:// (or specify no scheme)" }) + def to_objectchange(self, action): + objectchange = super().to_objectchange(action) + + # Censor any backend parameters marked as sensitive in the serialized data + pre_change_params = {} + post_change_params = {} + if objectchange.prechange_data: + pre_change_params = objectchange.prechange_data.get('parameters') or {} # parameters may be None + if objectchange.postchange_data: + post_change_params = objectchange.postchange_data.get('parameters') or {} + for param in self.backend_class.sensitive_parameters: + if post_change_params.get(param): + if post_change_params[param] != pre_change_params.get(param): + # Set the "changed" token if the parameter's value has been modified + post_change_params[param] = CENSOR_TOKEN_CHANGED + else: + post_change_params[param] = CENSOR_TOKEN + if pre_change_params.get(param): + pre_change_params[param] = CENSOR_TOKEN + + return objectchange + def enqueue_sync_job(self, request): """ Enqueue a background job to synchronize the DataSource by calling sync(). diff --git a/netbox/core/tests/test_models.py b/netbox/core/tests/test_models.py new file mode 100644 index 00000000000..0eeb66984d4 --- /dev/null +++ b/netbox/core/tests/test_models.py @@ -0,0 +1,122 @@ +from django.test import TestCase + +from core.models import DataSource +from extras.choices import ObjectChangeActionChoices +from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED + + +class DataSourceChangeLoggingTestCase(TestCase): + + def test_password_added_on_create(self): + datasource = DataSource.objects.create( + name='Data Source 1', + type='git', + source_url='http://localhost/', + parameters={ + 'username': 'jeff', + 'password': 'foobar123', + } + ) + + objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_CREATE) + self.assertIsNone(objectchange.prechange_data) + self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED) + + def test_password_added_on_update(self): + datasource = DataSource.objects.create( + name='Data Source 1', + type='git', + source_url='http://localhost/' + ) + datasource.snapshot() + + # Add a blank password + datasource.parameters = { + 'username': 'jeff', + 'password': '', + } + + objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE) + self.assertIsNone(objectchange.prechange_data['parameters']) + self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.postchange_data['parameters']['password'], '') + + # Add a password + datasource.parameters = { + 'username': 'jeff', + 'password': 'foobar123', + } + + objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE) + self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED) + + def test_password_changed(self): + datasource = DataSource.objects.create( + name='Data Source 1', + type='git', + source_url='http://localhost/', + parameters={ + 'username': 'jeff', + 'password': 'password1', + } + ) + datasource.snapshot() + + # Change the password + datasource.parameters['password'] = 'password2' + + objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE) + self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN) + self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED) + + def test_password_removed_on_update(self): + datasource = DataSource.objects.create( + name='Data Source 1', + type='git', + source_url='http://localhost/', + parameters={ + 'username': 'jeff', + 'password': 'foobar123', + } + ) + datasource.snapshot() + + objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE) + self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN) + self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN) + + # Remove the password + datasource.parameters['password'] = '' + + objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE) + self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN) + self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff') + self.assertEqual(objectchange.postchange_data['parameters']['password'], '') + + def test_password_not_modified(self): + datasource = DataSource.objects.create( + name='Data Source 1', + type='git', + source_url='http://localhost/', + parameters={ + 'username': 'username1', + 'password': 'foobar123', + } + ) + datasource.snapshot() + + # Remove the password + datasource.parameters['username'] = 'username2' + + objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE) + self.assertEqual(objectchange.prechange_data['parameters']['username'], 'username1') + self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN) + self.assertEqual(objectchange.postchange_data['parameters']['username'], 'username2') + self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN) diff --git a/netbox/netbox/constants.py b/netbox/netbox/constants.py index faddf8c219d..547e2079b95 100644 --- a/netbox/netbox/constants.py +++ b/netbox/netbox/constants.py @@ -36,3 +36,7 @@ 'bulk_edit': {'change'}, 'bulk_delete': {'delete'}, } + +# General-purpose tokens +CENSOR_TOKEN = '********' +CENSOR_TOKEN_CHANGED = '***CHANGED***'