Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added more debugging for bug-fixing #48

Merged
merged 18 commits into from
Mar 24, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion glocaltokens/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .scanner import GoogleDevice, discover_devices
from .utils import network as net_utils
from .utils import token as token_utils
from .utils.logs import censor

DEBUG = False

Expand All @@ -47,21 +48,33 @@ def __init__(
"""
Initializes a Device. Can set or google_device or ip and port
"""
LOGGER.debug("Initializing new Device instance.")
self.device_name = device_name
self.local_auth_token = None
self.google_device = google_device
self.hardware = hardware
LOGGER.debug(
'Set self device_name to "{}", local_auth_token to None, google_device to {}, and hardware to {}'.format(
device_name, google_device, hardware
)
)

if google_device:
LOGGER.debug(f"google_device is not None")
LOGGER.debug(f"Setting self ip to {google_device.ip}")
self.ip = google_device.ip
LOGGER.debug(f"Setting self port to {google_device.port}")
self.port = google_device.port
else:
LOGGER.debug(f"google_device is None")
if (ip and not port) or (not ip and port):
LOGGER.error(
"Both ip and port must be set, if one of them is specified."
)
return
LOGGER.debug(f"Setting self ip to {ip}")
self.ip = ip
LOGGER.debug(f"Setting self port to {port}")
self.port = port

if not local_auth_token:
Expand All @@ -88,6 +101,7 @@ def __init__(
LOGGER.error("port must be a valid port")
return

LOGGER.debug(f"Setting self local_auth_token to {censor(local_auth_token)}")
self.local_auth_token = local_auth_token

def __str__(self) -> str:
Expand Down Expand Up @@ -123,10 +137,19 @@ def __init__(
if not set;

"""
LOGGER.debug("Initializing new GLocalAuthenticationTokens instance.")
self.username: Optional[str] = username
self.password: Optional[str] = password
self.master_token: Optional[str] = master_token
self.android_id: Optional[str] = android_id
LOGGER.debug(
'Set self username to "{}", password to "{}", master_token to "{}" and android_id to {}'.format(
censor(username),
censor(password),
censor(master_token),
censor(android_id),
)
)
if (not self.username or not self.password) and not self.master_token:
LOGGER.error(
"You must either provide google username/password "
Expand All @@ -140,17 +163,23 @@ def __init__(
self.homegraph = None
self.access_token_date: Optional[datetime] = None
self.homegraph_date: Optional[datetime] = None
LOGGER.debug(
"Set self access_token, homegraph, access_token_date and homegraph_date to None"
)

@staticmethod
def _generate_mac_string():
"""Generate random 14 char long string"""
LOGGER.debug(f"Generating mac...")
random_uuid = uuid4()
random_string = str(random_uuid).replace("-", "")[:ANDROID_ID_LENGTH]
mac_string = random_string.upper()
LOGGER.debug(f"Generated mac: {mac_string}")
return mac_string

def get_android_id(self) -> Optional[str]:
if not self.android_id:
LOGGER.debug("There is not any stored android_id, generating a new one")
self.android_id = self._generate_mac_string()
return self.android_id

Expand All @@ -164,11 +193,13 @@ def get_master_token(self) -> Optional[str]:
Get google master token from username and password
"""
if not self.master_token:
LOGGER.debug("There is not any stored master_token, logging in...")
res = perform_master_login(
self.username, self.password, self.get_android_id()
)
if "Token" not in res:
LOGGER.error("[!] Could not get master token.")
LOGGER.debug(f"Request response: {res}")
return
self.master_token = res["Token"]
LOGGER.debug("Master token: {}".format(self.master_token))
Expand All @@ -178,6 +209,9 @@ def get_access_token(self) -> Optional[str]:
if self.access_token is None or self._has_expired(
self.access_token_date, ACCESS_TOKEN_DURATION
):
LOGGER.debug(
"There is not any stored access_token, or the stored one has expired, getting a new one..."
)
res = perform_oauth(
self.username,
self.get_master_token(),
Expand All @@ -188,10 +222,13 @@ def get_access_token(self) -> Optional[str]:
)
if "Auth" not in res:
LOGGER.error("[!] Could not get access token.")
LOGGER.debug(f"Request response: {res}")
return
self.access_token = res["Auth"]
self.access_token_date = datetime.now()
LOGGER.debug("Access token: {}".format(self.access_token))
LOGGER.debug(
f"Access token: {self.access_token}, datetime {self.access_token_date}"
)
return self.access_token

def get_homegraph(self):
Expand All @@ -201,18 +238,32 @@ def get_homegraph(self):
if self.homegraph is None or self._has_expired(
self.homegraph_date, HOMEGRAPH_DURATION
):
LOGGER.debug(
"There is not any stored homegraph, or the stored one has expired, getting a new one..."
)
try:
LOGGER.debug("Creating SSL channel credentials...")
scc = grpc.ssl_channel_credentials(root_certificates=None)
LOGGER.debug("Creating access token call credentials...")
tok = grpc.access_token_call_credentials(self.get_access_token())
LOGGER.debug("Compositing channel credentials...")
ccc = grpc.composite_channel_credentials(scc, tok)

LOGGER.debug(
"Establishing secure channel with the Google Home Foyer API..."
)
with grpc.secure_channel(GOOGLE_HOME_FOYER_API, ccc) as channel:
LOGGER.debug("Getting channels StructuresServiceStub...")
rpc_service = v1_pb2_grpc.StructuresServiceStub(channel)
LOGGER.debug("Getting HomeGraph request...")
request = v1_pb2.GetHomeGraphRequest(string1="", num2="")
LOGGER.debug("Fetching HomeGraph...")
response = rpc_service.GetHomeGraph(request)
LOGGER.debug("Storing gotten homegraph...")
self.homegraph = response
self.homegraph_date = datetime.now()
except grpc.RpcError as rpc_error:
LOGGER.debug("Got an RpcError.")
if rpc_error.code().name == "UNAUTHENTICATED":
LOGGER.warning("The access token has expired. Getting a new one.")
self.invalidate_access_token()
Expand Down Expand Up @@ -240,12 +291,16 @@ def get_google_devices(
"""

# Set models_list to empty list if None
LOGGER.debug("Initializing models list if empty...")
models_list = models_list if models_list else []

if force_homegraph_reload:
LOGGER.debug("Forcing homegraph reload.")
self.invalidate_homegraph()

LOGGER.debug("Getting homegraph...")
homegraph = self.get_homegraph()
LOGGER.debug("Getting network devices...")
network_devices = (
discover_devices(models_list, zeroconf_instance=zeroconf_instance)
if not disable_discovery
Expand All @@ -259,13 +314,20 @@ def find_device(name) -> Optional[GoogleDevice]:
return None

devices: [Device] = []
LOGGER.debug(
f"Iterating in homegraph devices (len={len(homegraph.home.devices)})"
)
for item in homegraph.home.devices:
if item.local_auth_token != "":
# This checks if the current item is a valid model, only if there are models in models_list.
# If models_list is empty, the check should be omitted, and accept all items.
if models_list and item.hardware.model not in models_list:
LOGGER.debug("{} not in models_list".format(item.hardware.model))
continue

LOGGER.debug(
"Finding device in network? {}".format(network_devices is not None)
ArnyminerZ marked this conversation as resolved.
Show resolved Hide resolved
)
google_device = (
find_device(item.device_name) if network_devices else None
)
Expand All @@ -276,9 +338,12 @@ def find_device(name) -> Optional[GoogleDevice]:
hardware=item.hardware.model,
)
if device.local_auth_token:
LOGGER.debug("Appending device")
ArnyminerZ marked this conversation as resolved.
Show resolved Hide resolved
devices.append(device)
else:
LOGGER.warning("Device initialization failed, skipping.")
else:
LOGGER.debug("local_auth_token is not initialized")
ArnyminerZ marked this conversation as resolved.
Show resolved Hide resolved

LOGGER.debug("Google Home devices: {}".format(devices))

Expand Down Expand Up @@ -316,8 +381,10 @@ def invalidate_access_token(self):
"""Invalidates the current access token"""
self.access_token = None
self.access_token_date = None
LOGGER.debug("Invalidated access_token")

def invalidate_homegraph(self):
"""Invalidates the stored homegraph data"""
self.homegraph = None
self.homegraph_date = None
LOGGER.debug("Invalidated homegraph")
30 changes: 30 additions & 0 deletions glocaltokens/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def get_value(key):

class GoogleDevice:
def __init__(self, name: str, ip: str, port: int, model: str):
_LOGGER.debug("Initializing GoogleDevice...")
if not net_utils.is_valid_ipv4_address(
ip
) and not net_utils.is_valid_ipv6_address(ip):
Expand All @@ -92,11 +93,20 @@ def __init__(self, name: str, ip: str, port: int, model: str):
self.ip = ip
self.port = port
self.model = model
_LOGGER.debug(
"Set self name to {}, ip to {}, port to {} and model to {}".format(
name, ip, port, model
)
ArnyminerZ marked this conversation as resolved.
Show resolved Hide resolved
)

if not 0 <= self.port <= 65535:
_LOGGER.error("Port is out of the valid range: [0,65535]")
return

def __str__(self) -> str:
"""Serializes the class into a str"""
return f"{{name:{self.name},ip:{self.ip},port:{self.port},model:{self.model}}}"
ArnyminerZ marked this conversation as resolved.
Show resolved Hide resolved


def discover_devices(
models_list: Optional[List[str]] = None,
Expand All @@ -105,30 +115,50 @@ def discover_devices(
zeroconf_instance=None,
):
# pylint: disable=unused-argument
_LOGGER.debug("Discovering devices...")
_LOGGER.debug("Importing zeroconf...")
import zeroconf

def callback():
"""Called when zeroconf has discovered a new chromecast."""
if max_devices is not None and listener.count >= max_devices:
discover_complete.set()

_LOGGER.debug("Creating new Event for discovery completion...")
discover_complete = Event()
_LOGGER.debug("Creating new CastListener...")
listener = CastListener(callback)
if not zeroconf_instance:
_LOGGER.debug("Creating new Zeroconf instance")
zconf = zeroconf.Zeroconf()
else:
_LOGGER.debug("Using attribute Zeroconf instance")
zconf = zeroconf_instance
_LOGGER.debug("Creating zeroconf service browser for _googlecast._tcp.local.")
zeroconf.ServiceBrowser(zconf, "_googlecast._tcp.local.", listener)

# Wait for the timeout or the maximum number of devices
_LOGGER.debug("Waiting for discovery completion...")
discover_complete.wait(timeout)

devices = []
_LOGGER.debug("Got {} devices. Iterating...".format(len(listener.devices)))
for service in listener.devices:
model = service[0]
name = service[1]
ip = service[2]
access_port = service[3]
if not models_list or model in models_list:
_LOGGER.debug(
"Appending new device. name: %s, ip: %s, port: %s, model: %s",
name,
ip,
access_port,
model,
)
devices.append(GoogleDevice(name, ip, int(access_port), model))
else:
_LOGGER.debug(
'Won\'t add device since model "{}" is not in models_list'.format(model)
)
Comment on lines +159 to +161
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leikoilja is model_list still needed?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, @KapJI.
We have dropped to maintain this list in google-home, do you think we still need it in glocaltokens, @ArnyminerZ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I don't know, it might be useful for someone, maybe. If it doesn't bother, I wouldn't remove it, just in case.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we fallback to fetching all devices if no model_list is provided we can just keep it i guess :)

return devices
8 changes: 8 additions & 0 deletions glocaltokens/utils/logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
def censor(text: str, replace: str = "*") -> str:
"""
Replaces all the characters in a str by the specified [replace]

text: The text to censure.
replace: The character to instead of the content.
"""
return replace * len(text)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are using this util to print tokens and passwords, we really should add a test to make sure we censor correctly and are not printing any user sensitive data to the log without their consent

1 change: 0 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
JSON_KEY_LOCAL_AUTH_TOKEN,
JSON_KEY_PORT,
)
from glocaltokens.utils.types import Struct
from tests.assertions import DeviceAssertions, TypeAssertions
from tests.factory.providers import HomegraphProvider, TokenProvider

Expand Down
21 changes: 21 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from unittest import TestCase

from faker import Faker

from glocaltokens.utils.logs import censor

faker = Faker()


class GLocalAuthenticationTokensClientTests(TestCase):
ArnyminerZ marked this conversation as resolved.
Show resolved Hide resolved
def setUp(self):
"""Setup method run before every test"""
pass

def tearDown(self):
"""Teardown method run after every test"""
pass

def test_logs(self):
random = faker.word()
ArnyminerZ marked this conversation as resolved.
Show resolved Hide resolved
self.assertNotEqual(random, censor(random))