Skip to content

Commit

Permalink
Add logging, address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Mikayla Thompson <thomika@amazon.com>
  • Loading branch information
mikaylathompson committed May 24, 2024
1 parent 4c9257a commit 6d60c09
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ services:
- ~/.aws:/root/.aws
environment:
- MIGRATION_KAFKA_BROKER_ENDPOINTS=kafka:9092
- AWS_REGION=us-east-2
- AWS_DEFAULT_REGION=us-east-2
- AWS_PROFILE=default
# command: ./runTestBenchmarks.sh

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

The console link library is designed to provide a unified interface for the many possible backend services involved in a migration. The interface can be used by multiple frontends--a CLI app and a web API, for instance.

![alt text](diagram.png)
![Console_link Library Diagram](console_library_diagram.svg)


The user defines their migration services in a `migration_services.yaml` file, by default found at `/etc/migration_services.yaml`.
Expand Down Expand Up @@ -44,7 +44,7 @@ Currently, the two supported metrics source types are `prometheus` and `cloudwat

- `type`: required, `prometheus` or `cloudwatch`
- `endpoint`: required for `prometheus` (ignored for `cloudwatch`)
- `region`: optional for `cloudwatch` (ignored for `prometheus`). if not provided, the usual rules are followed for determining aws region (`AWS_DEFAULT_REGION`, `~/.aws/config`)
- `aws_region`: optional for `cloudwatch` (ignored for `prometheus`). if not provided, the usual rules are followed for determining aws region (`AWS_DEFAULT_REGION`, `~/.aws/config`)

# Usage
### Library
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import console_link.logic.metrics as logic_metrics
from console_link.logic.instantiation import Environment
from console_link.models.metrics_source import Component, MetricStatistic
import logging

logger = logging.getLogger(__name__)

# ################### UNIVERSAL ####################

Expand All @@ -22,11 +24,13 @@ def __init__(self, config_file) -> None:
"--config-file", default="/etc/migration_services.yaml", help="Path to config file"
)
@click.option("--json", is_flag=True)
@click.option('-v', '--verbose', count=True, help="Verbosity level. -v is warnings, -vv is info, -vvv is debug.")
@click.pass_context
def cli(ctx, config_file, json):
def cli(ctx, config_file, json, verbose):
ctx.obj = Context(config_file)
ctx.obj.json = json

logging.basicConfig(level=logging.ERROR - (10 * verbose))
logger.info(f"Logging set to {logging.getLevelName(logger.getEffectiveLevel())}")

# ##################### CLUSTERS ###################

Expand Down Expand Up @@ -62,7 +66,6 @@ def cat_indices_cmd(ctx):
click.echo(logic_clusters.cat_indices(ctx.env.source_cluster))
click.echo("TARGET CLUSTER")
click.echo(logic_clusters.cat_indices(ctx.env.target_cluster))
pass


# ##################### REPLAYER ###################
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import logging
from console_link.models.cluster import Cluster
from console_link.models.metrics_source import MetricsSource, get_metrics_source
import yaml
from cerberus import Validator

logger = logging.getLogger(__name__)

SCHEMA = {
"source_cluster": {"type": "dict", "required": True},
"target_cluster": {"type": "dict", "required": True},
Expand All @@ -14,21 +17,23 @@

class Environment:
def __init__(self, config_file: str):
# TODO: add validation of overall yaml structure here, and details in each component.

self.config_file = config_file
with open(self.config_file) as f:
self.config = yaml.safe_load(f)
v = Validator(SCHEMA)
if not v.validate(self.config):
logger.error(f"Config file validation errors: {v.errors}")
raise ValueError("Invalid config file", v.errors)

self.source_cluster: Cluster = Cluster(self.config["source_cluster"])
logger.debug(f"Source cluster initialized: {self.source_cluster.endpoint}")

# At some point, target and replayers should be stored as pairs, but for the time being
# we can probably assume one target cluster.
self.target_cluster: Cluster = Cluster(self.config["target_cluster"])
logger.debug(f"Target cluster initialized: {self.target_cluster.endpoint}")

self.metrics_source: MetricsSource = get_metrics_source(
self.config.get("metrics_source")
)
logger.debug(f"Metrics source initialized: {self.metrics_source}")
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
from typing import List, Tuple
from console_link.models.metrics_source import MetricsSource, Component, MetricStatistic
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)


def get_metric_data(metrics_source: MetricsSource, component: str, metric_name: str,
statistic: str, lookback: int) -> List[Tuple[str, float]]:
logger.info(f"Called get_metric_data with {component=}, {metric_name=},"
f"{statistic=}, {lookback=}")
try:
component_obj = Component[component.upper()]
except KeyError:
logger.error(f"Component {component} was not found in {list(Component)}")
raise ValueError("Invalid component", {component})
try:
statistic_obj = MetricStatistic[statistic]
except KeyError:
logger.error(f"Statistic {statistic} was not found in {list(MetricStatistic)}")
raise ValueError("Invalid statistic", {statistic})

starttime = datetime.now() - timedelta(minutes=lookback)
logger.info(f"Setting starttime to current time ({datetime.now()}) minus lookback ({lookback}: {starttime}")

return metrics_source.get_metric_data(
component_obj,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import requests
from requests.auth import HTTPBasicAuth
from cerberus import Validator
import logging

requests.packages.urllib3.disable_warnings() # ignore: type

logger = logging.getLogger(__name__)

AuthMethod = Enum("AuthMethod", ["BASIC", "SIGV4"])
HttpMethod = Enum("HttpMethod", ["GET", "POST", "PUT", "DELETE"])

Expand Down Expand Up @@ -37,6 +40,7 @@ class Cluster:
auth_details: Optional[Dict[str, Any]] = None

def __init__(self, config: Dict) -> None:
logger.info(f"Initializing cluster with config: {config}")
v = Validator(SCHEMA)
if not v.validate(config):
raise ValueError("Invalid config file for cluster", v.errors)
Expand All @@ -55,18 +59,21 @@ def call_api(self, path, method: HttpMethod = HttpMethod.GET) -> requests.Respon
if self.auth_type == AuthMethod.BASIC:
assert self.auth_details is not None # for mypy's sake
auth = HTTPBasicAuth(
self.auth_details.get("username", None), self.auth_details["password"]
self.auth_details.get("username", None),
self.auth_details.get("password", None)
)
elif self.auth_type is None:
auth = None
else:
raise NotImplementedError(f"Auth type {self.auth_type} not implemented")

logger.info(f"Making api call to {self.endpoint}{path}")
r = requests.request(
method.name,
f"{self.endpoint}{path}",
verify=(not self.allow_insecure),
auth=auth,
)
logger.debug(f"Cluster API call request: {r.request}")
r.raise_for_status()
return r
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
from cerberus import Validator
from console_link.logic.utils import raise_for_aws_api_error
import requests
import logging

logger = logging.getLogger(__name__)


class UnsupportedMetricsSourceError(Exception):
Expand Down Expand Up @@ -35,7 +38,7 @@ class Component(Enum):
"type": "string",
"required": False,
},
"region": {
"aws_region": {
"type": "string",
"required": False
},
Expand All @@ -56,6 +59,7 @@ def get_metrics_source(config):
elif metric_source_type == MetricsSourceType.PROMETHEUS:
return PrometheusMetricsSource(config)
else:
logger.error(f"An unsupported metrics source type was provided: {config['type']}")
raise UnsupportedMetricsSourceError(config["type"])


Expand All @@ -72,7 +76,7 @@ def get_metric_data(
self,
component: Component,
metric: str,
statistc: MetricStatistic,
statistic: MetricStatistic,
startTime: datetime,
period_in_seconds: int = 60,
endTime: Optional[datetime] = None,
Expand Down Expand Up @@ -113,24 +117,27 @@ def __init__(self, list_metric_data: Dict[str, Any]):
class CloudwatchMetricsSource(MetricsSource):
def __init__(self, config: Dict) -> None:
super().__init__(config)
if "region" in config:
self.region = config["region"]
self.boto_config = botocore.config.Config(region_name=self.region)
print("overriding client with a region-specific one")
logger.info(f"Initializing CloudwatchMetricsSource from config {config}")
if "aws_region" in config:
self.aws_region = config["aws_region"]
self.boto_config = botocore.config.Config(region_name=self.aws_region)
else:
self.region = None
self.aws_region = None
self.boto_config = None
self.client = boto3.client("cloudwatch", config=self.boto_config)

def get_metrics(self, recent=True) -> Dict[str, List[str]]:
logger.info(f"{self.__name__} get_metrics called with {recent=}")
response = self.client.list_metrics( # TODO: implement pagination
Namespace=CLOUDWATCH_METRICS_NAMESPACE,
RecentlyActive="PT3H" if recent else None,
)
raise_for_aws_api_error(response)
logger.debug(f"ResponseMetadata from list_metrics: {response['ResponseMetadata']}")
assert "Metrics" in response
metrics = [CloudwatchMetricMetadata(m) for m in response["Metrics"]]
components = set([m.component for m in metrics])
logger.debug(f"Components found in returned metrics: {components}")
metrics_by_component = {}
for component in components:
metrics_by_component[component] = [
Expand All @@ -148,11 +155,16 @@ def get_metric_data(
endTime: Optional[datetime] = None,
dimensions: Optional[Dict[str, str]] = None,
) -> List[Tuple[str, float]]:
logger.info(f"{self.__name__} get_metric_data called with {component=}, {metric=}, {statistic=},"
f"{startTime=}, {period_in_seconds=}, {endTime=}, {dimensions=}")

aws_dimensions = [{"Name": "OTelLib", "Value": component.value}]
if dimensions:
aws_dimensions += [{"Name": k, "Value": v} for k, v in dimensions.items()]
logger.debug(f"AWS Dimensions set to: {aws_dimensions}")
if not endTime:
endTime = datetime.now()
logger.debug(f"No endTime provided, using current time: {endTime}")
response = self.client.get_metric_data(
MetricDataQueries=[
{
Expand All @@ -173,7 +185,9 @@ def get_metric_data(
ScanBy="TimestampAscending",
)
raise_for_aws_api_error(response)
logger.debug(f"ResponseMetadata from get_metric_data: {response['ResponseMetadata']}")
data_length = len(response["MetricDataResults"][0]["Timestamps"])
logger.debug(f"Number of datapoints returned: {data_length}")
return [
(
response["MetricDataResults"][0]["Timestamps"][i].isoformat(),
Expand All @@ -195,12 +209,15 @@ def prometheus_component_names(c: Component) -> str:
class PrometheusMetricsSource(MetricsSource):
def __init__(self, config: Dict) -> None:
super().__init__(config)
logger.info(f"Initializing PrometheusMetricsSource from config {config}")

v = Validator(PROMETHEUS_SCHEMA)
if not v.validate(config):
raise ValueError("Invalid config file for PrometheusMetricsSource", v.errors)
self.endpoint = config["endpoint"]

def get_metrics(self, recent=False) -> Dict[str, List[str]]:
logger.info(f"{self.__name__} get_metrics called with {recent=}")
metrics_by_component = {}
if recent:
raise NotImplementedError("Recent metrics are not implemented for Prometheus")
Expand All @@ -210,6 +227,8 @@ def get_metrics(self, recent=False) -> Dict[str, List[str]]:
f"{self.endpoint}/api/v1/query",
params={"query": f'{{exported_job="{exported_job}"}}'},
)
logger.debug(f"Request to Prometheus: {r.request}")
logger.debug(f"Response status code: {r.status_code}")
r.raise_for_status()
assert "data" in r.json() and "result" in r.json()["data"]
metrics_by_component[c.value] = list(
Expand All @@ -221,12 +240,14 @@ def get_metric_data(
self,
component: Component,
metric: str,
statistc: MetricStatistic,
statistic: MetricStatistic,
startTime: datetime,
period_in_seconds: int = 60,
endTime: Optional[datetime] = None,
dimensions: Optional[Dict] = None,
) -> List[Tuple[str, float]]:
logger.info(f"{self.__name__} get_metric_data called with {component=}, {metric=}, {statistic=},"
f"{startTime=}, {period_in_seconds=}, {endTime=}, {dimensions=}")
if not endTime:
endTime = datetime.now()
r = requests.get(
Expand All @@ -238,6 +259,8 @@ def get_metric_data(
"step": period_in_seconds,
},
)
logger.debug(f"Request to Prometheus: {r.request}")
logger.debug(f"Response status code: {r.status_code}")
r.raise_for_status()
assert "data" in r.json() and "result" in r.json()["data"]
if not r.json()["data"]["result"]:
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ target_cluster:
details:
username: "admin"
password: "myStrongPassword123!"
replayer:
deployment_type: "docker"
metrics_source:
type: "prometheus"
endpoint: "http://prometheus:9090"
endpoint: "http://prometheus:9090"
replayer:
deployment_type: "docker"

0 comments on commit 6d60c09

Please sign in to comment.