Skip to content

Getting Started with Kafka Prducer

Kafka Producer is a key component in Apache Kafka that sends data to Kafka topics for storage and later consumption by Kafka Consumers.In essence,this sdk helps you to produce to kafka.

Installation

The command below is for installing the Hubtel.Kafka.Producer.Sdk package in a .NET project using the NuGet Package Manager Console.

c#
Install-Package Hubtel.Kafka.Producer.Sdk

Setup in appsettings.json

The "KafkaProducerConfig" block contains one property:

  • "Hosts": This property is an array of objects, each representing a Kafka server (also known as a Kafka broker) that the Kafka producer can connect to. Each object in the "Hosts" array has two properties:

  • "BootstrapServers": This property specifies the address of the Kafka server. In this case, both servers are running on localhost (the same machine as the application) on port 9092, which is the default port for Kafka.

  • "Alias": This property provides a human-readable alias for the Kafka server. This can be useful for identifying the server in logs or error messages. In this case, one server is identified as "ecommerce-kafka" and the other as "general-kafka".

  • Place these entries in appsettings.json

json
   "KafkaProducerConfig": {
    "Hosts": [
      {
        "BootstrapServers": "localhost:9092",
        "Alias": "ecommerce-kafka"
      },
      {
        "BootstrapServers": "localhost:9092",
        "Alias": "general-kafka"
      }
    ]
  }

Register the service in Startup.cs

services.AddHubtelKafkaProducerSdk(c => Configuration.GetSection(nameof(KafkaProducerConfig)).Bind(c))

  • This line is adding the Hubtel Kafka Producer SDK to the application's services. This is presumably a method provided by the Hubtel.Kafka.Producer.Sdk package that sets up any necessary services for the SDK to function, such as Kafka producers or other dependencies.

  • The argument to AddHubtelKafkaProducerSdk is a lambda expression that configures the SDK. It uses the Configuration object's GetSection method to retrieve the KafkaProducerConfig section from the application's configuration (usually found in appsettings.json). The nameof operator is used to get the string name of the KafkaProducerConfig section in a type-safe manner.

  • The Bind method is then called on the configuration section, binding the configuration values to the SDK's configuration object (c in the lambda expression). This means that the SDK will be configured according to the settings in the KafkaProducerConfig section of the application's configuration.

  • Do as shown below:

csharp
   services.AddHubtelKafkaProducerSdk(c => Configuration.GetSection(nameof(KafkaProducerConfig)).Bind(c));

Inject Service

private readonly IKafkaProducer _producer

  • This line declares a private, read-only field of type IKafkaProducer named _producer. This interface presumably provides methods for producing messages to Kafka.

public ProduceToKafkaService(IKafkaProducerFactory producer)

  • This is the constructor of the ProduceToKafkaService class. It takes an instance of IKafkaProducerFactory as a parameter. This factory is used to create instances of IKafkaProducer.

_producer = producerFatory.CreateKafkaProducer("general-kafka")

  • Inside the constructor, the CreateKafkaProducer method of the IKafkaProducerFactory instance is called with the string "general-kafka" as an argument. The result, an instance of IKafkaProducer, is assigned to the _producer field.

public async Task ProduceToKafka(object data)

  • This is a public asynchronous method named ProduceToKafka. It takes an object named data as a parameter. This method is presumably used to produce a message to a Kafka topic.

_ = await _producer.ProduceAsync("hubtel.producer.test", data)

  • Inside the ProduceToKafka method, the ProduceAsync method of the _producer object is called with two arguments: the string "hubtel.producer.test" and the data object. The ProduceAsync method is asynchronous, so it returns a task that represents the ongoing operation. The await keyword is used to pause the execution of the method until the task is complete. The result of the task is discarded with the _ discard.

  • Inject IKafkaProducerFactory to create IKafkaProducer to produce.

csharp
    public class ProduceToKafkaService
    {
	    private readonly IKafkaProducer _producer;

	    public ProduceToKafkaService(IKafkaProducerFactory producer)
	    {
		    _producer = producerFatory.CreateKafkaProducer("general-kafka");
	    }

        public async Task ProduceToKafka(object data)
        {
            _ = await _producer.ProduceAsync("hubtel.producer.test", data);
        }
    }