@kafka-ts/nestjs-consumer
TypeScript icon, indicating that this package has built-in type declarations

1.0.0 • Public • Published

NestJS's Dynamic Module for Consumer.

Install

npm install --save @kafka-ts/nestjs-consumer

# or

yarn add @kafka-ts/nestjs-consumer

# or

pnpm add @kafka-ts/nestjs-consumer

Usage

// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';

import { AppModule } from 'app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      strategy: new KafkaConsumer({
        brokers: ['localhost:9092'],
        consumerOptions: {
          groupId: 'test-id',
        },
      }),
    },
  );

  await app.listen(3_000);
}

bootstrap();
// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';

import { AppModule } from 'app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    KafkaConsumer.createService({
      brokers: ['localhost:9092'],
      consumerOptions: {
        groupId: 'test-id',
      },
    }),
  );

  await app.listen(3_000);
}

bootstrap();
// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';

import { AppModule } from 'app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  app.connectMicroservice(
    KafkaConsumer.createService({
      brokers: ['localhost:9092'],
      consumerOptions: {
        groupId: 'test-id',
      },
    }),
  );

  await app.startAllMicroservices();
  await app.listen(3_000);
}

bootstrap();
// consumer.controller.ts
import { Controller } from '@nestjs/common';
import {
  Ctx,
  Payload,
  Subscribe,
  SubscribeMessage,
  KafkaMessageContext,
  KafkaBatchMessageContext,
} from '@kafka-ts/nestjs-consumer';

@Controller()
export class ConsumerController {
  @Subscribe({
    topics: ['topic'],
  })
  public async handleSubscribe(
    @Payload() data: string[],
    @Ctx() context: KafkaBatchMessageContext,
  ): Promise<string> {
    console.log('data', data);
    console.log('context.batch', context.batch);

    return 'Ok!';
  }
}

In case you wanna support multiple clientId

// main.ts
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';

import { AppModule } from 'app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  app.connectMicroservice(
    KafkaConsumer.createService([
      {
        brokers: ['localhost:9092'],
        consumerOptions: {
          groupId: 'test-id',
        },
      },
      {
        clientId: 'test-client',
        brokers: ['localhost:9092'],
        consumerOptions: {
          groupId: 'test-id-2',
        },
      },
    ]),
  );

  await app.startAllMicroservices();
  await app.listen(3_000);
}

bootstrap();
// consumer.controller.ts
import { Controller } from '@nestjs/common';
import {
  Ctx,
  Payload,
  Subscribe,
  SubscribeMessage,
  KafkaMessageContext,
  KafkaBatchMessageContext,
} from '@kafka-ts/nestjs-consumer';

@Controller()
export class ConsumerController {
  @Subscribe({
    topics: ['topic'],
  })
  public async handleSubscribe(
    @Payload() data: string[],
    @Ctx() context: KafkaBatchMessageContext,
  ): Promise<string> {
    console.log('data', data);
    console.log('context.batch', context.batch);

    return 'Ok!';
  }

  @Subscribe({
    clientId: 'test-client',
    topics: ['topic_2'],
  })
  public async handleSubscribeTestClient(
    @Payload() data: string[],
    @Ctx() context: KafkaBatchMessageContext,
  ): Promise<string> {
    console.log('data', data);
    console.log('context.batch', context.batch);

    return 'Ok!';
  }
}

Wanna subscribe eachBatch and eachMessage?

If you wanna subscribe eachBatch and eachMessage, you should define two topic in different clientId or else only eachBatch or eachMessage can run.

// main.ts
import detect from 'detect-port';
import { NestFactory } from '@nestjs/core';
import { KafkaConsumer } from '@kafka-ts/nestjs-consumer';

import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);

  app.connectMicroservice(
    KafkaConsumer.createService([
      {
        brokers: ['localhost:9092'],
        consumerOptions: {
          groupId: 'test-id',
        },
      },
      {
        clientId: 'test-client',
        brokers: ['localhost:9092'],
        consumerOptions: {
          groupId: 'test-id-2',
        },
      },
    ]),
  );

  const port = await detect(3_000);
  await app.startAllMicroservices();
  await app.listen(port);

  console.log(`Run on ${port}`);
}

bootstrap();
// consumer.controller.ts
import { Controller } from '@nestjs/common';
import {
  Ctx,
  Payload,
  Subscribe,
  SubscribeMessage,
  KafkaMessageContext,
  KafkaBatchMessageContext,
} from '@kafka-ts/nestjs-consumer';

@Controller()
export class ConsumerController {
  @Subscribe({
    topics: ['topic'],
  })
  public async handleSubscribe(
    @Payload() data: string[],
    @Ctx() context: KafkaBatchMessageContext,
  ): Promise<string> {
    console.log('data', data);
    console.log('context.batch', context.batch);

    return 'Ok!';
  }

  @SubscribeMessage({
    clientId: 'test-client',
    topics: ['topic_2'],
  })
  public async handleSubscribeMessage(
    @Payload() data: string,
    @Ctx() context: KafkaMessageContext,
  ): Promise<string> {
    console.log('data', data);
    console.log('context.message', context.message);

    return 'Ok!';
  }
}

Package Sidebar

Install

npm i @kafka-ts/nestjs-consumer

Weekly Downloads

3

Version

1.0.0

License

MIT

Unpacked Size

23.9 kB

Total Files

10

Last publish

Collaborators

  • zgid123