-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
Copy pathcw_log_puller.py
148 lines (126 loc) · 5.76 KB
/
cw_log_puller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
"""
CloudWatch log event puller implementation
"""
import logging
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Union
from botocore.exceptions import ClientError
from samcli.lib.observability.cw_logs.cw_log_event import CWLogEvent
from samcli.lib.observability.observability_info_puller import ObservabilityEventConsumer, ObservabilityPuller
from samcli.lib.utils.time import to_datetime, to_timestamp
LOG = logging.getLogger(__name__)
class CWLogPuller(ObservabilityPuller):
"""
Puller implementation that can pull events from CloudWatch log group
"""
def __init__(
self,
logs_client: Any,
consumer: ObservabilityEventConsumer,
cw_log_group: str,
resource_name: Optional[str] = None,
max_retries: int = 1000,
poll_interval: int = 1,
):
"""
Parameters
----------
logs_client: CloudWatchLogsClient
boto3 logs client instance
consumer : ObservabilityEventConsumer
Consumer instance that will process pulled events
cw_log_group : str
CloudWatch log group name
resource_name : Optional[str]
Optional parameter to assign a resource name for each event.
max_retries: int
Optional parameter to set maximum retries when tailing. Default value is 1000
poll_interval: int
Optional parameter to define sleep interval between pulling new log events when tailing. Default value is 1
"""
self.logs_client = logs_client
self.consumer = consumer
self.cw_log_group = cw_log_group
self.resource_name = resource_name
self._max_retries = max_retries
self._poll_interval = poll_interval
self.latest_event_time = 0
self.had_data = False
self._invalid_log_group = False
def tail(self, start_time: Optional[datetime] = None, filter_pattern: Optional[str] = None):
if start_time:
self.latest_event_time = to_timestamp(start_time)
counter = self._max_retries
while counter > 0 and not self.cancelled:
LOG.debug("Tailing logs from %s starting at %s", self.cw_log_group, str(self.latest_event_time))
counter -= 1
try:
self.load_time_period(to_datetime(self.latest_event_time), filter_pattern=filter_pattern)
except ClientError as err:
error_code = err.response.get("Error", {}).get("Code")
if error_code == "ThrottlingException":
# if throttled, increase poll interval by 1 second each time
if self._poll_interval == 1:
self._poll_interval += 1
else:
self._poll_interval **= 2
LOG.warning(
"Throttled by CloudWatch Logs API, consider pulling logs for certain resources. "
"Increasing the poll interval time for resource %s to %s seconds",
self.cw_log_group,
self._poll_interval,
)
else:
# if error is other than throttling, re-raise it
LOG.error("Failed while fetching new log events", exc_info=err)
raise err
# This poll fetched logs. Reset the retry counter and set the timestamp for next poll
if self.had_data:
counter = self._max_retries
self.latest_event_time += 1 # one extra millisecond to fetch next log event
self.had_data = False
# We already fetched logs once. Sleep for some time before querying again.
# This also helps us scoot under the TPS limit for CloudWatch API call.
time.sleep(self._poll_interval)
def load_time_period(
self,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
filter_pattern: Optional[str] = None,
):
kwargs = {"logGroupName": self.cw_log_group, "interleaved": True}
if start_time:
kwargs["startTime"] = to_timestamp(start_time)
if end_time:
kwargs["endTime"] = to_timestamp(end_time)
if filter_pattern:
kwargs["filterPattern"] = filter_pattern
while True:
LOG.debug("Fetching logs from CloudWatch with parameters %s", kwargs)
try:
result = self.logs_client.filter_log_events(**kwargs)
self._invalid_log_group = False
except self.logs_client.exceptions.ResourceNotFoundException:
if not self._invalid_log_group:
LOG.debug(
"The specified log group %s does not exist. "
"This may be due to your resource have not been invoked yet.",
self.cw_log_group,
)
self._invalid_log_group = True
break
# Several events will be returned. Consume one at a time
for event in result.get("events", []):
self.had_data = True
cw_event = CWLogEvent(self.cw_log_group, dict(event), self.resource_name)
if cw_event.timestamp > self.latest_event_time:
self.latest_event_time = cw_event.timestamp
self.consumer.consume(cw_event)
# Keep iterating until there are no more logs left to query.
next_token = result.get("nextToken", None)
kwargs["nextToken"] = next_token
if not next_token:
break
def load_events(self, event_ids: Union[List[Any], Dict]):
LOG.debug("Loading specific events are not supported via CloudWatch Log Group")