Skip to content

Commit

Permalink
Closes #13729: Censor sensitive data source parameters in change log (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremystretch authored Feb 5, 2024
1 parent 1a9149d commit fde9c16
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 0 deletions.
23 changes: 23 additions & 0 deletions netbox/core/models/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _

from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
from netbox.models import PrimaryModel
from netbox.models.features import JobsMixin
from netbox.registry import registry
Expand Down Expand Up @@ -130,6 +131,28 @@ def clean(self):
'source_url': f"URLs for local sources must start with file:// (or specify no scheme)"
})

def to_objectchange(self, action):
objectchange = super().to_objectchange(action)

# Censor any backend parameters marked as sensitive in the serialized data
pre_change_params = {}
post_change_params = {}
if objectchange.prechange_data:
pre_change_params = objectchange.prechange_data.get('parameters') or {} # parameters may be None
if objectchange.postchange_data:
post_change_params = objectchange.postchange_data.get('parameters') or {}
for param in self.backend_class.sensitive_parameters:
if post_change_params.get(param):
if post_change_params[param] != pre_change_params.get(param):
# Set the "changed" token if the parameter's value has been modified
post_change_params[param] = CENSOR_TOKEN_CHANGED
else:
post_change_params[param] = CENSOR_TOKEN
if pre_change_params.get(param):
pre_change_params[param] = CENSOR_TOKEN

return objectchange

def enqueue_sync_job(self, request):
"""
Enqueue a background job to synchronize the DataSource by calling sync().
Expand Down
122 changes: 122 additions & 0 deletions netbox/core/tests/test_models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from django.test import TestCase

from core.models import DataSource
from extras.choices import ObjectChangeActionChoices
from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED


class DataSourceChangeLoggingTestCase(TestCase):

def test_password_added_on_create(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'foobar123',
}
)

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_CREATE)
self.assertIsNone(objectchange.prechange_data)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)

def test_password_added_on_update(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/'
)
datasource.snapshot()

# Add a blank password
datasource.parameters = {
'username': 'jeff',
'password': '',
}

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertIsNone(objectchange.prechange_data['parameters'])
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], '')

# Add a password
datasource.parameters = {
'username': 'jeff',
'password': 'foobar123',
}

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)

def test_password_changed(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'password1',
}
)
datasource.snapshot()

# Change the password
datasource.parameters['password'] = 'password2'

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN_CHANGED)

def test_password_removed_on_update(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'jeff',
'password': 'foobar123',
}
)
datasource.snapshot()

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN)

# Remove the password
datasource.parameters['password'] = ''

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'jeff')
self.assertEqual(objectchange.postchange_data['parameters']['password'], '')

def test_password_not_modified(self):
datasource = DataSource.objects.create(
name='Data Source 1',
type='git',
source_url='http://localhost/',
parameters={
'username': 'username1',
'password': 'foobar123',
}
)
datasource.snapshot()

# Remove the password
datasource.parameters['username'] = 'username2'

objectchange = datasource.to_objectchange(ObjectChangeActionChoices.ACTION_UPDATE)
self.assertEqual(objectchange.prechange_data['parameters']['username'], 'username1')
self.assertEqual(objectchange.prechange_data['parameters']['password'], CENSOR_TOKEN)
self.assertEqual(objectchange.postchange_data['parameters']['username'], 'username2')
self.assertEqual(objectchange.postchange_data['parameters']['password'], CENSOR_TOKEN)
4 changes: 4 additions & 0 deletions netbox/netbox/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,7 @@
'bulk_edit': {'change'},
'bulk_delete': {'delete'},
}

# General-purpose tokens
CENSOR_TOKEN = '********'
CENSOR_TOKEN_CHANGED = '***CHANGED***'

0 comments on commit fde9c16

Please sign in to comment.