Skip to content

Commit

Permalink
fixed things around failing machines
Browse files Browse the repository at this point in the history
  • Loading branch information
jdv committed Jan 26, 2024
1 parent d51571d commit 27bde63
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,7 @@ cython_debug/
src/cscapi/_version.py

# database files
*.db
*.db

#vscode
.vscode
14 changes: 9 additions & 5 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@
client = CAPIClient(
storage=SQLStorage(),
config=CAPIClientConfig(
scenarios=["crowdsecurity/ssh-bf", "acme/http-bf"], prod=False
scenarios=["pysdktest/test-01", "pysdktest/test-02"], user_agent_prefix="example", prod=True
),
)

# Fetch signals from your data, and convert it into a list of signals accepted by CrowdSec
signals = [
create_signal(
attacker_ip="<attacker_ip>",
scenario="crowdsecurity/ssh-bf",
created_at="2023-11-17 10:20:46 +0000",
machine_id=generate_machine_id_from_key("<key>asd"),
attacker_ip="81.81.81.81",
scenario="pysdktest/test-sc",
created_at="2024-01-19 12:12:21 +0000",
machine_id=generate_machine_id_from_key("myMachineId"),
context=[{"key":"scenario-version", "value":"1.0.0"}],
message="test message to see where it is written",
)
]

Expand All @@ -25,3 +27,5 @@
# This sends all the unsent signals to the API.
# You can chron this call to send signals periodically.
client.send_signals()

# client.enroll_machines([generate_machine_id_from_key("myMachineId")], "basicExample", "myenrollkeyigotonconsole", [])
24 changes: 20 additions & 4 deletions src/cscapi/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ class CAPIClientConfig:
max_retries: int = 3
latency_offset: int = 10
retry_delay: int = 5
force_config_scenario: bool = True


class CAPIClient:
def __init__(self, storage: StorageInterface, config: CAPIClientConfig):
self.storage = storage

self.force_config_scenario = config.force_config_scenario
self.scenarios = ",".join(sorted(config.scenarios))
self.latency_offset = config.latency_offset
self.max_retries = config.max_retries
Expand Down Expand Up @@ -103,7 +104,8 @@ def _send_signals_by_machine_id(

while machines_to_process_attempts:
logging.info(f"attempt {attempt_count} to send signals")
if attempt_count > self.max_retries:
retry_machines_to_process_attempts = []
if attempt_count >= self.max_retries:
for machine_to_process in machines_to_process_attempts:
logging.error(
f"Machine {machine_to_process.machine_id} is marked as failing"
Expand Down Expand Up @@ -161,6 +163,9 @@ def _send_signals_by_machine_id(

attempt_count += 1
machines_to_process_attempts = retry_machines_to_process_attempts
if (len(retry_machines_to_process_attempts) != 0) and (attempt_count < self.max_retries):
logging.info( f"waiting {self.retry_delay} seconds before retrying sending signals")
time.sleep(self.retry_delay)

def _send_signals(self, token: str, signals: SignalModel):
for signal_batch in batched(signals, 250):
Expand Down Expand Up @@ -201,7 +206,6 @@ def _send_metrics_for_machine(self, machine: MachineModel):
logging.error(
f"received error {exc} while sending metrics for machine {machine.machine_id}"
)
time.sleep(self.retry_delay)

def _prune_sent_signals(self):
signals = list(
Expand All @@ -215,12 +219,13 @@ def _clear_all_signals(self):
self.storage.delete_signals(signals)

def _refresh_machine_token(self, machine: MachineModel) -> MachineModel:
scenarios = self.scenarios if self.force_config_scenario else machine.scenarios
resp = self.http_client.post(
self._get_url(CAPI_WATCHER_LOGIN_ENDPOINT),
json={
"machine_id": machine.machine_id,
"password": machine.password,
"scenarios": machine.scenarios.split(","),
"scenarios": scenarios.split(","),
},
)
try:
Expand Down Expand Up @@ -254,6 +259,12 @@ def _register_machine(self, machine: MachineModel) -> MachineModel:

def _prepare_machine(self, machine: MachineModel):
machine = self._ensure_machine_capi_registered(machine)
if machine.is_failing:
logging.error(
f"skipping connection for machine {machine.machine_id} as it's marked as failing"
)
return machine

machine = self._ensure_machine_capi_connected(machine)
return machine

Expand Down Expand Up @@ -298,6 +309,11 @@ def enroll_machines(
while machine_ids:
for machine_id in machine_ids:
machine = self._prepare_machine(MachineModel(machine_id=machine_id))
if machine.is_failing:
logging.error(
f"skipping enrollment for machine {machine.machine_id} as it's marked as failing"
)
continue
try:
resp = self.http_client.post(
self.url + CAPI_ENROLL_ENDPOINT,
Expand Down

0 comments on commit 27bde63

Please sign in to comment.