-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
Copy path_validators.py
438 lines (351 loc) · 19.5 KB
/
_validators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
from datetime import datetime
from json import loads
from re import match, search, findall
from knack.log import get_logger
from knack.util import CLIError
from azure.cli.core.azclierror import ValidationError, RequiredArgumentMissingError
from azure.cli.command_modules.vm.custom import get_vm, _is_linux_os
from azure.cli.command_modules.resource._client_factory import _resource_client_factory
from azure.core.exceptions import HttpResponseError
from azure.mgmt.core.tools import parse_resource_id, is_valid_resource_id
from .encryption_types import Encryption
from .exceptions import AzCommandError
from .repair_utils import (
_call_az_command,
_get_repair_resource_tag,
_fetch_encryption_settings,
_resolve_api_version,
check_extension_version,
_check_existing_rg
)
# pylint: disable=line-too-long, broad-except
logger = get_logger(__name__)
EXTENSION_NAME = 'vm-repair'
def validate_create(cmd, namespace):
check_extension_version(EXTENSION_NAME)
# Check if VM exists and is not classic VM
source_vm = _validate_and_get_vm(cmd, namespace.resource_group_name, namespace.vm_name)
is_linux = _is_linux_os(source_vm)
# Check repair vm name
if namespace.repair_vm_name:
_validate_vm_name(namespace.repair_vm_name, is_linux)
else:
namespace.repair_vm_name = ('repair-' + namespace.vm_name)[:14] + '_'
# Check copy disk name
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S')
if namespace.copy_disk_name:
_validate_disk_name(namespace.copy_disk_name)
else:
namespace.copy_disk_name = namespace.vm_name + '-DiskCopy-' + timestamp
# Check copy resouce group name
if namespace.repair_group_name:
if namespace.repair_group_name == namespace.resource_group_name:
raise CLIError('The repair resource group name cannot be the same as the source VM resource group.')
_validate_resource_group_name(namespace.repair_group_name)
else:
namespace.repair_group_name = 'repair-' + namespace.vm_name + '-' + timestamp
# Check encrypted disk
encryption_type, _, _, _ = _fetch_encryption_settings(source_vm)
# Currently only supporting single pass
if encryption_type in (Encryption.SINGLE_WITH_KEK, Encryption.SINGLE_WITHOUT_KEK):
if not namespace.unlock_encrypted_vm:
_prompt_encrypted_vm(namespace)
elif encryption_type is Encryption.DUAL:
logger.warning('The source VM\'s OS disk is encrypted using dual pass method.')
raise CLIError('The current command does not support VMs which were encrypted using dual pass.')
else:
logger.debug('The source VM\'s OS disk is not encrypted')
if namespace.encrypt_recovery_key:
if not namespace.unlock_encrypted_vm:
raise RequiredArgumentMissingError('Recovery password is provided in the argument, but --unlock-encrypted-vm is not passed. Rerun command adding --unlock-encrypted-vm.')
if namespace.enable_nested:
if is_linux:
raise CLIError('Nested VM is not supported for Linux VM')
# Validate Auth Params
# Prompt vm username
if not namespace.repair_username:
_prompt_repair_username(namespace)
# Validate vm username
validate_vm_username(namespace.repair_username, is_linux)
# Prompt vm password
if not namespace.repair_password:
_prompt_repair_password(namespace)
# Validate vm password
validate_vm_password(namespace.repair_password, is_linux)
# Prompt input for public ip usage
if (not namespace.associate_public_ip) and (not namespace.yes):
_prompt_public_ip(namespace)
def validate_restore(cmd, namespace):
check_extension_version(EXTENSION_NAME)
# Check if VM exists and is not classic VM
_validate_and_get_vm(cmd, namespace.resource_group_name, namespace.vm_name)
# No repair param given, find repair vm using tags
if not namespace.repair_vm_id:
fetch_repair_vm(namespace)
if not is_valid_resource_id(namespace.repair_vm_id):
raise CLIError('Repair resource id is not valid.')
repair_vm_id = parse_resource_id(namespace.repair_vm_id)
# Check if data disk exists on repair VM
repair_vm = get_vm(cmd, repair_vm_id['resource_group'], repair_vm_id['name'])
data_disks = repair_vm.storage_profile.data_disks
if not data_disks:
raise CLIError('No data disks found on repair VM: {}'.format(repair_vm_id['name']))
# Populate disk name
if not namespace.disk_name:
namespace.disk_name = data_disks[0].name
logger.info('Disk-name not given. Defaulting to the first data disk attached to the repair VM: %s', data_disks[0].name)
else: # check disk name
if not [disk for disk in data_disks if disk.name == namespace.disk_name]:
raise CLIError('No data disks found on the repair VM: \'{vm}\' with the disk name: \'{disk}\''.format(vm=repair_vm_id['name'], disk=namespace.disk_name))
def validate_run(cmd, namespace):
check_extension_version(EXTENSION_NAME)
# Set run_on_repair to True if repair_vm_id given
if namespace.repair_vm_id:
namespace.run_on_repair = True
# Check run-id and custom run file parameters
if not namespace.run_id and not namespace.custom_script_file:
raise CLIError('Please specify the run id with --run-id.')
if namespace.run_id and namespace.custom_script_file:
raise CLIError('Cannot continue with both the run-id and the custom-run-file. Please specify just one.')
# Check if VM exists and is not classic VM
source_vm = _validate_and_get_vm(cmd, namespace.resource_group_name, namespace.vm_name)
is_linux = _is_linux_os(source_vm)
if namespace.custom_script_file:
# Check if file extension is correct
if is_linux and not (namespace.custom_script_file.endswith('.sh') or namespace.custom_script_file.endswith('.bash')):
raise CLIError('Only .sh or .bash scripts are supported for repair run on a Linux VM.')
if not is_linux and not (namespace.custom_script_file.endswith('.ps1') or namespace.custom_script_file.endswith('.ps2')):
raise CLIError('Only PowerShell scripts are supported for repair run on a Windows VM.')
# Check if file exists
import os.path
if not os.path.isfile(namespace.custom_script_file):
raise CLIError('Custom script file cannot be found. Please check if the file exists.')
# Check for current custom-run-file parameter limitations
if namespace.parameters:
raise CLIError('Parameter passing does not work for custom run files yet. Please remove --parameters arguments.')
with open(namespace.custom_script_file, 'r') as f:
first_line = f.readline()
if first_line.lower().startswith('param('):
raise CLIError('Powershell param() statement does not work for custom script files yet. Please remove the param() line in the file.')
namespace.run_id = 'no-op'
# Check if the script type matches the OS
if not is_linux and namespace.run_id.startswith('linux'):
raise CLIError('Script IDs that start with \'linux\' are Linux Shell scripts. You cannot run Linux Shell scripts on a Windows VM.')
if is_linux and namespace.run_id.startswith('win'):
raise CLIError('Script IDs that start with \'win\' are Windows PowerShell scripts. You cannot run Windows PowerShell scripts on a Linux VM.')
# Fetch repair vm
if namespace.run_on_repair and not namespace.repair_vm_id:
fetch_repair_vm(namespace)
# If not run_on_repair, repair_vm = source_vm. Scripts directly run on source VM.
if not namespace.run_on_repair:
namespace.repair_vm_id = source_vm.id
if not is_valid_resource_id(namespace.repair_vm_id):
raise CLIError('Repair resource id is not valid.')
def validate_reset_nic(cmd, namespace):
check_extension_version(EXTENSION_NAME)
if namespace._subscription:
# setting subscription Id
try:
set_sub_command = 'az account set --subscription {sid}'.format(sid=namespace._subscription)
logger.info('Setting the subscription...\n')
_call_az_command(set_sub_command)
except AzCommandError as azCommandError:
logger.error(azCommandError)
raise CLIError('Unexpected error occured while setting the subscription..')
_validate_and_get_vm(cmd, namespace.resource_group_name, namespace.vm_name)
def _prompt_encrypted_vm(namespace):
from knack.prompting import prompt_y_n, NoTTYException
try:
message = 'The source VM\'s OS disk is encrypted. The current command will unlock the copied OS disk within the repair VM.'
logger.warning(message)
if prompt_y_n('Continue?'):
namespace.unlock_encrypted_vm = True
else:
raise CLIError('Stopping execution upon user input.')
except NoTTYException:
raise CLIError('Please specify the unlock_encrypted_vm parameter in non-interactive mode.')
def _prompt_repair_username(namespace):
from knack.prompting import prompt, NoTTYException
try:
namespace.repair_username = prompt('Repair VM admin username: ')
except NoTTYException:
raise CLIError('Please specify the username parameter in non-interactive mode.')
def _prompt_repair_password(namespace):
from knack.prompting import prompt_pass, NoTTYException
try:
namespace.repair_password = prompt_pass('Repair VM admin password: ', confirm=True)
except NoTTYException:
raise CLIError('Please specify the password parameter in non-interactive mode.')
def _prompt_public_ip(namespace):
from knack.prompting import prompt_y_n, NoTTYException
try:
if prompt_y_n('Does repair vm requires public ip?'):
namespace.associate_public_ip = _return_public_ip_name(namespace)
else:
namespace.associate_public_ip = '""'
except NoTTYException:
raise ValidationError('Please specify the associate-public-ip parameter in non-interactive mode.')
def _return_public_ip_name(namespace):
return namespace.repair_vm_name + "PublicIP"
def _classic_vm_exists(cmd, resource_group_name, vm_name):
classic_vm_provider = 'Microsoft.ClassicCompute'
vm_resource_type = 'virtualMachines'
try:
rcf = _resource_client_factory(cmd.cli_ctx)
api_version = _resolve_api_version(rcf, classic_vm_provider, None, vm_resource_type)
resource_client = rcf.resources
resource_client.get(resource_group_name, classic_vm_provider, '', vm_resource_type, vm_name, api_version)
except HttpResponseError as httpError:
# Resource does not exist or the API failed
logger.debug(httpError)
return False
except Exception as exception:
# Unknown error, so return false for default resource not found error message
logger.debug(exception)
return False
return True
def _validate_and_get_vm(cmd, resource_group_name, vm_name):
# Check if target VM exists
resource_not_found_error = 'ResourceNotFound'
source_vm = None
try:
source_vm = get_vm(cmd, resource_group_name, vm_name)
except HttpResponseError as httpError:
logger.debug(httpError)
if httpError.error.error == resource_not_found_error and _classic_vm_exists(cmd, resource_group_name, vm_name):
# Given VM is classic VM (RDFE)
raise CLIError('The given VM \'{}\' is a classic VM. VM repair commands do not support classic VMs.'.format(vm_name))
# Unknown Error
raise CLIError(httpError.message)
return source_vm
def _validate_vm_name(vm_name, is_linux):
if not is_linux:
win_pattern = r'[\'~!@#$%^&*()=+_[\]{}\\|;:.",<>?]'
num_pattern = r'[0-9]+$'
if len(vm_name) > 15 or search(win_pattern, vm_name) or match(num_pattern, vm_name):
raise CLIError('Windows computer name cannot be more than 15 characters long, be entirely numeric, or contain the following characters: '
r'`~!@#$%^&*()=+_[]{}\|; :.\'",<>/?')
def _validate_disk_name(disk_name):
disk_pattern = r'([a-zA-Z0-9][a-zA-Z0-9_.\-]+[a-zA-Z0-9_])$'
if not match(disk_pattern, disk_name):
raise CLIError('Disk name must begin with a letter or number, end with a letter, number or underscore, and may contain only letters, numbers, underscores, periods, or hyphens.')
if len(disk_name) > 80:
raise CLIError('Disk name only allow up to 80 characters.')
def _validate_resource_group_name(rg_name):
from knack.prompting import prompt_y_n
rg_pattern = r'[0-9a-zA-Z._\-()]+$'
# if match is null or ends in period, then raise error
if not match(rg_pattern, rg_name) or rg_name[-1] == '.':
raise CLIError('Resource group name only allow alphanumeric characters, periods, underscores, hyphens and parenthesis and cannot end in a period.')
if len(rg_name) > 90:
raise CLIError('Resource group name only allow up to 90 characters.')
if _check_existing_rg(rg_name):
if not prompt_y_n('Resource Group already exists. Continue to use existing resource group? If operation fails you will prompted to delete resource group'):
raise CLIError('Resource group with name \'{}\' already exists within subscription.'.format(rg_name))
logger.warning("Using preexisting resource group")
def fetch_repair_vm(namespace):
# Find repair VM
tag = _get_repair_resource_tag(namespace.resource_group_name, namespace.vm_name)
try:
find_repair_command = 'az resource list --tag {tag} --query "[?type==\'microsoft.compute/virtualmachines\' || type==\'Microsoft.Compute/virtualMachines\']" -o json' \
.format(tag=tag)
logger.info('Searching for repair-vm within subscription...')
output = _call_az_command(find_repair_command)
except AzCommandError as azCommandError:
logger.error(azCommandError)
raise CLIError('Unexpected error occured while locating repair VM.')
repair_list = loads(output)
# No repair VM found
if not repair_list:
raise CLIError('Repair VM not found for {vm_name}. Run \'az vm repair create -n {vm_name} -g {rg} --verbose\' to create repair vm and rerun the command.'
.format(vm_name=namespace.vm_name, rg=namespace.resource_group_name))
# More than one repair VM found
if len(repair_list) > 1:
message = 'More than one repair VM found:\n'
for vm in repair_list:
message += vm['id'] + '\n'
message += '\nPlease specify the repair VM id using the parameter --repair-vm-id'
raise CLIError(message)
# One repair VM found
namespace.repair_vm_id = repair_list[0]['id']
logger.info('Found repair VM: %s\n', namespace.repair_vm_id)
def validate_vm_password(password, is_linux):
"""Sourced from src/azure-cli/azure/cli/command_modules/vm/_validators.py _validate_admin_password()"""
max_length = 72 if is_linux else 123
min_length = 12
if len(password) not in range(min_length, max_length + 1):
raise CLIError('Password length must be between {} and {}'.format(min_length, max_length))
contains_lower = findall('[a-z]+', password)
contains_upper = findall('[A-Z]+', password)
contains_digit = findall('[0-9]+', password)
contains_special_char = findall(r'[ `~!@#$%^&*()=+_\[\]{}\|;:.\/\'\",<>?]+', password)
count = len([x for x in [contains_lower, contains_upper, contains_digit, contains_special_char] if x])
if count < 3:
raise CLIError('Password must have the 3 of the following: 1 lower case character, 1 upper case character, 1 number and 1 special character')
def validate_vm_username(username, is_linux):
"""Sourced from src/azure-cli/azure/cli/command_modules/vm/_validators.py _validate_admin_username()"""
pattern = (r'[\\\/"\[\]:|<>+=;,?*@#()!A-Z]+' if is_linux else r'[\\\/"\[\]:|<>+=;,?*@]+')
linux_err = r'VM username cannot contain upper case character A-Z, special characters \/"[]:|<>+=;,?*@#()! or start with $ or -'
win_err = r'VM username cannot contain special characters \/"[]:|<>+=;,?*@# or ends with .'
if findall(pattern, username):
raise CLIError(linux_err if is_linux else win_err)
if is_linux and findall(r'^[$-]+', username):
raise CLIError(linux_err)
if not is_linux and username.endswith('.'):
raise CLIError(win_err)
# Sourced from vm module also
disallowed_user_names = [
"administrator", "admin", "user", "user1", "test", "user2",
"test1", "user3", "admin1", "1", "123", "a", "actuser", "adm",
"admin2", "aspnet", "backup", "console", "guest",
"owner", "root", "server", "sql", "support", "support_388945a0",
"sys", "test2", "test3", "user4", "user5"]
if username.lower() in disallowed_user_names:
raise CLIError("This username '{}' meets the general requirements, but is specifically disallowed. Please try a different value.".format(username))
def validate_repair_and_restore(cmd, namespace):
check_extension_version(EXTENSION_NAME)
logger.info('Validating repair and restore parameters...')
logger.info(namespace.vm_name + ' ' + namespace.resource_group_name)
# Check if VM exists and is not classic VM
source_vm = _validate_and_get_vm(cmd, namespace.resource_group_name, namespace.vm_name)
is_linux = _is_linux_os(source_vm)
# Check repair vm name
namespace.repair_vm_name = ('repair-' + namespace.vm_name)[:14] + '_'
logger.info('Repair VM name: %s', namespace.repair_vm_name)
# Check copy disk name
timestamp = datetime.utcnow().strftime('%Y%m%d%H%M%S')
if namespace.copy_disk_name:
_validate_disk_name(namespace.copy_disk_name)
else:
namespace.copy_disk_name = namespace.vm_name + '-DiskCopy-' + timestamp
logger.info('Copy disk name: %s', namespace.copy_disk_name)
# Check copy resouce group name
if namespace.repair_group_name:
if namespace.repair_group_name == namespace.resource_group_name:
raise CLIError('The repair resource group name cannot be the same as the source VM resource group.')
_validate_resource_group_name(namespace.repair_group_name)
else:
namespace.repair_group_name = 'repair-' + namespace.vm_name + '-' + timestamp
logger.info('Repair resource group name: %s', namespace.repair_group_name)
# Check encrypted disk
encryption_type, _, _, _ = _fetch_encryption_settings(source_vm)
# Currently only supporting single pass
if encryption_type in (Encryption.SINGLE_WITH_KEK, Encryption.SINGLE_WITHOUT_KEK):
if not namespace.unlock_encrypted_vm:
_prompt_encrypted_vm(namespace)
elif encryption_type is Encryption.DUAL:
logger.warning('The source VM\'s OS disk is encrypted using dual pass method.')
raise CLIError('The current command does not support VMs which were encrypted using dual pass.')
else:
logger.debug('The source VM\'s OS disk is not encrypted')
validate_vm_username(namespace.repair_username, is_linux)
validate_vm_password(namespace.repair_password, is_linux)
# Prompt input for public ip usage
namespace.associate_public_ip = False
# Validate repair run command
source_vm = _validate_and_get_vm(cmd, namespace.resource_group_name, namespace.vm_name)
is_linux = _is_linux_os(source_vm)