Getting Started with Kafka Prducer
Kafka Producer is a key component in Apache Kafka that sends data to Kafka topics for storage and later consumption by Kafka Consumers.In essence,this sdk helps you to produce to kafka.
Installation
The command below is for installing the Hubtel.Kafka.Producer.Sdk
package in a .NET project using the NuGet Package Manager Console.
Install-Package Hubtel.Kafka.Producer.Sdk
Setup in appsettings.json
The "KafkaProducerConfig"
block contains one property:
"Hosts"
: This property is an array of objects, each representing a Kafka server (also known as a Kafka broker) that the Kafka producer can connect to. Each object in the "Hosts" array has two properties:"BootstrapServers"
: This property specifies the address of the Kafka server. In this case, both servers are running on localhost (the same machine as the application) on port 9092, which is the default port for Kafka."Alias"
: This property provides a human-readable alias for the Kafka server. This can be useful for identifying the server in logs or error messages. In this case, one server is identified as "ecommerce-kafka" and the other as "general-kafka".Place these entries in appsettings.json
"KafkaProducerConfig": {
"Hosts": [
{
"BootstrapServers": "localhost:9092",
"Alias": "ecommerce-kafka"
},
{
"BootstrapServers": "localhost:9092",
"Alias": "general-kafka"
}
]
}
Register the service in Startup.cs
services.AddHubtelKafkaProducerSdk(c => Configuration.GetSection(nameof(KafkaProducerConfig)).Bind(c))
This line is adding the Hubtel Kafka Producer SDK to the application's services. This is presumably a method provided by the
Hubtel.Kafka.Producer.Sdk
package that sets up any necessary services for the SDK to function, such as Kafka producers or other dependencies.The argument to
AddHubtelKafkaProducerSdk
is a lambda expression that configures the SDK. It uses theConfiguration
object'sGetSection
method to retrieve theKafkaProducerConfig
section from the application's configuration (usually found inappsettings.json
). Thenameof
operator is used to get the string name of theKafkaProducerConfig
section in a type-safe manner.The
Bind
method is then called on the configuration section, binding the configuration values to the SDK's configuration object (c
in the lambda expression). This means that the SDK will be configured according to the settings in theKafkaProducerConfig
section of the application's configuration.Do as shown below:
services.AddHubtelKafkaProducerSdk(c => Configuration.GetSection(nameof(KafkaProducerConfig)).Bind(c));
Inject Service
private readonly IKafkaProducer _producer
- This line declares a private, read-only field of type
IKafkaProducer
named_producer
. This interface presumably provides methods for producing messages to Kafka.
public ProduceToKafkaService(IKafkaProducerFactory producer)
- This is the constructor of the ProduceToKafkaService class. It takes an instance of IKafkaProducerFactory as a parameter. This factory is used to create instances of IKafkaProducer.
_producer = producerFatory.CreateKafkaProducer("general-kafka")
- Inside the constructor, the
CreateKafkaProducer
method of theIKafkaProducerFactory
instance is called with the string "general-kafka" as an argument. The result, an instance ofIKafkaProducer
, is assigned to the_producer
field.
public async Task ProduceToKafka(object data)
- This is a public asynchronous method named ProduceToKafka. It takes an object named data as a parameter. This method is presumably used to produce a message to a Kafka topic.
_ = await _producer.ProduceAsync("hubtel.producer.test", data)
Inside the
ProduceToKafka
method, theProduceAsync
method of the_producer
object is called with two arguments: the string "hubtel.producer.test" and thedata
object. TheProduceAsync
method is asynchronous, so it returns a task that represents the ongoing operation. Theawait
keyword is used to pause the execution of the method until the task is complete. The result of the task is discarded with the _ discard.Inject
IKafkaProducerFactory
to createIKafkaProducer
to produce.
public class ProduceToKafkaService
{
private readonly IKafkaProducer _producer;
public ProduceToKafkaService(IKafkaProducerFactory producer)
{
_producer = producerFatory.CreateKafkaProducer("general-kafka");
}
public async Task ProduceToKafka(object data)
{
_ = await _producer.ProduceAsync("hubtel.producer.test", data);
}
}
Was this helpful? 📚
CHAT SAMMIAT