-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
Copy pathresource_testcase.py
128 lines (112 loc) · 5.18 KB
/
resource_testcase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# --------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
from collections import namedtuple
import functools
import os
import datetime
import logging
try:
from azure.mgmt.resource import ResourceManagementClient
except ImportError:
pass
from . import AzureMgmtPreparer
from .exceptions import AzureTestError, ReservedResourceNameError
from .sanitizers import add_general_string_sanitizer
logging.getLogger().setLevel(logging.INFO)
RESOURCE_GROUP_PARAM = "resource_group"
FakeResource = namedtuple("FakeResource", ["name", "id"])
class ResourceGroupPreparer(AzureMgmtPreparer):
def __init__(
self,
name_prefix="",
use_cache=False,
random_name_length=75,
parameter_name=RESOURCE_GROUP_PARAM,
parameter_name_for_location="location",
location="westus",
disable_recording=True,
playback_fake_resource=None,
client_kwargs=None,
random_name_enabled=False,
delete_after_tag_timedelta=datetime.timedelta(hours=8),
):
super(ResourceGroupPreparer, self).__init__(
name_prefix,
random_name_length,
disable_recording=disable_recording,
playback_fake_resource=playback_fake_resource,
client_kwargs=client_kwargs,
random_name_enabled=random_name_enabled,
)
self.location = location
self.parameter_name = parameter_name
self.parameter_name_for_location = parameter_name_for_location
env_value = os.environ.get("AZURE_RESOURCEGROUP_NAME", None)
self._need_creation = True
if env_value:
self.resource_random_name = env_value
self._need_creation = False
if self.random_name_enabled:
self.resource_moniker = self.name_prefix + "rgname"
self.set_cache(use_cache, parameter_name, name_prefix)
self.delete_after_tag_timedelta = delete_after_tag_timedelta
def _prefix_name(self, name):
name = "rgpy-" + name
if len(name) > 90:
name = name[:90]
return name
def create_resource(self, name, **kwargs):
if self.is_live and self._need_creation:
self.client = self.create_mgmt_client(ResourceManagementClient)
parameters = {"location": self.location}
expiry = datetime.datetime.utcnow() + self.delete_after_tag_timedelta
parameters["tags"] = {"DeleteAfter": expiry.replace(microsecond=0).isoformat()}
parameters["tags"]["BuildId"] = os.environ.get("BUILD_BUILDID", "local")
parameters["tags"]["BuildJob"] = os.environ.get("AGENT_JOBNAME", "local")
parameters["tags"]["BuildNumber"] = os.environ.get("BUILD_BUILDNUMBER", "local")
parameters["tags"]["BuildReason"] = os.environ.get("BUILD_REASON", "local")
try:
# Prefixing all RGs created here with 'rgpy-' for tracing purposes
name = self._prefix_name(name)
logging.info(
"Attempting to create a Resource Group with name {} and parameters {}".format(name, parameters)
)
self.resource = self.client.resource_groups.create_or_update(name, parameters)
except Exception as ex:
if "ReservedResourceName" in str(ex):
raise ReservedResourceNameError(name)
raise
else:
self.resource = self.resource or FakeResource(
name=name,
id="/subscriptions/00000000-0000-0000-0000-000000000000/resourceGroups/" + name,
)
if name != self.moniker:
add_general_string_sanitizer(target=name, value=self.moniker)
return {
self.parameter_name: self.resource,
self.parameter_name_for_location: self.location,
}
def remove_resource(self, name, **kwargs):
if self.is_live and self._need_creation:
name = self._prefix_name(name)
try:
if "wait_timeout" in kwargs:
azure_poller = self.client.resource_groups.begin_delete(name)
azure_poller.wait(kwargs.get("wait_timeout"))
if azure_poller.done():
return
raise AzureTestError("Timed out waiting for resource group to be deleted.")
else:
self.client.resource_groups.begin_delete(name, polling=False).result()
except (
Exception
) as err: # NOTE: some track 1 libraries do not have azure-core installed. Cannot use HttpResponseError here
logging.info("Failed to delete resource group with name {}".format(name))
logging.info("{}".format(err))
pass
RandomNameResourceGroupPreparer = functools.partial(ResourceGroupPreparer, random_name_enabled=True)
CachedResourceGroupPreparer = functools.partial(ResourceGroupPreparer, use_cache=True, random_name_enabled=True)