Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APM Log Forwarding #559

Merged
merged 15 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions newrelic/api/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class Application(object):
_delayed_callables = {}

@staticmethod
def _instance(name):
def _instance(name, activate=True):
if name is None:
name = newrelic.core.config.global_settings().app_name

Expand All @@ -44,7 +44,7 @@ def _instance(name):

instance = Application._instances.get(name, None)

if not instance:
if not instance and activate:
with Application._lock:
# Now try again with lock so that only one gets
# to create and add it.
Expand Down Expand Up @@ -150,6 +150,10 @@ def record_transaction(self, data):
if self.active:
self._agent.record_transaction(self._name, data)

def record_log_event(self, message, level=None, timestamp=None, priority=None):
if self.active:
self._agent.record_log_event(self._name, message, level, timestamp, priority=priority)

def normalize_name(self, name, rule_type="url"):
if self.active:
return self._agent.normalize_name(self._name, name, rule_type)
Expand All @@ -162,8 +166,8 @@ def compute_sampled(self):
return self._agent.compute_sampled(self._name)


def application_instance(name=None):
return Application._instance(name)
def application_instance(name=None, activate=True):
return Application._instance(name, activate=activate)


def register_application(name=None, timeout=None):
Expand Down
1 change: 1 addition & 0 deletions newrelic/api/import_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
# These modules should not be added to the _uninstrumented_modules set
# because they have been deemed okay to import before initialization by
# the customer.
"logging",
"gunicorn.app.base",
"wsgiref.simple_server",
"gevent.wsgi",
Expand Down
29 changes: 27 additions & 2 deletions newrelic/api/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
import json
import logging
import re
import warnings
from logging import Formatter, LogRecord

from newrelic.api.time_trace import get_linking_metadata
from newrelic.api.transaction import current_transaction
from newrelic.api.transaction import current_transaction, record_log_event
from newrelic.common import agent_http
from newrelic.common.object_names import parse_exc_info
from newrelic.core.attribute import truncate
Expand Down Expand Up @@ -85,8 +86,25 @@ def safe_str(object, *args, **kwargs):
return json.dumps(self.log_record_to_dict(record), default=safe_str, separators=(",", ":"))


class NewRelicLogForwardingHandler(logging.Handler):
def emit(self, record):
try:
# Avoid getting local log decorated message
if hasattr(record, "_nr_original_message"):
message = record._nr_original_message()
else:
message = record.getMessage()

record_log_event(message, record.levelname, int(record.created * 1000))
except Exception:
self.handleError(record)


class NewRelicLogHandler(logging.Handler):
"""This is an experimental log handler provided by the community. Use with caution."""
"""
Deprecated: Please use NewRelicLogForwardingHandler instead.
This is an experimental log handler provided by the community. Use with caution.
"""

PATH = "/log/v1"

Expand All @@ -104,6 +122,13 @@ def __init__(
ca_bundle_path=None,
disable_certificate_validation=False,
):
warnings.warn(
"The contributed NewRelicLogHandler has been superseded by automatic instrumentation for "
"logging in the standard lib. If for some reason you need to manually configure a handler, "
"please use newrelic.api.log.NewRelicLogForwardingHandler to take advantage of all the "
"features included in application log forwarding such as proper batching.",
DeprecationWarning
)
super(NewRelicLogHandler, self).__init__(level=level)
self.license_key = license_key or self.settings.license_key
self.host = host or self.settings.host or self.default_host(self.license_key)
Expand Down
72 changes: 57 additions & 15 deletions newrelic/api/time_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import logging
import platform
import random
import sys
import time
Expand Down Expand Up @@ -574,20 +575,26 @@ def increment_child_count(self):
else:
self.has_async_children = False

def get_linking_metadata(self):
metadata = {
"entity.type": "SERVICE",
}
def _get_service_linking_metadata(self, application=None):
if application is not None:
return get_service_linking_metadata(application)
elif self.transaction is not None:
return get_service_linking_metadata(settings=self.transaction.settings)
else:
return get_service_linking_metadata()

def _get_trace_linking_metadata(self):
metadata = {}
txn = self.transaction
if txn:
metadata["span.id"] = self.guid
metadata["trace.id"] = txn.trace_id
settings = txn.settings
if settings:
metadata["entity.name"] = settings.app_name
entity_guid = settings.entity_guid
if entity_guid:
metadata["entity.guid"] = entity_guid

return metadata

def get_linking_metadata(self, application=None):
metadata = self._get_service_linking_metadata(application)
metadata.update(self._get_trace_linking_metadata())
return metadata


Expand All @@ -601,14 +608,49 @@ def current_trace():
return trace_cache().current_trace()


def get_linking_metadata():
def get_trace_linking_metadata():
trace = current_trace()
if trace:
return trace.get_linking_metadata()
return trace._get_trace_linking_metadata()
else:
return {
"entity.type": "SERVICE",
}
return {}


def get_service_linking_metadata(application=None, settings=None):
metadata = {
"entity.type": "SERVICE",
}

trace = current_trace()
if settings is None and trace:
txn = trace.transaction
if txn:
settings = txn.settings

if not settings:
if application is None:
from newrelic.api.application import application_instance
application = application_instance(activate=False)

if application is not None:
settings = application.settings

if settings:
metadata["entity.name"] = settings.app_name
entity_guid = settings.entity_guid
if entity_guid:
metadata["entity.guid"] = entity_guid
metadata["hostname"] = platform.uname()[1]

return metadata


def get_linking_metadata(application=None):
metadata = get_service_linking_metadata()
trace = current_trace()
if trace:
metadata.update(trace._get_trace_linking_metadata())
return metadata


def record_exception(exc=None, value=None, tb=None, params=None, ignore_errors=None, application=None):
Expand Down
70 changes: 67 additions & 3 deletions newrelic/api/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import weakref
from collections import OrderedDict

from newrelic.api.application import application_instance
import newrelic.core.database_node
import newrelic.core.error_node
from newrelic.core.log_event_node import LogEventNode
import newrelic.core.root_node
import newrelic.core.transaction_node
import newrelic.packages.six as six
from newrelic.api.time_trace import TimeTrace
from newrelic.api.time_trace import TimeTrace, get_linking_metadata
from newrelic.common.encoding_utils import (
DistributedTracePayload,
NrTraceState,
Expand All @@ -46,18 +48,20 @@
obfuscate,
)
from newrelic.core.attribute import (
MAX_LOG_MESSAGE_LENGTH,
MAX_NUM_USER_ATTRIBUTES,
create_agent_attributes,
create_attributes,
create_user_attributes,
process_user_attribute,
truncate,
)
from newrelic.core.attribute_filter import (
DST_ERROR_COLLECTOR,
DST_NONE,
DST_TRANSACTION_TRACER,
)
from newrelic.core.config import DEFAULT_RESERVOIR_SIZE
from newrelic.core.config import DEFAULT_RESERVOIR_SIZE, LOG_EVENT_RESERVOIR_SIZE
from newrelic.core.custom_event import create_custom_event
from newrelic.core.stack_trace import exception_stack
from newrelic.core.stats_engine import CustomMetrics, SampledDataSet
Expand Down Expand Up @@ -204,7 +208,6 @@ def __init__(self, application, enabled=None, source=None):

self._errors = []
self._slow_sql = []
self._custom_events = SampledDataSet(capacity=DEFAULT_RESERVOIR_SIZE)

self._stack_trace_count = 0
self._explain_plan_count = 0
Expand Down Expand Up @@ -320,6 +323,13 @@ def __init__(self, application, enabled=None, source=None):
if self._settings:
self.enabled = True

if self._settings:
self._custom_events = SampledDataSet(capacity=self._settings.event_harvest_config.harvest_limits.custom_event_data)
self._log_events = SampledDataSet(capacity=self._settings.event_harvest_config.harvest_limits.log_event_data)
else:
self._custom_events = SampledDataSet(capacity=DEFAULT_RESERVOIR_SIZE)
self._log_events = SampledDataSet(capacity=LOG_EVENT_RESERVOIR_SIZE)

def __del__(self):
self._dead = True
if self._state == self.STATE_RUNNING:
Expand Down Expand Up @@ -562,6 +572,7 @@ def __exit__(self, exc, value, tb):
errors=tuple(self._errors),
slow_sql=tuple(self._slow_sql),
custom_events=self._custom_events,
log_events=self._log_events,
apdex_t=self.apdex,
suppress_apdex=self.suppress_apdex,
custom_metrics=self._custom_metrics,
Expand Down Expand Up @@ -1465,6 +1476,31 @@ def set_transaction_name(self, name, group=None, priority=None):
self._group = group
self._name = name


def record_log_event(self, message, level=None, timestamp=None, priority=None):
settings = self.settings
if not (settings and settings.application_logging and settings.application_logging.enabled and settings.application_logging.forwarding and settings.application_logging.forwarding.enabled):
return

timestamp = timestamp if timestamp is not None else time.time()
level = str(level) if level is not None else "UNKNOWN"

if not message or message.isspace():
_logger.debug("record_log_event called where message was missing. No log event will be sent.")
return

message = truncate(message, MAX_LOG_MESSAGE_LENGTH)

event = LogEventNode(
timestamp=timestamp,
level=level,
message=message,
attributes=get_linking_metadata(),
)

self._log_events.add(event, priority=priority)


def record_exception(self, exc=None, value=None, tb=None, params=None, ignore_errors=None):
# Deprecation Warning
warnings.warn(
Expand Down Expand Up @@ -1814,6 +1850,34 @@ def record_custom_event(event_type, params, application=None):
application.record_custom_event(event_type, params)


def record_log_event(message, level=None, timestamp=None, application=None, priority=None):
"""Record a log event.

Args:
record (logging.Record):
application (newrelic.api.Application): Application instance.
"""

if application is None:
transaction = current_transaction()
if transaction:
transaction.record_log_event(message, level, timestamp)
else:
application = application_instance(activate=False)

if application and application.enabled:
application.record_log_event(message, level, timestamp, priority=priority)
else:
_logger.debug(
"record_log_event has been called but no transaction or application was running. As a result, "
"the following event has not been recorded. message: %r level: %r timestamp %r. To correct "
"this problem, supply an application object as a parameter to this record_log_event call.",
message, level, timestamp,
)
elif application.enabled:
application.record_log_event(message, level, timestamp, priority=priority)


def accept_distributed_trace_payload(payload, transport_type="HTTP"):
transaction = current_transaction()
if transaction:
Expand Down
1 change: 1 addition & 0 deletions newrelic/common/agent_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,7 @@ class DeveloperModeClient(SupportabilityMixin, BaseClient):
"error_event_data": None,
"span_event_data": None,
"custom_event_data": None,
"log_event_data": None,
"shutdown": [],
}

Expand Down
Loading