Kafka Package
The @hazeljs/kafka package provides Apache Kafka integration for HazelJS applications. It supports producing messages, consuming with decorator-driven handlers, and lightweight stream processing pipelines for transform-and-forward workloads.
Purpose
Building event-driven applications with Kafka requires managing producers, consumers, consumer groups, and stream processing logic. The @hazeljs/kafka package simplifies Kafka integration by providing:
- Decorator-Based Consumers: Use
@KafkaConsumerand@KafkaSubscribeto define topic handlers declaratively - Producer Service: Inject
KafkaProducerServiceto publish messages from controllers or services - Stream Processing: Lightweight
KafkaStreamProcessorfor consume-transform-produce pipelines (enrichment, filtering, transformation) - Dependency Injection: Full DI integration with the HazelJS container
- Type Safety: TypeScript types for messages, payloads, and configuration
Architecture
The package uses a module-based architecture with shared Kafka client and dedicated services:
Key Components
- KafkaModule: Configures the Kafka client via
forRoot()orforRootAsync() - KafkaProducerService: Publishes messages to topics
- KafkaConsumerService: Manages consumer groups and routes messages to decorated handlers
- KafkaStreamProcessor: Consume-transform-produce pipeline builder
- Decorators:
@KafkaConsumer,@KafkaSubscribefor declarative topic handling
Advantages
1. Declarative Consumer Setup
Use decorators to define topic handlers—no manual consumer wiring or boilerplate.
2. Fluent Stream API
Build stream pipelines with a chainable API: from(topic).transform(fn).to(topic).
3. Framework Integration
Kafka clients and services integrate with HazelJS DI; inject them anywhere you need messaging.
4. Lightweight Stream Processing
Not a full Kafka Streams replacement—focused on simple transform pipelines without the complexity of stateful aggregations.
5. Production-Ready
Built on KafkaJS with support for retries, rebalancing, and graceful shutdown.
Installation
npm install @hazeljs/kafka
Quick Start
Basic Setup
import { HazelModule } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
@HazelModule({
imports: [
KafkaModule.forRoot({
clientId: 'my-app',
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
}),
],
})
export class AppModule {}
Register Consumers in Bootstrap
Consumers are registered from provider instances after the app bootstraps:
import { HazelApp, Container } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
import { OrderConsumer } from './order.consumer';
async function bootstrap() {
const app = new HazelApp(AppModule);
const container = Container.getInstance();
const orderConsumer = container.resolve(OrderConsumer);
if (orderConsumer) {
await KafkaModule.registerConsumersFromProvider(orderConsumer);
}
await app.listen(3000);
}
bootstrap();
Producer
KafkaProducerService
Inject the producer service to publish messages:
import { Injectable } from '@hazeljs/core';
import { KafkaProducerService } from '@hazeljs/kafka';
@Injectable()
export class OrderService {
constructor(private readonly producer: KafkaProducerService) {}
async createOrder(order: { id: string; userId: string; items: string[]; total: number }) {
await this.producer.send('orders', [
{
key: order.id,
value: JSON.stringify(order),
headers: { 'x-source': 'order-service' },
},
]);
return order;
}
}
Sending Multiple Messages
await this.producer.send('orders', [
{ key: '1', value: JSON.stringify(order1) },
{ key: '2', value: JSON.stringify(order2) },
]);
Produce Options
await this.producer.send('orders', messages, {
acks: -1, // Wait for all replicas
timeout: 30000, // Request timeout in ms
compression: 1, // 0=none, 1=gzip, 2=snappy, 3=lz4
});
Consumer Decorators
The Kafka package provides decorators for defining topic handlers declaratively.
@KafkaConsumer Decorator
Purpose: Marks a class as a Kafka consumer and configures the consumer group.
Configuration Options:
interface KafkaConsumerOptions {
groupId: string; // Consumer group ID (required)
sessionTimeout?: number; // Session timeout in ms (default: 30000)
rebalanceTimeout?: number; // Rebalance timeout in ms (default: 60000)
heartbeatInterval?: number; // Heartbeat interval in ms (default: 3000)
maxWaitTimeInMs?: number; // Max wait when fetching (default: 5000)
retry?: object; // KafkaJS retry configuration
}
Example:
@KafkaConsumer({ groupId: 'order-processor' })
@Injectable()
export class OrderConsumer {
// Handlers defined with @KafkaSubscribe
}
@KafkaSubscribe Decorator
Purpose: Marks a method as a handler for a specific topic. Each subscription creates a handler that receives messages from that topic.
How it works:
- Applied to methods within a class decorated with
@KafkaConsumer - The method receives
KafkaMessagePayloadwithtopic,partition,message - Multiple
@KafkaSubscribedecorators on the same class subscribe to multiple topics
Configuration Options:
interface KafkaSubscribeOptions {
fromBeginning?: boolean; // Read from start of topic (default: false)
}
Example with Detailed Explanation:
import { Injectable } from '@hazeljs/core';
import { KafkaConsumer, KafkaSubscribe, KafkaMessagePayload } from '@hazeljs/kafka';
import logger from '@hazeljs/core';
@KafkaConsumer({ groupId: 'order-processor' })
@Injectable()
export class OrderConsumer {
// @KafkaSubscribe marks this method to handle messages from 'orders' topic
@KafkaSubscribe('orders')
async handleOrder({ message }: KafkaMessagePayload) {
if (!message.value) return;
const order = JSON.parse(message.value.toString());
logger.info(`Processing order: ${order.id}`, order);
// Process the order (e.g., validate, persist, emit events)
}
// Same consumer can handle multiple topics
@KafkaSubscribe('order-events', { fromBeginning: false })
async handleOrderEvents({ message }: KafkaMessagePayload) {
if (!message.value) return;
const event = JSON.parse(message.value.toString());
logger.debug(`Order event: ${event.type}`, event);
}
}
Payload Structure:
interface KafkaMessagePayload {
topic: string;
partition: number;
message: {
key: Buffer | null;
value: Buffer | null;
headers: Record<string, string>;
offset: string;
timestamp: string;
};
}
Consumer Bootstrap Pattern
Consumers must be registered after the app and module are initialized. Use KafkaModule.registerConsumersFromProvider() for each consumer instance:
// Register each consumer provider
const orderConsumer = container.resolve(OrderConsumer);
await KafkaModule.registerConsumersFromProvider(orderConsumer);
const eventConsumer = container.resolve(EventConsumer);
await KafkaModule.registerConsumersFromProvider(eventConsumer);
Stream Processing
Use KafkaStreamProcessor for lightweight consume-transform-produce pipelines.
Pipeline API
processor
.from('input-topic') // Subscribe to input topic
.withGroupId('my-group') // Consumer group for this processor (optional)
.transform(async (msg) => { /* transform logic */ })
.to('output-topic'); // Produce to output topic
await processor.start();
Transform Function
The transform receives { key, value, headers } and returns:
- An object with
key,value,headersto produce nullto skip (filter out) the message
.transform(async (msg) => {
if (!msg.value) return null;
const data = JSON.parse(msg.value.toString());
// Filter: skip if total < 100
if (data.total < 100) return null;
return {
key: msg.key,
value: msg.value,
headers: { ...msg.headers, 'x-filtered': 'true' },
};
})
Complete Stream Example
import { Injectable, Inject } from '@hazeljs/core';
import { KafkaStreamProcessor, KAFKA_CLIENT_TOKEN } from '@hazeljs/kafka';
import { Kafka } from 'kafkajs';
@Injectable()
export class StreamPipelinesService {
private processors: KafkaStreamProcessor[] = [];
constructor(@Inject(KAFKA_CLIENT_TOKEN) private readonly kafka: Kafka) {}
async startEnrichmentPipeline() {
const processor = new KafkaStreamProcessor(this.kafka);
this.processors.push(processor);
processor
.from('orders')
.withGroupId('stream-enrichment')
.transform(async (msg) => {
if (!msg.value) return null;
const order = JSON.parse(msg.value.toString());
return {
key: msg.key,
value: JSON.stringify({
...order,
processedAt: new Date().toISOString(),
enriched: true,
orderCount: order.items?.length ?? 0,
}),
headers: { ...msg.headers, 'x-pipeline': 'enrichment' },
};
})
.to('enriched-orders');
await processor.start();
}
}
Pipeline Patterns
Enrichment: Add computed fields to messages.
Filter: Return null for messages to skip; forward others.
Transformation: Normalize schema, rename fields, or change format.
Chaining: Output of one pipeline becomes input of another (via topic names).
Running Kafka (Docker)
Use the included docker-compose for local development:
# Start Kafka (KRaft mode, single node)
docker compose -f example/src/kafka/docker-compose.yml up -d
# Wait ~30 seconds for Kafka to be ready
# Then run your application
The example provides a docker-compose.yml configured for Apache Kafka 3.9 in KRaft mode.
Complete Example
// kafka-example.module.ts
import { HazelModule } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
import { OrderConsumer } from './order.consumer';
import { OrderService } from './order.service';
import { OrderController } from './order.controller';
@HazelModule({
imports: [
KafkaModule.forRoot({
clientId: 'kafka-example',
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
}),
],
controllers: [OrderController],
providers: [OrderConsumer, OrderService],
})
export class KafkaExampleModule {}
// order.controller.ts
import { Controller, Post, Body } from '@hazeljs/core';
import { OrderService } from './order.service';
@Controller('/orders')
export class OrderController {
constructor(private readonly orderService: OrderService) {}
@Post()
async create(@Body() body: { id: string; userId: string; items: string[]; total: number }) {
return this.orderService.createOrder(body);
}
}
// order.consumer.ts
import { Injectable } from '@hazeljs/core';
import { KafkaConsumer, KafkaSubscribe, KafkaMessagePayload } from '@hazeljs/kafka';
@KafkaConsumer({ groupId: 'order-processor' })
@Injectable()
export class OrderConsumer {
@KafkaSubscribe('orders')
async handleOrder({ message }: KafkaMessagePayload) {
if (!message.value) return;
const order = JSON.parse(message.value.toString());
console.log('Received order:', order.id);
}
}
// index.ts - bootstrap
import { HazelApp, Container } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
import { KafkaExampleModule } from './kafka-example.module';
import { OrderConsumer } from './order.consumer';
async function bootstrap() {
const app = new HazelApp(KafkaExampleModule);
const container = Container.getInstance();
const orderConsumer = container.resolve(OrderConsumer);
if (orderConsumer) {
await KafkaModule.registerConsumersFromProvider(orderConsumer);
}
await app.listen(3010);
}
bootstrap();
Creating Topics
Kafka does not always auto-create topics. Create required topics before consumers start:
import { Container } from '@hazeljs/core';
import { KAFKA_CLIENT_TOKEN } from '@hazeljs/kafka';
import type { Kafka } from 'kafkajs';
async function ensureTopics() {
const kafka = Container.getInstance().resolve(KAFKA_CLIENT_TOKEN) as Kafka;
const admin = kafka.admin();
await admin.connect();
try {
await admin.createTopics({
topics: [
{ topic: 'orders', numPartitions: 1, replicationFactor: 1 },
{ topic: 'order-events', numPartitions: 1, replicationFactor: 1 },
],
waitForLeaders: true,
timeout: 10000,
});
} finally {
await admin.disconnect();
}
}
// Call ensureTopics() before registering consumers in bootstrap
Best Practices
-
Create topics upfront: Use the admin API to create topics before consumers subscribe, or ensure
auto.create.topics.enableis appropriate for your environment. -
Use consumer groups wisely: Each
@KafkaConsumerclass uses one consumer group. Use different groups for different processing concerns. -
Handle errors in handlers: Wrap handler logic in try/catch; unhandled errors can affect the consumer.
-
Stream processor lifecycle: Call
processor.stop()on shutdown to disconnect consumers and producers. -
Idempotent handlers: Design handlers to be idempotent where possible, since Kafka can redeliver messages.
-
Partition keys: Use meaningful keys when producing to enable ordering and partitioning by business entity.
What's Next?
- Learn about Cache for caching Kafka message processing results
- Explore Cron for scheduled Kafka-related tasks
- Check out Config for Kafka broker configuration
- Read the Kafka example for a full working application