From d79b4de3de6e266b6bf08dc4ff9ea14f936252cd Mon Sep 17 00:00:00 2001 From: Oriol Date: Tue, 5 Nov 2024 16:27:38 +0100 Subject: [PATCH 1/4] max_depth implementation max_depth option for whois and ai_whois queries. --- asyncwhois/__init__.py | 4 ++++ asyncwhois/client.py | 3 ++- asyncwhois/query.py | 21 ++++++++++++++------- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/asyncwhois/__init__.py b/asyncwhois/__init__.py index 8607dff..ade9138 100644 --- a/asyncwhois/__init__.py +++ b/asyncwhois/__init__.py @@ -52,6 +52,7 @@ def whois( proxy_url: str = None, timeout: int = 10, tldextract_obj: TLDExtract = None, + max_depth: int = None, ) -> tuple[str, dict]: """ Performs a WHOIS query for the given `search_term`. If `search_term` is or can be cast to an @@ -89,6 +90,7 @@ def whois( proxy_url=proxy_url, timeout=timeout, tldextract_obj=tldextract_obj, + max_depth=max_depth, ).whois(search_term) else: return "", {} @@ -145,6 +147,7 @@ async def aio_whois( proxy_url: str = None, timeout: int = 10, tldextract_obj: TLDExtract = None, + max_depth: int = None, ) -> tuple[str, dict]: """ Performs a WHOIS query for the given `search_term`. If `search_term` is or can be cast to an @@ -182,6 +185,7 @@ async def aio_whois( proxy_url=proxy_url, timeout=timeout, tldextract_obj=tldextract_obj, + max_depth=max_depth, ).aio_whois(search_term) else: return "", {} diff --git a/asyncwhois/client.py b/asyncwhois/client.py index 5750df7..a0d191f 100644 --- a/asyncwhois/client.py +++ b/asyncwhois/client.py @@ -54,6 +54,7 @@ def __init__( whodap_client: whodap.DNSClient = None, timeout: int = 10, tldextract_obj: TLDExtract = None, + max_depth: int = None, ): super().__init__(whodap_client) self.authoritative_only = authoritative_only @@ -61,7 +62,7 @@ def __init__( self.proxy_url = proxy_url self.timeout = timeout self.tldextract_obj = tldextract_obj - self.query_obj = DomainQuery(proxy_url=proxy_url, timeout=timeout) + self.query_obj = DomainQuery(proxy_url=proxy_url, timeout=timeout, max_depth=max_depth) self.parse_obj = DomainParser(ignore_not_found=ignore_not_found) def _get_domain_components(self, domain: str) -> tuple[str, str, str]: diff --git a/asyncwhois/query.py b/asyncwhois/query.py index e204e63..2295735 100644 --- a/asyncwhois/query.py +++ b/asyncwhois/query.py @@ -19,10 +19,10 @@ class Query: refer_regex = r"refer: *(.+)" whois_server_regex = r".+ whois server: *(.+)" - def __init__(self, proxy_url: str = None, timeout: int = 10): + def __init__(self, proxy_url: str = None, timeout: int = 10, max_depth: int = None): self.proxy_url = proxy_url self.timeout = timeout - + self.max_depth = max_depth @staticmethod def _find_match(regex: str, blob: str) -> str: match = "" @@ -130,7 +130,7 @@ async def aio_run(self, search_term: str, server: str = None) -> list[str]: return await self._aio_do_query(server, data, server_regex, []) def _do_query( - self, server: str, data: str, regex: str, chain: list[str] + self, server: str, data: str, regex: str, chain: list[str], depth: int = 0 ) -> list[str]: """ Recursively submits WHOIS queries until it reaches the Authoritative Server. @@ -141,6 +141,9 @@ def _do_query( query_output = self._send_and_recv(conn, data) # save query chain chain.append(query_output) + # if max depth is reached, return the chain + if self.max_depth and depth >= self.max_depth: + return chain # parse response for the referred WHOIS server name whois_server = self._find_match(regex, query_output) whois_server = whois_server.lower() @@ -152,13 +155,13 @@ def _do_query( ): # recursive call to find more authoritative server chain = self._do_query( - whois_server, data, self.whois_server_regex, chain + whois_server, data, self.whois_server_regex, chain, depth + 1 ) # return the WHOIS query chain return chain async def _aio_do_query( - self, server: str, data: str, regex: str, chain: list[str] + self, server: str, data: str, regex: str, chain: list[str], depth: int = 0 ) -> list[str]: # connect to whois://:43 async with self._aio_create_connection( @@ -171,6 +174,9 @@ async def _aio_do_query( self._aio_send_and_recv(reader, writer, data), self.timeout ) chain.append(query_output) + # if max depth is reached, return the chain + if self.max_depth is not None and depth >= self.max_depth: + return chain # parse response for the referred WHOIS server name whois_server = self._find_match(regex, query_output) whois_server = whois_server.lower() @@ -183,7 +189,7 @@ async def _aio_do_query( ): # recursive call to find the authoritative server chain = await self._aio_do_query( - whois_server, data, self.whois_server_regex, chain + whois_server, data, self.whois_server_regex, chain, depth + 1 ) # return the WHOIS query chain return chain @@ -195,8 +201,9 @@ def __init__( server: str = None, proxy_url: str = None, timeout: int = 10, + max_depth: int = None, ): - super().__init__(proxy_url, timeout) + super().__init__(proxy_url, timeout, max_depth) self.server = server @staticmethod From a2f87f3fb9f934864ac11fee91c249f9ca79e464 Mon Sep 17 00:00:00 2001 From: Oriol Date: Tue, 5 Nov 2024 16:31:00 +0100 Subject: [PATCH 2/4] . --- asyncwhois/query.py | 1 + 1 file changed, 1 insertion(+) diff --git a/asyncwhois/query.py b/asyncwhois/query.py index 2295735..94f1825 100644 --- a/asyncwhois/query.py +++ b/asyncwhois/query.py @@ -23,6 +23,7 @@ def __init__(self, proxy_url: str = None, timeout: int = 10, max_depth: int = No self.proxy_url = proxy_url self.timeout = timeout self.max_depth = max_depth + @staticmethod def _find_match(regex: str, blob: str) -> str: match = "" From 5f5115bda77273d01f1325d72e2ef3129e5a56cb Mon Sep 17 00:00:00 2001 From: Joe Obarzanek Date: Tue, 5 Nov 2024 16:39:58 -0500 Subject: [PATCH 3/4] Add switch for auth responses --- asyncwhois/__init__.py | 24 ++++++++----- asyncwhois/client.py | 8 +++-- asyncwhois/query.py | 80 ++++++++++++++++++++++-------------------- 3 files changed, 62 insertions(+), 50 deletions(-) diff --git a/asyncwhois/__init__.py b/asyncwhois/__init__.py index ade9138..8be353c 100644 --- a/asyncwhois/__init__.py +++ b/asyncwhois/__init__.py @@ -42,17 +42,17 @@ "GeneralError", "QueryError", ] -__version__ = "1.1.7" +__version__ = "1.1.8" def whois( search_term: Union[str, ipaddress.IPv4Address, ipaddress.IPv6Address], - authoritative_only: bool = False, + authoritative_only: bool = False, # todo: deprecate and remove this argument + find_authoritative_server: bool = True, ignore_not_found: bool = False, proxy_url: str = None, timeout: int = 10, tldextract_obj: TLDExtract = None, - max_depth: int = None, ) -> tuple[str, dict]: """ Performs a WHOIS query for the given `search_term`. If `search_term` is or can be cast to an @@ -60,8 +60,11 @@ def whois( otherwise a DNS search is performed. :param search_term: Any domain, URL, IPv4, or IPv6 - :param authoritative_only: If False (default), asyncwhois returns the entire WHOIS query chain, + :param authoritative_only: DEPRECATED - If False (default), asyncwhois returns the entire WHOIS query chain, otherwise if True, only the authoritative response is returned. + :param find_authoritative_server: This parameter only applies to domain queries. If True (default), asyncwhois + will attempt to find the authoritative response, otherwise if False, asyncwhois will only query the whois server + associated with the given TLD as specified in the IANA root db (`asyncwhois/servers/domains.py`). :param ignore_not_found: If False (default), the `NotFoundError` exception is raised if the query output contains "no such domain" language. If True, asyncwhois will not raise `NotFoundError` exceptions. :param proxy_url: Optional SOCKS4 or SOCKS5 proxy url (e.g. 'socks5://host:port') @@ -86,11 +89,11 @@ def whois( except (ipaddress.AddressValueError, ValueError): return DomainClient( authoritative_only=authoritative_only, + find_authoritative_server=find_authoritative_server, ignore_not_found=ignore_not_found, proxy_url=proxy_url, timeout=timeout, tldextract_obj=tldextract_obj, - max_depth=max_depth, ).whois(search_term) else: return "", {} @@ -142,12 +145,12 @@ def rdap( async def aio_whois( search_term: str, - authoritative_only: bool = False, + authoritative_only: bool = False, # todo: deprecate and remove this argument + find_authoritative_server: bool = True, ignore_not_found: bool = False, proxy_url: str = None, timeout: int = 10, tldextract_obj: TLDExtract = None, - max_depth: int = None, ) -> tuple[str, dict]: """ Performs a WHOIS query for the given `search_term`. If `search_term` is or can be cast to an @@ -155,8 +158,11 @@ async def aio_whois( otherwise a DNS search is performed. :param search_term: Any domain, URL, IPv4, or IPv6 - :param authoritative_only: If False (default), asyncwhois returns the entire WHOIS query chain, + :param authoritative_only: DEPRECATED - If False (default), asyncwhois returns the entire WHOIS query chain, otherwise if True, only the authoritative response is returned. + :param find_authoritative_server: This parameter only applies to domain queries. If True (default), asyncwhois + will attempt to find the authoritative response, otherwise if False, asyncwhois will only query the whois server + associated with the given TLD as specified in the IANA root db (`asyncwhois/servers/domains.py`). :param ignore_not_found: If False (default), the `NotFoundError` exception is raised if the query output contains "no such domain" language. If True, asyncwhois will not raise `NotFoundError` exceptions. :param proxy_url: Optional SOCKS4 or SOCKS5 proxy url (e.g. 'socks5://host:port') @@ -185,7 +191,7 @@ async def aio_whois( proxy_url=proxy_url, timeout=timeout, tldextract_obj=tldextract_obj, - max_depth=max_depth, + find_authoritative_server=find_authoritative_server, ).aio_whois(search_term) else: return "", {} diff --git a/asyncwhois/client.py b/asyncwhois/client.py index a0d191f..1b8998d 100644 --- a/asyncwhois/client.py +++ b/asyncwhois/client.py @@ -49,12 +49,12 @@ class DomainClient(Client): def __init__( self, authoritative_only: bool = False, + find_authoritative_server: bool = True, ignore_not_found: bool = False, proxy_url: str = None, whodap_client: whodap.DNSClient = None, timeout: int = 10, tldextract_obj: TLDExtract = None, - max_depth: int = None, ): super().__init__(whodap_client) self.authoritative_only = authoritative_only @@ -62,7 +62,11 @@ def __init__( self.proxy_url = proxy_url self.timeout = timeout self.tldextract_obj = tldextract_obj - self.query_obj = DomainQuery(proxy_url=proxy_url, timeout=timeout, max_depth=max_depth) + self.query_obj = DomainQuery( + proxy_url=proxy_url, + timeout=timeout, + find_authoritative_server=find_authoritative_server, + ) self.parse_obj = DomainParser(ignore_not_found=ignore_not_found) def _get_domain_components(self, domain: str) -> tuple[str, str, str]: diff --git a/asyncwhois/query.py b/asyncwhois/query.py index 94f1825..2bdfb01 100644 --- a/asyncwhois/query.py +++ b/asyncwhois/query.py @@ -19,11 +19,16 @@ class Query: refer_regex = r"refer: *(.+)" whois_server_regex = r".+ whois server: *(.+)" - def __init__(self, proxy_url: str = None, timeout: int = 10, max_depth: int = None): + def __init__( + self, + proxy_url: str = None, + timeout: int = 10, + find_authoritative_server: bool = True, + ): self.proxy_url = proxy_url self.timeout = timeout - self.max_depth = max_depth - + self.find_authoritative_server = find_authoritative_server + @staticmethod def _find_match(regex: str, blob: str) -> str: match = "" @@ -118,6 +123,16 @@ def run(self, search_term: str, server: str = None) -> list[str]: server_regex = self.whois_server_regex return self._do_query(server, data, server_regex, []) + @staticmethod + def _continue_querying(current_server: str, next_server: str) -> bool: + next_server = next_server.lower() + return ( + next_server + and next_server != current_server + and not next_server.startswith("http") + and not next_server.startswith("www.") + ) + async def aio_run(self, search_term: str, server: str = None) -> list[str]: data = search_term + "\r\n" if not server: @@ -131,7 +146,7 @@ async def aio_run(self, search_term: str, server: str = None) -> list[str]: return await self._aio_do_query(server, data, server_regex, []) def _do_query( - self, server: str, data: str, regex: str, chain: list[str], depth: int = 0 + self, server: str, data: str, regex: str, chain: list[str] ) -> list[str]: """ Recursively submits WHOIS queries until it reaches the Authoritative Server. @@ -142,22 +157,16 @@ def _do_query( query_output = self._send_and_recv(conn, data) # save query chain chain.append(query_output) - # if max depth is reached, return the chain - if self.max_depth and depth >= self.max_depth: - return chain - # parse response for the referred WHOIS server name - whois_server = self._find_match(regex, query_output) - whois_server = whois_server.lower() - if ( - whois_server - and whois_server != server - and not whois_server.startswith("http") - and not whois_server.startswith("www.") - ): - # recursive call to find more authoritative server - chain = self._do_query( - whois_server, data, self.whois_server_regex, chain, depth + 1 - ) + # if we should find the authoritative response, + # then parse the response for the next server + if self.find_authoritative_server: + # parse response for the referred WHOIS server name + whois_server = self._find_match(regex, query_output) + if self._continue_querying(server, whois_server): + # recursive call to find more authoritative server + chain = self._do_query( + whois_server, data, self.whois_server_regex, chain + ) # return the WHOIS query chain return chain @@ -175,23 +184,16 @@ async def _aio_do_query( self._aio_send_and_recv(reader, writer, data), self.timeout ) chain.append(query_output) - # if max depth is reached, return the chain - if self.max_depth is not None and depth >= self.max_depth: - return chain - # parse response for the referred WHOIS server name - whois_server = self._find_match(regex, query_output) - whois_server = whois_server.lower() - # check for another legitimate server name - if ( - whois_server - and whois_server != server - and not whois_server.startswith("http") - and not whois_server.startswith("www.") - ): - # recursive call to find the authoritative server - chain = await self._aio_do_query( - whois_server, data, self.whois_server_regex, chain, depth + 1 - ) + # if we should find the authoritative response, + # then parse the response for the next server + if self.find_authoritative_server: + # parse response for the referred WHOIS server name + whois_server = self._find_match(regex, query_output) + if self._continue_querying(server, whois_server): + # recursive call to find more authoritative server + chain = self._do_query( + whois_server, data, self.whois_server_regex, chain + ) # return the WHOIS query chain return chain @@ -202,9 +204,9 @@ def __init__( server: str = None, proxy_url: str = None, timeout: int = 10, - max_depth: int = None, + find_authoritative_server: bool = True, ): - super().__init__(proxy_url, timeout, max_depth) + super().__init__(proxy_url, timeout, find_authoritative_server) self.server = server @staticmethod From 91a5fc5a3b713fb531c9f1cb103f4c228a589c16 Mon Sep 17 00:00:00 2001 From: Joe Obarzanek Date: Wed, 6 Nov 2024 08:06:34 -0500 Subject: [PATCH 4/4] Remove unused param --- asyncwhois/query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncwhois/query.py b/asyncwhois/query.py index 2bdfb01..7e2c2da 100644 --- a/asyncwhois/query.py +++ b/asyncwhois/query.py @@ -171,7 +171,7 @@ def _do_query( return chain async def _aio_do_query( - self, server: str, data: str, regex: str, chain: list[str], depth: int = 0 + self, server: str, data: str, regex: str, chain: list[str] ) -> list[str]: # connect to whois://:43 async with self._aio_create_connection(