Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Service Bus Messages Have No Context #2 #903

Open
bivas6 opened this issue Feb 6, 2022 · 13 comments
Open

Service Bus Messages Have No Context #2 #903

bivas6 opened this issue Feb 6, 2022 · 13 comments

Comments

@bivas6
Copy link

bivas6 commented Feb 6, 2022

Hi,

We are using AI to track our system. We have service that receive some info in REST, and post it on Azure Service Bus,
and other services consume this message.
The send/receive of the same message have different operation ID, and they looks like they have different context.

In addition, the received message have different context than other http/s requests and logs on the same flow. Every trace/dependency on the 'receive' function seems to be without any parent.

Simple app to reproduce:

sender.js:

const appInsights = require('applicationinsights');

appInsights.setup()
    .setAutoCollectConsole(true, true)
    .setSendLiveMetrics(true)
    .start();

const express = require('express');
const axios = require('axios');
const {ServiceBusClient} = require('@azure/service-bus');
const WebSocket = require('ws');
const connectionString = "*****";
const sender = new ServiceBusClient(connectionString, {webSocketOptions: { webSocket: WebSocket }}).createSender('test');
const app = express();
app.post('/sendMessage', async (_req, res) => {
    console.log('log');
    await axios.default.get('https://google.com');
    console.log('log 2');
    await sender.sendMessages({subject: 'subject', body: {key:'value'}});
    res.sendStatus(200);
});

app.listen(3080, '0.0.0.0', () => {console.log('started')});

receiver.js:

const appInsights = require('applicationinsights');

appInsights.setup().setAutoCollectConsole(true, true).setSendLiveMetrics(true).start();

const axios = require('axios');
const { ServiceBusClient } = require('@azure/service-bus');
const WebSocket = require('ws');

const connectionString = '*****';
const receiver = new ServiceBusClient(connectionString, { webSocketOptions: { webSocket: WebSocket } }).createReceiver(
  'test',
  'test-subscription',
);

const myMessageHandler = async (message) => {
  console.log(`message.body: ${JSON.stringify(message.body)}`);
  await axios.default.get('https://google.com');
  console.log('log 3');
};
const myErrorHandler = async (args) => {
  console.log(`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `, args.error);
};
receiver.subscribe({
  processMessage: myMessageHandler,
  processError: myErrorHandler,
});

This is the current result on Azure portal:
Sender request:

image

Receiver side:

image

Thanks!
cc: @orgads

@orgads
Copy link
Contributor

orgads commented Feb 9, 2022

@hectorhdzg Did you have a chance to look into this?

@hectorhdzg
Copy link
Member

@bivas6, @orgads, Application Insights SDK will try to use the correlation context using incoming HTTP request headers, in this case receiver doesn't have any trace headers, is making a new outgoing HTTP request to Service Bus so a new context will be created, I guess you could add some correlation properties in the actual Service Bus message to be able to associate the telemetry but not much we can do here, regarding other issue where receiver telemetry is not in same context, automatically we will wrap all the telemetry in same context when there is a incoming HTTP request, in this case you don't have that so you will need to create your context and wrap all the code you want to share it, similar to what did in this PR you can also take a look at the code we share to enable this in Azure Functions

@bivas6
Copy link
Author

bivas6 commented Feb 21, 2022

For those who needs it, this is the solution we used:

const messageHandlerWrapper = (handler) => {
  return (message) => {
    const traceParent = message.applicationProperties;
    const diagnosticId = traceParent['Diagnostic-Id'];
    const [_, traceId, spanId, _a] = diagnosticId.split('-');
    const context = CorrelationContextManager.CorrelationContextManager.generateContextObject(
      traceId,
      spanId,
      'ServiceBus.process',
    );
    const fn = appInsights.wrapWithCorrelationContext(() => handler(message), context);
    await fn();
  };
};

const myMessageHandler = async (message) => {
  console.log(`message.body: ${JSON.stringify(message.body)}`);
  await axios.default.get('https://google.com');
  console.log('log 3');
};

const myErrorHandler = async (args) => {
  console.log(`Error occurred with ${args.entityPath} within ${args.fullyQualifiedNamespace}: `, args.error);
};

receiver.subscribe({
  processMessage: messageHandlerWrapper(myMessageHandler),
  processError: messageHandlerWrapper(myErrorHandler),
});

@orgads
Copy link
Contributor

orgads commented Feb 21, 2022

@hectorhdzg Is it possible for you to hook in subscribe automatically? We didn't add any data to the message, and just used applicationProperties[Diagnostic-Id].

@orgads
Copy link
Contributor

orgads commented Mar 1, 2022

@hectorhdzg ping?

@hectorhdzg
Copy link
Member

@orgads the way we hook to Azure SDKs is through @azure/core-tracing package, we do not do any patching for specific SDKs, looks like "subscribe" functionality is only present in Azure Service Bus, we are planning to introduce full support for OpenTelemetry in this SDK that should handle this scenario because Azure SDKs are also using it, we are expecting to have a beta release soon.

@orgads
Copy link
Contributor

orgads commented Mar 1, 2022

Ok, thanks a lot.

@tudor33sud
Copy link

tudor33sud commented Jun 27, 2022

Hello! I've been having the same issue as specified by @bivas6 .

The main workflow I was trying to fulfill using function apps was the following:

  1. have a sender component which will defer async work to some worker component through service bus
  2. the worker should receive service bus message, do certain calls to different microservices and fulfill the message
  3. I would like all the subcalls and logs to be in the same operation Id with minimal fuss

Option 1(didn't work):

I sent the sender traceparent inside the service bus message and generated a correlation context by using application insights exported classes like this

const traceParent = new Traceparent(serviceBusMessage.traceParent);
const correlationContext = CorrelationContextManager.generateContextObject(traceParent.traceId, traceParent.spanId, operationName);
return appInsights.wrapWithCorrelationContext(()=>{}, correlationContext)

I noted that the correlation context was properly created with this method by reusing the traceparent from the sender function, however I think there is some bug by using these classes because the end result was that a totally new context was generated inside appInsights.wrapWithCorrelationContext and the passed argument wasn't used properly. Subcalls from the receiver were also not being correlated at all and they generated new operation ids as well. I assume there might be some internals that I'm not aware off and this option won't work without some other setup which might be done inside appInsights.startOperation method.

Option 2(which works):
I also sent the sender traceparent inside the service bus message to be reused by message consumers. Then, I am taking advantage of the mutable context of the function app and do the following:

export function contextPropagatingServiceBusTrigger(serviceBusTrigger: AzureFunction) {
    return async (context: AuthenticatedContext, ...args: any[]) => {
        const operationName = `ServiceBusReceive ${context.executionContext.functionName}`;
        const serviceBusMessage = args[0];
        //use message traceparent, or fallback to default app insights behaviour to use a freshly generated one
        context.traceContext.traceparent = serviceBusMessage?.traceParent ?? context.traceContext.traceparent;
        const correlationContext = appInsights.startOperation(context, context.executionContext.functionName);

        // Wrap the Function runtime with correlationContext
        return appInsights.wrapWithCorrelationContext(async () => {
            const startTime = Date.now(); // Start trackRequest timer
            try {
                // execute the function
                const result = await serviceBusTrigger(context, ...args);
                appInsights.defaultClient.trackRequest({
                    name: operationName,
                    resultCode: 200,
                    success: true,
                    url: context.executionContext.functionName,
                    duration: Date.now() - startTime,
                    id: correlationContext.operation.parentId,
                });
                appInsights.defaultClient.flush();
                return result;
            } catch (err) {
                // we might not need this try/catch, but it's good to have certain syntax errors which will be caught as well
                appInsights.defaultClient.trackException({
                    exception: err,
                });
                appInsights.defaultClient.flush();
            }

        }, correlationContext)();
    };
}

With this option, I am getting the same operationId correctly across the process initiator(sender), receiver + dependency calls.

My question now is if this approach is acceptable, since I wasn't able to find another way of accomplishing the correlation. I was inspired by insights docs for http triggers to reach this way of correlating services communication. Thanks a lot in advance, and I'm looking forward to hearing from you!

@tudor33sud
Copy link

@hectorhdzg If you get a chance, I would be greatful to hear about this issue from you.

@johnib
Copy link

johnib commented Jun 30, 2022

Check this out:

#973

@hectorhdzg
Copy link
Member

@tudor33sud both options look fine to me, I can look into the issue you are having with option 1, that should work too, can you provide some test traceParent that you are passing in service bus message?

@tudor33sud
Copy link

@hectorhdzg I am currently running our workloads exclusively on FunctionApps so it's pretty simple to pass a traceparent.

Assuming a typical AzureFunction , we do have the Context object which is accessible via context.traceContext.traceparent , so whatever the Function App will generate for that particular handler, we will just make sure to send it in the service bus payload like : {...customProperties, traceparent: context.traceContext.traceparent}.

For the purpose of testing, you could use any valid traceparent and you should see that in option 1 that it's not being used. I suspect startOperation does something more than just generating the context somehow, or there might be some cls context mapping situation.

@haodeon
Copy link

haodeon commented Mar 3, 2023

I've been looking at getting correlation working correctly for service bus triggers in our environment.

It appears using the startOperation string overload is the intended way to support non-http trigger functions.

I found this information from previous issues and pull requests: #665 #672 #681 #682

Hope this helps anyone else digging for the same info.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants