Skip to content

Commit

Permalink
Implement vultr API v2 (#770)
Browse files Browse the repository at this point in the history
* Implement vultr API v2

* Update test recordings for vultr API v2
  • Loading branch information
bartelsielski authored Mar 28, 2021
1 parent c445f37 commit ed75f0a
Show file tree
Hide file tree
Showing 28 changed files with 5,021 additions and 1,187 deletions.
166 changes: 107 additions & 59 deletions lexicon/providers/vultr.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Module provider for Vultr"""
from __future__ import absolute_import

import json
import logging

import requests
Expand All @@ -23,85 +24,117 @@ class Provider(BaseProvider):
def __init__(self, config):
super(Provider, self).__init__(config)
self.domain_id = None
self.api_endpoint = "https://api.vultr.com/v1"
self.api_endpoint = "https://api.vultr.com/v2"

def _authenticate(self):
payload = self._get("/dns/list")
payload = self._get("/domains")

if not [item for item in payload if item["domain"] == self.domain]:
raise Exception("No domain found")
for domain in payload["domains"]:
if domain["domain"] == self.domain:
self.domain_id = self.domain
return

self.domain_id = self.domain
while payload["meta"]["links"]["next"] != "":
query_params = {"cursor": payload["meta"]["links"]["next"]}
payload = self._get("/domains", query_params=query_params)

# Create record. If record already exists with the same content, do nothing'
for domain in payload["domains"]:
if domain["domain"] == self.domain:
self.domain_id = self.domain
return

raise Exception("Domain not found")

# Create record. If record already exists with the same content, do nothing
def _create_record(self, rtype, name, content):
records = self._list_records(rtype, name, content)
if len(records) != 0:
LOGGER.debug("create_record (already exists): %s", records[0]["id"])
return True

record = {
"type": rtype,
"domain": self.domain_id,
"name": self._relative_name(name),
"data": self._add_quotes(rtype, content),
"priority": 0,
}
if rtype == "TXT":
record["data"] = f'"{content}"'
else:
record["data"] = content
if self._get_lexicon_option("ttl"):
record["ttl"] = self._get_lexicon_option("ttl")
self._post("/dns/create_record", record)

LOGGER.debug("create_record: %s", True)
result = self._post(f"/domains/{self.domain_id}/records", record)
LOGGER.debug("create_record: %s", result["record"]["id"])
return True

# List all records. Return an empty list if no records found
# type, name and content are used to filter records.
# If possible filter during the query, otherwise filter after response is received.
def _list_records(self, rtype=None, name=None, content=None):
payload = self._get("/dns/records", {"domain": self.domain_id})
url = f"/domains/{self.domain_id}/records"

payload = self._get(url)
unprocessed_records = payload["records"]

while payload["meta"]["links"]["next"] != "":
query_params = {"cursor": payload["meta"]["links"]["next"]}
payload = self._get(url, query_params=query_params)
unprocessed_records.extend(payload["records"])

records = []
for record in payload:
processed_record = {
"type": record["type"],
"name": f"{record['name']}.{self.domain_id}",
"ttl": record.get("ttl", self._get_lexicon_option("ttl")),
"content": record["data"],
"id": record["RECORDID"],
}
processed_record = self._clean_TXT_record(processed_record)
records.append(processed_record)
for record in unprocessed_records:
records.append(self._process_record(record))

if rtype:
records = [record for record in records if record["type"] == rtype]
records = [rec for rec in records if rec["type"] == rtype]
if name:
records = [
record for record in records if record["name"] == self._full_name(name)
]
records = [rec for rec in records if rec["name"] == self._full_name(name)]
if content:
records = [record for record in records if record["content"] == content]
records = [rec for rec in records if rec["content"] == content]

LOGGER.debug("list_records: %s", records)
return records

# Create or update a record.
# Update a record. Identifier must be specified.
def _update_record(self, identifier, rtype=None, name=None, content=None):
record = None
if not identifier:
records = self._list_records(rtype, name)

if not records:
raise Exception(
f"No record(s) found for arguments: identifer={identifier}, rtype={rtype}, name={name}"
)
if len(records) > 1:
LOGGER.warning(
"Multiple records have been found for given parameters. "
"Only first one will be updated (id: %s)",
records[0]["id"],
)

record = records[0]
identifier = record["id"]

url = f"/domains/{self.domain_id}/records/{identifier}"
if not record:
record = self._get(url)["record"]
record = self._process_record(record)

new_record = {}

data = {
"domain": self.domain_id,
"RECORDID": identifier,
"ttl": self._get_lexicon_option("ttl"),
}
# if rtype:
# data['type'] = rtype
if name:
data["name"] = self._relative_name(name)
name = self._relative_name(name)
if name != record["name"]:
new_record["name"] = name

if content:
if rtype == "TXT":
data["data"] = f'"{content}"'
else:
data["data"] = content
content = self._add_quotes(record["type"], content)
if content != record["content"]:
new_record["data"] = content

self._post("/dns/update_record", data)
if new_record == {}:
LOGGER.debug("update_record (nothing to do): %s", True)
return True

self._patch(url, new_record)
LOGGER.debug("update_record: %s", True)
return True

Expand All @@ -116,40 +149,55 @@ def _delete_record(self, identifier=None, rtype=None, name=None, content=None):
delete_record_id.append(identifier)

LOGGER.debug("delete_records: %s", delete_record_id)

for record_id in delete_record_id:
data = {"domain": self.domain_id, "RECORDID": record_id}
self._post("/dns/delete_record", data)
try:
self._delete(f"/domains/{self.domain_id}/records/{record_id}")
except requests.HTTPError as e:
if e.response.status_code != 404:
raise

# is always True at this point, if a non 200 response is returned an error is raised.
LOGGER.debug("delete_record: %s", True)
return True

# Helpers

def _request(self, action="GET", url="/", data=None, query_params=None):
if data is None:
data = {}
if query_params is None:
query_params = {}

default_headers = {
headers = {
"Accept": "application/json",
# 'Content-Type': 'application/json',
"API-Key": self._get_provider_option("auth_token"),
"Authorization": "Bearer " + self._get_provider_option("auth_token"),
}

if data is not None:
headers["Content-Type"] = "application/json"
data = json.dumps(data)

response = requests.request(
action,
self.api_endpoint + url,
params=query_params,
data=data,
headers=default_headers,
headers=headers,
)
# if the request fails for any reason, throw an error.
response.raise_for_status()

if action in ("DELETE", "PUT", "POST"):
# vultr handles succss/failure via HTTP Codes, Only GET returns a response.
return response.text
if response.status_code == 204:
return None

return response.json()

@staticmethod
def _add_quotes(rtype, content):
if rtype == "TXT":
return f'"{content}"'
return content

def _process_record(self, record):
processed_record = {
"type": record["type"],
"name": self._full_name(record["name"]),
"ttl": record["ttl"],
"content": record["data"],
"id": record["id"],
}
return self._clean_TXT_record(processed_record)
18 changes: 4 additions & 14 deletions lexicon/tests/providers/test_vultr.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,17 @@
"""Integration tests for Vultr"""
from unittest import TestCase

import pytest

from lexicon.tests.providers.integration_tests import IntegrationTestsV1
from lexicon.tests.providers.integration_tests import IntegrationTestsV2


# Hook into testing framework by inheriting unittest.TestCase and reuse
# the tests which *each and every* implementation of the interface must
# pass, by inheritance from define_tests.TheTests
# TODO: migrate to IntegrationTestsV2 and its extended test suite
class VultrProviderTests(TestCase, IntegrationTestsV1):
class VultrProviderTests(TestCase, IntegrationTestsV2):
"""TestCase for Vultr"""

provider_name = "vultr"
domain = "capsulecd.com"
domain = "lexicon-test.eu"

def _filter_headers(self):
return ["API-Key"]

# TODO: enable the skipped tests
@pytest.mark.skip(reason="new test, missing recording")
def test_provider_when_calling_update_record_should_modify_record_name_specified(
self,
):
return
return ["Authorization"]
Original file line number Diff line number Diff line change
@@ -1,29 +1,46 @@
interactions:
- request:
body: '{}'
body: null
headers:
Accept: [application/json]
Accept-Encoding: ['gzip, deflate']
Connection: [keep-alive]
Content-Length: ['2']
Content-Type: [application/json]
User-Agent: [python-requests/2.9.1]
Accept:
- application/json
Accept-Encoding:
- gzip, deflate
Connection:
- keep-alive
User-Agent:
- python-requests/2.25.1
method: GET
uri: https://api.vultr.com/v1/dns/list
uri: https://api.vultr.com/v2/domains
response:
body: {string: !!python/unicode '[{"domain":"capsulecd.com","date_created":"2016-04-13
19:03:44"}]'}
body:
string: '{"domains":[{"domain":"lexicon-test.eu","date_created":"2021-03-27T09:40:29+00:00"},{"domain":"sielski.be","date_created":"2020-09-13T18:19:59+00:00"}],"meta":{"total":2,"links":{"next":"","prev":""}}}'
headers:
cache-control: [no-cache]
connection: [keep-alive]
content-length: ['65']
content-type: [application/json]
date: ['Wed, 13 Apr 2016 23:22:33 GMT']
expires: ['Wed, 13 Apr 2016 23:22:32 GMT']
server: [nginx]
strict-transport-security: [max-age=31536000]
transfer-encoding: [chunked]
x-content-type-options: [nosniff]
x-frame-options: [DENY]
status: {code: 200, message: OK}
Cache-Control:
- no-cache
Connection:
- keep-alive
Content-Type:
- application/json
Date:
- Sat, 27 Mar 2021 09:41:21 GMT
Expires:
- Sat, 27 Mar 2021 09:41:20 GMT
Server:
- nginx
Strict-Transport-Security:
- max-age=31536000
Transfer-Encoding:
- chunked
X-Content-Type-Options:
- nosniff
X-Frame-Options:
- DENY
X-Robots-Tag:
- noindex,noarchive
content-length:
- '201'
status:
code: 200
message: OK
version: 1
Loading

0 comments on commit ed75f0a

Please sign in to comment.