CQRS CQRS

导读

简单的 CRUD(创建、读取、更新和删除)应用程序的流程可以描述如下:

  1. 控制器层处理 HTTP 请求并将任务委托给服务层。
  2. 服务层是大多数业务逻辑所在的位置。
  3. 服务使用存储库/DAO 来更改/持久化实体。
  4. 实体充当值的容器,具有 setter 和 getter。

虽然这种模式通常适用于中小型应用程序,但对于更大、更复杂的应用程序来说,它可能不是最佳选择。在这种情况下,CQRS(命令和查询职责分离)模型可能更合适且更具可扩展性(取决于应用程序的要求)。此模型的优点包括:

  • 关注点分离。该模型将读取和写入操作分离为单独的模型。
  • 可扩展性。读写操作可以独立扩展。 -灵活性。该模型允许使用不同的数据存储进行读写操作。 -性能。该模型允许使用针对读写操作优化的不同数据存储。

为了促进该模型,Nest 提供了一个轻量级的 CQRS 模块。本章介绍如何使用它。

安装

首先安装所需的包:

bash
$ npm install --save @nestjs/cqrs

命令

命令用于更改应用程序状态。它们应该基于任务,而不是以数据为中心。当命令被分派时,它由相应的命令处理程序处理。处理程序负责更新应用程序状态。

ts
heroes-game.service
ts
@Injectable()
export class HeroesGameService {
  constructor(private commandBus: CommandBus) {}

  async killDragon(heroId: string, killDragonDto: KillDragonDto) {
    return this.commandBus.execute(
      new KillDragonCommand(heroId, killDragonDto.dragonId)
    )
  }
}

在上面的代码片段中,我们实例化 KillDragonCommand 类并将其传递给 CommandBusexecute() 方法。这是演示的命令类:

ts
kill-dragon.command
ts
export class KillDragonCommand {
  constructor(
    public readonly heroId: string,
    public readonly dragonId: string,
  ) {}
}

CommandBus 表示命令的 。它负责将命令分派给适当的处理程序。execute() 方法返回一个承诺,该承诺解析为处理程序返回的值。

让我们为 KillDragonCommand 命令创建一个处理程序。

ts
kill-dragon.handler
ts
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(private repository: HeroRepository) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command
    const hero = this.repository.findOneById(+heroId)

    hero.killEnemy(dragonId)
    await this.repository.persist(hero)
  }
}

此处理程序从存储库中检索Hero实体,调用killEnemy()方法,然后持久保存更改。KillDragonHandler类实现ICommandHandler接口,这需要实现execute()方法。execute()方法接收命令对象作为参数。

查询

查询用于从应用程序状态中检索数据。它们应该以数据为中心,而不是基于任务。当分派查询时,它由相应的查询处理程序处理。处理程序负责检索数据。

QueryBus遵循与CommandBus相同的模式。查询处理程序应实现IQueryHandler接口并使用@QueryHandler()装饰器进行注释。

事件

事件用于通知应用程序的其他部分有关应用程序状态的变化。它们由模型或直接使用EventBus进行调度。调度事件时,它会由相应的事件处理程序进行处理。然后,处理程序可以执行例如更新读取的模型等操作。

为了演示目的,让我们创建一个事件类:

ts
hero-killed-dragon.event
ts
export class HeroKilledDragonEvent {
  constructor(
    public readonly heroId: string,
    public readonly dragonId: string,
  ) {}
}

现在,虽然可以使用 EventBus.publish() 方法直接分派事件,但我们也可以从模型中分派它们。让我们更新 Hero 模型,以便在调用 killEnemy() 方法时分派 HeroKilledDragonEvent 事件。

ts
hero.model
ts
export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super()
  }

  killEnemy(enemyId: string) {
    // Business logic
    this.apply(new HeroKilledDragonEvent(this.id, enemyId))
  }
}

apply() 方法用于分派事件。它接受事件对象作为参数。但是,由于我们的模型不知道 EventBus,因此我们需要将其与模型关联起来。我们可以通过使用 EventPublisher 类来实现这一点。

ts
kill-dragon.handler
ts
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
  constructor(
    private repository: HeroRepository,
    private publisher: EventPublisher,
  ) {}

  async execute(command: KillDragonCommand) {
    const { heroId, dragonId } = command
    const hero = this.publisher.mergeObjectContext(
      await this.repository.findOneById(+heroId),
    )
    hero.killEnemy(dragonId)
    hero.commit()
  }
}

EventPublisher#mergeObjectContext 方法将事件发布者合并到提供的对象中,这意味着该对象现在能够将事件发布到事件流。

请注意,在此示例中,我们还在模型上调用了 commit() 方法。此方法用于分派任何未完成的事件。要自动分派事件,我们可以将 autoCommit 属性设置为 true

ts
export class Hero extends AggregateRoot {
  constructor(private id: string) {
    super()
    this.autoCommit = true
  }
}

如果我们想将事件发布者合并到一个不存在的对象中,而是合并到一个类中,我们可以使用EventPublisher#mergeClassContext方法:

ts
const HeroModel = this.publisher.mergeClassContext(Hero)
const hero = new HeroModel('id') // <-- HeroModel 是一个类

现在,HeroModel类的每个实例都可以发布事件,而无需使用mergeObjectContext()方法。

此外,我们可以使用EventBus手动发出事件:

ts
this.eventBus.publish(new HeroKilledDragonEvent())
提示

EventBus 是一个可注入类。

每个事件可以有多个 事件处理程序

hero-killed-dragon.handler
ts
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
  constructor(private repository: HeroRepository) {}

  handle(event: HeroKilledDragonEvent) {
    // Business logic
  }
}
提示

请注意,当您开始使用事件处理程序时,您将脱离传统的 HTTP Web 上下文。

  • CommandHandlers 中的错误仍可被内置 异常过滤器 捕获。
  • EventHandlers 中的错误无法被异常过滤器捕获:您必须手动处理它们。可以通过简单的 try/catch、使用 Sagas 触发补偿事件,或您选择的任何其他解决方案。
  • CommandHandlers 中的 HTTP 响应仍可发送回客户端。
  • EventHandlers 中的 HTTP 响应不能。如果您想向客户端发送信息,可以使用 WebSocketSSE 或您选择的任何其他解决方案。

Sagas

Saga 是一个长期运行的进程,它会监听事件并可能触发新命令。它通常用于管理应用程序中的复杂工作流。例如,当用户注册时,saga 可能会监听 UserRegisteredEvent 并向用户发送欢迎电子邮件。

Sagas 是一个非常强大的功能。单个 saga 可能会监听 1..* 个事件。使用 RxJS 库,我们可以过滤、映射、分叉和合并事件流以创建复杂的工作流。每个 saga 都会返回一个 Observable,它会生成一个命令实例。然后,CommandBus异步 调度此命令。

让我们创建一个 saga,它监听 HeroKilledDragonEvent 并调度 DropAncientItemCommand 命令。

ts
heroes-game.saga
ts
@Injectable()
export class HeroesGameSagas {
  @Saga()
  dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
    return events$.pipe(
      ofType(HeroKilledDragonEvent),
      map(event => new DropAncientItemCommand(event.heroId, fakeItemID)),
    )
  }
}
提示

ofType 操作符和 @Saga() 装饰器从 @nestjs/cqrs 包导出。

@Saga() 装饰器将该方法标记为 saga。events$ 参数是所有事件的 Observable 流。ofType 操作符按指定的事件类型过滤流。map 操作符将事件映射到新的命令实例。

在此示例中,我们将 HeroKilledDragonEvent 映射到 DropAncientItemCommand 命令。然后 CommandBus 自动分派 DropAncientItemCommand 命令。

设置

最后,我们需要在 HeroesGameModule 中注册所有命令处理程序、事件处理程序和 saga:

heroes-game.module
ts
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler]
export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler]

@Module({
  imports: [CqrsModule],
  controllers: [HeroesGameController],
  providers: [
    HeroesGameService,
    HeroesGameSagas,
    ...CommandHandlers,
    ...EventHandlers,
    HeroRepository,
  ]
})
export class HeroesGameModule {}

未处理的异常

事件处理程序以异步方式执行。这意味着它们应始终处理所有异常,以防止应用程序进入不一致状态。但是,如果未处理异常,则EventBus将创建UnhandledExceptionInfo对象并将其推送到UnhandledExceptionBus流。此流是一个Observable,可用于处理未处理的异常。

ts
private destroy$ = new Subject<void>();

constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
  this.unhandledExceptionsBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((exceptionInfo) => {
      // Handle exception here
      // e.g. send it to external service, terminate process, or publish a new event
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

为了过滤掉异常,我们可以使用 ofType 运算符,如下所示:

ts
this.unhandledExceptionsBus.pipe(takeUntil(this.destroy$), UnhandledExceptionBus.ofType(TransactionNotAllowedException)).subscribe((exceptionInfo) => {
  // Handle exception here
})

其中 TransactionNotAllowedException 是我们要过滤掉的异常。

UnhandledExceptionInfo 对象包含以下属性:

ts
export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
  /**
   * The exception that was thrown.
   */
  exception: Exception
  /**
   * The cause of the exception (event or command reference).
   */
  cause: Cause
}

订阅所有事件

CommandBusQueryBusEventBus 都是 Observables。这意味着我们可以订阅整个流,例如处理所有事件。例如,我们可以将所有事件记录到控制台,或将它们保存到事件存储中。

ts
private destroy$ = new Subject<void>();

constructor(private eventBus: EventBus) {
  this.eventBus
    .pipe(takeUntil(this.destroy$))
    .subscribe((event) => {
      // Save events to database
    });
}

onModuleDestroy() {
  this.destroy$.next();
  this.destroy$.complete();
}

示例

此处 提供了一个实用示例。