Getting Started with KafkaHost
This framework helps you to easily build Kafka consumer applications. It is built on the Akka.Streams.Kafka (https://github.com/akkadotnet/Akka.Streams.Kafka) library which presents the following features:
- There is no constant Kafka topics pooling: messages are consumed on demand, and with back-pressure support
- There is no internal buffering: consumed messages are passed to the downstream in realtime, and producer stages publish messages to Kafka as soon as get them from upstream
- Each stage can make use of it's own IConsumer or IProducer instance, or can share them (can be used for optimization)
- All Kafka failures can be handled with usual stream error handling strategies
Install-Package Hubtel.Kafka.Host
The provided code snippet is a command that is used to install Hubtel.Kafka.Host
* package in a .NET project.
Install-Package Hubtel.Kafka.Host
Setting up appsettings.json
The provided JSON snippet is a configuration block for a .NET application that uses the Hubtel.Kafka.Host
package. It contains three sections: KafkaConsumerConfig
, KafkaProducerConfig
, and ApplicationInsights
.
KafkaConsumerConfig
: This section configures the Kafka consumer.BootstrapServers
: This property specifies the address of the Kafka server(s). In this case, it's set to a local Kafka server running on port 9092.GroupId
: This property specifies the consumer group ID. Here, it's set to "myGroup".Topics
: This property specifies the topics that the consumer is interested in. Here, it's set to an array containing a single topic, "topic1".ExtraProperties
: This property specifies additional configuration options. Here, it includesauto.offset.reset
, which determines what the consumer should do when there is no initial offset in Kafka or if the current offset no longer exists. It's set to "earliest", which means the consumer starts reading from the start of the topic.
KafkaProducerConfig
: This section configures the Kafka producer.BootstrapServers
: Like in the consumer configuration, this property specifies the address of the Kafka server(s). It's also set to a local Kafka server running on port 9092.
ApplicationInsights
: This section configures Application Insights, a service that helps developers monitor their application's performance and usage.InstrumentationKey
: This property specifies the unique identifier for the Application Insights resource. Here, it's set to "ttt".
Ensure that setting on "Copy To Output Directory" on your appsettings.json is "Copy Always"
Place these entries in appsettings.json Also ensure you have your instrumentation key in there.
"KafkaConsumerConfig": {
"BootstrapServers": "localhost:9092",
"GroupId": "myGroup",
"Topics":["topic1"]
"ExtraProperties": {
"auto.offset.reset": "earliest" //or latest?
}
},
"KafkaProducerConfig": {
"BootstrapServers": "localhost:9092"
},
"ApplicationInsights": {
"InstrumentationKey": "ttt"
},
Building a consumer
CreateKafkaBuilder(args).Build().Run();
- This line creates, builds, and runs the application's host. It's calling the
CreateKafkaBuilder
method, which returns anIHostBuilder
object, and then calling theBuild
andRun
methods on that object.
public static IHostBuilder CreateKafkaBuilder(string[] args) => ...
- This line defines the
CreateKafkaBuilder
method, which creates and configures anIHostBuilder
object. TheIHostBuilder
interface is used to configure and build anIHost
, which hosts the application's resources.
KafkaHost.CreateDefaultBuilder(args)
- This line creates a default
IHostBuilder
using theCreateDefaultBuilder
method provided by theKafkaHost
class from theHubtel.Kafka.Host package
.
.ConfigureServices((hostContext, services) => {...})
This line configures the application's services. It's using a lambda expression to add and configure services.
services.Configure<KafkaConsumerConfig>(hostContext.Configuration.GetSection(nameof(KafkaConsumerConfig))); and services.Configure<KafkaProducerConfig>(hostContext.Configuration.GetSection(nameof(KafkaProducerConfig)));
configure the Kafka consumer and producer using settings from the application's configuration.services.AddApplicationInsightsTelemetryWorkerService(hostContext.Configuration["ApplicationInsights:InstrumentationKey"])
andservices.AddApplicationInsightsTelemtryHubtel(hostContext.Configuration["ApplicationInsights:InstrumentationKey"])
add Application Insights services to the application. Application Insights is a service that helps developers monitor their application's performance and usage.To build a simple consumer, create an empty .NET Core (preferrably 3.1 or higher) Console Application and include the framework library. Setup your Program.cs as follows:
public static void Main(string[] args)
{
CreateKafkaBuilder(args).Build().Run();
}
public static IHostBuilder CreateKafkaBuilder(string[] args) =>
KafkaHost.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
//mandatory for now :)
services.Configure<KafkaConsumerConfig>(hostContext.Configuration.GetSection(nameof(KafkaConsumerConfig)));
services.Configure<KafkaProducerConfig>(hostContext.Configuration.GetSection(nameof(KafkaProducerConfig)));
//Application Insights is mandatory by default
services.AddApplicationInsightsTelemetryWorkerService(hostContext.Configuration["ApplicationInsights:InstrumentationKey"]);
services.AddApplicationInsightsTelemtryHubtel(hostContext.Configuration["ApplicationInsights:InstrumentationKey"]);
//rest of service registry
})
;
- Create your consumer as a normal public class as shown below, ensuring the suffix "Consumer" as you would create a Controller in MVC:
public class SampleConsumer : KafkaConsumerBase
{
private readonly ILogger<MyConsumer> logger;
public MyConsumer(ILogger<MyConsumer> logger)
{
this.logger = logger;
}
//option 1
//KafkaHost will handover an instance of the ConsumeResult<Null, string> whenever there is data that matches your given list of topics
[ConsumeTopic("hubtel.sales.service_rendered,another_topic,yet_another_topic")]
public async Task HandleServiceRendered(ConsumeResult<Null, string> message)
{
logger.LogInformation($"received from Thread: {Thread.CurrentThread.ManagedThreadId} =>{message.Message.Value}");
await Task.Delay(0);
}
//option 2
//KafkaHost gives you the raw Kafka payload
/*
[ConsumeTopic("hubtel.sales.service_rendered,another_topic,yet_another_topic")]
public async Task HandleServiceRendered(string rawPayload)
{
logger.LogInformation($"received from Thread: {Thread.CurrentThread.ManagedThreadId} =>{rawPayload}");
}
*/
//option 3
// FromType should always be an IOptions<T>
//KafkaHost will deserialize and inject an instance of your specified type (in this case, "ServiceRenderedKafkaMessage") into your method
//this time, we're informing KafkaHost to read messages from a field "TopicsAsSingleString" in an options-bounded type of "KafkaConsumerConfig"
/*
[ConsumeTopic(FromType = typeof(IOptions<KafkaConsumerConfig>),PropertyName = "TopicsAsSingleString")]
public async Task HandleServiceRendered(ServiceRenderedKafkaMessage rawPayload)
{
logger.LogInformation($"received from Thread: {Thread.CurrentThread.ManagedThreadId} =>{rawPayload}");
}
*/
/* option 4
parameterless methods are now allowed
we've introduced concept of "Context"
[ConsumeTopic(FromType = typeof(IOptions<KafkaConsumerConfig>), PropertyName = "TopicsAsSingleString")]
public async Task HandleServiceRendered()
{
foreach (var item in Context.MessageBag)//MessageBag is a List of ConsumeResult<Null, string> objects
{
logger.LogDebug($"received {item.Message.Value}");
}
await Task.Delay(0);
}
*/
public override Task ConsumingStopped()
{
//you may want to do some clean up here before Ctrl+C is fully invoked
logger.LogWarning("consuming stopped");
return Task.CompletedTask;
}
}
Producing to Kafka
services.AddKafkaProducerAgent("bootstrap server URLs here")
This line is adding a Kafka producer agent to the application's services. This is a method provided by the
Hubtel.Kafka.Host
package that sets up a Kafka producer agent, which is an object that can send messages to a Kafka server.The
AddKafkaProducerAgent
method takes a string argument, which is the URL of the Kafka server(s) that the producer agent should send messages to. In this case, it's a placeholder string "bootstrap server URLs here".The comment at the end of the line suggests that you can retrieve the Kafka server URLs from the application's configuration using
Configuration["KafkaProducerConfig:BootstrapServers"]
. This would replace the placeholder string in the method call.To produce to Kafka, inject "IKafkaProducerService" into your class and invoke the methods on it. You ought to register a producer instance as follows:
services.AddKafkaProducerAgent("bootstrap server URLs here"); //you can pickup URLs from Configuration["KafkaProducerConfig:BootstrapServers"]
Was this helpful? 📚
CHAT SAMMIAT