Kafka Package

The @hazeljs/kafka package provides Apache Kafka integration for HazelJS applications. It supports producing messages, consuming with decorator-driven handlers, and lightweight stream processing pipelines for transform-and-forward workloads.

Purpose

Building event-driven applications with Kafka requires managing producers, consumers, consumer groups, and stream processing logic. The @hazeljs/kafka package simplifies Kafka integration by providing:

  • Decorator-Based Consumers: Use @KafkaConsumer and @KafkaSubscribe to define topic handlers declaratively
  • Producer Service: Inject KafkaProducerService to publish messages from controllers or services
  • Stream Processing: Lightweight KafkaStreamProcessor for consume-transform-produce pipelines (enrichment, filtering, transformation)
  • Dependency Injection: Full DI integration with the HazelJS container
  • Type Safety: TypeScript types for messages, payloads, and configuration

Architecture

The package uses a module-based architecture with shared Kafka client and dedicated services:

Loading diagram...

Key Components

  1. KafkaModule: Configures the Kafka client via forRoot() or forRootAsync()
  2. KafkaProducerService: Publishes messages to topics
  3. KafkaConsumerService: Manages consumer groups and routes messages to decorated handlers
  4. KafkaStreamProcessor: Consume-transform-produce pipeline builder
  5. Decorators: @KafkaConsumer, @KafkaSubscribe for declarative topic handling

Advantages

1. Declarative Consumer Setup

Use decorators to define topic handlers—no manual consumer wiring or boilerplate.

2. Fluent Stream API

Build stream pipelines with a chainable API: from(topic).transform(fn).to(topic).

3. Framework Integration

Kafka clients and services integrate with HazelJS DI; inject them anywhere you need messaging.

4. Lightweight Stream Processing

Not a full Kafka Streams replacement—focused on simple transform pipelines without the complexity of stateful aggregations.

5. Production-Ready

Built on KafkaJS with support for retries, rebalancing, and graceful shutdown.

Installation

npm install @hazeljs/kafka

Quick Start

Basic Setup

import { HazelModule } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';

@HazelModule({
  imports: [
    KafkaModule.forRoot({
      clientId: 'my-app',
      brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
    }),
  ],
})
export class AppModule {}

Register Consumers in Bootstrap

Consumers are registered from provider instances after the app bootstraps:

import { HazelApp, Container } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
import { OrderConsumer } from './order.consumer';

async function bootstrap() {
  const app = new HazelApp(AppModule);

  const container = Container.getInstance();
  const orderConsumer = container.resolve(OrderConsumer);
  if (orderConsumer) {
    await KafkaModule.registerConsumersFromProvider(orderConsumer);
  }

  await app.listen(3000);
}

bootstrap();

Producer

KafkaProducerService

Inject the producer service to publish messages:

import { Injectable } from '@hazeljs/core';
import { KafkaProducerService } from '@hazeljs/kafka';

@Injectable()
export class OrderService {
  constructor(private readonly producer: KafkaProducerService) {}

  async createOrder(order: { id: string; userId: string; items: string[]; total: number }) {
    await this.producer.send('orders', [
      {
        key: order.id,
        value: JSON.stringify(order),
        headers: { 'x-source': 'order-service' },
      },
    ]);
    return order;
  }
}

Sending Multiple Messages

await this.producer.send('orders', [
  { key: '1', value: JSON.stringify(order1) },
  { key: '2', value: JSON.stringify(order2) },
]);

Produce Options

await this.producer.send('orders', messages, {
  acks: -1,           // Wait for all replicas
  timeout: 30000,     // Request timeout in ms
  compression: 1,     // 0=none, 1=gzip, 2=snappy, 3=lz4
});

Consumer Decorators

The Kafka package provides decorators for defining topic handlers declaratively.

@KafkaConsumer Decorator

Purpose: Marks a class as a Kafka consumer and configures the consumer group.

Configuration Options:

interface KafkaConsumerOptions {
  groupId: string;           // Consumer group ID (required)
  sessionTimeout?: number;   // Session timeout in ms (default: 30000)
  rebalanceTimeout?: number; // Rebalance timeout in ms (default: 60000)
  heartbeatInterval?: number; // Heartbeat interval in ms (default: 3000)
  maxWaitTimeInMs?: number;  // Max wait when fetching (default: 5000)
  retry?: object;            // KafkaJS retry configuration
}

Example:

@KafkaConsumer({ groupId: 'order-processor' })
@Injectable()
export class OrderConsumer {
  // Handlers defined with @KafkaSubscribe
}

@KafkaSubscribe Decorator

Purpose: Marks a method as a handler for a specific topic. Each subscription creates a handler that receives messages from that topic.

How it works:

  • Applied to methods within a class decorated with @KafkaConsumer
  • The method receives KafkaMessagePayload with topic, partition, message
  • Multiple @KafkaSubscribe decorators on the same class subscribe to multiple topics

Configuration Options:

interface KafkaSubscribeOptions {
  fromBeginning?: boolean;  // Read from start of topic (default: false)
}

Example with Detailed Explanation:

import { Injectable } from '@hazeljs/core';
import { KafkaConsumer, KafkaSubscribe, KafkaMessagePayload } from '@hazeljs/kafka';
import logger from '@hazeljs/core';

@KafkaConsumer({ groupId: 'order-processor' })
@Injectable()
export class OrderConsumer {
  // @KafkaSubscribe marks this method to handle messages from 'orders' topic
  @KafkaSubscribe('orders')
  async handleOrder({ message }: KafkaMessagePayload) {
    if (!message.value) return;
    const order = JSON.parse(message.value.toString());
    logger.info(`Processing order: ${order.id}`, order);
    // Process the order (e.g., validate, persist, emit events)
  }

  // Same consumer can handle multiple topics
  @KafkaSubscribe('order-events', { fromBeginning: false })
  async handleOrderEvents({ message }: KafkaMessagePayload) {
    if (!message.value) return;
    const event = JSON.parse(message.value.toString());
    logger.debug(`Order event: ${event.type}`, event);
  }
}

Payload Structure:

interface KafkaMessagePayload {
  topic: string;
  partition: number;
  message: {
    key: Buffer | null;
    value: Buffer | null;
    headers: Record<string, string>;
    offset: string;
    timestamp: string;
  };
}

Consumer Bootstrap Pattern

Consumers must be registered after the app and module are initialized. Use KafkaModule.registerConsumersFromProvider() for each consumer instance:

// Register each consumer provider
const orderConsumer = container.resolve(OrderConsumer);
await KafkaModule.registerConsumersFromProvider(orderConsumer);

const eventConsumer = container.resolve(EventConsumer);
await KafkaModule.registerConsumersFromProvider(eventConsumer);

Stream Processing

Use KafkaStreamProcessor for lightweight consume-transform-produce pipelines.

Pipeline API

processor
  .from('input-topic')      // Subscribe to input topic
  .withGroupId('my-group')  // Consumer group for this processor (optional)
  .transform(async (msg) => { /* transform logic */ })
  .to('output-topic');      // Produce to output topic

await processor.start();

Transform Function

The transform receives { key, value, headers } and returns:

  • An object with key, value, headers to produce
  • null to skip (filter out) the message
.transform(async (msg) => {
  if (!msg.value) return null;
  const data = JSON.parse(msg.value.toString());
  // Filter: skip if total < 100
  if (data.total < 100) return null;
  return {
    key: msg.key,
    value: msg.value,
    headers: { ...msg.headers, 'x-filtered': 'true' },
  };
})

Complete Stream Example

import { Injectable, Inject } from '@hazeljs/core';
import { KafkaStreamProcessor, KAFKA_CLIENT_TOKEN } from '@hazeljs/kafka';
import { Kafka } from 'kafkajs';

@Injectable()
export class StreamPipelinesService {
  private processors: KafkaStreamProcessor[] = [];

  constructor(@Inject(KAFKA_CLIENT_TOKEN) private readonly kafka: Kafka) {}

  async startEnrichmentPipeline() {
    const processor = new KafkaStreamProcessor(this.kafka);
    this.processors.push(processor);

    processor
      .from('orders')
      .withGroupId('stream-enrichment')
      .transform(async (msg) => {
        if (!msg.value) return null;
        const order = JSON.parse(msg.value.toString());
        return {
          key: msg.key,
          value: JSON.stringify({
            ...order,
            processedAt: new Date().toISOString(),
            enriched: true,
            orderCount: order.items?.length ?? 0,
          }),
          headers: { ...msg.headers, 'x-pipeline': 'enrichment' },
        };
      })
      .to('enriched-orders');

    await processor.start();
  }
}

Pipeline Patterns

Enrichment: Add computed fields to messages.

Filter: Return null for messages to skip; forward others.

Transformation: Normalize schema, rename fields, or change format.

Chaining: Output of one pipeline becomes input of another (via topic names).

Running Kafka (Docker)

Use the included docker-compose for local development:

# Start Kafka (KRaft mode, single node)
docker compose -f example/src/kafka/docker-compose.yml up -d

# Wait ~30 seconds for Kafka to be ready
# Then run your application

The example provides a docker-compose.yml configured for Apache Kafka 3.9 in KRaft mode.

Complete Example

// kafka-example.module.ts
import { HazelModule } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
import { OrderConsumer } from './order.consumer';
import { OrderService } from './order.service';
import { OrderController } from './order.controller';

@HazelModule({
  imports: [
    KafkaModule.forRoot({
      clientId: 'kafka-example',
      brokers: (process.env.KAFKA_BROKERS || 'localhost:9092').split(','),
    }),
  ],
  controllers: [OrderController],
  providers: [OrderConsumer, OrderService],
})
export class KafkaExampleModule {}

// order.controller.ts
import { Controller, Post, Body } from '@hazeljs/core';
import { OrderService } from './order.service';

@Controller('/orders')
export class OrderController {
  constructor(private readonly orderService: OrderService) {}

  @Post()
  async create(@Body() body: { id: string; userId: string; items: string[]; total: number }) {
    return this.orderService.createOrder(body);
  }
}

// order.consumer.ts
import { Injectable } from '@hazeljs/core';
import { KafkaConsumer, KafkaSubscribe, KafkaMessagePayload } from '@hazeljs/kafka';

@KafkaConsumer({ groupId: 'order-processor' })
@Injectable()
export class OrderConsumer {
  @KafkaSubscribe('orders')
  async handleOrder({ message }: KafkaMessagePayload) {
    if (!message.value) return;
    const order = JSON.parse(message.value.toString());
    console.log('Received order:', order.id);
  }
}

// index.ts - bootstrap
import { HazelApp, Container } from '@hazeljs/core';
import { KafkaModule } from '@hazeljs/kafka';
import { KafkaExampleModule } from './kafka-example.module';
import { OrderConsumer } from './order.consumer';

async function bootstrap() {
  const app = new HazelApp(KafkaExampleModule);
  const container = Container.getInstance();

  const orderConsumer = container.resolve(OrderConsumer);
  if (orderConsumer) {
    await KafkaModule.registerConsumersFromProvider(orderConsumer);
  }

  await app.listen(3010);
}

bootstrap();

Creating Topics

Kafka does not always auto-create topics. Create required topics before consumers start:

import { Container } from '@hazeljs/core';
import { KAFKA_CLIENT_TOKEN } from '@hazeljs/kafka';
import type { Kafka } from 'kafkajs';

async function ensureTopics() {
  const kafka = Container.getInstance().resolve(KAFKA_CLIENT_TOKEN) as Kafka;
  const admin = kafka.admin();
  await admin.connect();
  try {
    await admin.createTopics({
      topics: [
        { topic: 'orders', numPartitions: 1, replicationFactor: 1 },
        { topic: 'order-events', numPartitions: 1, replicationFactor: 1 },
      ],
      waitForLeaders: true,
      timeout: 10000,
    });
  } finally {
    await admin.disconnect();
  }
}

// Call ensureTopics() before registering consumers in bootstrap

Best Practices

  1. Create topics upfront: Use the admin API to create topics before consumers subscribe, or ensure auto.create.topics.enable is appropriate for your environment.

  2. Use consumer groups wisely: Each @KafkaConsumer class uses one consumer group. Use different groups for different processing concerns.

  3. Handle errors in handlers: Wrap handler logic in try/catch; unhandled errors can affect the consumer.

  4. Stream processor lifecycle: Call processor.stop() on shutdown to disconnect consumers and producers.

  5. Idempotent handlers: Design handlers to be idempotent where possible, since Kafka can redeliver messages.

  6. Partition keys: Use meaningful keys when producing to enable ordering and partitioning by business entity.

What's Next?

  • Learn about Cache for caching Kafka message processing results
  • Explore Cron for scheduled Kafka-related tasks
  • Check out Config for Kafka broker configuration
  • Read the Kafka example for a full working application