Event-Driven Applications with the HazelJS Kafka Module
Build event-driven applications with @hazeljs/kafka — decorator-based consumers, producers, and lightweight stream processing pipelines for enrichment, filtering, and transformation.
Introduction
Event-driven architecture is at the heart of modern scalable systems. The @hazeljs/kafka package brings Apache Kafka integration to HazelJS with a familiar decorator-based API. Produce messages from controllers, consume with declarative handlers, and build lightweight stream pipelines—all without the boilerplate.
Why Kafka with HazelJS?
Building event-driven applications typically means wiring producers, consumers, and stream processors manually. The HazelJS Kafka module simplifies this with:
- Declarative Consumers: Use
@KafkaConsumerand@KafkaSubscribeto define topic handlers—no manual consumer setup - Producer Service: Inject
KafkaProducerServiceto publish from controllers or services - Stream Pipelines: Lightweight consume-transform-produce pipelines with a fluent API
- DI Integration: Full dependency injection—Kafka clients and services fit naturally into your app
Installation
npm install @hazeljs/kafkaQuick Start
Configure the Kafka module and register your consumers in bootstrap:
import { HazelModule } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
@HazelModule({
imports: [
KafkaModule.forRoot({
clientId: 'my-app',
brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
}),
],
})
export class AppModule {}Producing Messages
Inject KafkaProducerService to publish from anywhere in your app:
import { Injectable } from '@hazeljs/core';
import { KafkaProducerService } from '@hazeljs/kafka';
@Injectable()
export class OrderService {
constructor(private readonly producer: KafkaProducerService) {}
async createOrder(order: { id: string; userId: string; items: string[]; total: number }) {
await this.producer.send('orders', [
{ key: order.id, value: JSON.stringify(order) },
]);
return order;
}
}Consuming with Decorators
Define topic handlers declaratively with @KafkaConsumer and @KafkaSubscribe:
import { Injectable } from '@hazeljs/core';
import { KafkaConsumer, KafkaSubscribe, KafkaMessagePayload } from '@hazeljs/kafka';
@KafkaConsumer({ groupId: 'order-processor' })
@Injectable()
export class OrderConsumer {
@KafkaSubscribe('orders')
async handleOrder({ message }: KafkaMessagePayload) {
if (!message.value) return;
const order = JSON.parse(message.value.toString());
console.log('Processing order:', order.id);
}
@KafkaSubscribe('order-events', { fromBeginning: false })
async handleOrderEvents({ message }: KafkaMessagePayload) {
if (!message.value) return;
const event = JSON.parse(message.value.toString());
console.log('Order event:', event.type);
}
}Register consumers in your bootstrap after the app initializes:
const orderConsumer = container.resolve(OrderConsumer);
await KafkaModule.registerConsumersFromProvider(orderConsumer);Stream Processing
Build enrichment, filter, or transformation pipelines with KafkaStreamProcessor:
processor
.from('orders')
.withGroupId('stream-enrichment')
.transform(async (msg) => {
if (!msg.value) return null;
const order = JSON.parse(msg.value.toString());
return {
key: msg.key,
value: JSON.stringify({
...order,
processedAt: new Date().toISOString(),
enriched: true,
}),
};
})
.to('enriched-orders');
await processor.start();Return null from the transform to filter out messages. Chain pipelines by routing output topics to the input of the next.
Running Kafka Locally
Use the included docker-compose for development. The example provides a KRaft single-node setup:
docker compose -f example/src/kafka/docker-compose.yml up -d
# Wait ~30 seconds, then run your app
npm run kafkaComplete Example
The HazelJS repo includes a full Kafka example with HTTP endpoints, consumers, and three stream pipelines (enrichment, filter, transformation). See example/src/kafka/ for the complete working application.
Learn More
For detailed configuration, topic creation, and best practices, see the Kafka Package documentation.