Kafka Kafka

导读

Kafka 是一个开源的分布式流式传输平台,具有三个关键功能:

  • 发布和订阅记录流,类似于消息队列或企业消息系统。
  • 以容错持久的方式存储记录流。
  • 在记录流发生时对其进行处理。

Kafka 项目旨在提供一个统一、高吞吐量、低延迟的平台来处理实时数据馈送。它与 Apache Storm 和 Spark 很好地集成在一起,用于实时流数据分析。

安装

要开始构建基于 Kafka 的微服务,首先安装所需的包:

bash
$ npm i --save kafkajs

概述

与其他 Nest 微服务传输层实现一样,您可以使用传递给 createMicroservice() 方法的选项对象的 transport 属性以及可选的 options 属性来选择 Kafka 传输器机制,如下所示:

ts
main
ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
})
提示

Transport 枚举从 @nestjs/microservices 包导入。

选项

options 属性特定于所选传输器。Kafka 传输器公开了下面描述的属性。

||| |client|Client configuration optionsread more| |consumer|Consumer configuration options read more| |run|Run configuration options read more here| |subscribe|Subscribe configuration options read more| |producer|Producer configuration options read more| |send|Send configuration optionsread more| |producerOnlyMode|Feature flag to skip consumer group registration and only act as a producer boolean| |postfixId|Change suffix of clientId value string|

客户端

与其他微服务传输器相比,Kafka 有一点不同。我们使用 ClientKafka 类而不是 ClientProxy 类。

与其他微服务传输器一样,您有 几个选项 来创建 ClientKafka 实例。

创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递具有与 createMicroservice() 方法中上面显示的相同属性的选项对象,以及用作注入令牌的 name 属性。在此处阅读有关 ClientsModule更多信息

ts
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  // ...
})

也可以使用其他选项来创建客户端(ClientProxyFactory@Client())。您可以在 此处 阅读有关它们的信息。

使用 @Client() 装饰器如下:

ts
@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero',
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer'
    }
  }
})
client: ClientKafka;

消息模式

Kafka 微服务消息模式利用两个主题作为请求和回复渠道。ClientKafka#send() 方法通过将 关联 ID、回复主题和回复分区与请求消息关联,发送带有 返回地址 的消息。这要求 ClientKafka 实例订阅回复主题并分配给至少一个分区,然后才能发送消息。

随后,您需要为每个正在运行的 Nest 应用程序至少有一个回复主题分区。例如,如果您正在运行 4 个 Nest 应用程序,但回复主题只有 3 个分区,那么在尝试发送消息时,其中 1 个 Nest 应用程序将出错。

当新的 ClientKafka 实例启动时,它们会加入消费者组并订阅各自的主题。此过程会触发分配给消费者组消费者的主题分区的重新平衡。

通常,使用循环分区器分配主题分区,该分区器将主题分区分配给按应用程序启动时随机设置的消费者名称排序的消费者集合。但是,当新消费者加入消费者组时,新消费者可以位于消费者集合中的任何位置。这会产生一种情况,即当预先存在的消费者位于新消费者之后时,可以为预先存在的消费者分配不同的分区。因此,分配了不同分区的消费者将丢失重新平衡之前发送的请求的响应消息。

为了防止 ClientKafka 消费者丢失响应消息,使用了 Nest 特定的内置自定义分区器。此自定义分区器将分区分配给按应用程序启动时设置的高分辨率时间戳 (process.hrtime()) 排序的消费者集合。

消息响应订阅

注意

本节仅在您使用 request-response 消息样式(使用 @MessagePattern 装饰器和 ClientKafka#send 方法)时才相关。对于 基于事件 通信(@EventPattern 装饰器和 ClientKafka#emit 方法),订阅响应主题不是必需的。

ClientKafka 类提供 subscribeToResponseOf() 方法。subscribeToResponseOf() 方法将请求的主题名称作为参数,并将派生的回复主题名称添加到回复主题集合中。实现消息模式时需要此方法。

heroes.controller
ts
onModuleInit(){
  this.client.subscribeToResponseOf('hero.kill.dragon');
}

如果ClientKafka实例是异步创建的,则在调用connect()方法之前必须调用subscribeToResponseOf()方法。

heroes.controller
ts
async onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
  await this.client.connect();
}

传入

Nest 以对象的形式接收传入的 Kafka 消息,该对象具有 keyvalueheaders 属性,这些属性的值类型为 Buffer。然后,Nest 通过将缓冲区转换为字符串来解析这些值。如果字符串是类似对象,Nest 会尝试将字符串解析为 JSON。然后将 value 传递给其关联的处理程序。

传出

Nest 在发布事件或发送消息时经过序列化过程后发送传出的 Kafka 消息。这发生在传递给 ClientKafka emit()send() 方法的参数上,或发生在从 @MessagePattern 方法返回的值上。此序列化通过使用 JSON.stringify()toString() 原型方法字符串化非字符串或缓冲区的对象。

heroes.controller
ts
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const dragonId = message.dragonId
    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ]
    return items
  }
}
提示

@Payload()@nestjs/microservices 包导入。

也可以通过传递具有 keyvalue 属性的对象来键入传出消息。键入消息对于满足 共同分区要求 非常重要。

heroes.controller
ts
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest'
    const heroId = message.heroId
    const dragonId = message.dragonId

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ]

    return {
      headers: {
        realm
      },
      key: heroId,
      value: items
    }
  }
}

此外,以此格式传递的消息还可以包含在headers哈希属性中设置的自定义标头。标头哈希属性值必须是string类型或Buffer类型。

heroes.controller
ts
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest'
    const heroId = message.heroId
    const dragonId = message.dragonId

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ]

    return {
      headers: {
        kafka_nestRealm: realm
      },
      key: heroId,
      value: items
    }
  }
}

基于事件

虽然请求-响应方法非常适合在服务之间交换消息,但当您的消息样式是基于事件的(这反过来又非常适合 Kafka)时,它就不太合适了 - 当您只想发布事件而不等待响应时。在这种情况下,您不希望请求-响应需要维护两个主题的开销。

查看这两个部分以了解更多信息:概述:基于事件概述:发布事件

上下文

在更复杂的场景中,您可能希望访问有关传入请求的更多信息。使用 Kafka 传输器时,您可以访问 KafkaContext 对象。

ts
TS
ts
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  console.log(`Topic: ${context.getTopic()}`);
}
提示

@Payload()@Ctx()KafkaContext@nestjs/microservices 包导入。

要访问原始 Kafka IncomingMessage 对象,请使用 KafkaContext 对象的 getMessage() 方法,如下所示:

ts
TS
ts
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}

其中 IncomingMessage 满足以下接口:

ts
interface IncomingMessage {
  topic: string
  partition: number
  timestamp: string
  size: number
  attributes: number
  offset: string
  key: any
  value: any
  headers: Record<string, any>
}

如果您的处理程序对每条收到的消息的处理时间较长,则应考虑使用heartbeat回调。要检索heartbeat函数,请使用KafkaContextgetHeartbeat()方法,如下所示:

ts
@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const heartbeat = context.getHeartbeat();

  // 做一些缓慢的处理
  await doWorkPart1();

  // 发送心跳以不超过 sessionTimeout
  await heartbeat();

  // 再次做一些缓慢的处理
  await doWorkPart2();
}

命名约定

Kafka 微服务组件将其各自角色的描述附加到 client.clientIdconsumer.groupId 选项上,以防止 Nest 微服务客户端和服务器组件之间发生冲突。默认情况下,ClientKafka 组件将 -client 附加到这两个选项,而 ServerKafka 组件将 -server 附加到这两个选项。请注意下面提供的值是如何以这种方式转换的(如注释中所示)。

main
ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-server
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-server
    },
  }
})

对于客户来说:

heroes.controller
ts
@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-client
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-client
    }
  }
})
client: ClientKafka;
提示

Kafka 客户端和消费者命名约定可以通过在您自己的自定义提供程序中扩展 ClientKafkaKafkaServer 并覆盖构造函数来自定义。

由于 Kafka 微服务消息模式使用两个主题作为请求和回复渠道,因此回复模式应从请求主题派生而来。默认情况下,回复主题的名称是请求主题名称与附加的 .reply 的组合。

heroes.controller
ts
onModuleInit() {
  this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}
提示

Kafka 回复主题命名约定可以通过在您自己的自定义提供程序中扩展 ClientKafka 并覆盖 getResponsePatternName 方法来自定义。

可重试异常

与其他传输器类似,所有未处理的异常都会自动包装到 RpcException 中并转换为用户友好格式。但是,在某些情况下,您可能希望绕过此机制并让 kafkajs 驱动程序使用异常。在处理消息时抛出异常会指示 kafkajs 重试(重新传递),这意味着即使触发了消息(或事件)处理程序,偏移量也不会提交给 Kafka。

警告

对于事件处理程序(基于事件的通信),默认情况下,所有未处理的异常都被视为可重试异常

为此,您可以使用一个名为KafkaRetriableException的专用类,如下所示:

ts
throw new KafkaRetriableException('...')
提示

KafkaRetriableException类从@nestjs/microservices包中导出。

提交偏移量

使用 Kafka 时,提交偏移量至关重要。默认情况下,消息将在特定时间后自动提交。有关更多信息,请访问KafkaJS 文档KafkaContext提供了一种访问活动消费者以手动提交偏移量的方法。消费者是 KafkaJS 消费者,作为 原生 KafkaJS 实现 工作。

ts
TS
ts
@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
  // business logic

  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  const consumer = context.getConsumer();
  await consumer.commitOffsets([{ topic, partition, offset }])
}

要禁用消息的自动提交,请在运行配置中设置autoCommit:false,如下所示:

ts
main
ts
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    run: {
      autoCommit: false
    }
  }
})