-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(send signals): Handle machine preparation failed #31
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,7 +9,7 @@ | |
|
||
import httpx | ||
import jwt | ||
from cscapi.storage import MachineModel, ReceivedDecision, SignalModel, StorageInterface | ||
from .storage import MachineModel, ReceivedDecision, SignalModel, StorageInterface | ||
from more_itertools import batched | ||
|
||
__version__ = metadata.version("cscapi").split("+")[0] | ||
|
@@ -153,55 +153,79 @@ def _send_signals_by_machine_id( | |
self.logger.error( | ||
f"Machine {machine_to_process.machine_id} is marked as failing" | ||
) | ||
# machine_to_process contains only machine_id data | ||
# To avoid erasing data, we need to fetch the full machine from the database | ||
database_machine = self.storage.get_machine_by_id( | ||
machine_to_process.machine_id | ||
) | ||
machine_to_upsert = ( | ||
database_machine if database_machine else machine_to_process | ||
) | ||
self.storage.update_or_create_machine( | ||
replace(machine_to_process, is_failing=True) | ||
replace(machine_to_upsert, is_failing=True) | ||
) | ||
break | ||
|
||
for machine_to_process in machines_to_process_attempts: | ||
machine_to_process = self._prepare_machine(machine_to_process) | ||
try: | ||
machine_to_process = self._prepare_machine(machine_to_process) | ||
except httpx.HTTPStatusError as exc: | ||
self.logger.error( | ||
f"Error while preparing machine {machine_to_process.machine_id}: {exc}" | ||
) | ||
machine_to_process.token = None | ||
retry_machines_to_process_attempts.append(machine_to_process) | ||
continue | ||
if machine_to_process.is_failing: | ||
self.logger.error( | ||
f"skipping sending signals for machine {machine_to_process.machine_id} as it's marked as failing" | ||
f"skipping sending signals for machine {machine_to_process.machine_id}" | ||
f" as it's marked as failing" | ||
) | ||
continue | ||
|
||
self.logger.info( | ||
f"sending signals for machine {machine_to_process.machine_id}" | ||
) | ||
sent_signal_ids = [] | ||
try: | ||
sent_signal_ids = self._send_signals_to_capi( | ||
machine_to_process.token, | ||
signals_by_machineid[machine_to_process.machine_id], | ||
self.logger.info( | ||
f"sending signals for machine {machine_to_process.machine_id}" | ||
) | ||
sent_signal_ids_count = len(sent_signal_ids) | ||
total_sent += sent_signal_ids_count | ||
self.logger.info(f"sent {sent_signal_ids_count} signals") | ||
if machine_to_process.token is not None: | ||
sent_signal_ids = self._send_signals_to_capi( | ||
machine_to_process.token, | ||
signals_by_machineid[machine_to_process.machine_id], | ||
) | ||
sent_signal_ids_count = len(sent_signal_ids) | ||
total_sent += sent_signal_ids_count | ||
self.logger.info(f"sent {sent_signal_ids_count} signals") | ||
else: | ||
self.logger.error( | ||
f"skipping sending signals for machine {machine_to_process.machine_id}" | ||
f" as it has no token" | ||
) | ||
continue | ||
except httpx.HTTPStatusError as exc: | ||
self.logger.error( | ||
f"error while sending signals: {exc} for machine {machine_to_process.machine_id}" | ||
) | ||
if exc.response.status_code == 401: | ||
if attempt_count >= self.max_retries: | ||
self.storage.update_or_create_machine( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I delete this piece of code, because we already update the machine in line 165 (after PR, line 157 before PR). @rr404 : am I missing something ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ça me semble Ok en effet |
||
replace(machine_to_process, is_failing=True) | ||
) | ||
continue | ||
machine_to_process.token = None | ||
retry_machines_to_process_attempts.append(machine_to_process) | ||
continue | ||
if prune_after_send and sent_signal_ids: | ||
self.logger.info( | ||
f"pruning sent signals for machine {machine_to_process.machine_id}" | ||
) | ||
self.storage.delete_signals(sent_signal_ids) | ||
|
||
self.logger.info( | ||
f"sending metrics for machine {machine_to_process.machine_id}" | ||
) | ||
try: | ||
self.logger.info( | ||
f"pruning sent signals for machine {machine_to_process.machine_id}" | ||
) | ||
self.storage.delete_signals(sent_signal_ids) | ||
except Exception as exc: | ||
self.logger.error( | ||
f"error while pruning signals: {exc} for machine {machine_to_process.machine_id}", | ||
exc_info=True, | ||
) | ||
|
||
try: | ||
self.logger.info( | ||
f"sending metrics for machine {machine_to_process.machine_id}" | ||
) | ||
self._send_metrics_for_machine(machine_to_process) | ||
except httpx.HTTPStatusError as exc: | ||
self.logger.error( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -97,7 +97,7 @@ def get_signals( | |
raise NotImplementedError | ||
|
||
@abstractmethod | ||
def get_machine_by_id(self, machine_id: str) -> MachineModel: | ||
def get_machine_by_id(self, machine_id: str) -> Optional[MachineModel]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sql and mongodb storage implementation already returns Optional[MachineModel] ; |
||
raise NotImplementedError | ||
|
||
@abstractmethod | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Et du coup là on fait rien d'aute que log? sans essayer de lui choper un token ? on prend pour acquis que si ici elle pas a de token c'est qu'il y a eu un aute pb?
Y a t'il un risque qu'elle n'ai pas de token d'ailleurs ? vu que tu hadle le prepare qui fail