Skip to content

Commit

Permalink
Code cleanup for test_vulnerabilities.py.
Browse files Browse the repository at this point in the history
  • Loading branch information
JCantu248 committed Feb 5, 2025
1 parent a083545 commit 3df9356
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 115 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ jobs:
test:
timeout-minutes: 60
runs-on: ubuntu-latest
container:
image: python:3.9

strategy:
matrix:
Expand Down
234 changes: 121 additions & 113 deletions integration/tests/test_vulnerabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@

def get_vulnerabilities():
"""Get a tuple of vulnerability IDs for testing."""
url = "{}/vulnerabilities/search".format(BASE_URL)
json = {
"page": 1,
"pageSize": 10,
}
response = requests.post(
url, json=json, headers={"X-API-KEY": X_API_KEY}, timeout=10
"{}/vulnerabilities/search".format(BASE_URL),
json={"page": 1, "pageSize": 10},
headers={"X-API-KEY": X_API_KEY},
timeout=10,
)
assert response.status_code == 200
data = response.json()
Expand All @@ -41,56 +39,57 @@ def test_get_vulnerability_by_id():
"""Test get vulnerability by ID."""
select_vulnerability = random.choice(vulnerabilities)
vulnerability_id = select_vulnerability["id"]
url = f"{BASE_URL}/vulnerabilities/{vulnerability_id}"
response = requests.get(url, headers={"X-API-KEY": X_API_KEY}, timeout=10)
response = requests.get(
"{}/vulnerabilities/{}".format(BASE_URL, vulnerability_id),
headers={"X-API-KEY": X_API_KEY},
timeout=10,
)

assert (
response.status_code == 200
), f"Expected status 200, got {response.status_code}"
assert response.status_code == 200, "Expected status 200, got {}".format(
response.status_code
)
data = response.json()
assert data is not None, "Response is empty"
assert (
data["id"] == vulnerability_id
), f"Expected ID {select_vulnerability}, got {data['id']}"
assert data["id"] == vulnerability_id, "Expected ID {}, got {}".format(
select_vulnerability, data["id"]
)


@pytest.mark.integration
def test_get_vulnerability_by_id_fails_404():
"""Test get vulnerability by ID fails with 404."""
url = f"{BASE_URL}/vulnerabilities/{BAD_ID}"
response = requests.get(url, headers={"X-API-KEY": X_API_KEY}, timeout=10)
response = requests.get(
"{}/vulnerabilities/{}".format(BASE_URL, BAD_ID),
headers={"X-API-KEY": X_API_KEY},
timeout=10,
)

assert (
response.status_code == 404
), f"Expected status 404, got {response.status_code}"
assert response.status_code == 404, "Expected status 404, got {}".format(
response.status_code
)
data = response.json()
assert data is not None, "Response is empty"

# Check for the error message in the "detail" key
assert "detail" in data, "Expected 'detail' in response"
assert (
data["detail"] == "Vulnerability not found."
), f"Unexpected error message: {data['detail']}"
), "Unexpected error message: {}".format(data["detail"])


@pytest.mark.integration
def test_search_vulnerabilities():
"""Test search vulnerabilities."""
url = f"{BASE_URL}/vulnerabilities/search"
json = {
"page": 1,
"filters": {
"severity": "high",
},
"pageSize": 10,
}
response = requests.post(
url, json=json, headers={"X-API-KEY": X_API_KEY}, timeout=10
"{}/vulnerabilities/search".format(BASE_URL),
json={"page": 1, "filters": {"severity": "high"}, "pageSize": 10},
headers={"X-API-KEY": X_API_KEY},
timeout=10,
)

assert (
response.status_code == 200
), f"Expected status 200, got {response.status_code}"
assert response.status_code == 200, "Expected status 200, got {}".format(
response.status_code
)
data = response.json()
assert data is not None, "Response is empty"
assert "result" in data, "Results not found in response"
Expand All @@ -100,7 +99,7 @@ def test_search_vulnerabilities():
for vulnerability in data["result"]:
assert (
vulnerability["severity"].lower() == "high"
), f"Expected severity 'high', got {vulnerability['severity']}"
), "Expected severity 'high', got {}".format(vulnerability["severity"])


@pytest.mark.integration
Expand All @@ -109,27 +108,34 @@ def test_get_update_and_revert_vulnerability_by_id():
# Step 1: Retrieve the original data using GET
select_vulnerability = random.choice(vulnerabilities)
vulnerability_id = select_vulnerability["id"]
url = f"{BASE_URL}/vulnerabilities/{vulnerability_id}"
url = "{}/vulnerabilities/{}".format(BASE_URL, vulnerability_id)
response = requests.get(url, headers={"X-API-KEY": X_API_KEY}, timeout=10)
assert (
response.status_code == 200
), f"Expected status 200, got {response.status_code}"
assert response.status_code == 200, "Expected status 200, got {}".format(
response.status_code
)
original_data = response.json()

assert original_data is not None, "Response is empty"
assert (
original_data["id"] == vulnerability_id
), f"Expected ID {vulnerability_id}, got {original_data['id']}"
assert original_data["id"] == vulnerability_id, "Expected ID {}, got {}".format(
vulnerability_id, original_data["id"]
)

# Extract domain_id and service_id from the nested objects
domain_id = original_data["domain"]["id"] if "domain" in original_data else None
service_id = original_data["service"]["id"] if "service" in original_data else None

# Remove nested fields that are not part of the PUT schema
original_data.pop("domain", None)
original_data.pop("service", None)
original_data["domain_id"] = domain_id
original_data["service_id"] = service_id
if original_data["domain"] is not None:
domain_id = original_data["domain"]["id"] if "domain" in original_data else None
original_data.pop("domain", None)
original_data["domain_id"] = domain_id
else:
original_data["domain_id"] = None
if original_data["service"] is not None:
service_id = (
original_data["service"]["id"] if "service" in original_data else None
)
original_data.pop("service", None)
original_data["service_id"] = service_id
else:
original_data["service_id"] = None

# Step 2: Update a few fields using PUT
updated_data = original_data.copy() # Start with the original data
Expand All @@ -142,9 +148,9 @@ def test_get_update_and_revert_vulnerability_by_id():
update_response = requests.put(
url, json=updated_data, headers={"X-API-KEY": X_API_KEY}, timeout=10
)
assert (
update_response.status_code == 200
), f"Expected status 200, got {update_response.status_code}"
assert update_response.status_code == 200, "Expected status 200, got {}".format(
update_response.status_code
)
updated_response_data = update_response.json()

# Validate the updated fields
Expand All @@ -159,9 +165,9 @@ def test_get_update_and_revert_vulnerability_by_id():
revert_response = requests.put(
url, json=original_data, headers={"X-API-KEY": X_API_KEY}, timeout=10
)
assert (
revert_response.status_code == 200
), f"Expected status 200, got {revert_response.status_code}"
assert revert_response.status_code == 200, "Expected status 200, got {}".format(
revert_response.status_code
)
reverted_data = revert_response.json()

# Validate that the original data was restored
Expand All @@ -176,38 +182,39 @@ def test_get_update_and_revert_vulnerability_by_id():
@pytest.mark.integration
def test_update_vulnerability_by_id_fails_404():
"""Test update vulnerability by ID fails with 404."""
url = f"{BASE_URL}/vulnerabilities/{BAD_ID}" # Use a non-existent ID
json = {
"id": BAD_ID,
"createdAt": "2024-12-03T16:56:58.684835",
"updatedAt": "2025-01-24T19:05:00.000Z",
"lastSeen": None,
"title": "Non-existent Vulnerability",
"cve": "CVE-404-0001",
"cwe": "CWE-404",
"cpe": "cpe:/a:nonexistent:software:1.0",
"description": "This vulnerability does not exist.",
"references": [],
"cvss": 5.0,
"severity": "Medium",
"needsPopulation": False,
"state": "closed",
"substate": "confirmed",
"source": "testSource",
"notes": "This is a test for a non-existent vulnerability.",
"actions": [],
"structuredData": {},
"isKev": False,
"domain_id": "0c4ee5b6-ff18-458c-adcc-dfe121fb54c5",
"service_id": "9ac326f0-29ad-4e2c-a6bf-e330c91aa872",
}
response = requests.put(
url, json=json, headers={"X-API-KEY": X_API_KEY}, timeout=10
"{}/vulnerabilities/{}".format(BASE_URL, BAD_ID), # Use a non-existent ID
json={
"id": BAD_ID,
"createdAt": "2024-12-03T16:56:58.684835",
"updatedAt": "2025-01-24T19:05:00.000Z",
"lastSeen": None,
"title": "Non-existent Vulnerability",
"cve": "CVE-404-0001",
"cwe": "CWE-404",
"cpe": "cpe:/a:nonexistent:software:1.0",
"description": "This vulnerability does not exist.",
"references": [],
"cvss": 5.0,
"severity": "Medium",
"needsPopulation": False,
"state": "closed",
"substate": "confirmed",
"source": "testSource",
"notes": "This is a test for a non-existent vulnerability.",
"actions": [],
"structuredData": {},
"isKev": False,
"domain_id": "0c4ee5b6-ff18-458c-adcc-dfe121fb54c5",
"service_id": "9ac326f0-29ad-4e2c-a6bf-e330c91aa872",
},
headers={"X-API-KEY": X_API_KEY},
timeout=10,
)

assert (
response.status_code == 404
), f"Expected status 404, got {response.status_code}"
assert response.status_code == 404, "Expected status 404, got {}".format(
response.status_code
)
data = response.json()
assert "detail" in data, "Error detail missing in response"
assert data["detail"] == "Vulnerability not found.", "Unexpected error message"
Expand All @@ -218,38 +225,39 @@ def test_update_vulnerability_by_id_fails_422():
"""Test update vulnerability by ID fails with 422 due to invalid payload."""
select_vulnerability = random.choice(vulnerabilities)
vulnerability_id = select_vulnerability["id"]
url = f"{BASE_URL}/vulnerabilities/{vulnerability_id}"
json = {
"id": vulnerability_id,
"createdAt": "invalid-date", # Invalid date format
"updatedAt": "2025-01-24T19:05:00.000Z",
"lastSeen": None,
"title": None, # Invalid: title is required
"cve": "CVE-INVALID-422", # Example invalid CVE
"cwe": "CWE-INVALID", # Example invalid CWE
"cpe": None, # Invalid: cpe cannot be None
"description": 12345, # Invalid: description should be a string
"references": "invalid-references", # Invalid: should be a list
"cvss": "invalid-cvss", # Invalid: cvss should be a number
"severity": "InvalidSeverity", # Invalid: severity is not a valid enum value
"needsPopulation": "not-a-boolean", # Invalid: should be a boolean
"state": 123, # Invalid: state should be a string
"substate": [], # Invalid: substate should be a string
"source": False, # Invalid: source should be a string
"notes": 999, # Invalid: notes should be a string
"actions": "invalid-actions", # Invalid: actions should be a list
"structuredData": "not-a-dict", # Invalid: structuredData should be a dictionary
"isKev": "not-a-boolean", # Invalid: should be a boolean
"domain_id": "invalid-domain-id", # Invalid domain_id
"service_id": "invalid-service-id", # Invalid service_id
}
response = requests.put(
url, json=json, headers={"X-API-KEY": X_API_KEY}, timeout=10
"{}/vulnerabilities/{}".format(BASE_URL, vulnerability_id),
json={
"id": vulnerability_id,
"createdAt": "invalid-date",
"updatedAt": "2025-01-24T19:05:00.000Z",
"lastSeen": None,
"title": None,
"cve": "CVE-INVALID-422",
"cwe": "CWE-INVALID",
"cpe": None,
"description": 12345,
"references": "invalid-references",
"cvss": "invalid-cvss",
"severity": "InvalidSeverity",
"needsPopulation": "not-a-boolean",
"state": 123,
"substate": [],
"source": False,
"notes": 999,
"actions": "invalid-actions",
"structuredData": "not-a-dict",
"isKev": "not-a-boolean",
"domain_id": "invalid-domain-id",
"service_id": "invalid-service-id",
},
headers={"X-API-KEY": X_API_KEY},
timeout=10,
)

assert (
response.status_code == 422
), f"Expected status 422, got {response.status_code}"
assert response.status_code == 422, "Expected status 422, got {}".format(
response.status_code
)
data = response.json()

# Validate that the response contains a list of validation errors
Expand All @@ -264,7 +272,7 @@ def test_update_vulnerability_by_id_fails_422():
assert created_at_error is not None, "'createdAt' validation error is missing"
assert (
"Input should be a valid datetime" in created_at_error["msg"]
), f"Unexpected error message: {created_at_error['msg']}"
), "Unexpected error message: {}".format(created_at_error["msg"])

# Check that 'title' error is included (optional warning if missing)
title_error = next(
Expand All @@ -277,4 +285,4 @@ def test_update_vulnerability_by_id_fails_422():
else:
assert (
title_error["msg"] == "Field required"
), f"Unexpected error message: {title_error['msg']}"
), "Unexpected error message: {}".format(title_error["msg"])

0 comments on commit 3df9356

Please sign in to comment.