Getting started with OpenTelemetry and distributed tracing in Kafka Consumers
OpenTelemetry is a collection of APIs, SDKs, tools, and integrations designed for the creation and management of telemetry data, such as traces, metrics, and logs. Lets get familiar with a few otel concepts:
Span and Activities
A span is the building block that forms a trace, it has a unique identifier and represents a piece of the workflow in the distributed system. Multiple spans are pieced together to create a trace. Traces are often viewed as a “tree” of spans that reflects the time that each span started and completed.
In .NET a span is represented by an Activity.
The OpenTelemetry client for dotnet is reusing the existing Activity and associated classes to represent the OpenTelemetry Span. This means that users can instrument their applications/libraries to emit OpenTelemetry compatible traces by using just the .NET Runtime.
To create a Span in .NET we must first create a new activity:
private static readonly ActivitySource Activity = new(nameof(RabbitRepository));
And then call “StartActivity” to begin recording, everything that happens inside the using block will be recorded into that Span.
using (var activity = Activity.StartActivity("Process Message", ActivityKind.Consumer, parentContext.ActivityContext)){}
Propagators
A propagator allows us to extract and inject context across process boundaries.
This is typically required if you are not using any of the .NET communication libraries which has instrumentations already available which does the propagation (eg: HttpClient). In such cases, context extraction and propagation is the responsibility of the library itself.
To create a propagator in .NET we must first create a TextMapPropagator
private static readonly TextMapPropagator Propagator = new TraceContextPropagator();
Then use the Inject and Extract methods for inter-process trace propagation.
The “Inject” method injects the Activity into a carrier. For example, into the headers of an HTTP request.
Propagator.Inject(new PropagationContext(activity.Context, Baggage.Current), props, InjectContextIntoHeader);
The 'Hubtel.Producer.Sdk' already handles the injection for us so do not need to worry about the inject method.
And the “Extract” method extracts the value from an incoming request. For example, from the headers of an HTTP request.
If a value can not be parsed from the carrier, for a cross-cutting concern, the implementation should not throw an exception and should not store a new value in the Context, in order to preserve any previously existing valid value.
var parentContext = Propagator.Extract(default, ea.BasicProperties, ExtractTraceContextFromBasicProperties);
Attributes
Attributes are key:value pairs that provide additional information to a trace.
In .NET those are called Tags. We can add an attribute into an Activity like this:
activity?.SetTag("messaging.system", "kafka");
activity?.SetTag("messaging.destination_kind", "topic");
activity?.SetTag("messaging.kafka.queue", "sample");
Adding instrumentation to your kafka consumer:
1. Install Required Packages
dotnet add package Hubtel.Otel.Instrumentation
2. Add to IServiceCollection in Program.cs or Startup.cs
services.AddHubtelOpenTelemetry(config);
3. Add to appsettings.json
"OpenTelemetryConfig": {
"ServiceName": "Name of Project or Service",
"Host": "localhost",
"Port": 4317,
"Protocol": "http",
"EnableConsoleExporter": true
}
Add this method to the implementing class of 'KafkaConsumerBase'
private IEnumerable<string> ExtractTraceContextFromKafka(KafkaContext context, string key)
{
try
{
var headers = context.MessageBag.FirstOrDefault()!.Message.Headers;
var headerValue = headers.FirstOrDefault(x => x.Key == key);
if (headerValue != null)
{
return new[] { Encoding.UTF8.GetString(headerValue.GetValueBytes()) };
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error occurred while extracting trace context from kafka");
return Enumerable.Empty<string>();
}
return Enumerable.Empty<string>();
}
In your 'HandleMessage' you extract the trace information from the header like this:
[ConsumeTopic(FromType = typeof(IOptions<KafkaConsumerConfig>),
PropertyName = nameof(KafkaConsumerConfig.TopicsAsSingleString))]
public async Task HandleMessage(List<OrganizerPortalConsumerPayload> messages)
{
var parentContext = Propagator.Extract(default, Context, ExtractTraceContextFromKafka);
foreach (var message in messages)
{
using var activity = TelemetryConstants.EventsActivitySource.StartActivity($"{nameof(OrganizerApiConsumer)}.{nameof(HandleMessage)}", ActivityKind.Internal, parentContext.ActivityContext);
try
{
activity?.AddTag("current.id", message.Id);
using var scope = _serviceProvider.CreateScope();
var service = scope.ServiceProvider.GetRequiredService<ICacheService>();
await service.AddItemToCache(message.Payload, message.Key, message.Id);
}
catch (Exception ex)
{
activity?.RecordException(ex);
activity?.SetStatus(Status.Error);
_logger.LogError(ex, "Error occurred while processing organizer with ID: {OrganizerId}", message.Id);
}
}
await Task.CompletedTask;
}
The above code does the following
- Extract trace context from the incoming Kafka message(s).
- Start a telemetry activity for processing each message.
- Adds a tag (current.id) to the activity, identifying which message (message.Id) is being processed.
- If an error occurs, log it and mark the telemetry activity as failed.
CHAT SAMMIAT