1.3. Kafka Quickstart
https://kafka.apache.org/quickstart
Exploring with Kafka CLI
Kafka is a high-throughput, distributed messaging system that’s widely used for building real-time streaming data pipelines. This guide covers a quickstart setup for Kafka with Docker Compose, focusing on the fundamentals: creating topics, producing messages, and consuming messages.
Step 1: Set Up Kafka and Zookeeper with Docker Compose
Why learn with Docker Compose? Using Docker Compose lets us focus on the functionality of Kafka without worrying about the underlying infrastructure setup. With a few lines of YAML configuration, we can have a multi-node Kafka cluster running locally and jump straight into exploring topics, producers, and consumers.
Create a docker-compose.yml
file to set up Kafka and Zookeeper (Kafka requires Zookeeper for managing brokers).
version: '3.7'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.7.1
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka-1:
image: confluentinc/cp-kafka:7.7.1
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka-1:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_BROKER_ID: 1
KAFKA_LOG_DIRS: /var/lib/kafka/data/broker-1
KAFKA_REPLICATION_FACTOR: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT
volumes:
- kafka-1-data:/var/lib/kafka/data
kafka-2:
image: confluentinc/cp-kafka:7.7.1
ports:
- "9093:9093"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka-2:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_BROKER_ID: 2
KAFKA_LOG_DIRS: /var/lib/kafka/data/broker-2
KAFKA_REPLICATION_FACTOR: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT
volumes:
- kafka-2-data:/var/lib/kafka/data
kafka-3:
image: confluentinc/cp-kafka:7.7.1
ports:
- "9094:9094"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: INSIDE://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka-3:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_BROKER_ID: 3
KAFKA_LOG_DIRS: /var/lib/kafka/data/broker-3
KAFKA_REPLICATION_FACTOR: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT
volumes:
- kafka-3-data:/var/lib/kafka/data
volumes:
kafka-1-data:
kafka-2-data:
kafka-3-data:
Start the services: docker-compose up -d
This command will launch Kafka and Zookeeper containers in detached mode.
Step 2: Creating a Kafka Topic
Once the Kafka brokers are running, let’s create a topic called test-topic.
docker exec -it kafka-1 \
kafka-topics --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1
This command creates a new topic named test-topic
with one partition and a replication factor of 1.
We will learn about this later.
To list all available topics, run:
docker exec -it kafka-1 \
kafka-topics --list \
--bootstrap-server localhost:9092
Step 3: Producing Messages to the Topic
To start a Kafka producer and write messages to test-topic
:
docker exec -it kafka-1 \
kafka-console-producer \
--topic test-topic --bootstrap-server localhost:9092
After the producer starts, type any message and press Enter. Each message you type will be sent to test-topic
.
Step 4: Consuming Messages from the Topic
In a new terminal, start a Kafka consumer to read from test-topic
:
docker exec -it kafka-1 \
kafka-console-consumer --topic test-topic \
--bootstrap-server localhost:9092 --from-beginning
The consumer reads all messages from the beginning of test-topic
.
Put the 2 terminal side-by-side and see, each type yout type message in producer terminal, it will get printed in realtime in consumer terminal.
Explanation
This quickstart demonstrates Kafka’s basic commands to manage topics, produce, and consume messages using the Kafka CLI. However, you rarely do this in real environment. Instead, a declarative approach is often preferred for topic management, and applications typically handle producing and consuming messages in the code using a language-specific Kafka client (e.g., Python, Go, Java).
So, keep continue reading.
Creating Topic Declaratively
In production, Kafka topics can be managed using declarative tools for better automation. Two common approaches are:
- Terraform: Use the Terraform Kafka provider to manage topics as code.
- Kubernetes Operator: Kafka operators like Strimzi or Confluent Operator enable Kubernetes-native topic management, scaling, and other cluster operations.
Example Terraform configuration for creating a Kafka topic:
resource "kafka_topic" "example" {
name = "user-events"
partitions = 3
replication_factor = 2
config = {
"retention.ms" = "604800000" # 7 days
}
}
Producing and Consuming Message with Code
In many setups, backend applications are commonly built in Golang for its performance and concurrency support, while machine learning applications often use Python due to its extensive ML library support.
Let’s create an example where a Golang application produces messages to a user
topic, while a Python application consumes these messages and detect if the user' age
is unsual.
Golang Producer: Sending User Information
The Golang producer sends user information ({name: "Alice", age: 30}
) to the user
topic.
package main
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
"log"
)
func main() {
writer := kafka.NewWriter(kafka.WriterConfig{
Brokers: []string{"localhost:9092"},
Topic: "user",
})
defer writer.Close()
message := kafka.Message{
Key: []byte("user_1"),
Value: []byte(`{"name": "Alice", "age": 30}`),
}
err := writer.WriteMessages(context.Background(), message)
if err != nil {
log.Fatalf("could not write message %v", err)
}
fmt.Println("Message sent to topic 'user'")
}
Python Consumer: Detecting Unusual Ages
The Python consumer reads messages from the user topic and checks if the age field is above 130, logging it as “unusual”.
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'user',
bootstrap_servers='localhost:9092',
auto_offset_reset='earliest',
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
for message in consumer:
user = message.value
if user['age'] > 130:
print(f"Unusual age detected: {user['age']} for user {user['name']}")
else:
print(f"User data received: {user}")
In a real system, these applications would run on separate servers or within different containers, each handling a part of the data flow.