-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal for pipeline branching in the OpenTelemetry Collector #5414
Comments
@siliconbrain, thank you for this detailed proposal. I think you've laid all of this out in fantastic detail and made a compelling argument that some form of routing is necessary. I am one of the original authors of stanza and continue to maintain the donated codebase within the OTel project, so I can certainly appreciate the use cases you are targeting here. As you can imagine, some of these same concerns were discussed when debating exactly how to integrate stanza into the OTel collector. It is strongly preferred that expectations around how data flows through pipelines be respected. However, this of course does not preclude composition of pipelines, such as in your backup proposal. I think you would be interested to compare your proposal to #2336. The explicit motivation for that proposal was to enable translation between signals, but the proposed implementation follows from the same line of thought as your backup proposal. I took it one step further and suggested that the exporter/receiver pair be formalized into a new type of component called a connector. Unfortunately, this has not yet been implemented, but it is already accepted in concept. You've also made a very lucid point about the distinction between a true routing component vs a "duplicate and filter" strategy. I think this may be worth exploring further. That said, I'm curious to first hear your thoughts on the design proposed in #2336, and whether you think it would be sufficient for your use cases. |
@djaglowski, your comment is very much appreciated!
I had a hunch it would be so, hence the backup proposal. 🙂 Could you give me some insight as to why that is? Also, to me, the routing processor seems to clearly break those expectations.
It was pleasant to read your proposal and see that we're not alone with missing a feature like this. Indeed, your proposal suggests something very similar to our backup. I quite like the notion of combining the exporter-receiver pair into its own entity as this eliminates the redundancy of defining pairs and I also think the connector is a very descriptive name for it. 👍 However, my argument against it is that this requires the topological sorting of connected pipelines, while with the exporter-receiver pair this is not necessary. Maybe we could have the best of both worlds by implementing connectors as a kind of "syntactic sugar" for the configuration and generating exporter-receiver pairs underneath? Another question that comes to mind (and this might be better asked in your proposal's discussion, but anyway) is how would the definition of a translating connector look like in the configuration, especially with regards to indicating its input and output signal type?
What do you mean about exploring further? |
Basically because the design is easy to understand and is already in place, works for most use cases, and it appears to be sufficient to build upon for other use cases (via connectors or other similar mechanism). I agree that the routing processor is not really conformant to the expected data flow. I believe I've heard this sentiment from others as well, and there is a desire to reconcile this eventually.
Isn't the need for topological sorting just a consequence of the decision to disallow cycles? Technically cycles are always possible via other means (eg. export back to same pipeline via otlp exporer-receiver pair), but I would think we should prevent them where possible.
I believe this could be implicit: receivers:
filelog:
exporters:
otlp:
connectors: # must be used as both exporter and receiver
guesswhichkind:
service:
pipelines:
logs:
receivers: [filelog]
exporters: [guesswhichkind]
metrics:
receivers: [guesswhichkind]
exporters: [otlp]
I need to give this some more thought, but perhaps it's possible to implement a routing mechanism within a receiver. Currently it's possible to use a single receiver instance in multiple pipelines. Rather than requiring each pipeline to apply a filter, perhaps the receiver can selectively emit to its pipelines. I don't believe the current interface allows this, but it seems possible. In theory this could mitigate some work (eg. a receiver that emits to only 1 of 100 pipelines would not need to duplicate the signal and would prevent 99 pipelines from having to filter out the signal). |
I would love to see something like this in the collector. My current approach is creating otlp receiver/exporters to connect the pipelines, it's not the most elegant solution, but it works... |
I could only catch up with this proposal now, and I like it. The current state of the routing processor is MVP-like, the minimal solution that works. What you outlined in the backup proposal is mainly what I had in mind during the development: "... communicate in-process thus using the absolute minimum of extra resources". I don't think I had a new receiver in mind for the in-process, but an evolution of the current lookup. In any case, keep me posted on the evolution of this! |
I like your proposal! :) |
Is anyone likely to work on this in the near future? I constantly feel like my observability pipeline is waiting for either the otel collector to implement this, or for vector to properly support otel traces. |
@kamalmarhubi AFAIK, @djaglowski is working on implementing his proposal which is a superset of ours, supporting even more use-cases. |
As per @siliconbrain's last comment, closing this issue as addressed in an alternative proposal. |
This document describes the notion of pipeline branching in the context of the OpenTelemetry collector, tries to argue and demonstrate its usefulness in different use cases supported by real world examples.
What is pipeline branching?
In the context of the OpenTelemetry Collector, a pipeline is "a path the data follows in the Collector starting from reception, then further processing or modification and finally exiting the Collector via exporters". [source]
A pipeline may have multiple receivers sending data to its first processor, and multiple exporters receiving data from its last processor, but there can be no forks between the first and last processor.
In other words, the processors of a pipeline constitute a linear (directed) graph.
This linear structure does not fit well cases when the incoming data needs to be processed in different ways (and exported with different exporters) based on its features (attributes, timestamps, content, name, etc.).
To make handling these cases easier, we want to introduce pipeline branching: allow for branches —like forks in a road— in the processing part of a pipeline.
More precisely, we want to allow the outdegree of any processor to be greater than 1, or atleast enable creating an equivalent structure in some other manner.
See our motivating examples if this explanation seems too dry or you want to see what problems can branching solve.
Current options
Processors currently available in the OpenTelemetry Collector (including the contributions) can only handle a limited set of these cases and/or —as we'll see later— introduce unnecessary overhead.
Conditional processing of data in a pipeline can be implemented by using a
filter
processor.The
filter
processor can be configured to include or exclude data from the remainder of the pipeline based on a few features of metrics or logs (it does not support traces at the moment).This is useful as a tool for guarding part of the pipeline from unintended data but requires duplicating parts of the pipeline before the
filter
processor for handling complementary cases (i.e. cases not matching the filter's condition).This duplication introduces unwanted redundancy in the Collector's configuration that hinders modification.
Also, defining the complementary pipeline(s) is a non trivial problem that becomes more complex when multiple branches are introduced, or even more complex when using multiple levels of filters.
Another processor related to conditional forwarding of data is the
routing
processor.This processor can selectively route data to different exporters using a configurable routing table indexed with an attribute of the data.
It's a useful but limited tool. One of its limitations is that indexing requires an exact match of the table key and the attribute value, however, this can be worked around by utilizing one or more processors previously in the pipeline to create a synthetic attribute just for routing —but then this attribute will be exported with your data which you might not want.
Another, more limiting restriction is that it can only route directly to exporters, thus no further processing can occur after routing.
Our proposal
The
opentelemetry-log-collection
project, originally known as Stanza, has been contributed by observIQ to "accelerate development of the OpenTelemetry Collector's log collection capabilities".At its core, this project has a similar architecture to the Collector's: it has a notion of pipelines which consist of input, output and transformer operators connected as a directed acyclic graph.
The main difference lies in how these connections are configured: in a Stanza pipeline, operators connect to each other using IDs.
This enables every operator to reference —and thus send data to— any other operator in the pipeline.
This is exploited by the project's
router
operator which serves a similar purpose to the Collector's previously mentionedrouting
processor, but has a more advanced solution for determining whether some data matches a specific route employing the Expr library.We propose to create a new routing processor —or maybe modify the existing one— that has similar capabilities to this
router
operator: it has advanced data matching capabilities and can forward data to one or many of its configured outputs which can either be other processors or exporters.Unfortunately, this latter feature requires some modifications of the core OpenTelemetry Collector code.
In the current architecture, there is no way for a processor access a collection of available processors to send data to.
This could be solved by extending the
Host
interface with aGetProcessors
method similar to theGetExporters
method that would return available processors.Example code for solution
Example config for solution
Our backup proposal
This Stanza-esque router, while providing significant flexibility, requires changes to the core API and design of the OTel Collector which might not be something the community wants.
For this reason, we also present a more conservative solution that only exploits already available features of the Collector.
The
routing
processor's last mentioned missing feature —that further processing of data cannot occur after routing— could be overcome by exporting the data in such a way that it's picked up by another pipeline that continues its processing.With the currently available array of exporters and receivers the simplest way this could be done is by exporting the data using the
otlp
exporter to theotlp
receiver of another pipeline in the same collector, using ports on the same machine, but this solution wastes resources unnecessarily.We propose to create a new pair of exporter and receiver that communicate in-process thus using the absolute minimum of extra resources.
In-process receivers are configured to look up their exporter pairs by component ID taking advantage of the same mechanism used by the
routing
processor.They register themselves as targets with these exporters and the exporters forward any incoming data to their registered targets.
Pros:
Cons:
Example code for solution
Example config for solution
Detecting cycles at runtime
Both of our proposed solutions suffer from the problem of enabling the creation of cyclic flows of data that when unchecked can lead to infinite loops.
With the current architecture, this problem cannot be detected at startup time, but can be handled at runtime.
We can mark data that has been processed by one of our components —the router processor or the in-process exporter— using the context.
The key of the mark identifies the component —or checkpoint— the data is passing through, and the value is the number of times the data has passed through the checkpoint.
Components can detect cycles by checking this special value in the context and handle cases according to their configuration.
Example code
Example config
Alternative ideas
An alternative solution for applying only some processors of a pipeline to a specific piece of data would be to allow specifying a condition for every processor which would determine whether that processor should be applied to a specific piece of data or should it just be passed on to the next stage unmodified.
The same idea is used for example in GitHub Actions.
We did not explore this further because with our use-cases it suffers from some of the same problems mentioned for the
filter
processor.If you have any additional ideas that you feel would support our use-cases while fitting the OpenTelemetry Collector better, please, let us know!
Motivating examples
Here are some examples that articulate why pipeline branching is a useful and sometimes unavoidable tool that should be part of the Collector.
If you recognize another use case that would be enabled by or benefit from pipeline branching, please reach out to us so that we can add it to this collection — especially if it's related to metrics or tracing.
Better support for logs
Logs are the newest addition to the types of data OpenTelemetry can handle and thus they are not as well supported as traces or metrics.
Code donated by the Stanza project in opentelemetry-log-collection gave a huge boost to the number of receivers available for ingesting logs, but log forwarding and more importantly, log processing still has a long way to go.
Strong processing capabilities are especially important with logs since they are highly heterogeneous with varying levels of structure.
While newer applications seem to have embraced the move toward structured logging, most older applications still emit logs that can only be ingested as a blob of plain text data with almost no structured metadata attached.
That blob of text almost always contains a plethora of useful information waiting to be extracted, however, industry standards for log line syntax are aplenty and still many developers just roll their own syntax.
This means that advanced processing capabilities might be needed to process and transform logs into one standard format.
This processing might be handled by a third-party service, but the OpenTelemetry Collector already sits in the perfect place(s) and has the necessary infrastructure for processing logs, it only lacks some tools.
If you take a look at the features available for transforming and parsing logs in the Stanza codebase mentioned previously, you can see that there are many.
The problem is that these cannot be mixed with the OpenTelemetry Collector's processors, since Stanza's operators are only available inside the receiver components that are implemented by them.
The main barrier for reimplementing Stanza's operators as OpenTelemetry processors seems to be that OTel's pipeline architecture is missing some features exploited by these operators.
Reimplementing the Logging Operator using OTel Collector
Lately, we've been looking into replacing FluentBit and Fluentd with the OpenTelemetry Collector inside Logging Operator.
To achieve this, we need to examine how features of Fluent(Bit|d) translate to features of the OTel Collector.
One of the main features of the Logging Operator is that it can route logs based on labels and each route can have its own transformation pipeline.
When we tried to replicate the logic behind this feature in the OTel Collector with the available tools, we ran into an obstacle.
Take a look at the following picture. The main difference is how the different log flows are treated by the tools.
Logging Operator:
OTel Collector:
As you can see, Logging Operator routes logs after augmenting them with some additional metadata retrieved from the Kubernetes cluster —much like the
k8sattributes
processor does— and then applies some user defined transformations.With the OTel Collector however, routing can only happen just before the end of the pipeline, thus allowing no more transformation to take place after routing.
If you take a closer look at the code from the Stanza project, you can see that they also have a routing facility implemented which even has more advanced capabilities for matching records and can route them to any operator in the Stanza pipeline.
However, this cannot be used as a routing solution in our case, since it can only run as part of a receiver component but Kubernetes metadata addition, which needs to happen before routing, can only be done earliest as the first processor in an OTel Collector pipeline which runs only after the receiver.
As you can see, its a case of the chicken and egg problem.
The text was updated successfully, but these errors were encountered: