-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Browse files
Browse the repository at this point in the history
- Loading branch information
Showing
2 changed files
with
147 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
""" | ||
A check to verify that the Secrets service is operational. | ||
Information about the lastest indexed task is returned. | ||
""" | ||
import logging | ||
import random | ||
from datetime import timedelta | ||
|
||
import taskcluster | ||
import taskcluster.aio | ||
import taskcluster.exceptions | ||
|
||
from poucave import utils | ||
from poucave.typings import CheckResult | ||
|
||
from . import utils as tc_utils | ||
|
||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
DEFAULT_NAME = "project/taskcluster/secrets-test-{rand:x}" | ||
DEFAULT_EXPIRES_SECONDS = 600 | ||
|
||
|
||
async def run( | ||
root_url: str, | ||
secret_name: str = DEFAULT_NAME, | ||
expires_seconds: int = DEFAULT_EXPIRES_SECONDS, | ||
client_id: str = "", | ||
access_token: str = "", | ||
certificate: str = "", | ||
) -> CheckResult: | ||
unique_secret = secret_name.format(rand=random.getrandbits(32)) | ||
|
||
# Build connection infos from parameters. | ||
options = tc_utils.options_from_params( | ||
root_url, client_id, access_token, certificate | ||
) | ||
|
||
secrets = taskcluster.aio.Secrets(options) | ||
|
||
# 1. Write and read. | ||
payload = { | ||
"expires": (utils.utcnow() + timedelta(seconds=expires_seconds)).isoformat(), | ||
"secret": {"hello": "beautiful world"}, | ||
} | ||
await secrets.set(unique_secret, payload) | ||
try: | ||
await secrets.get(unique_secret) | ||
except taskcluster.exceptions.TaskclusterRestFailure: | ||
return False, f"Secret {unique_secret!r} could not be retrieved" | ||
|
||
# 2. Remove and check. | ||
await secrets.remove(unique_secret) | ||
try: | ||
await secrets.get(unique_secret) | ||
return False, f"Secret {unique_secret!r} was not removed" | ||
except taskcluster.exceptions.TaskclusterRestFailure as e: | ||
if getattr(e, "status_code") != 404: | ||
raise | ||
|
||
return True, {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
from typing import Any, Dict | ||
from unittest import mock | ||
|
||
import pytest | ||
import taskcluster.exceptions | ||
|
||
from checks.taskcluster.write_secrets import run | ||
|
||
|
||
MODULE = "checks.taskcluster.write_secrets" | ||
|
||
PARAMS = { | ||
"root_url": "http://server", | ||
} | ||
|
||
|
||
class FakeSecrets: | ||
_content: Dict[str, Any] = {} | ||
|
||
async def set(self, name, payload): | ||
self._content[name] = payload | ||
|
||
async def get(self, name): | ||
try: | ||
return self._content[name] | ||
except KeyError: | ||
e = taskcluster.exceptions.TaskclusterRestFailure("", None) | ||
e.status_code = 404 | ||
raise e | ||
|
||
async def remove(self, name): | ||
del self._content[name] | ||
|
||
|
||
async def test_positive(): | ||
fake_secrets = FakeSecrets() | ||
with mock.patch(f"{MODULE}.taskcluster.aio.Secrets", return_value=fake_secrets): | ||
status, data = await run(**PARAMS) | ||
|
||
assert status is True | ||
assert data == {} | ||
|
||
|
||
async def test_negative_cannot_write(): | ||
class FailingSecrets(FakeSecrets): | ||
async def set(self, name, payload): | ||
pass # Do not store. | ||
|
||
fake_secrets = FailingSecrets() | ||
with mock.patch(f"{MODULE}.taskcluster.aio.Secrets", return_value=fake_secrets): | ||
status, data = await run(**PARAMS) | ||
|
||
assert status is False | ||
assert "could not be retrieved" in data | ||
|
||
|
||
async def test_negative_cannot_remove(): | ||
class FailingSecrets(FakeSecrets): | ||
async def remove(self, name): | ||
pass # Do not remove. | ||
|
||
fake_secrets = FailingSecrets() | ||
with mock.patch(f"{MODULE}.taskcluster.aio.Secrets", return_value=fake_secrets): | ||
status, data = await run(**PARAMS) | ||
|
||
assert status is False | ||
assert "was not removed" in data | ||
|
||
|
||
async def test_secrets_errors_are_raised(): | ||
class FailingSecrets(FakeSecrets): | ||
async def get(self, name): | ||
try: | ||
return self._content[name] | ||
except KeyError: | ||
e = taskcluster.exceptions.TaskclusterRestFailure("", None) | ||
e.status_code = 503 | ||
raise e | ||
|
||
fake_secrets = FailingSecrets() | ||
with mock.patch(f"{MODULE}.taskcluster.aio.Secrets", return_value=fake_secrets): | ||
with pytest.raises(taskcluster.exceptions.TaskclusterRestFailure): | ||
await run(**PARAMS) |