diff --git a/bin/qds.py b/bin/qds.py index fac8b439..4952763a 100755 --- a/bin/qds.py +++ b/bin/qds.py @@ -15,7 +15,7 @@ from qds_sdk.nezha import NezhaCmdLine from qds_sdk.user import UserCmdLine from qds_sdk.template import TemplateCmdLine -from qds_sdk.clusterv2 import ClusterCmdLine +from qds_sdk.cluster_cmd_line import ClusterCmdLine from qds_sdk.sensors import * import os import sys diff --git a/qds_sdk/cluster_cmd_line.py b/qds_sdk/cluster_cmd_line.py new file mode 100644 index 00000000..8978c162 --- /dev/null +++ b/qds_sdk/cluster_cmd_line.py @@ -0,0 +1,110 @@ +from qds_sdk.cluster_info_factory import ClusterInfoFactory +from qds_sdk.clusterv2 import ClusterV2 +from qds_sdk.qubole import Qubole +from qds_sdk.resource import Resource +from qds_sdk.cloud.cloud import Cloud +from qds_sdk.engine import Engine +from qds_sdk import util +import argparse +import json + +class ClusterCmdLine: + + @staticmethod + def parsers(action): + argparser = argparse.ArgumentParser( + prog="qds.py cluster", + description="Cluster Operations for Qubole Data Service.") + subparsers = argparser.add_subparsers(title="Cluster operations") + if Qubole.version is not None: + ClusterV2.api_version = Qubole.version + if action == "create": + create = subparsers.add_parser("create", help="Create a new cluster") + ClusterCmdLine.create_update_clone_parser(create, action="create") + create.set_defaults(func=ClusterV2.create) + + if action == "update": + update = subparsers.add_parser("update", help="Update the settings of an existing cluster") + ClusterCmdLine.create_update_clone_parser(update, action="update") + update.set_defaults(func=ClusterV2.update) + + if action == "clone": + clone = subparsers.add_parser("clone", help="Clone a cluster from an existing one") + ClusterCmdLine.create_update_clone_parser(clone, action="clone") + clone.set_defaults(func=ClusterV2.clone) + + if action == "list": + li = subparsers.add_parser("list", help="list clusters from existing clusters depending upon state") + ClusterCmdLine.list_parser(li, action="list") + li.set_defaults(func=ClusterV2.list) + return argparser + + @staticmethod + def list_parser(subparser, action=None, ): + + # cluster info parser + cluster_info_cls = ClusterInfoFactory.get_cluster_info_cls() + cluster_info_cls.list_info_parser(subparser, action) + + @staticmethod + def create_update_clone_parser(subparser, action=None): + # cloud config parser + cloud = Qubole.get_cloud() + cloud.create_parser(subparser) + + # cluster info parser + cluster_info_cls = ClusterInfoFactory.get_cluster_info_cls() + cluster_info_cls.cluster_info_parser(subparser, action) + + # engine config parser + Engine.engine_parser(subparser) + + @staticmethod + def run(args): + parser = ClusterCmdLine.parsers(args[0]) + arguments = parser.parse_args(args) + if args[0] in ["create", "clone", "update"]: + ClusterCmdLine.get_cluster_create_clone_update(arguments, args[0]) + else: + return arguments.func(arguments.label, arguments.cluster_id, arguments.state, + arguments.page, arguments.per_page) + + @staticmethod + def get_cluster_create_clone_update(arguments, action): + + # This will set cluster info and monitoring settings + cluster_info_cls = ClusterInfoFactory.get_cluster_info_cls() + cluster_info = cluster_info_cls(arguments.label) + cluster_info.set_cluster_info_from_arguments(arguments) + + # This will set cloud config settings + cloud_config = Qubole.get_cloud() + cloud_config.set_cloud_config_from_arguments(arguments) + + # This will set engine settings + engine_config = Engine(flavour=arguments.flavour) + engine_config.set_engine_config_settings(arguments) + cluster_request = ClusterCmdLine.get_cluster_request_parameters(cluster_info, cloud_config, engine_config) + + action = action + if action == "create": + return arguments.func(cluster_request) + else: + return arguments.func(arguments.cluster_id_label, cluster_request) + + @staticmethod + def get_cluster_request_parameters(cluster_info, cloud_config, engine_config): + ''' + Use this to return final minimal request from cluster_info, cloud_config or engine_config objects + Alternatively call util._make_minimal if only one object needs to be implemented + ''' + + cluster_request = {} + cloud_config = util._make_minimal(cloud_config.__dict__) + if bool(cloud_config): cluster_request['cloud_config'] = cloud_config + + engine_config = util._make_minimal(engine_config.__dict__) + if bool(engine_config): cluster_request['engine_config'] = engine_config + + cluster_request.update(util._make_minimal(cluster_info.__dict__)) + return cluster_request \ No newline at end of file diff --git a/qds_sdk/cluster_info_factory.py b/qds_sdk/cluster_info_factory.py new file mode 100644 index 00000000..923e499c --- /dev/null +++ b/qds_sdk/cluster_info_factory.py @@ -0,0 +1,17 @@ +from qds_sdk.qubole import Qubole +from qds_sdk.clusterv2 import ClusterInfoV2 +from qds_sdk.cluster_info_v22 import ClusterInfoV22 + + +class ClusterInfoFactory: + + @staticmethod + def get_cluster_info_cls(api_version=None): + if api_version is None: + api_version = Qubole.version + if api_version == "v2": + return ClusterInfoV2 + elif api_version == "v2.2": + return ClusterInfoV22 + else: + return ClusterInfoV2 diff --git a/qds_sdk/cluster_info_v22.py b/qds_sdk/cluster_info_v22.py new file mode 100644 index 00000000..1af9cdf2 --- /dev/null +++ b/qds_sdk/cluster_info_v22.py @@ -0,0 +1,752 @@ +import json + +from qds_sdk import util + + +def str2bool(v): + return v.lower() in ("yes", "true", "t", "1") + + +class ClusterInfoV22(object): + """ + qds_sdk.ClusterInfoV2 is the class which stores information about a cluster_info. + You can use objects of this class to create/update/clone a cluster. + """ + + def __init__(self, label): + """ + Args: + `label`: A list of labels that identify the cluster. At least one label + must be provided when creating a cluster. + """ + self.cluster_info = {'label': label} + self.monitoring = {} + self.internal = {} # right now not supported + + def set_cluster_info_from_arguments(self, arguments): + customer_ssh_key = util._read_file(arguments.customer_ssh_key_file) + self.set_cluster_info(disallow_cluster_termination=arguments.disallow_cluster_termination, + enable_ganglia_monitoring=arguments.enable_ganglia_monitoring, + datadog_api_token=arguments.datadog_api_token, + datadog_app_token=arguments.datadog_app_token, + node_bootstrap=arguments.node_bootstrap_file, + master_instance_type=arguments.master_instance_type, + slave_instance_type=arguments.slave_instance_type, + min_nodes=arguments.initial_nodes, + max_nodes=arguments.max_nodes, + node_base_cooldown_period=arguments.node_base_cooldown_period, + node_spot_cooldown_period=arguments.node_spot_cooldown_period, + custom_tags=arguments.custom_tags, + heterogeneous_config=arguments.heterogeneous_config, + idle_cluster_timeout=arguments.idle_cluster_timeout, + disk_count=arguments.count, + disk_type=arguments.disk_type, + disk_size=arguments.size, + root_disk_size=arguments.root_disk_size, + upscaling_config=arguments.upscaling_config, + enable_encryption=arguments.encrypted_ephemerals, + customer_ssh_key=customer_ssh_key, + image_uri_overrides=arguments.image_uri_overrides, + env_name=arguments.env_name, + python_version=arguments.python_version, + r_version=arguments.r_version, + disable_cluster_pause=arguments.disable_cluster_pause, + paused_cluster_timeout_mins=arguments.paused_cluster_timeout_mins, + disable_autoscale_node_pause=arguments.disable_autoscale_node_pause, + paused_autoscale_node_timeout_mins=arguments.paused_autoscale_node_timeout_mins) + + self.set_composition(master_type=arguments.master_type, + master_spot_block_duration=arguments.master_spot_block_duration, + master_maximum_bid_price_percentage=arguments.master_maximum_bid_price_percentage, + master_timeout_for_request=arguments.master_timeout_for_request, + master_spot_fallback=arguments.master_spot_fallback, + min_ondemand_percentage=arguments.min_ondemand_percentage, + min_spot_block_percentage=arguments.min_spot_block_percentage, + min_spot_block_duration=arguments.min_spot_block_duration, + min_spot_percentage=arguments.min_spot_percentage, + min_maximum_bid_price_percentage=arguments.min_maximum_bid_price_percentage, + min_timeout_for_request=arguments.min_timeout_for_request, + min_spot_fallback=arguments.min_spot_fallback, + autoscaling_ondemand_percentage=arguments.autoscaling_ondemand_percentage, + autoscaling_spot_block_percentage=arguments.autoscaling_spot_block_percentage, + autoscaling_spot_percentage=arguments.autoscaling_spot_percentage, + autoscaling_spot_block_duration=arguments.autoscaling_spot_block_duration, + autoscaling_maximum_bid_price_percentage=arguments.autoscaling_maximum_bid_price_percentage, + autoscaling_timeout_for_request=arguments.autoscaling_timeout_for_request, + autoscaling_spot_fallback=arguments.autoscaling_spot_fallback) + + def set_cluster_info(self, + disallow_cluster_termination=None, + enable_ganglia_monitoring=None, + datadog_api_token=None, + datadog_app_token=None, + node_bootstrap=None, + master_instance_type=None, + slave_instance_type=None, + min_nodes=None, + max_nodes=None, + node_base_cooldown_period=None, + node_spot_cooldown_period=None, + custom_tags=None, + heterogeneous_config=None, + idle_cluster_timeout=None, + disk_count=None, + disk_type=None, + disk_size=None, + root_disk_size=None, + upscaling_config=None, + enable_encryption=None, + customer_ssh_key=None, + cluster_name=None, + force_tunnel=None, + image_uri_overrides=None, + env_name=None, + python_version=None, + r_version=None, + disable_cluster_pause=None, + paused_cluster_timeout_mins=None, + disable_autoscale_node_pause=None, + paused_autoscale_node_timeout_mins=None): + """ + Args: + + `disallow_cluster_termination`: Set this to True if you don't want + qubole to auto-terminate idle clusters. Use this option with + extreme caution. + + `enable_ganglia_monitoring`: Set this to True if you want to enable + ganglia monitoring for the cluster. + + `node_bootstrap`: name of the node bootstrap file for this + cluster. It should be in stored in S3 at + /scripts/hadoop/ + + `master_instance_type`: The instance type to use for the Hadoop master + node. + + `slave_instance_type`: The instance type to use for the Hadoop slave + nodes. + + `min_nodes`: Number of nodes to start the cluster with. + + `max_nodes`: Maximum number of nodes the cluster may be auto-scaled up + to. + + `node_base_cooldown_period`: Time for which an on-demand node waits before termination (Unit: minutes) + + `node_spot_cooldown_period`: Time for which a spot node waits before termination (Unit: minutes) + + `disk_count`: Number of EBS volumes to attach + to each instance of the cluster. + + `disk_type`: Type of the EBS volume. Valid + values are 'standard' (magnetic) and 'ssd'. + + `disk_size`: Size of each EBS volume, in GB. + + `root_disk_size`: Size of root volume, in GB. + + `enable_encryption`: Encrypt the ephemeral drives on the instance. + + `customer_ssh_key`: SSH key to use to login to the instances. + + `idle_cluster_timeout`: The buffer time (range in 0-6 hrs) after a cluster goes idle + and gets terminated, given cluster auto termination is on and no cluster specific + timeout has been set (default is 2 hrs) + + `heterogeneous_config` : Configuring heterogeneous nodes in Hadoop 2 and Spark clusters. + It implies that slave nodes can be of different instance types + + `custom_tags` : Custom tags to be set on all instances + of the cluster. Specified as JSON object (key-value pairs) + + `datadog_api_token` : Specify the Datadog API token to use the Datadog monitoring service + + `datadog_app_token` : Specify the Datadog APP token to use the Datadog monitoring service + + `image_uri_overrides` : Override the image name provided + + `env_name`: Name of python and R environment. (For Spark clusters) + + `python_version`: Version of Python for environment. (For Spark clusters) + + `r_version`: Version of R for environment. (For Spark clusters) + + `disable_cluster_pause`: Disable cluster pause + + `paused_cluster_timeout_mins`: Paused cluster timeout in mins + + `disable_autoscale_node_pause`: Disable autoscale node pause + + `paused_autoscale_node_timeout_mins`: Paused autoscale node timeout in mins + + Doc: For getting details about arguments + http://docs.qubole.com/en/latest/rest-api/cluster_api/create-new-cluster.html#parameters + + """ + self.cluster_info['master_instance_type'] = master_instance_type + self.cluster_info['slave_instance_type'] = slave_instance_type + self.cluster_info['min_nodes'] = min_nodes + self.cluster_info['max_nodes'] = max_nodes + self.cluster_info['cluster_name'] = cluster_name + self.cluster_info['node_bootstrap'] = node_bootstrap + self.cluster_info['disallow_cluster_termination'] = disallow_cluster_termination + self.cluster_info['force_tunnel'] = force_tunnel + self.cluster_info['node_base_cooldown_period'] = node_base_cooldown_period + self.cluster_info['node_volatile_cooldown_period'] = node_spot_cooldown_period + self.cluster_info['customer_ssh_key'] = customer_ssh_key + if custom_tags and custom_tags.strip(): + try: + self.cluster_info['custom_tags'] = json.loads(custom_tags.strip()) + except Exception as e: + raise Exception( + "Invalid JSON string for custom ec2 tags: %s" % e.message) + + self.cluster_info['heterogeneous_config'] = heterogeneous_config + self.cluster_info['idle_cluster_timeout'] = idle_cluster_timeout + self.cluster_info['rootdisk'] = {} + self.cluster_info['rootdisk']['size'] = root_disk_size + self.set_data_disk(disk_size, disk_count, disk_type, + upscaling_config, enable_encryption) + self.set_monitoring(enable_ganglia_monitoring, + datadog_api_token, datadog_app_token) + self.set_internal(image_uri_overrides) + self.set_env_settings(env_name, python_version, r_version) + self.set_start_stop_settings(disable_cluster_pause, paused_cluster_timeout_mins, + disable_autoscale_node_pause, paused_autoscale_node_timeout_mins) + + def set_composition(self, + master_type="ondemand", + master_spot_block_duration=None, + master_maximum_bid_price_percentage=None, + master_timeout_for_request=None, + master_spot_fallback=None, + min_ondemand_percentage=None, + min_spot_block_percentage=None, + min_spot_block_duration=None, + min_spot_percentage=None, + min_maximum_bid_price_percentage=None, + min_timeout_for_request=None, + min_spot_fallback=None, + autoscaling_ondemand_percentage=None, + autoscaling_spot_block_percentage=None, + autoscaling_spot_percentage=None, + autoscaling_spot_block_duration=None, + autoscaling_maximum_bid_price_percentage=None, + autoscaling_timeout_for_request=None, + autoscaling_spot_fallback=None): + + self.cluster_info["composition"] = {} + + self.set_master_config(master_type, + master_spot_block_duration, + master_maximum_bid_price_percentage, + master_timeout_for_request, + master_spot_fallback) + + self.set_min_config(min_ondemand_percentage, + min_spot_block_percentage, + min_spot_block_duration, + min_spot_percentage, + min_maximum_bid_price_percentage, + min_timeout_for_request, + min_spot_fallback) + + self.set_autoscaling_config(autoscaling_ondemand_percentage, + autoscaling_spot_block_percentage, + autoscaling_spot_block_duration, + autoscaling_spot_percentage, + autoscaling_maximum_bid_price_percentage, + autoscaling_timeout_for_request, + autoscaling_spot_fallback) + + def set_master_config(self, + master_type, + master_spot_block_duration, + master_maximum_bid_price_percentage, + master_timeout_for_request, + master_spot_fallback): + self.cluster_info["composition"]["master"] = {"nodes": []} + if master_type == "ondemand": + self.set_master_ondemand(100) + elif master_type == "spot": + self.set_master_spot(100, master_maximum_bid_price_percentage, + master_timeout_for_request, master_spot_fallback) + elif master_type == "spotblock": + self.set_master_spot_block( + 100, master_spot_block_duration) + + def set_min_config(self, + min_ondemand_percentage, + min_spot_block_percentage, + min_spot_block_duration, + min_spot_percentage, + min_maximum_bid_price_percentage, + min_timeout_for_request, + min_spot_fallback): + self.cluster_info["composition"]["min_nodes"] = {"nodes": []} + if not min_ondemand_percentage and not min_spot_block_percentage and not min_spot_percentage: + self.set_min_ondemand(100) + else: + if min_ondemand_percentage: + self.set_min_ondemand(min_ondemand_percentage) + if min_spot_block_percentage: + self.set_min_spot_block( + min_spot_block_percentage, min_spot_block_duration) + if min_spot_percentage: + self.set_min_spot(min_spot_percentage, min_maximum_bid_price_percentage, + min_timeout_for_request, min_spot_fallback) + + def set_autoscaling_config(self, + autoscaling_ondemand_percentage, + autoscaling_spot_block_percentage, + autoscaling_spot_block_duration, + autoscaling_spot_percentage, + autoscaling_maximum_bid_price_percentage, + autoscaling_timeout_for_request, + autoscaling_spot_fallback): + self.cluster_info["composition"]["autoscaling_nodes"] = {"nodes": []} + if not autoscaling_ondemand_percentage and not autoscaling_spot_block_percentage and not autoscaling_spot_percentage: + self.set_autoscaling_ondemand(50) + self.set_autoscaling_spot(50, 100, 1, 'ondemand') + else: + if autoscaling_ondemand_percentage: + self.set_autoscaling_ondemand(autoscaling_ondemand_percentage) + if autoscaling_spot_block_percentage: + self.set_autoscaling_spot_block(autoscaling_spot_block_percentage, + autoscaling_spot_block_duration) + if autoscaling_spot_percentage: + self.set_autoscaling_spot(autoscaling_spot_percentage, autoscaling_maximum_bid_price_percentage, + autoscaling_timeout_for_request, autoscaling_spot_fallback) + + def set_master_ondemand(self, master_ondemand_percentage=None): + ondemand = {"percentage": master_ondemand_percentage, "type": "ondemand"} + self.cluster_info["composition"]["master"]["nodes"].append(ondemand) + + def set_master_spot_block(self, master_spot_block_percentage=None, master_spot_block_duration=120): + spot_block = {"percentage": master_spot_block_percentage, + "type": "spotblock", + "timeout": master_spot_block_duration} + self.cluster_info["composition"]["master"]["nodes"].append(spot_block) + + def set_master_spot(self, master_spot_percentage=None, master_maximum_bid_price_percentage=100, + master_timeout_for_request=1, master_spot_fallback=None): + spot = {"percentage": master_spot_percentage, + "type": "spot", + "maximum_bid_price_percentage": master_maximum_bid_price_percentage, + "timeout_for_request": master_timeout_for_request, + "fallback": master_spot_fallback + } + self.cluster_info["composition"]["master"]["nodes"].append(spot) + + def set_min_ondemand(self, min_ondemand_percentage=None): + ondemand = {"percentage": min_ondemand_percentage, "type": "ondemand"} + self.cluster_info["composition"]["min_nodes"]["nodes"].append(ondemand) + + def set_min_spot_block(self, min_spot_block_percentage=None, min_spot_block_duration=120): + spot_block = {"percentage": min_spot_block_percentage, + "type": "spotblock", + "timeout": min_spot_block_duration} + self.cluster_info["composition"]["min_nodes"]["nodes"].append(spot_block) + + def set_min_spot(self, min_spot_percentage=None, min_maximum_bid_price_percentage=100, + min_timeout_for_request=1, min_spot_fallback=None): + spot = {"percentage": min_spot_percentage, + "type": "spot", + "maximum_bid_price_percentage": min_maximum_bid_price_percentage, + "timeout_for_request": min_timeout_for_request, + "fallback": min_spot_fallback + } + self.cluster_info["composition"]["min_nodes"]["nodes"].append(spot) + + def set_autoscaling_ondemand(self, autoscaling_ondemand_percentage=None): + ondemand = { + "percentage": autoscaling_ondemand_percentage, "type": "ondemand"} + self.cluster_info["composition"]["autoscaling_nodes"]["nodes"].append(ondemand) + + def set_autoscaling_spot_block(self, autoscaling_spot_block_percentage=None, autoscaling_spot_block_duration=120): + spot_block = {"percentage": autoscaling_spot_block_percentage, + "type": "spotblock", + "timeout": autoscaling_spot_block_duration} + self.cluster_info["composition"]["autoscaling_nodes"]["nodes"].append(spot_block) + + def set_autoscaling_spot(self, autoscaling_spot_percentage=None, autoscaling_maximum_bid_price_percentage=100, + autoscaling_timeout_for_request=1, autoscaling_spot_fallback=None): + spot = {"percentage": autoscaling_spot_percentage, + "type": "spot", + "maximum_bid_price_percentage": autoscaling_maximum_bid_price_percentage, + "timeout_for_request": autoscaling_timeout_for_request, + "fallback": autoscaling_spot_fallback + } + self.cluster_info["composition"]["autoscaling_nodes"]["nodes"].append(spot) + + def set_datadog_setting(self, + datadog_api_token=None, + datadog_app_token=None): + self.monitoring['datadog'] = {} + self.monitoring['datadog']['datadog_api_token'] = datadog_api_token + self.monitoring['datadog']['datadog_app_token'] = datadog_app_token + + def set_monitoring(self, + enable_ganglia_monitoring=None, + datadog_api_token=None, + datadog_app_token=None): + self.monitoring['ganglia'] = enable_ganglia_monitoring + self.set_datadog_setting(datadog_api_token, datadog_app_token) + + def set_data_disk(self, + disk_size=None, + disk_count=None, + disk_type=None, + upscaling_config=None, + enable_encryption=None): + self.cluster_info['datadisk'] = {} + self.cluster_info['datadisk']['size'] = disk_size + self.cluster_info['datadisk']['count'] = disk_count + self.cluster_info['datadisk']['type'] = disk_type + self.cluster_info['datadisk']['upscaling_config'] = upscaling_config + self.cluster_info['datadisk']['encryption'] = enable_encryption + + def set_internal(self, image_uri_overrides=None): + self.internal['image_uri_overrides'] = image_uri_overrides + + def set_env_settings(self, env_name=None, python_version=None, r_version=None): + self.cluster_info['env_settings'] = {} + self.cluster_info['env_settings']['name'] = env_name + self.cluster_info['env_settings']['python_version'] = python_version + self.cluster_info['env_settings']['r_version'] = r_version + + def set_start_stop_settings(self, + disable_cluster_pause=None, + paused_cluster_timeout_mins=None, + disable_autoscale_node_pause=None, + paused_autoscale_node_timeout_mins=None): + if disable_cluster_pause is not None: + disable_cluster_pause = int(disable_cluster_pause) + self.cluster_info['disable_cluster_pause'] = disable_cluster_pause + self.cluster_info['paused_cluster_timeout_mins'] = paused_cluster_timeout_mins + if disable_autoscale_node_pause is not None: + disable_autoscale_node_pause = int(disable_autoscale_node_pause) + self.cluster_info['disable_autoscale_node_pause'] = disable_autoscale_node_pause + self.cluster_info['paused_autoscale_node_timeout_mins'] = paused_autoscale_node_timeout_mins + + @staticmethod + def list_info_parser(argparser, action): + argparser.add_argument("--id", dest="cluster_id", + help="show cluster with this id") + + argparser.add_argument("--label", dest="label", + help="show cluster with this label") + argparser.add_argument("--state", dest="state", + choices=['invalid', 'up', 'down', + 'pending', 'terminating'], + help="State of the cluster") + argparser.add_argument("--page", dest="page", + type=int, + help="Page number") + argparser.add_argument("--per-page", dest="per_page", + type=int, + help="Number of clusters to be retrieved per page") + + @staticmethod + def cluster_info_parser(argparser, action): + create_required = False + label_required = False + if action == "create": + create_required = True + elif action == "update": + argparser.add_argument("cluster_id_label", + help="id/label of the cluster to update") + elif action == "clone": + argparser.add_argument("cluster_id_label", + help="id/label of the cluster to update") + label_required = True + + argparser.add_argument("--label", dest="label", + nargs="+", required=(create_required or label_required), + help="list of labels for the cluster" + + " (atleast one label is required)") + cluster_info = argparser.add_argument_group("cluster_info") + cluster_info.add_argument("--master-instance-type", + dest="master_instance_type", + help="instance type to use for the hadoop" + + " master node") + cluster_info.add_argument("--slave-instance-type", + dest="slave_instance_type", + help="instance type to use for the hadoop" + + " slave nodes") + cluster_info.add_argument("--min-nodes", + dest="initial_nodes", + type=int, + help="number of nodes to start the" + + " cluster with", ) + cluster_info.add_argument("--max-nodes", + dest="max_nodes", + type=int, + help="maximum number of nodes the cluster" + + " may be auto-scaled up to") + cluster_info.add_argument("--idle-cluster-timeout", + dest="idle_cluster_timeout", + help="cluster termination timeout for idle cluster") + cluster_info.add_argument("--node-bootstrap-file", + dest="node_bootstrap_file", + help="""name of the node bootstrap file for this cluster. It + should be in stored in S3 at + /scripts/hadoop/NODE_BOOTSTRAP_FILE + """, ) + cluster_info.add_argument("--root-disk-size", + dest="root_disk_size", + type=int, + help="size of the root volume in GB") + termination = cluster_info.add_mutually_exclusive_group() + termination.add_argument("--disallow-cluster-termination", + dest="disallow_cluster_termination", + action="store_true", + default=None, + help="don't auto-terminate idle clusters," + + " use this with extreme caution", ) + termination.add_argument("--allow-cluster-termination", + dest="disallow_cluster_termination", + action="store_false", + default=None, + help="auto-terminate idle clusters,") + + node_cooldown_period_group = argparser.add_argument_group( + "node cooldown period settings") + node_cooldown_period_group.add_argument("--node-base-cooldown-period", + dest="node_base_cooldown_period", + type=int, + help="Cooldown period for on-demand nodes" + + " unit: minutes") + node_cooldown_period_group.add_argument("--node-spot-cooldown-period", + dest="node_spot_cooldown_period", + type=int, + help="Cooldown period for spot nodes" + + " unit: minutes") + cluster_info.add_argument("--customer-ssh-key", + dest="customer_ssh_key_file", + help="location for ssh key to use to" + + " login to the instance") + cluster_info.add_argument("--custom-tags", + dest="custom_tags", + help="""Custom tags to be set on all instances + of the cluster. Specified as JSON object (key-value pairs) + e.g. --custom-ec2-tags '{"key1":"value1", "key2":"value2"}' + """, ) + + # datadisk settings + datadisk_group = argparser.add_argument_group("data disk settings") + datadisk_group.add_argument("--count", + dest="count", + type=int, + help="Number of EBS volumes to attach to" + + " each instance of the cluster", ) + datadisk_group.add_argument("--disk-type", + dest="disk_type", + choices=["standard", "gp2"], + help="Type of the volume attached to the instances. Valid values are " + + "'standard' (magnetic) and 'gp2' (ssd).") + datadisk_group.add_argument("--size", + dest="size", + type=int, + help="Size of each EBS volume, in GB", ) + datadisk_group.add_argument("--upscaling-config", + dest="upscaling_config", + help="Upscaling config to be attached with the instances.", ) + ephemerals = datadisk_group.add_mutually_exclusive_group() + ephemerals.add_argument("--encrypted-ephemerals", + dest="encrypted_ephemerals", + action="store_true", + default=None, + help="encrypt the ephemeral drives on" + + " the instance", ) + ephemerals.add_argument("--no-encrypted-ephemerals", + dest="encrypted_ephemerals", + action="store_false", + default=None, + help="don't encrypt the ephemeral drives on" + + " the instance", ) + + cluster_info.add_argument("--heterogeneous-config", + dest="heterogeneous_config", + help="heterogeneous config for the cluster") + + composition_group = argparser.add_argument_group("cluster composition settings") + composition_group.add_argument("--master-type", + dest="master_type", + choices=["ondemand", "spot", "spotblock"], + default="ondemand", + help="type of master nodes. Valid values are: ('ondemand', 'spot', 'spotblock')" + + "default: ondemand") + composition_group.add_argument("--master-spot-block-duration", + dest="master_spot_block_duration", + type=int, + default=120, + help="spot block duration unit: minutes") + composition_group.add_argument("--master-maximum-bid-price-percentage", + dest="master_maximum_bid_price_percentage", + type=int, + default=100, + help="maximum value to bid for master spot instances" + + " expressed as a percentage of the base" + + " price for the master instance types") + composition_group.add_argument("--master-timeout-for-request", + dest="master_timeout_for_request", + type=int, + default=1, + help="timeout for a master spot instance request, unit: minutes") + composition_group.add_argument("--master-spot-fallback", + dest="master_spot_fallback", + choices=["ondemand", None], + default=None, + help="whether to fallback to on-demand instances for master nodes" + + " if spot instances aren't available") + composition_group.add_argument("--min-ondemand-percentage", + dest="min_ondemand_percentage", + type=int, + help="percentage of ondemand nodes in min config") + composition_group.add_argument("--min-spot-block-percentage", + dest="min_spot_block_percentage", + type=int, + help="percentage of spot block nodes in min config") + composition_group.add_argument("--min-spot-percentage", + dest="min_spot_percentage", + type=int, + help="percentage of spot nodes in min config") + composition_group.add_argument("--min-spot-block-duration", + dest="min_spot_block_duration", + type=int, + default=120, + help="spot block duration unit: minutes") + composition_group.add_argument("--min-maximum-bid-price-percentage", + dest="min_maximum_bid_price_percentage", + type=int, + default=100, + help="maximum value to bid for min spot instances" + + " expressed as a percentage of the base" + + " price for the master instance types") + composition_group.add_argument("--min-timeout-for-request", + dest="min_timeout_for_request", + type=int, + default=1, + help="timeout for a min spot instance request, unit: minutes") + composition_group.add_argument("--min-spot-fallback", + dest="min_spot_fallback", + choices=["ondemand", None], + default=None, + help="whether to fallback to on-demand instances for min nodes" + + " if spot instances aren't available") + + composition_group.add_argument("--autoscaling-ondemand-percentage", + dest="autoscaling_ondemand_percentage", + type=int, + help="percentage of ondemand nodes in autoscaling config") + composition_group.add_argument("--autoscaling-spot-block-percentage", + dest="autoscaling_spot_block_percentage", + type=int, + help="percentage of spot block nodes in autoscaling config") + composition_group.add_argument("--autoscaling-spot-percentage", + dest="autoscaling_spot_percentage", + type=int, + help="percentage of spot nodes in autoscaling config") + composition_group.add_argument("--autoscaling-spot-block-duration", + dest="autoscaling_spot_block_duration", + type=int, + default=120, + help="spot block duration unit: minutes") + composition_group.add_argument("--autoscaling-maximum-bid-price-percentage", + dest="autoscaling_maximum_bid_price_percentage", + type=int, + default=100, + help="maximum value to bid for autoscaling spot instances" + + " expressed as a percentage of the base" + + " price for the master instance types") + composition_group.add_argument("--autoscaling-timeout-for-request", + dest="autoscaling_timeout_for_request", + type=int, + default=1, + help="timeout for a autoscaling spot instance request, unit: minutes") + composition_group.add_argument("--autoscaling-spot-fallback", + dest="autoscaling_spot_fallback", + choices=["ondemand", None], + default=None, + help="whether to fallback to on-demand instances for autoscaling nodes" + + " if spot instances aren't available") + + # monitoring settings + monitoring_group = argparser.add_argument_group("monitoring settings") + ganglia = monitoring_group.add_mutually_exclusive_group() + ganglia.add_argument("--enable-ganglia-monitoring", + dest="enable_ganglia_monitoring", + action="store_true", + default=None, + help="enable ganglia monitoring for the" + + " cluster", ) + ganglia.add_argument("--disable-ganglia-monitoring", + dest="enable_ganglia_monitoring", + action="store_false", + default=None, + help="disable ganglia monitoring for the" + + " cluster", ) + + datadog_group = argparser.add_argument_group("datadog settings") + datadog_group.add_argument("--datadog-api-token", + dest="datadog_api_token", + default=None, + help="fernet key for airflow cluster", ) + datadog_group.add_argument("--datadog-app-token", + dest="datadog_app_token", + default=None, + help="overrides for airflow cluster", ) + + internal_group = argparser.add_argument_group("internal settings") + internal_group.add_argument("--image-overrides", + dest="image_uri_overrides", + default=None, + help="overrides for image", ) + + env_group = argparser.add_argument_group("environment settings") + env_group.add_argument("--env-name", + dest="env_name", + default=None, + help="name of Python and R environment") + env_group.add_argument("--python-version", + dest="python_version", + default=None, + help="version of Python in environment") + env_group.add_argument("--r-version", + dest="r_version", + default=None, + help="version of R in environment") + + start_stop_group = argparser.add_argument_group("start stop settings") + start_stop_group.add_argument("--disable-cluster-pause", + dest="disable_cluster_pause", + action='store_true', + default=None, + help="disable cluster pause") + start_stop_group.add_argument("--no-disable-cluster-pause", + dest="disable_cluster_pause", + action='store_false', + default=None, + help="disable cluster pause") + start_stop_group.add_argument("--paused-cluster-timeout", + dest="paused_cluster_timeout_mins", + default=None, + type=int, + help="paused cluster timeout in min") + start_stop_group.add_argument("--disable-autoscale-node-pause", + dest="disable_autoscale_node_pause", + action='store_true', + default=None, + help="disable autoscale node pause") + start_stop_group.add_argument("--no-disable-autoscale-node-pause", + dest="disable_autoscale_node_pause", + action='store_false', + default=None, + help="disable autoscale node pause") + start_stop_group.add_argument("--paused-autoscale-node-timeout", + dest="paused_autoscale_node_timeout_mins", + default=None, + type=int, + help="paused autoscale node timeout in min") diff --git a/qds_sdk/clusterv2.py b/qds_sdk/clusterv2.py index eb3f07f6..957a6f94 100755 --- a/qds_sdk/clusterv2.py +++ b/qds_sdk/clusterv2.py @@ -6,143 +6,10 @@ import argparse import json + def str2bool(v): return v.lower() in ("yes", "true", "t", "1") -class ClusterCmdLine: - - @staticmethod - def parsers(action): - argparser = argparse.ArgumentParser( - prog="qds.py cluster", - description="Cluster Operations for Qubole Data Service.") - subparsers = argparser.add_subparsers(title="Cluster operations") - - if action == "create": - create = subparsers.add_parser("create", help="Create a new cluster") - ClusterCmdLine.create_update_clone_parser(create, action="create") - create.set_defaults(func=ClusterV2.create) - - if action == "update": - update = subparsers.add_parser("update", help="Update the settings of an existing cluster") - ClusterCmdLine.create_update_clone_parser(update, action="update") - update.set_defaults(func=ClusterV2.update) - - if action == "clone": - clone = subparsers.add_parser("clone", help="Clone a cluster from an existing one") - ClusterCmdLine.create_update_clone_parser(clone, action="clone") - clone.set_defaults(func=ClusterV2.clone) - - if action == "list": - li = subparsers.add_parser("list", help="list clusters from existing clusters depending upon state") - ClusterCmdLine.list_parser(li, action="list") - li.set_defaults(func=ClusterV2.list) - return argparser - - @staticmethod - def list_parser(subparser, action=None): - - # cluster info parser - ClusterInfoV2.list_info_parser(subparser, action) - - @staticmethod - def create_update_clone_parser(subparser, action=None): - # cloud config parser - cloud = Qubole.get_cloud() - cloud.create_parser(subparser) - - # cluster info parser - ClusterInfoV2.cluster_info_parser(subparser, action) - - # engine config parser - Engine.engine_parser(subparser) - - @staticmethod - def run(args): - parser = ClusterCmdLine.parsers(args[0]) - arguments = parser.parse_args(args) - if args[0] in ["create", "clone", "update"]: - ClusterCmdLine.get_cluster_create_clone_update(arguments, args[0]) - else: - return arguments.func(arguments.label, arguments.cluster_id, arguments.state, - arguments.page, arguments.per_page) - - @staticmethod - def get_cluster_create_clone_update(arguments, action): - customer_ssh_key = util._read_file(arguments.customer_ssh_key_file) - # This will set cluster info and monitoring settings - cluster_info = ClusterInfoV2(arguments.label) - cluster_info.set_cluster_info(disallow_cluster_termination=arguments.disallow_cluster_termination, - enable_ganglia_monitoring=arguments.enable_ganglia_monitoring, - datadog_api_token=arguments.datadog_api_token, - datadog_app_token=arguments.datadog_app_token, - node_bootstrap=arguments.node_bootstrap_file, - master_instance_type=arguments.master_instance_type, - slave_instance_type=arguments.slave_instance_type, - min_nodes=arguments.initial_nodes, - max_nodes=arguments.max_nodes, - slave_request_type=arguments.slave_request_type, - fallback_to_ondemand=arguments.fallback_to_ondemand, - node_base_cooldown_period=arguments.node_base_cooldown_period, - node_spot_cooldown_period=arguments.node_spot_cooldown_period, - custom_tags=arguments.custom_tags, - heterogeneous_config=arguments.heterogeneous_config, - maximum_bid_price_percentage=arguments.maximum_bid_price_percentage, - timeout_for_request=arguments.timeout_for_request, - maximum_spot_instance_percentage=arguments.maximum_spot_instance_percentage, - stable_maximum_bid_price_percentage=arguments.stable_maximum_bid_price_percentage, - stable_timeout_for_request=arguments.stable_timeout_for_request, - stable_spot_fallback=arguments.stable_spot_fallback, - spot_block_duration=arguments.spot_block_duration, - idle_cluster_timeout=arguments.idle_cluster_timeout, - disk_count=arguments.count, - disk_type=arguments.disk_type, - disk_size=arguments.size, - root_disk_size=arguments.root_disk_size, - upscaling_config=arguments.upscaling_config, - enable_encryption=arguments.encrypted_ephemerals, - customer_ssh_key=customer_ssh_key, - image_uri_overrides=arguments.image_uri_overrides, - env_name=arguments.env_name, - python_version=arguments.python_version, - r_version=arguments.r_version, - disable_cluster_pause=arguments.disable_cluster_pause, - paused_cluster_timeout_mins=arguments.paused_cluster_timeout_mins, - disable_autoscale_node_pause=arguments.disable_autoscale_node_pause, - paused_autoscale_node_timeout_mins=arguments.paused_autoscale_node_timeout_mins) - - # This will set cloud config settings - cloud_config = Qubole.get_cloud() - cloud_config.set_cloud_config_from_arguments(arguments) - - # This will set engine settings - engine_config = Engine(flavour=arguments.flavour) - engine_config.set_engine_config_settings(arguments) - - cluster_request = ClusterCmdLine.get_cluster_request_parameters(cluster_info, cloud_config, engine_config) - - action = action - if action == "create": - return arguments.func(cluster_request) - else: - return arguments.func(arguments.cluster_id_label, cluster_request) - - @staticmethod - def get_cluster_request_parameters(cluster_info, cloud_config, engine_config): - ''' - Use this to return final minimal request from cluster_info, cloud_config or engine_config objects - Alternatively call util._make_minimal if only one object needs to be implemented - ''' - - cluster_request = {} - cloud_config = util._make_minimal(cloud_config.__dict__) - if bool(cloud_config): cluster_request['cloud_config'] = cloud_config - - engine_config = util._make_minimal(engine_config.__dict__) - if bool(engine_config): cluster_request['engine_config'] = engine_config - - cluster_request.update(util._make_minimal(cluster_info.__dict__)) - return cluster_request class ClusterInfoV2(object): """ @@ -159,7 +26,48 @@ def __init__(self, label): self.cluster_info = {} self.cluster_info['label'] = label self.monitoring = {} - self.internal = {} # right now not supported + self.internal = {} # right now not supported + + def set_cluster_info_from_arguments(self, arguments): + customer_ssh_key = util._read_file(arguments.customer_ssh_key_file) + self.set_cluster_info(disallow_cluster_termination=arguments.disallow_cluster_termination, + enable_ganglia_monitoring=arguments.enable_ganglia_monitoring, + datadog_api_token=arguments.datadog_api_token, + datadog_app_token=arguments.datadog_app_token, + node_bootstrap=arguments.node_bootstrap_file, + master_instance_type=arguments.master_instance_type, + slave_instance_type=arguments.slave_instance_type, + min_nodes=arguments.initial_nodes, + max_nodes=arguments.max_nodes, + slave_request_type=arguments.slave_request_type, + fallback_to_ondemand=arguments.fallback_to_ondemand, + node_base_cooldown_period=arguments.node_base_cooldown_period, + node_spot_cooldown_period=arguments.node_spot_cooldown_period, + custom_tags=arguments.custom_tags, + heterogeneous_config=arguments.heterogeneous_config, + maximum_bid_price_percentage=arguments.maximum_bid_price_percentage, + timeout_for_request=arguments.timeout_for_request, + maximum_spot_instance_percentage=arguments.maximum_spot_instance_percentage, + stable_maximum_bid_price_percentage=arguments.stable_maximum_bid_price_percentage, + stable_timeout_for_request=arguments.stable_timeout_for_request, + stable_spot_fallback=arguments.stable_spot_fallback, + spot_block_duration=arguments.spot_block_duration, + idle_cluster_timeout=arguments.idle_cluster_timeout, + disk_count=arguments.count, + disk_type=arguments.disk_type, + disk_size=arguments.size, + root_disk_size=arguments.root_disk_size, + upscaling_config=arguments.upscaling_config, + enable_encryption=arguments.encrypted_ephemerals, + customer_ssh_key=customer_ssh_key, + image_uri_overrides=arguments.image_uri_overrides, + env_name=arguments.env_name, + python_version=arguments.python_version, + r_version=arguments.r_version, + disable_cluster_pause=arguments.disable_cluster_pause, + paused_cluster_timeout_mins=arguments.paused_cluster_timeout_mins, + disable_autoscale_node_pause=arguments.disable_autoscale_node_pause, + paused_autoscale_node_timeout_mins=arguments.paused_autoscale_node_timeout_mins) def set_cluster_info(self, disallow_cluster_termination=None, @@ -320,7 +228,7 @@ def set_cluster_info(self, self.cluster_info['force_tunnel'] = force_tunnel self.cluster_info['fallback_to_ondemand'] = fallback_to_ondemand self.cluster_info['node_base_cooldown_period'] = node_base_cooldown_period - self.cluster_info['node_spot_cooldown_period'] = node_spot_cooldown_period + self.cluster_info['node_volatile_cooldown_period'] = node_spot_cooldown_period self.cluster_info['customer_ssh_key'] = customer_ssh_key if custom_tags and custom_tags.strip(): try: @@ -336,8 +244,10 @@ def set_cluster_info(self, self.cluster_info['rootdisk'] = {} self.cluster_info['rootdisk']['size'] = root_disk_size - self.set_spot_instance_settings(maximum_bid_price_percentage, timeout_for_request, maximum_spot_instance_percentage) - self.set_stable_spot_bid_settings(stable_maximum_bid_price_percentage, stable_timeout_for_request, stable_spot_fallback) + self.set_spot_instance_settings(maximum_bid_price_percentage, timeout_for_request, + maximum_spot_instance_percentage) + self.set_stable_spot_bid_settings(stable_maximum_bid_price_percentage, stable_timeout_for_request, + stable_spot_fallback) self.set_spot_block_settings(spot_block_duration) self.set_data_disk(disk_size, disk_count, disk_type, upscaling_config, enable_encryption) self.set_monitoring(enable_ganglia_monitoring, datadog_api_token, datadog_app_token) @@ -705,16 +615,18 @@ def cluster_info_parser(argparser, action): type=int, help="paused autoscale node timeout in min") -class ClusterV2(Resource): +class ClusterV2(Resource): rest_entity_path = "clusters" + api_version = "v2" @classmethod def create(cls, cluster_info): """ Create a new cluster using information provided in `cluster_info`. """ - conn = Qubole.agent(version="v2") + + conn = Qubole.agent(version=cls.api_version) return conn.post(cls.rest_entity_path, data=cluster_info) @classmethod @@ -723,7 +635,7 @@ def update(cls, cluster_id_label, cluster_info): Update the cluster with id/label `cluster_id_label` using information provided in `cluster_info`. """ - conn = Qubole.agent(version="v2") + conn = Qubole.agent(version=cls.api_version) return conn.put(cls.element_path(cluster_id_label), data=cluster_info) @classmethod @@ -732,7 +644,7 @@ def clone(cls, cluster_id_label, cluster_info): Update the cluster with id/label `cluster_id_label` using information provided in `cluster_info`. """ - conn = Qubole.agent(version="v2") + conn = Qubole.agent(version=cls.api_version) return conn.post(cls.element_path(cluster_id_label) + '/clone', data=cluster_info) @classmethod @@ -758,7 +670,7 @@ def list(cls, label=None, cluster_id=None, state=None, page=None, per_page=None) if per_page: params['per_page'] = per_page params = None if not params else params - conn = Qubole.agent(version="v2") + conn = Qubole.agent(version=cls.api_version) cluster_list = conn.get(cls.rest_entity_path) if state is None: # return the complete list since state is None @@ -776,5 +688,5 @@ def show(cls, cluster_id_label): """ Show information about the cluster with id/label `cluster_id_label`. """ - conn = Qubole.agent(version="v2") + conn = Qubole.agent(version=cls.api_version) return conn.get(cls.element_path(cluster_id_label)) diff --git a/qds_sdk/qubole.py b/qds_sdk/qubole.py index 06be4da5..d8b842a5 100644 --- a/qds_sdk/qubole.py +++ b/qds_sdk/qubole.py @@ -121,3 +121,4 @@ def get_cloud_object(cls, cloud_name): elif cloud_name.lower() == "gcp": import qds_sdk.cloud.gcp_cloud return qds_sdk.cloud.gcp_cloud.GcpCloud() + diff --git a/tests/test_clusterv2.py b/tests/test_clusterv2.py index eec62ade..4861a2fa 100644 --- a/tests/test_clusterv2.py +++ b/tests/test_clusterv2.py @@ -583,7 +583,7 @@ def test_node_spot_cooldown_period_v2(self): qds.main() Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': {'label': ['test_label'], - 'node_spot_cooldown_period': 15}}) + 'node_volatile_cooldown_period': 15}}) def test_node_spot_cooldown_period_invalid_v2(self): sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'create', '--label', 'test_label', @@ -855,7 +855,7 @@ def test_node_spot_cooldown_period_v2(self): Connection._api_call = Mock(return_value={}) qds.main() Connection._api_call.assert_called_with('PUT', 'clusters/123', - {'cluster_info': {'node_spot_cooldown_period': 15}}) + {'cluster_info': {'node_volatile_cooldown_period': 15}}) def test_node_spot_cooldown_period_invalid_v2(self): sys.argv = ['qds.py', '--version', 'v2', 'cluster', 'update', '123', diff --git a/tests/test_clusterv22.py b/tests/test_clusterv22.py new file mode 100644 index 00000000..e05128dd --- /dev/null +++ b/tests/test_clusterv22.py @@ -0,0 +1,197 @@ +from __future__ import print_function +import sys +import os + +if sys.version_info > (2, 7, 0): + import unittest +else: + import unittest2 as unittest +from mock import Mock, ANY +import tempfile + +sys.path.append(os.path.join(os.path.dirname(__file__), '../bin')) +import qds +from qds_sdk.connection import Connection +from test_base import print_command +from test_base import QdsCliTestCase +from qds_sdk.cloud.cloud import Cloud +from qds_sdk.qubole import Qubole + + +class TestClusterCreate(QdsCliTestCase): + # default cluster composition + def test_cluster_info(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--compute-access-key', 'aki', '--compute-secret-key', 'sak', '--min-nodes', '3', + '--max-nodes', '5', '--disallow-cluster-termination', '--enable-ganglia-monitoring', + '--node-bootstrap-file', 'test_file_name', '--master-instance-type', + 'm1.xlarge', '--slave-instance-type', 'm1.large', '--encrypted-ephemerals'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', { + 'cloud_config': {'compute_config': {'compute_secret_key': 'sak', 'compute_access_key': 'aki'}}, + 'monitoring': {'ganglia': True}, + 'cluster_info': {'master_instance_type': 'm1.xlarge', 'node_bootstrap': 'test_file_name', + 'slave_instance_type': 'm1.large', 'label': ['test_label'], + 'disallow_cluster_termination': True, 'max_nodes': 5, 'min_nodes': 3, + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'autoscaling_nodes': {'nodes': [{'percentage': 50, 'type': 'ondemand'}, + {'timeout_for_request': 1, + 'percentage': 50, 'type': 'spot', + 'fallback': 'ondemand', + 'maximum_bid_price_percentage': 100}]}}, + 'datadisk': {'encryption': True}}}) + + def test_od_od_od(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'ondemand', '--min-ondemand-percentage', '100', + '--autoscaling-ondemand-percentage', '100'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': { + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'autoscaling_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}}, + 'label': ['test_label']}}) + + def test_od_od_odspot(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'ondemand', '--min-ondemand-percentage', '100', + '--autoscaling-ondemand-percentage', + '50', '--autoscaling-spot-percentage', '50', '--autoscaling-maximum-bid-price-percentage', '50', + '--autoscaling-timeout-for-request', '3', '--autoscaling-spot-fallback', 'ondemand'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': { + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, 'autoscaling_nodes': { + 'nodes': [{'percentage': 50, 'type': 'ondemand'}, + {'timeout_for_request': 3, 'percentage': 50, 'type': 'spot', 'fallback': 'ondemand', + 'maximum_bid_price_percentage': 50}]}}, 'label': ['test_label']}}) + + def test_od_od_odspot_nofallback(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'ondemand', '--min-ondemand-percentage', '100', + '--autoscaling-ondemand-percentage', + '50', '--autoscaling-spot-percentage', '50', '--autoscaling-maximum-bid-price-percentage', '50', + '--autoscaling-timeout-for-request', '3', '--autoscaling-spot-fallback', None] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': { + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, 'autoscaling_nodes': { + 'nodes': [{'percentage': 50, 'type': 'ondemand'}, + {'timeout_for_request': 3, 'percentage': 50, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}}, 'label': ['test_label']}}) + + def test_od_od_spotblock(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'ondemand', '--min-ondemand-percentage', '100', + '--autoscaling-spot-block-percentage', + '100', '--autoscaling-spot-block-duration', '60'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': { + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'autoscaling_nodes': {'nodes': [{'percentage': 100, 'type': 'spotblock', 'timeout': 60}]}}, + 'label': ['test_label']}}) + + def test_od_od_spotblockspot(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'ondemand', '--min-ondemand-percentage', '100', + '--autoscaling-spot-block-percentage', + '50', '--autoscaling-spot-block-duration', '60', '--autoscaling-spot-percentage', '50', + '--autoscaling-maximum-bid-price-percentage', '50', + '--autoscaling-timeout-for-request', '3', '--autoscaling-spot-fallback', None] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': { + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, 'autoscaling_nodes': { + 'nodes': [{'percentage': 50, 'type': 'spotblock', 'timeout': 60}, + {'timeout_for_request': 3, 'percentage': 50, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}}, 'label': ['test_label']}}) + + def test_od_od_spot(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'ondemand', '--min-ondemand-percentage', '100', '--autoscaling-spot-percentage', + '100', + '--autoscaling-maximum-bid-price-percentage', '50', '--autoscaling-timeout-for-request', '3', + '--autoscaling-spot-fallback', None] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': { + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, 'autoscaling_nodes': { + 'nodes': [{'timeout_for_request': 3, 'percentage': 100, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}}, 'label': ['test_label']}}) + + def test_od_spot_spot(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'ondemand', '--min-spot-percentage', '100', + '--min-maximum-bid-price-percentage', '50', '--min-timeout-for-request', '3', + '--min-spot-fallback', None, '--autoscaling-spot-percentage', '100', + '--autoscaling-maximum-bid-price-percentage', '50', '--autoscaling-timeout-for-request', '3', + '--autoscaling-spot-fallback', None] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': {'composition': {'min_nodes': { + 'nodes': [{'timeout_for_request': 3, 'percentage': 100, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}, 'master': { + 'nodes': [{'percentage': 100, 'type': 'ondemand'}]}, 'autoscaling_nodes': {'nodes': [ + {'timeout_for_request': 3, 'percentage': 100, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}}, 'label': ['test_label']}}) + + def test_spotblock_spotblock_spotblock(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'spotblock', '--master-spot-block-duration', '60', '--min-spot-block-percentage', + '100', '--min-spot-block-duration', '60', '--autoscaling-spot-block-percentage', + '100', '--autoscaling-spot-block-duration', '60'] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': { + 'composition': {'min_nodes': {'nodes': [{'percentage': 100, 'type': 'spotblock', 'timeout': 60}]}, + 'master': {'nodes': [{'percentage': 100, 'type': 'spotblock', 'timeout': 60}]}, + 'autoscaling_nodes': {'nodes': [{'percentage': 100, 'type': 'spotblock', 'timeout': 60}]}}, + 'label': ['test_label']}}) + + def test_spot_spot_spot(self): + sys.argv = ['qds.py', '--version', 'v2.2', 'cluster', 'create', '--label', 'test_label', + '--master-type', 'spot', '--master-maximum-bid-price-percentage', '50', + '--master-timeout-for-request', '3', + '--master-spot-fallback', None, '--min-spot-percentage', '100', + '--min-maximum-bid-price-percentage', '50', '--min-timeout-for-request', '3', + '--min-spot-fallback', None, '--autoscaling-spot-percentage', '100', + '--autoscaling-maximum-bid-price-percentage', '50', '--autoscaling-timeout-for-request', '3', + '--autoscaling-spot-fallback', None] + Qubole.cloud = None + print_command() + Connection._api_call = Mock(return_value={}) + qds.main() + Connection._api_call.assert_called_with('POST', 'clusters', {'cluster_info': {'composition': {'min_nodes': { + 'nodes': [{'timeout_for_request': 3, 'percentage': 100, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}, 'master': {'nodes': [ + {'timeout_for_request': 3, 'percentage': 100, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}, 'autoscaling_nodes': {'nodes': [ + {'timeout_for_request': 3, 'percentage': 100, 'type': 'spot', 'fallback': None, + 'maximum_bid_price_percentage': 50}]}}, 'label': ['test_label']}})