Skip to content

Getting Started with KafkaHost

This framework helps you to easily build Kafka consumer applications. It is built on the Akka.Streams.Kafka (https://github.com/akkadotnet/Akka.Streams.Kafka) library which presents the following features:

  • There is no constant Kafka topics pooling: messages are consumed on demand, and with back-pressure support
  • There is no internal buffering: consumed messages are passed to the downstream in realtime, and producer stages publish messages to Kafka as soon as get them from upstream
  • Each stage can make use of it's own IConsumer or IProducer instance, or can share them (can be used for optimization)
  • All Kafka failures can be handled with usual stream error handling strategies

Install-Package Hubtel.Kafka.Host

The provided code snippet is a command that is used to install Hubtel.Kafka.Host* package in a .NET project.

C#
    Install-Package Hubtel.Kafka.Host

Setting up appsettings.json

The provided JSON snippet is a configuration block for a .NET application that uses the Hubtel.Kafka.Host package. It contains three sections: KafkaConsumerConfig, KafkaProducerConfig, and ApplicationInsights.

  • KafkaConsumerConfig: This section configures the Kafka consumer.

    • BootstrapServers: This property specifies the address of the Kafka server(s). In this case, it's set to a local Kafka server running on port 9092.

    • GroupId: This property specifies the consumer group ID. Here, it's set to "myGroup".

    • Topics: This property specifies the topics that the consumer is interested in. Here, it's set to an array containing a single topic, "topic1".

    • ExtraProperties: This property specifies additional configuration options. Here, it includes auto.offset.reset, which determines what the consumer should do when there is no initial offset in Kafka or if the current offset no longer exists. It's set to "earliest", which means the consumer starts reading from the start of the topic.

  • KafkaProducerConfig: This section configures the Kafka producer.

    • BootstrapServers: Like in the consumer configuration, this property specifies the address of the Kafka server(s). It's also set to a local Kafka server running on port 9092.
  • ApplicationInsights: This section configures Application Insights, a service that helps developers monitor their application's performance and usage.

    • InstrumentationKey: This property specifies the unique identifier for the Application Insights resource. Here, it's set to "ttt".
  • Ensure that setting on "Copy To Output Directory" on your appsettings.json is "Copy Always"

  • Place these entries in appsettings.json Also ensure you have your instrumentation key in there.

json

"KafkaConsumerConfig": {
    "BootstrapServers": "localhost:9092",
    "GroupId": "myGroup",
    "Topics":["topic1"]
    "ExtraProperties": {
      "auto.offset.reset": "earliest" //or latest?
    }
  },
  "KafkaProducerConfig": {
    "BootstrapServers": "localhost:9092"    
  },
  "ApplicationInsights": {
   "InstrumentationKey": "ttt"
 },

Building a consumer

CreateKafkaBuilder(args).Build().Run();

  • This line creates, builds, and runs the application's host. It's calling the CreateKafkaBuilder method, which returns an IHostBuilder object, and then calling the Build and Run methods on that object.

public static IHostBuilder CreateKafkaBuilder(string[] args) => ...

  • This line defines the CreateKafkaBuilder method, which creates and configures an IHostBuilder object. The IHostBuilder interface is used to configure and build an IHost, which hosts the application's resources.

KafkaHost.CreateDefaultBuilder(args)

  • This line creates a default IHostBuilder using the CreateDefaultBuilder method provided by the KafkaHost class from the Hubtel.Kafka.Host package.

.ConfigureServices((hostContext, services) => {...})

  • This line configures the application's services. It's using a lambda expression to add and configure services.

  • services.Configure<KafkaConsumerConfig>(hostContext.Configuration.GetSection(nameof(KafkaConsumerConfig))); and services.Configure<KafkaProducerConfig>(hostContext.Configuration.GetSection(nameof(KafkaProducerConfig))); configure the Kafka consumer and producer using settings from the application's configuration.

  • services.AddApplicationInsightsTelemetryWorkerService(hostContext.Configuration["ApplicationInsights:InstrumentationKey"]) and services.AddApplicationInsightsTelemtryHubtel(hostContext.Configuration["ApplicationInsights:InstrumentationKey"]) add Application Insights services to the application. Application Insights is a service that helps developers monitor their application's performance and usage.

  • To build a simple consumer, create an empty .NET Core (preferrably 3.1 or higher) Console Application and include the framework library. Setup your Program.cs as follows:

c#
 public static void Main(string[] args)
        {
                 CreateKafkaBuilder(args).Build().Run();

        }

         public static IHostBuilder CreateKafkaBuilder(string[] args) =>
            KafkaHost.CreateDefaultBuilder(args)
                .ConfigureServices((hostContext, services) =>
                {

                    //mandatory for now :)
                    services.Configure<KafkaConsumerConfig>(hostContext.Configuration.GetSection(nameof(KafkaConsumerConfig)));
                    services.Configure<KafkaProducerConfig>(hostContext.Configuration.GetSection(nameof(KafkaProducerConfig)));
                    
                    
                    //Application Insights is mandatory by default
                    services.AddApplicationInsightsTelemetryWorkerService(hostContext.Configuration["ApplicationInsights:InstrumentationKey"]);
                    services.AddApplicationInsightsTelemtryHubtel(hostContext.Configuration["ApplicationInsights:InstrumentationKey"]);

                    //rest of service registry



                })
        ;
  • Create your consumer as a normal public class as shown below, ensuring the suffix "Consumer" as you would create a Controller in MVC:
c#
    
      public class SampleConsumer : KafkaConsumerBase
    {
        private readonly ILogger<MyConsumer> logger;
      

        public MyConsumer(ILogger<MyConsumer> logger)
        {
            this.logger = logger;           
        }

        
        //option 1
        //KafkaHost will handover an instance of the ConsumeResult<Null, string> whenever there is data that matches your given list of topics
        
        [ConsumeTopic("hubtel.sales.service_rendered,another_topic,yet_another_topic")]
        public async Task HandleServiceRendered(ConsumeResult<Null, string> message)
        {
            logger.LogInformation($"received from Thread: {Thread.CurrentThread.ManagedThreadId} =>{message.Message.Value}");
            await Task.Delay(0);            
        }

        //option 2
        //KafkaHost gives you the raw Kafka payload
       /* 
        [ConsumeTopic("hubtel.sales.service_rendered,another_topic,yet_another_topic")]
        public async Task HandleServiceRendered(string rawPayload)
        {
            logger.LogInformation($"received from Thread: {Thread.CurrentThread.ManagedThreadId} =>{rawPayload}");
                        
        }
        */

         //option 3
        // FromType should always be an IOptions<T>
        //KafkaHost will deserialize and inject an instance of your specified type (in this case, "ServiceRenderedKafkaMessage") into your method
        //this time, we're informing KafkaHost to read messages from a field "TopicsAsSingleString" in an options-bounded type of "KafkaConsumerConfig" 
       /* 
        [ConsumeTopic(FromType = typeof(IOptions<KafkaConsumerConfig>),PropertyName = "TopicsAsSingleString")] 
        public async Task HandleServiceRendered(ServiceRenderedKafkaMessage rawPayload)
        {
            logger.LogInformation($"received from Thread: {Thread.CurrentThread.ManagedThreadId} =>{rawPayload}");
                        
        }
        */

        /* option 4
         parameterless methods are now allowed
         we've introduced concept of "Context"
        [ConsumeTopic(FromType = typeof(IOptions<KafkaConsumerConfig>), PropertyName = "TopicsAsSingleString")]
        public async Task HandleServiceRendered()
        {
            foreach (var item in Context.MessageBag)//MessageBag is a List of ConsumeResult<Null, string> objects
            {
                logger.LogDebug($"received {item.Message.Value}");
            }

            await Task.Delay(0);
        }
*/

        public override Task ConsumingStopped()
        {
            //you may want to do some clean up here before Ctrl+C is fully invoked
            logger.LogWarning("consuming stopped");
            return Task.CompletedTask;
        }

         
    }

Producing to Kafka

services.AddKafkaProducerAgent("bootstrap server URLs here")

  • This line is adding a Kafka producer agent to the application's services. This is a method provided by the Hubtel.Kafka.Host package that sets up a Kafka producer agent, which is an object that can send messages to a Kafka server.

  • The AddKafkaProducerAgent method takes a string argument, which is the URL of the Kafka server(s) that the producer agent should send messages to. In this case, it's a placeholder string "bootstrap server URLs here".

  • The comment at the end of the line suggests that you can retrieve the Kafka server URLs from the application's configuration using Configuration["KafkaProducerConfig:BootstrapServers"]. This would replace the placeholder string in the method call.

  • To produce to Kafka, inject "IKafkaProducerService" into your class and invoke the methods on it. You ought to register a producer instance as follows:

c#

services.AddKafkaProducerAgent("bootstrap server URLs here"); //you can pickup URLs from Configuration["KafkaProducerConfig:BootstrapServers"]

Was this page helpful?

Happy React is loading...