简单的 CRUD(创建、读取、更新和删除)应用程序的流程可以描述如下:
- 控制器层处理 HTTP 请求并将任务委托给服务层。
- 服务层是大多数业务逻辑所在的位置。
- 服务使用存储库/DAO 来更改/持久化实体。
- 实体充当值的容器,具有 setter 和 getter。
虽然这种模式通常适用于中小型应用程序,但对于更大、更复杂的应用程序来说,它可能不是最佳选择。在这种情况下,CQRS(命令和查询职责分离)模型可能更合适且更具可扩展性(取决于应用程序的要求)。此模型的优点包括:
- 关注点分离。该模型将读取和写入操作分离为单独的模型。
- 可扩展性。读写操作可以独立扩展。 -灵活性。该模型允许使用不同的数据存储进行读写操作。 -性能。该模型允许使用针对读写操作优化的不同数据存储。
为了促进该模型,Nest 提供了一个轻量级的 CQRS 模块。本章介绍如何使用它。
安装
首先安装所需的包:
$ npm install --save @nestjs/cqrs
命令
命令用于更改应用程序状态。它们应该基于任务,而不是以数据为中心。当命令被分派时,它由相应的命令处理程序处理。处理程序负责更新应用程序状态。
@Injectable()
export class HeroesGameService {
constructor(private commandBus: CommandBus) {}
async killDragon(heroId: string, killDragonDto: KillDragonDto) {
return this.commandBus.execute(
new KillDragonCommand(heroId, killDragonDto.dragonId)
)
}
}
在上面的代码片段中,我们实例化 KillDragonCommand
类并将其传递给 CommandBus
的 execute()
方法。这是演示的命令类:
export class KillDragonCommand {
constructor(
public readonly heroId: string,
public readonly dragonId: string,
) {}
}
CommandBus
表示命令的 流。它负责将命令分派给适当的处理程序。execute()
方法返回一个承诺,该承诺解析为处理程序返回的值。
让我们为 KillDragonCommand
命令创建一个处理程序。
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(private repository: HeroRepository) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command
const hero = this.repository.findOneById(+heroId)
hero.killEnemy(dragonId)
await this.repository.persist(hero)
}
}
此处理程序从存储库中检索Hero
实体,调用killEnemy()
方法,然后持久保存更改。KillDragonHandler
类实现ICommandHandler
接口,这需要实现execute()
方法。execute()
方法接收命令对象作为参数。
查询
查询用于从应用程序状态中检索数据。它们应该以数据为中心,而不是基于任务。当分派查询时,它由相应的查询处理程序处理。处理程序负责检索数据。
QueryBus
遵循与CommandBus
相同的模式。查询处理程序应实现IQueryHandler
接口并使用@QueryHandler()
装饰器进行注释。
事件
事件用于通知应用程序的其他部分有关应用程序状态的变化。它们由模型或直接使用EventBus
进行调度。调度事件时,它会由相应的事件处理程序进行处理。然后,处理程序可以执行例如更新读取的模型等操作。
为了演示目的,让我们创建一个事件类:
export class HeroKilledDragonEvent {
constructor(
public readonly heroId: string,
public readonly dragonId: string,
) {}
}
现在,虽然可以使用 EventBus.publish()
方法直接分派事件,但我们也可以从模型中分派它们。让我们更新 Hero
模型,以便在调用 killEnemy()
方法时分派 HeroKilledDragonEvent
事件。
export class Hero extends AggregateRoot {
constructor(private id: string) {
super()
}
killEnemy(enemyId: string) {
// Business logic
this.apply(new HeroKilledDragonEvent(this.id, enemyId))
}
}
apply()
方法用于分派事件。它接受事件对象作为参数。但是,由于我们的模型不知道 EventBus
,因此我们需要将其与模型关联起来。我们可以通过使用 EventPublisher
类来实现这一点。
@CommandHandler(KillDragonCommand)
export class KillDragonHandler implements ICommandHandler<KillDragonCommand> {
constructor(
private repository: HeroRepository,
private publisher: EventPublisher,
) {}
async execute(command: KillDragonCommand) {
const { heroId, dragonId } = command
const hero = this.publisher.mergeObjectContext(
await this.repository.findOneById(+heroId),
)
hero.killEnemy(dragonId)
hero.commit()
}
}
EventPublisher#mergeObjectContext
方法将事件发布者合并到提供的对象中,这意味着该对象现在能够将事件发布到事件流。
请注意,在此示例中,我们还在模型上调用了 commit()
方法。此方法用于分派任何未完成的事件。要自动分派事件,我们可以将 autoCommit
属性设置为 true
:
export class Hero extends AggregateRoot {
constructor(private id: string) {
super()
this.autoCommit = true
}
}
如果我们想将事件发布者合并到一个不存在的对象中,而是合并到一个类中,我们可以使用EventPublisher#mergeClassContext
方法:
const HeroModel = this.publisher.mergeClassContext(Hero)
const hero = new HeroModel('id') // <-- HeroModel 是一个类
现在,HeroModel
类的每个实例都可以发布事件,而无需使用mergeObjectContext()
方法。
此外,我们可以使用EventBus
手动发出事件:
this.eventBus.publish(new HeroKilledDragonEvent())
EventBus
是一个可注入类。
每个事件可以有多个 事件处理程序。
@EventsHandler(HeroKilledDragonEvent)
export class HeroKilledDragonHandler implements IEventHandler<HeroKilledDragonEvent> {
constructor(private repository: HeroRepository) {}
handle(event: HeroKilledDragonEvent) {
// Business logic
}
}
请注意,当您开始使用事件处理程序时,您将脱离传统的 HTTP Web 上下文。
Sagas
Saga 是一个长期运行的进程,它会监听事件并可能触发新命令。它通常用于管理应用程序中的复杂工作流。例如,当用户注册时,saga 可能会监听 UserRegisteredEvent
并向用户发送欢迎电子邮件。
Sagas 是一个非常强大的功能。单个 saga 可能会监听 1..* 个事件。使用 RxJS 库,我们可以过滤、映射、分叉和合并事件流以创建复杂的工作流。每个 saga 都会返回一个 Observable,它会生成一个命令实例。然后,CommandBus
会 异步 调度此命令。
让我们创建一个 saga,它监听 HeroKilledDragonEvent
并调度 DropAncientItemCommand
命令。
@Injectable()
export class HeroesGameSagas {
@Saga()
dragonKilled = (events$: Observable<any>): Observable<ICommand> => {
return events$.pipe(
ofType(HeroKilledDragonEvent),
map(event => new DropAncientItemCommand(event.heroId, fakeItemID)),
)
}
}
ofType
操作符和 @Saga()
装饰器从 @nestjs/cqrs
包导出。
@Saga()
装饰器将该方法标记为 saga。events$
参数是所有事件的 Observable 流。ofType
操作符按指定的事件类型过滤流。map
操作符将事件映射到新的命令实例。
在此示例中,我们将 HeroKilledDragonEvent
映射到 DropAncientItemCommand
命令。然后 CommandBus
自动分派 DropAncientItemCommand
命令。
设置
最后,我们需要在 HeroesGameModule
中注册所有命令处理程序、事件处理程序和 saga:
export const CommandHandlers = [KillDragonHandler, DropAncientItemHandler]
export const EventHandlers = [HeroKilledDragonHandler, HeroFoundItemHandler]
@Module({
imports: [CqrsModule],
controllers: [HeroesGameController],
providers: [
HeroesGameService,
HeroesGameSagas,
...CommandHandlers,
...EventHandlers,
HeroRepository,
]
})
export class HeroesGameModule {}
未处理的异常
事件处理程序以异步方式执行。这意味着它们应始终处理所有异常,以防止应用程序进入不一致状态。但是,如果未处理异常,则EventBus
将创建UnhandledExceptionInfo
对象并将其推送到UnhandledExceptionBus
流。此流是一个Observable
,可用于处理未处理的异常。
private destroy$ = new Subject<void>();
constructor(private unhandledExceptionsBus: UnhandledExceptionBus) {
this.unhandledExceptionsBus
.pipe(takeUntil(this.destroy$))
.subscribe((exceptionInfo) => {
// Handle exception here
// e.g. send it to external service, terminate process, or publish a new event
});
}
onModuleDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
为了过滤掉异常,我们可以使用 ofType
运算符,如下所示:
this.unhandledExceptionsBus.pipe(takeUntil(this.destroy$), UnhandledExceptionBus.ofType(TransactionNotAllowedException)).subscribe((exceptionInfo) => {
// Handle exception here
})
其中 TransactionNotAllowedException
是我们要过滤掉的异常。
UnhandledExceptionInfo
对象包含以下属性:
export interface UnhandledExceptionInfo<Cause = IEvent | ICommand, Exception = any> {
/**
* The exception that was thrown.
*/
exception: Exception
/**
* The cause of the exception (event or command reference).
*/
cause: Cause
}
订阅所有事件
CommandBus
、QueryBus
和 EventBus
都是 Observables。这意味着我们可以订阅整个流,例如处理所有事件。例如,我们可以将所有事件记录到控制台,或将它们保存到事件存储中。
private destroy$ = new Subject<void>();
constructor(private eventBus: EventBus) {
this.eventBus
.pipe(takeUntil(this.destroy$))
.subscribe((event) => {
// Save events to database
});
}
onModuleDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
示例
此处 提供了一个实用示例。