-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CDK: emit AirbyteTraceMessage
with exception trace information
#12593
Conversation
class AirbyteSecretHelper: | ||
_secrets: List[str] = [] | ||
|
||
@classmethod | ||
def update_secrets(cls, secrets: List[str]): | ||
"""Update the list of secrets to be replaced""" | ||
cls._secrets = secrets | ||
|
||
@classmethod | ||
def filter_secrets(cls, string: str) -> str: | ||
"""Filter secrets from a string by replacing them with ****""" | ||
for secret in cls._secrets: | ||
string = string.replace(secret, "****") | ||
return string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved out the logic from AirbyteLogFormatter
so it can be re-used (using this for trace messages now as well)
type=TraceType.ERROR, | ||
emitted_at=now_millis, | ||
error=AirbyteErrorTraceMessage( | ||
message=self.message or "an error occurred with the source", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any suggestions on what this generic default message should be are welcome 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't necessarily source, could also be destination.
How about Something went wrong in the connector. See the logs for more details.
?
Pointing them at the logs seems sensible given that this is always going to be non-specific at this level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any environment variables or system arguments we can use to know if we are a source or destination? e.g. looking at the docker container name (source-mailchimp)?
# emit an AirbyteTraceMessage for any exception that gets to this spot | ||
traced_exc = exception_value if exception_type == AirbyteTracedException else AirbyteTracedException.from_exception(exception_value) | ||
message = traced_exc.as_airbyte_message() | ||
message_json = message.json(exclude_unset=True) | ||
print(AirbyteSecretHelper.filter_secrets(message_json)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the main part that is in charge of emitting the AirbyteTraceMessage
: If the exception we get is already an AirbyteTracedException
(which can be thrown from anywhere with appropriate messages and fields) we use it, if not then we build one from the exception. This is then turned into an AirbyteTraceMessage
and filtered for secret values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome! Mostly just comments on being pythonic, one open question around the AirbyteLogger
which should be resolved before approve I think.
type=TraceType.ERROR, | ||
emitted_at=now_millis, | ||
error=AirbyteErrorTraceMessage( | ||
message=self.message or "an error occurred with the source", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this isn't necessarily source, could also be destination.
How about Something went wrong in the connector. See the logs for more details.
?
Pointing them at the logs seems sensible given that this is always going to be non-specific at this level.
super().__init__(internal_message) | ||
|
||
def as_airbyte_message(self) -> AirbyteMessage: | ||
now_millis = int(datetime.now().timestamp() * 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a Double in the protocol so should technically be a float
here in python, but I'm not sure it would actually matter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh interesting - the RecordMessage
's emitted_at
is an int
🤔 good catch!
changed in 65543d1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I defer to other's Pythonic notes, but 👍 from me! The ergonomic suggestion is non-blocking.
message_json = message.json(exclude_unset=True) | ||
print(filter_secrets(message_json)) | ||
|
||
sys.excepthook = hook_fn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is cool - like Node's process.on('uncaughtException')
with pytest.raises(subprocess.CalledProcessError) as err: | ||
subprocess.check_output([sys.executable, "-c", cmd], stderr=subprocess.STDOUT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a neat way to test that a process crashed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Nice one 👍
this was causing circular dependency issues with later changes
313b445
to
b00fc78
Compare
What
AirbyteTraceMessage
has been introduced in the protocol to allow better debugging and attribution of failures when something in the connector goes wrong.This change starts emitting
AirbyteTraceMessage
s generically whenever an exception is raised. A future PR will make it easier to return better user-friendly messages.closes #12472
How
sys.excepthook
to register a global handler for uncaught exceptions. This handler is in charge of emitting the genericAirbyteTraceMessage
s.AirbyteTracedException
class, which can be used to customize the trace message and provide better error messages.AirbyteSecretHelper
so it can be re-used to filter out secrets from trace messages.Recommended reading order
utils/traced_exception.py
exception_handler.py