Kafka 是一个开源的分布式流式传输平台,具有三个关键功能:
- 发布和订阅记录流,类似于消息队列或企业消息系统。
- 以容错持久的方式存储记录流。
- 在记录流发生时对其进行处理。
Kafka 项目旨在提供一个统一、高吞吐量、低延迟的平台来处理实时数据馈送。它与 Apache Storm 和 Spark 很好地集成在一起,用于实时流数据分析。
安装
要开始构建基于 Kafka 的微服务,首先安装所需的包:
$ npm i --save kafkajs
概述
与其他 Nest 微服务传输层实现一样,您可以使用传递给 createMicroservice()
方法的选项对象的 transport
属性以及可选的 options
属性来选择 Kafka 传输器机制,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
})
Transport
枚举从 @nestjs/microservices
包导入。
选项
options
属性特定于所选传输器。Kafka 传输器公开了下面描述的属性。
|||
|client|Client configuration optionsread more|
|consumer|Consumer configuration options read more|
|run|Run configuration options read more here|
|subscribe|Subscribe configuration options read more|
|producer|Producer configuration options read more|
|send|Send configuration optionsread more|
|producerOnlyMode|Feature flag to skip consumer group registration and only act as a producer boolean
|
|postfixId|Change suffix of clientId value string
|
客户端
与其他微服务传输器相比,Kafka 有一点不同。我们使用 ClientKafka
类而不是 ClientProxy
类。
与其他微服务传输器一样,您有 几个选项 来创建 ClientKafka
实例。
创建实例的一种方法是使用 ClientsModule
。要使用 ClientsModule
创建客户端实例,请导入它并使用 register()
方法传递具有与 createMicroservice()
方法中上面显示的相同属性的选项对象,以及用作注入令牌的 name
属性。在此处阅读有关 ClientsModule
的更多信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
// ...
})
也可以使用其他选项来创建客户端(ClientProxyFactory
或 @Client()
)。您可以在 此处 阅读有关它们的信息。
使用 @Client()
装饰器如下:
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
})
client: ClientKafka;
消息模式
Kafka 微服务消息模式利用两个主题作为请求和回复渠道。ClientKafka#send()
方法通过将 关联 ID、回复主题和回复分区与请求消息关联,发送带有 返回地址 的消息。这要求 ClientKafka
实例订阅回复主题并分配给至少一个分区,然后才能发送消息。
随后,您需要为每个正在运行的 Nest 应用程序至少有一个回复主题分区。例如,如果您正在运行 4 个 Nest 应用程序,但回复主题只有 3 个分区,那么在尝试发送消息时,其中 1 个 Nest 应用程序将出错。
当新的 ClientKafka
实例启动时,它们会加入消费者组并订阅各自的主题。此过程会触发分配给消费者组消费者的主题分区的重新平衡。
通常,使用循环分区器分配主题分区,该分区器将主题分区分配给按应用程序启动时随机设置的消费者名称排序的消费者集合。但是,当新消费者加入消费者组时,新消费者可以位于消费者集合中的任何位置。这会产生一种情况,即当预先存在的消费者位于新消费者之后时,可以为预先存在的消费者分配不同的分区。因此,分配了不同分区的消费者将丢失重新平衡之前发送的请求的响应消息。
为了防止 ClientKafka
消费者丢失响应消息,使用了 Nest 特定的内置自定义分区器。此自定义分区器将分区分配给按应用程序启动时设置的高分辨率时间戳 (process.hrtime()
) 排序的消费者集合。
消息响应订阅
本节仅在您使用 request-response 消息样式(使用 @MessagePattern
装饰器和 ClientKafka#send
方法)时才相关。对于 基于事件 通信(@EventPattern
装饰器和 ClientKafka#emit
方法),订阅响应主题不是必需的。
ClientKafka
类提供 subscribeToResponseOf()
方法。subscribeToResponseOf()
方法将请求的主题名称作为参数,并将派生的回复主题名称添加到回复主题集合中。实现消息模式时需要此方法。
onModuleInit(){
this.client.subscribeToResponseOf('hero.kill.dragon');
}
如果ClientKafka
实例是异步创建的,则在调用connect()
方法之前必须调用subscribeToResponseOf()
方法。
async onModuleInit() {
this.client.subscribeToResponseOf('hero.kill.dragon');
await this.client.connect();
}
传入
Nest 以对象的形式接收传入的 Kafka 消息,该对象具有 key
、value
和 headers
属性,这些属性的值类型为 Buffer
。然后,Nest 通过将缓冲区转换为字符串来解析这些值。如果字符串是类似对象
,Nest 会尝试将字符串解析为 JSON
。然后将 value
传递给其关联的处理程序。
传出
Nest 在发布事件或发送消息时经过序列化过程后发送传出的 Kafka 消息。这发生在传递给 ClientKafka
emit()
和 send()
方法的参数上,或发生在从 @MessagePattern
方法返回的值上。此序列化通过使用 JSON.stringify()
或 toString()
原型方法字符串化
非字符串或缓冲区的对象。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const dragonId = message.dragonId
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
]
return items
}
}
@Payload()
从 @nestjs/microservices
包导入。
也可以通过传递具有 key
和 value
属性的对象来键入传出消息。键入消息对于满足 共同分区要求 非常重要。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest'
const heroId = message.heroId
const dragonId = message.dragonId
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
]
return {
headers: {
realm
},
key: heroId,
value: items
}
}
}
此外,以此格式传递的消息还可以包含在headers
哈希属性中设置的自定义标头。标头哈希属性值必须是string
类型或Buffer
类型。
@Controller()
export class HeroesController {
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage): any {
const realm = 'Nest'
const heroId = message.heroId
const dragonId = message.dragonId
const items = [
{ id: 1, name: 'Mythical Sword' },
{ id: 2, name: 'Key to Dungeon' },
]
return {
headers: {
kafka_nestRealm: realm
},
key: heroId,
value: items
}
}
}
基于事件
虽然请求-响应方法非常适合在服务之间交换消息,但当您的消息样式是基于事件的(这反过来又非常适合 Kafka)时,它就不太合适了 - 当您只想发布事件而不等待响应时。在这种情况下,您不希望请求-响应需要维护两个主题的开销。
查看这两个部分以了解更多信息:概述:基于事件 和 概述:发布事件。
上下文
在更复杂的场景中,您可能希望访问有关传入请求的更多信息。使用 Kafka 传输器时,您可以访问 KafkaContext
对象。
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
console.log(`Topic: ${context.getTopic()}`);
}
@Payload()
、@Ctx()
和 KafkaContext
从 @nestjs/microservices
包导入。
要访问原始 Kafka IncomingMessage
对象,请使用 KafkaContext
对象的 getMessage()
方法,如下所示:
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const partition = context.getPartition();
const { headers, timestamp } = originalMessage;
}
其中 IncomingMessage
满足以下接口:
interface IncomingMessage {
topic: string
partition: number
timestamp: string
size: number
attributes: number
offset: string
key: any
value: any
headers: Record<string, any>
}
如果您的处理程序对每条收到的消息的处理时间较长,则应考虑使用heartbeat
回调。要检索heartbeat
函数,请使用KafkaContext
的getHeartbeat()
方法,如下所示:
@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const heartbeat = context.getHeartbeat();
// 做一些缓慢的处理
await doWorkPart1();
// 发送心跳以不超过 sessionTimeout
await heartbeat();
// 再次做一些缓慢的处理
await doWorkPart2();
}
命名约定
Kafka 微服务组件将其各自角色的描述附加到 client.clientId
和 consumer.groupId
选项上,以防止 Nest 微服务客户端和服务器组件之间发生冲突。默认情况下,ClientKafka
组件将 -client
附加到这两个选项,而 ServerKafka
组件将 -server
附加到这两个选项。请注意下面提供的值是如何以这种方式转换的(如注释中所示)。
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-server
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-server
},
}
})
对于客户来说:
@Client({
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero', // hero-client
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer' // hero-consumer-client
}
}
})
client: ClientKafka;
Kafka 客户端和消费者命名约定可以通过在您自己的自定义提供程序中扩展 ClientKafka
和 KafkaServer
并覆盖构造函数来自定义。
由于 Kafka 微服务消息模式使用两个主题作为请求和回复渠道,因此回复模式应从请求主题派生而来。默认情况下,回复主题的名称是请求主题名称与附加的 .reply
的组合。
onModuleInit() {
this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}
Kafka 回复主题命名约定可以通过在您自己的自定义提供程序中扩展 ClientKafka
并覆盖 getResponsePatternName
方法来自定义。
可重试异常
与其他传输器类似,所有未处理的异常都会自动包装到 RpcException
中并转换为用户友好
格式。但是,在某些情况下,您可能希望绕过此机制并让 kafkajs
驱动程序使用异常。在处理消息时抛出异常会指示 kafkajs
重试(重新传递),这意味着即使触发了消息(或事件)处理程序,偏移量也不会提交给 Kafka。
对于事件处理程序(基于事件的通信),默认情况下,所有未处理的异常都被视为可重试异常。
为此,您可以使用一个名为KafkaRetriableException
的专用类,如下所示:
throw new KafkaRetriableException('...')
KafkaRetriableException
类从@nestjs/microservices
包中导出。
提交偏移量
使用 Kafka 时,提交偏移量至关重要。默认情况下,消息将在特定时间后自动提交。有关更多信息,请访问KafkaJS 文档。KafkaContext
提供了一种访问活动消费者以手动提交偏移量的方法。消费者是 KafkaJS 消费者,作为 原生 KafkaJS 实现 工作。
@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
// business logic
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
const consumer = context.getConsumer();
await consumer.commitOffsets([{ topic, partition, offset }])
}
要禁用消息的自动提交,请在运行
配置中设置autoCommit:false
,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
run: {
autoCommit: false
}
}
})