Introduction
Kafka is a distributed streaming platform, that is made to manage massive amounts of real-time data in a fault-tolerant and scalable way. It was initially created by LinkedIn, and the Apache Software Foundation currently looks after it.
At its core, Kafka is a publish-subscribe messaging system that allows applications to send and receive streams of data in real-time. Data is organized into topics, which are divided into partitions that can be distributed across a cluster of machines. This allows Kafka to handle high volumes of data and ensure that it is available and reliable even in the face of hardware failures.
Kafka is widely used in a variety of use cases, including real-time data pipelines, messaging systems, and event-driven architectures. It is particularly well-suited for applications that require high-throughput and low-latency data processing.
In the .NET Core ecosystem, Kafka is supported by several open-source libraries and frameworks, including Confluent.Kafka and the Kafka.NET client. These libraries provide developers with a simple and intuitive way to interact with Kafka and build robust and scalable applications that can handle real-time data streams.
Source Code
You can find the source code of this article that is working fine in this git repository, feel free to star, or clone it Github Source code.
Installation
Installing Kafka using Docker is a popular and convenient way to get started with Kafka quickly and easily. Docker is a containerization platform that allows you to package an application and all its dependencies into a single container, making it easy to run the application on any platform.
To install Kafka using Docker, follow these steps:
Install Docker on your system. You can download and install Docker from the Docker website: docker.com/products/docker-desktop.
Open a terminal or command prompt and run the following command to pull this docker-compose:
git clone https://github.com/conduktor/kafka-stack-docker-compose.git
Next, go to the folder
kafka-stack-docker-compose
and list the files.Note the file
zk-single-kafka-single.yml
. As the name suggests, we'll use it to launch a Kafka cluster with a single Zookeeper and a single broker.Now, use this command to launch a Kafka cluster with one Zookeeper and one Kafka broker. Add
-d
flag to run it in the background.
docker-compose -f zk-single-kafka-single.yml up -d
Kafka is going to be exposed on your computer at localhost:9092
Running commands against our Kafka running on Docker
To run Kafka commands against Docker, we have two options:
Run commands directly from within the Docker container of Kafka (using
docker exec
)Run commands from our host OS (we must first install the binaries)
Running commands from within the Kafka docker container
docker exec -it kafka1 /bin/bash
Then, from within the container, you can start running some Kafka commands (without .sh
)
Create a topic with cli with the below command:
kafka-topics --create --topic training-kafka --bootstrap-server localhost:9092
The above code creates a topic called training-kafka that we will use to test kafka and publish messages to it.
Working with Apache Kafka in ASP.NET Core 6
You'll put into practice a straightforward employee data publishing application in this part. The producer application and the consumer application are the two applications you'll construct. Both of these applications will be developed in the Visual Studio 2022 IDE using ASP.NET 6.
To create a new ASP.NET 6 Project in Visual Studio 2022:
Start the Visual Studio 2022 IDE.
In the “Create a new project” window, select “ASP.NET Core Web API” and click Next to move on.
Specify the project name as
KafkaProducerAPI
and the path where it should be created in the “Configure your new project” window.If you want the solution file and project to be created in the same directory, you can optionally check the “Place solution and project in the same directory” checkbox. Click Next to move on.
In the next screen, specify the target framework and authentication type as well. Ensure that the “Configure for HTTPS,” “Enable Docker Support,” and “Enable OpenAPI support” checkboxes are unchecked because you won't use any of these in this example.
Click Create to complete the process.
Follow the same steps outlined above to create a Console Application project. Name this project KafkaConsumer
. Note that you can also choose any meaningful name for both of these projects.
Install NuGet Package(s)
So far so good. The next step is to install the necessary NuGet package(s). To produce and consume messages, you need a client for Kafka. Use the most popular client: Confluent's Kafka .NET Client. To install the required packages into your project, right-click on the solution and select "Manage NuGet Packages for Solution…". Then type Confluent.Kafka
in the search box, select the Confluent.Kafka
package, and install it.
Alternatively, you can execute the following command in the Package Manager Console:
PM> Install-Package Confluent.Kafka
create a class that matches the employee data as below:
public class EmployeeRequest
{
public int Id { get; set; }
public string Name { get; set; }
public string Jobtitle { get; set; }
public string Address { get; set; }
}
Create a controller to test sending employee data
public class EmployeesController : Controller
{
private readonly string bootstrapServers = "localhost:9092";
private readonly string topic = "training-kafka";
[HttpPost("api/employees/post")]
public async Task<IActionResult> Post([FromBody] EmployeeRequest request)
{
var message = JsonSerializer.Serialize(request);
return Ok(await SendOrderRequest(topic, message));
}
private async Task<bool> SendOrderRequest(string topic, string message)
{
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers,
ClientId = Dns.GetHostName()
};
try
{
using var producer = new ProducerBuilder<Null, string>(config).Build();
var result = await producer.ProduceAsync(topic, new Message<Null, string>
{
Value = message
});
Debug.WriteLine($"Delivery Timestamp:{result.Timestamp.UtcDateTime}");
return await Task.FromResult(true);
}
catch (Exception ex)
{
Console.WriteLine($"Error occured: {ex.Message}");
}
return await Task.FromResult(false);
}
}
Consumer Project
using System.Diagnostics;
using System.Text.Json;
using Confluent.Kafka;
using Shared;
const string topic = "training-kafka";
const string groupId = "test_group";
const string bootstrapServers = "localhost:9092";
var config = new ConsumerConfig
{
BootstrapServers = bootstrapServers,
GroupId = groupId,
AutoOffsetReset = AutoOffsetReset.Earliest
};
try
{
using var consumerBuilder = new ConsumerBuilder<Ignore, string>(config).Build();
consumerBuilder.Subscribe(topic);
var cancelToken = new CancellationTokenSource();
try
{
while (true)
{
var consumer = consumerBuilder.Consume
(cancelToken.Token);
var employee = JsonSerializer.Deserialize<EmployeeRequest>(consumer.Message.Value);
Console.WriteLine($"Processing Employee Name: {employee.Name}");
}
}
catch (OperationCanceledException)
{
consumerBuilder.Close();
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
Console.WriteLine("Hello, World!");
How to test?
run the two projects and head to the swagger of the producer API then start to send employee data to the endpoint.
you should see the employee data in the console application (consumer) once you click the submit button in the API.
Where Should I Go from Here
Kafka is a natural choice if you're willing to build an application that needs high performant, resilient, and scalable messaging. This post walked you through building a simple Kafka producer and consumer using ASP.NET 6. Note that you can set up Apache Kafka using Docker as well. You can know more about Apache Kafka from the Apache Kafka Documentation.