Skip to content

Commit

Permalink
Added more debugging for bug-fixing (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnyminerZ authored Mar 24, 2021
1 parent 766ee5a commit 409dc16
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 6 deletions.
1 change: 1 addition & 0 deletions example/get_tokens.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
password=GOOGLE_PASSWORD,
master_token=GOOGLE_MASTER_TOKEN,
android_id=DEVICE_ID,
verbose=True,
)

# Get master token
Expand Down
82 changes: 77 additions & 5 deletions glocaltokens/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@
from .scanner import GoogleDevice, discover_devices
from .utils import network as net_utils
from .utils import token as token_utils
from .utils.logs import censor

DEBUG = False

logging_level = logging.DEBUG if DEBUG else logging.ERROR
logging.basicConfig(level=logging_level)
logging.basicConfig(level=logging.ERROR)
LOGGER = logging.getLogger(__name__)


Expand All @@ -47,21 +45,33 @@ def __init__(
"""
Initializes a Device. Can set or google_device or ip and port
"""
LOGGER.debug("Initializing new Device instance.")
self.device_name = device_name
self.local_auth_token = None
self.google_device = google_device
self.hardware = hardware
LOGGER.debug(
'Set self device_name to "{}", local_auth_token to None, google_device to {}, and hardware to {}'.format(
device_name, google_device, hardware
)
)

if google_device:
LOGGER.debug(f"google_device is not None")
LOGGER.debug(f"Setting self ip to {google_device.ip}")
self.ip = google_device.ip
LOGGER.debug(f"Setting self port to {google_device.port}")
self.port = google_device.port
else:
LOGGER.debug(f"google_device is None")
if (ip and not port) or (not ip and port):
LOGGER.error(
"Both ip and port must be set, if one of them is specified."
)
return
LOGGER.debug(f"Setting self ip to {ip}")
self.ip = ip
LOGGER.debug(f"Setting self port to {port}")
self.port = port

if not local_auth_token:
Expand All @@ -88,6 +98,7 @@ def __init__(
LOGGER.error("port must be a valid port")
return

LOGGER.debug(f"Setting self local_auth_token to {censor(local_auth_token)}")
self.local_auth_token = local_auth_token

def __str__(self) -> str:
Expand All @@ -109,6 +120,7 @@ def __init__(
password: Optional[str] = None,
master_token: Optional[str] = None,
android_id: Optional[str] = None,
verbose: bool = False,
):
"""
Initialize an GLocalAuthenticationTokens instance with google account
Expand All @@ -123,10 +135,22 @@ def __init__(
if not set;
"""
if verbose:
LOGGER.setLevel(logging.DEBUG)

LOGGER.debug("Initializing new GLocalAuthenticationTokens instance.")
self.username: Optional[str] = username
self.password: Optional[str] = password
self.master_token: Optional[str] = master_token
self.android_id: Optional[str] = android_id
LOGGER.debug(
'Set self username to "{}", password to "{}", master_token to "{}" and android_id to {}'.format(
censor(username),
censor(password),
censor(master_token),
censor(android_id),
)
)
if (not self.username or not self.password) and not self.master_token:
LOGGER.error(
"You must either provide google username/password "
Expand All @@ -140,17 +164,23 @@ def __init__(
self.homegraph = None
self.access_token_date: Optional[datetime] = None
self.homegraph_date: Optional[datetime] = None
LOGGER.debug(
"Set self access_token, homegraph, access_token_date and homegraph_date to None"
)

@staticmethod
def _generate_mac_string():
"""Generate random 14 char long string"""
LOGGER.debug(f"Generating mac...")
random_uuid = uuid4()
random_string = str(random_uuid).replace("-", "")[:ANDROID_ID_LENGTH]
mac_string = random_string.upper()
LOGGER.debug(f"Generated mac: {mac_string}")
return mac_string

def get_android_id(self) -> Optional[str]:
if not self.android_id:
LOGGER.debug("There is not any stored android_id, generating a new one")
self.android_id = self._generate_mac_string()
return self.android_id

Expand All @@ -164,11 +194,13 @@ def get_master_token(self) -> Optional[str]:
Get google master token from username and password
"""
if not self.master_token:
LOGGER.debug("There is not any stored master_token, logging in...")
res = perform_master_login(
self.username, self.password, self.get_android_id()
)
if "Token" not in res:
LOGGER.error("[!] Could not get master token.")
LOGGER.debug(f"Request response: {res}")
return
self.master_token = res["Token"]
LOGGER.debug("Master token: {}".format(self.master_token))
Expand All @@ -178,6 +210,9 @@ def get_access_token(self) -> Optional[str]:
if self.access_token is None or self._has_expired(
self.access_token_date, ACCESS_TOKEN_DURATION
):
LOGGER.debug(
"There is not any stored access_token, or the stored one has expired, getting a new one..."
)
res = perform_oauth(
self.username,
self.get_master_token(),
Expand All @@ -188,10 +223,13 @@ def get_access_token(self) -> Optional[str]:
)
if "Auth" not in res:
LOGGER.error("[!] Could not get access token.")
LOGGER.debug(f"Request response: {res}")
return
self.access_token = res["Auth"]
self.access_token_date = datetime.now()
LOGGER.debug("Access token: {}".format(self.access_token))
LOGGER.debug(
f"Access token: {self.access_token}, datetime {self.access_token_date}"
)
return self.access_token

def get_homegraph(self):
Expand All @@ -201,18 +239,32 @@ def get_homegraph(self):
if self.homegraph is None or self._has_expired(
self.homegraph_date, HOMEGRAPH_DURATION
):
LOGGER.debug(
"There is not any stored homegraph, or the stored one has expired, getting a new one..."
)
try:
LOGGER.debug("Creating SSL channel credentials...")
scc = grpc.ssl_channel_credentials(root_certificates=None)
LOGGER.debug("Creating access token call credentials...")
tok = grpc.access_token_call_credentials(self.get_access_token())
LOGGER.debug("Compositing channel credentials...")
ccc = grpc.composite_channel_credentials(scc, tok)

LOGGER.debug(
"Establishing secure channel with the Google Home Foyer API..."
)
with grpc.secure_channel(GOOGLE_HOME_FOYER_API, ccc) as channel:
LOGGER.debug("Getting channels StructuresServiceStub...")
rpc_service = v1_pb2_grpc.StructuresServiceStub(channel)
LOGGER.debug("Getting HomeGraph request...")
request = v1_pb2.GetHomeGraphRequest(string1="", num2="")
LOGGER.debug("Fetching HomeGraph...")
response = rpc_service.GetHomeGraph(request)
LOGGER.debug("Storing gotten homegraph...")
self.homegraph = response
self.homegraph_date = datetime.now()
except grpc.RpcError as rpc_error:
LOGGER.debug("Got an RpcError.")
if rpc_error.code().name == "UNAUTHENTICATED":
LOGGER.warning("The access token has expired. Getting a new one.")
self.invalidate_access_token()
Expand Down Expand Up @@ -240,12 +292,16 @@ def get_google_devices(
"""

# Set models_list to empty list if None
LOGGER.debug("Initializing models list if empty...")
models_list = models_list if models_list else []

if force_homegraph_reload:
LOGGER.debug("Forcing homegraph reload.")
self.invalidate_homegraph()

LOGGER.debug("Getting homegraph...")
homegraph = self.get_homegraph()
LOGGER.debug("Getting network devices...")
network_devices = (
discover_devices(models_list, zeroconf_instance=zeroconf_instance)
if not disable_discovery
Expand All @@ -259,13 +315,22 @@ def find_device(name) -> Optional[GoogleDevice]:
return None

devices: [Device] = []
LOGGER.debug(
f"Iterating in homegraph devices (len={len(homegraph.home.devices)})"
)
for item in homegraph.home.devices:
if item.local_auth_token != "":
# This checks if the current item is a valid model, only if there are models in models_list.
# If models_list is empty, the check should be omitted, and accept all items.
if models_list and item.hardware.model not in models_list:
LOGGER.debug("{} not in models_list".format(item.hardware.model))
continue

LOGGER.debug(
"Looking for a device in local network? {}".format(
network_devices is not None
)
)
google_device = (
find_device(item.device_name) if network_devices else None
)
Expand All @@ -276,9 +341,14 @@ def find_device(name) -> Optional[GoogleDevice]:
hardware=item.hardware.model,
)
if device.local_auth_token:
LOGGER.debug("Adding {device.device_name} to devices list")
devices.append(device)
else:
LOGGER.warning("Device initialization failed, skipping.")
else:
LOGGER.debug(
f"local_auth_token is not initialized for {item.device_name}"
)

LOGGER.debug("Google Home devices: {}".format(devices))

Expand Down Expand Up @@ -316,8 +386,10 @@ def invalidate_access_token(self):
"""Invalidates the current access token"""
self.access_token = None
self.access_token_date = None
LOGGER.debug("Invalidated access_token")

def invalidate_homegraph(self):
"""Invalidates the stored homegraph data"""
self.homegraph = None
self.homegraph_date = None
LOGGER.debug("Invalidated homegraph")
28 changes: 28 additions & 0 deletions glocaltokens/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def get_value(key):

class GoogleDevice:
def __init__(self, name: str, ip: str, port: int, model: str):
_LOGGER.debug("Initializing GoogleDevice...")
if not net_utils.is_valid_ipv4_address(
ip
) and not net_utils.is_valid_ipv6_address(ip):
Expand All @@ -92,11 +93,18 @@ def __init__(self, name: str, ip: str, port: int, model: str):
self.ip = ip
self.port = port
self.model = model
_LOGGER.debug(
f"Set self name to {name}, ip to {ip}, port to {port} and model to {model}"
)

if not 0 <= self.port <= 65535:
_LOGGER.error("Port is out of the valid range: [0,65535]")
return

def __str__(self) -> str:
"""Serializes the class into a str"""
return f"{{name:{self.name},ip:{self.ip},port:{self.port},model:{self.model}}}"


def discover_devices(
models_list: Optional[List[str]] = None,
Expand All @@ -105,30 +113,50 @@ def discover_devices(
zeroconf_instance=None,
):
# pylint: disable=unused-argument
_LOGGER.debug("Discovering devices...")
_LOGGER.debug("Importing zeroconf...")
import zeroconf

def callback():
"""Called when zeroconf has discovered a new chromecast."""
if max_devices is not None and listener.count >= max_devices:
discover_complete.set()

_LOGGER.debug("Creating new Event for discovery completion...")
discover_complete = Event()
_LOGGER.debug("Creating new CastListener...")
listener = CastListener(callback)
if not zeroconf_instance:
_LOGGER.debug("Creating new Zeroconf instance")
zconf = zeroconf.Zeroconf()
else:
_LOGGER.debug("Using attribute Zeroconf instance")
zconf = zeroconf_instance
_LOGGER.debug("Creating zeroconf service browser for _googlecast._tcp.local.")
zeroconf.ServiceBrowser(zconf, "_googlecast._tcp.local.", listener)

# Wait for the timeout or the maximum number of devices
_LOGGER.debug("Waiting for discovery completion...")
discover_complete.wait(timeout)

devices = []
_LOGGER.debug("Got {} devices. Iterating...".format(len(listener.devices)))
for service in listener.devices:
model = service[0]
name = service[1]
ip = service[2]
access_port = service[3]
if not models_list or model in models_list:
_LOGGER.debug(
"Appending new device. name: %s, ip: %s, port: %s, model: %s",
name,
ip,
access_port,
model,
)
devices.append(GoogleDevice(name, ip, int(access_port), model))
else:
_LOGGER.debug(
'Won\'t add device since model "{}" is not in models_list'.format(model)
)
return devices
12 changes: 12 additions & 0 deletions glocaltokens/utils/logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Optional


def censor(text: Optional[str]) -> str:
"""
Replaces characters in a str by the asteriks
text: The text to censure.
"""
char = "*"
text = text if text else ""
return text[0] + (len(text) - 1) * char if text else text
14 changes: 13 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
from datetime import datetime, timedelta
from unittest import TestCase

Expand All @@ -21,7 +22,6 @@
JSON_KEY_LOCAL_AUTH_TOKEN,
JSON_KEY_PORT,
)
from glocaltokens.utils.types import Struct
from tests.assertions import DeviceAssertions, TypeAssertions
from tests.factory.providers import HomegraphProvider, TokenProvider

Expand Down Expand Up @@ -81,6 +81,18 @@ def test_initialization__valid(self, m_log):
GLocalAuthenticationTokens(master_token=faker.master_token())
self.assertEqual(m_log.call_count, 0)

@patch("glocaltokens.client.LOGGER.setLevel")
def test_initialization__valid_verbose_logger(self, m_set_level):
# Non verbose
GLocalAuthenticationTokens(username=faker.word(), password=faker.word())
self.assertEqual(m_set_level.call_count, 0)

# Verbose
GLocalAuthenticationTokens(
username=faker.word(), password=faker.word(), verbose=True
)
m_set_level.assert_called_once_with(logging.DEBUG)

@patch("glocaltokens.client.LOGGER.error")
def test_initialization__invalid(self, m_log):
# Without username
Expand Down
4 changes: 4 additions & 0 deletions tests/test_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def test_initialization(self):
self.assertEqual(port, device.port)
self.assertEqual(model, device.model)

self.assertEqual(
f"{{name:{name},ip:{ip},port:{port},model:{model}}}", str(device)
)

@patch("glocaltokens.scanner._LOGGER.error")
def test_initialization__valid(self, mock):
GoogleDevice(
Expand Down
Loading

0 comments on commit 409dc16

Please sign in to comment.