Data Package
The @hazeljs/data package provides data processing and ETL capabilities for HazelJS applications. It includes declarative pipelines with schema validation, stream processing, built-in transformers, and data quality checks.
Purpose
Building data pipelines typically requires orchestrating multiple steps—normalization, validation, enrichment—with proper error handling and schema enforcement. The @hazeljs/data package simplifies this by providing:
- Declarative Pipelines – Use
@Pipeline,@Transform, and@Validatedecorators to define ETL flows - Schema Validation – Fluent Schema API for type-safe validation (string, number, object, array, email, oneOf)
- ETL Service – Execute multi-step pipelines with ordered execution and error handling
- Built-in Transformers – trimString, toLowerCase, parseJson, pick, omit, renameKeys
- Data Quality – QualityService for completeness, notNull, and custom checks
- Stream Processing – StreamBuilder and StreamProcessor for batch and streaming workloads
- Flink Integration – Optional Apache Flink deployment for distributed stream processing
Architecture
The package uses a decorator-driven pipeline architecture with service orchestration:
graph TD A["@Pipeline Decorator<br/>(Defines Pipeline Class)"] --> B["@Transform / @Validate<br/>(Step Decorators)"] B --> C["ETLService<br/>(Orchestrates Execution)"] C --> D["PipelineBase.execute()<br/>(Single Entry Point)"] E["SchemaValidator<br/>(Fluent Schema)"] --> F["@Validate Step"] G["StreamService"] --> H["Batch Processing<br/>(processBatch)"] G --> I["StreamProcessor<br/>(Streaming)"] C --> J["Step 1 → Step 2 → Step 3"] style A fill:#8b5cf6,stroke:#a78bfa,stroke-width:2px,color:#fff style B fill:#3b82f6,stroke:#60a5fa,stroke-width:2px,color:#fff style C fill:#10b981,stroke:#34d399,stroke-width:2px,color:#fff style D fill:#f59e0b,stroke:#fbbf24,stroke-width:2px,color:#fff
Key Components
- DataModule – Registers SchemaValidator, ETLService, PipelineBuilder, StreamService, QualityService, FlinkService
- ETLService – Executes pipelines by invoking decorated steps in order
- PipelineBase – Base class providing
execute()for pipelines - SchemaValidator – Validates data against fluent schemas
- Decorators –
@Pipeline,@Transform,@Validate,@Streamfor declarative pipelines
Advantages
1. Declarative ETL
Define pipelines with decorators—no manual step wiring or execution logic.
2. Type-Safe Validation
Fluent Schema API with TypeScript support for validating request bodies, API responses, and data imports.
3. Reusable Transformers
Built-in and custom transformers compose easily within pipeline steps.
4. Data Quality Built-In
QualityService for completeness checks, null checks, and custom validation rules.
5. Batch and Streaming
StreamService for batch processing; StreamBuilder and Flink for streaming workloads.
Installation
npm install @hazeljs/data @hazeljs/core
Quick Start
1. Import DataModule
import { HazelApp } from '@hazeljs/core';
import { DataModule } from '@hazeljs/data';
const app = new HazelApp({
imports: [DataModule.forRoot()],
});
app.listen(3000);
2. Define a Pipeline
import { Injectable } from '@hazeljs/core';
import {
Pipeline,
PipelineBase,
Transform,
Validate,
ETLService,
Schema,
} from '@hazeljs/data';
const OrderSchema = Schema.object()
.prop('id', Schema.string().required())
.prop('customerId', Schema.string().required())
.prop('status', Schema.string().oneOf(['pending', 'paid', 'shipped', 'delivered', 'cancelled']))
.prop('items', Schema.array().items(Schema.object()
.prop('sku', Schema.string().minLength(1))
.prop('qty', Schema.number().min(1))
.prop('price', Schema.number().min(0))
))
.required();
@Pipeline('order-processing')
@Injectable()
export class OrderProcessingPipeline extends PipelineBase {
constructor(etlService: ETLService) {
super(etlService);
}
@Transform({ step: 1, name: 'normalize' })
async normalize(data: Record<string, unknown>): Promise<Record<string, unknown>> {
return {
...data,
status: String(data.status).toLowerCase(),
};
}
@Validate({ step: 2, schema: OrderSchema })
async validate(data: Record<string, unknown>): Promise<Record<string, unknown>> {
return data;
}
@Transform({ step: 3, name: 'enrich' })
async enrich(data: Record<string, unknown> & { items?: { qty: number; price: number }[] }): Promise<Record<string, unknown>> {
const items = data.items ?? [];
const subtotal = items.reduce((sum, i) => sum + i.qty * i.price, 0);
const tax = subtotal * 0.1;
return {
...data,
subtotal,
tax,
total: subtotal + tax,
processedAt: new Date().toISOString(),
};
}
}
3. Execute from a Controller
import { Controller, Post, Body, Inject } from '@hazeljs/core';
import { OrderProcessingPipeline } from './pipelines/order-processing.pipeline';
@Controller('data')
export class DataController {
constructor(
@Inject(OrderProcessingPipeline) private pipeline: OrderProcessingPipeline
) {}
@Post('pipeline/orders')
async processOrder(@Body() body: unknown) {
const result = await this.pipeline.execute(body);
return { ok: true, data: result };
}
}
Batch Processing
Process arrays through pipelines with StreamService:
import { StreamService, ETLService } from '@hazeljs/data';
const streamService = new StreamService(etlService);
const results = await streamService.processBatch(OrderProcessingPipeline, orders);
Schema Validation
Build schemas with the fluent API:
import { Schema, SchemaValidator } from '@hazeljs/data';
const UserSchema = Schema.object()
.prop('email', Schema.string().format('email').required())
.prop('name', Schema.string().minLength(1).maxLength(200))
.prop('age', Schema.number().min(0).max(150))
.prop('role', Schema.string().oneOf(['user', 'admin', 'moderator', 'guest']))
.required();
const validator = new SchemaValidator();
const { value, error } = validator.validate(UserSchema, rawData);
Data Quality
Run quality checks on datasets:
import { QualityService } from '@hazeljs/data';
const qualityService = new QualityService();
const report = await qualityService.check(records, {
completeness: ['id', 'email', 'createdAt'],
notNull: ['id', 'status'],
});
Flink Configuration
For distributed stream processing with Apache Flink:
DataModule.forRoot({
flink: {
url: process.env.FLINK_REST_URL ?? 'http://localhost:8081',
timeout: 30000,
},
});
Built-in Transformers
| Transformer | Description |
|---|---|
trimString | Trim whitespace from strings |
toLowerCase / toUpperCase | Case conversion |
parseJson / stringifyJson | JSON parsing and serialization |
pick | Select specific keys from objects |
omit | Remove specific keys from objects |
renameKeys | Rename object keys |
Related Resources
- Config Package – Environment and configuration for data sources
- Prisma Package – Database access for pipeline outputs
- hazeljs-data-starter – Full example with order and user pipelines