如何使用 TypeScript 實現 Pub/Sub Pattern ?
Pub/Sub Pattern 是 OOP 中著名的 Design Pattern,尤其應付 多對多 的場景特別有效。在本文中,我們將以 Angular 與 TypeScript 實現。
Pub/Sub Pattern 與 Observer Pattern 非常接近,將特別探討與 Observer Pattern 之間的差異。
Version
Node.js 8.9.4
Angular CLI 1.6.2
TypeScript 2.5.3
Angular 5.2.2
User Story

畫面上有兩個 數字鐘,上面的是 每秒鐘 更新一次,下面的是 每 3 秒 更新一次。
Task
程式碼希望分成兩部分,一個部分送出 目前時間,另一個部分負責 顯示時間。
但因為目前分成 每秒鐘 與 每 3 秒,因此會有 2 個 class 分別 負責產生時間。
Definition
Pub/Sub Pattern
當物件之間有
多對多的依賴關係,且當多的發行者(publisher) 改變時,多的訂閱者(subscriber) 也必須跟著改變,就特別適合使用Pub/Sub Pattern
此為 Pub/Sub Pattern 最原始的 UML class diagram,實務上會有多個 publisher 與多個 subscriber,且 publisher 與 subscriber 彼此互相依賴,broker 的介入讓 publisher 與 subscriber 的關係簡單化。
subscriber 不用知道有哪些 publisher,只要知道 broker 即可;publisher 也不用知道有哪些 subscriber,只要知道 broker 即可,如此 subscriber 與 publisher 就徹底解耦合,且將來無論新增多少 subscriber 與 publisher,唯一需要修改的只有 broker,符合 開放封閉原則 的要求。
Architecture
Clock1sPublisher負責每秒鐘送出目前時間;Clock3sPublisher負責每 3 秒送出目前時間Digital1sComponent負責每秒鐘顯示目前時間;Digital3sComponent負責每 3 秒顯示目前時間DigitalComponent必須能向ClockBrokersubscribe資料;ClockPublisher必須能向ClockBrokerpublish資料,根據DigitalComponent與ClockPublisher需求,訂出BrokerInterface,期望ClockBroker能遵守ClockBroker必須能向DigitalComponent送出目前時間,根據ClockBroker需求,訂出SubscriberInterface,期望DigitalComponent能遵守ClockBroker必須能向ClockPublisher設定 broker,根據ClockBroker需求,訂出PublisherInterface,期望ClockPublisher能遵守
Implementation
DigitalClock1sComponent
digital-clock1s.component.html
1 | {{ now | date:'HH:mm:ss'}} |
HTML 負責顯示 目前時間,至於 時:分:秒 部分就不用自己寫程式處理了,靠 pipe 即可。
digital-clock1s.component.ts
1 | import { Component, Inject, OnDestroy, OnInit } from '@angular/core'; |
12 行
1 | export class DigitalClock1sComponent implements SubscriberInterface |
先不考慮 DigitalClock1sComponent 所使用的 SubscriberInterface ,稍後會討論。
也因為是實現 SubscriberInterface,因此 DigitalClock1sComponent 本質上就是一個 Subscriber,只是藉由 component 實作。
subscriber在觀念上很類似 Observer Pattern 的observer
20 行
1 | ngOnInit(): void { |
回想 subscriber 的初衷,除了顯示 目前時間 外,另外一個目的就是能對 broker 加以 訂閱 與 取消訂閱。
既然要訂閱,該在什麼時候執行呢 ?
最好是在 subscriber 開始 初始化 時就對 broker 加以訂閱,因此選擇使用 ngOnInit() lifecycle hook。
並且希望 broker 有 subscribe(),提供 訂閱功能。
subscribe() 第 1 個參數為 SubjectEnum,設定要 subscribe 什麼 subject;第 2 個參數則是將 this 傳進去,準備將來 callback 使用。
Sub/Pub Pattern 與 Observer Pattern 一個主要的差異是 observer 會直接對 subject 加以訂閱,但 Sub/Pub Pattern 則會透過 broker 加以訂閱
24 行
1 | ngOnDestroy(): void { |
既然有 訂閱,就應該有 取消訂閱,該在什麼時候取消呢 ?
最好是在 subscriber 最後 被消滅 時對 broker 加以 取消註冊,因此選擇 ngOnDestroy() lifecycle hook。
並且希望 broker 有 unsubscribe() ,提供 取消訂閱 功能。
綜合
ngOnInit()與ngOnDestroy(),根據介面隔離原則,已經大概可猜到 broker 的 interface 該提供subscribe()與unsubscribe()
15 行
1 | constructor( |
既然 subscriber 需要 ClockBroker,因此必須在 constructor 將 ClockBroker DI 注入進來。
根據 依賴反轉原則 : subscriber 不應該依賴底層的 broker,兩者應該依賴於 interface。
根據 介面隔離原則 : subscriber 應該只相依於他所需要的 interface,目前看來只需要 subscribe() 與 unsubscribe(),因此由 subscriber 需求角度訂出的 broker 的 interface。
因此 DI 注入的 ClockBroker ,其型別為 BrokerInterface,這樣就符合 依賴反轉原則 與 介面隔離原則。
subscribe()在觀念上類似於 Observer Pattern 的addObserver();而unsubscribe()類似於removeObserver()
Clock1sPublisher
clock1s.publisher.ts
1 | import { Injectable } from '@angular/core'; |
第 6 行
1 | export class Clock1sPublisher implements PublisherInterface |
先不考慮 Clock1sPublisher 所使用的 PublisherInterface ,稍後會討論。
10 行
1 | constructor() { |
根據需求,publisher 要能夠每秒送出 目前時間,所以使用 JavaScript 原生的 setInterval(),每 1 秒鐘呼叫 tick() 一次。
18 行
1 | private tick(): void { |
每一秒執行 tick() 時,會將 目前時間 透過 publish() 發佈至 broker。
根據
介面隔離原則,已經大概可猜到broker的 interface 該提供publish()
BrokerInterface
broker.interface.ts
1 | import { SubjectEnum } from '../enum/subject.enum'; |
根據 介面隔離原則,subscriber 與 publisher 應該只相依於他所需要的 interface,目前看來共需要 publish()、subscribe() 與 unsubscribe(),因此訂出 BrokerInterface,而 broker 必須實作此 interface。
ClockBroker
clock.broker.ts
1 | import { Injectable } from '@angular/core'; |
11 行
1 | export class ClockBroker implements BrokerInterface |
根據 依賴反轉原則,broker 應該相依於 subscriber 與 publisher 所訂出的 interface,因此必須實現 BrokerInterface。
16 行
1 | constructor(private clock1sPublisher: Clock1sPublisher, |
ClockBroker 的角色就是做 publisher 與 subscriber 的中介角色,除了 被 subscriber 注入外,還必須自己注入多個 publisher。
因為 publisher 將來會對 broker 加以 callback,因此使用 setBroker() 將 broker 的 reference 傳進去,讓 publisher 可對 broker 做 callback。
根據
介面隔離原則,已經大概可猜到publisher的 interface 該提供setBroker()
12 行
1 | private subscribers: SubjectSubscriber[] = []; |
subscribers 陣列儲存所有訂閱的 subscriber,每個物件型別為 SubjectSubscriber。
至於什麼是 SubjectSubscriber,稍後會討論。
34 行
1 | subscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void { |
既然 BlockerInterface 已經定義了 subscribe(),broker 就必須加以實作。
SubjectSubscriber 物件提供了 subject 與 subscriber 兩個 field,subject 儲存所訂閱的主題,其型別為 SubjectEnum,subscriber 儲存有訂閱的 subscriber,其型別為 SubscriberInterface。
subscribers 為陣列,使用 push() 新增資料進陣列。
22 行
1 | publish(date: Date): void { |
既然 BlockerInterface 已經定義了 publish(),broker 就必須加以實作。
目前 broker 主要應付兩種 publisher,一種是 每秒,另一種是 每三秒。
每一秒 counter 會 +1,當 counter 為 3 會 reset 為 0。
每 3 秒會執行 publishToClock3sSubscriber(),而每 1 秒會執行 publishToClock1sSubscriber(),並將 目前時間 送出。
42 行
1 | private publishToClock3sSubscriber(date: Date) { |
每 3 秒執行 publishToClock3sSubscriber() 一次,先 filter 只訂閱 3 秒更新 的 subscriber,然後 foreach 執行每個 subscriber 的 update(),並送出 目前時間。
51 行
1 | private publishToClock1sSubscriber(date: Date) { |
每 1 秒執行 publishToClock1sSubscriber() 一次,先 filter 只訂閱 1 秒更新 的 subscriber,然後 foreach 執行每個 subscriber 的 update(),並送出 目前時間。
根據
介面隔離原則,已經大概可猜到subscriber的 interface 該提供update()
40 行
1 | unsubscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void { |
既然 BlockerInterface 已經定義了 subscribe(),broker 就必須加以實作。
使用 Lodash 的
remove()對陣列刪除物件
PublisherInterface
publisher.interface.ts
1 | import { BrokerInterface } from './broker.interface'; |
根據 介面隔離原則,publisher 應該只相依於他所需要的 interface,目前看來需要 setBroker(),因此訂出 PublisherInterface,而 publisher 必須實作此 interface。
Clock1sPublisher
clock1s.publisher.ts
1 | import { Injectable } from '@angular/core'; |
第 6 行
1 | export class Clock1sPublisher implements PublisherInterface |
根據 依賴反轉原則,publisher 應該相依於 broker 所訂出的 interface,因此必須實現 PublisherInterface。
14 行
1 | setBroker(broker: BrokerInterface): void { |
既然 PublisherInterface 已經定義了 setBroker(),publisher 就必須加以實作。
SubscriberInterface
subscriber.interface.ts
1 | export interface SubscriberInterface { |
根據 介面隔離原則,broker 應該只相依於他所需要的 interface,目前看來需要 update(),因此訂出 SubscriberInterface,而 subscriber 必須實作此 interface。
DigitalClock1sComponent
digital-clock1s.component.ts
1 | import { Component, Inject, OnDestroy, OnInit } from '@angular/core'; |
13 行
1 | export class DigitalClock1sComponent implements SubscriberInterface, OnInit, OnDestroy |
根據 依賴反轉原則,subscriber 應該相依於 broker 所訂出的 interface,因此必須實現 SubscriberInterface。
25 行
1 | update(date: Date): void { |
既然 SubscriberInterface 已經定義了 update(),subscriber 就必須加以實作。
AppModule
app.module.ts
1 | import { BrowserModule } from '@angular/platform-browser'; |
20 行
1 | providers: [ |
broker 採用 interface 方式注入,要特別使用 token。
兩個 publisher 直接對 class 注入即可。
Summary
Pub/Sub Pattern vs. Observer Pattern
Observer Pattern 是原始 GoF 23 個 Design Pattern 之一,Pub/Sub Pattern 與 Observer Pattern 理念上非常接近,廣義上來說,Pub/Sub Pattern 算是 Observer Pattern 的變形,有些書甚至也認為 Pub/Sub Pattern 就是 Observer Pattern,但嚴格來說,兩者還是有些微差異:
相同點
- Observer Pattern 的
observer相當於 Pub/Sub Pattern 的subscriber - Observer Pattern 的
subject相當於 Pub/Sub Pattern 的publisher - Observer Pattern 的
SubjectInterface有addObserver()與removeObserver(); Pub/Sub Pattern 的BrokerInterface也有subscribe()與unsubscribe() - Observer Pattern 的
ObserverInterface有update();Pub/Sub Pattern 的SubscriberInterface也有update()
相異點
- Pub/Sub Pattern 多了
broker介入 - 對於
publisher而言,broker是publisher的subscriber;對於subscriber而言,broker是subscriber的publisher,所以broker兼具subscriber與publisher的角色 - Observer Pattern 只有一個
subject,但 Pub/Sub Pattern 有多個publisher
Conclusion
- Observer Pattern 適合
一對多場景;而 Pub/Sub Pattern 適合多對多場景 一對一當然也可以使用 Pub/Sub Pattern 實現,只是 Design Pattern 強調的是 intention,應該依照使用場景選擇適合的 pattern- Pub/Sub Pattern 與 Observer Pattern 的最大差異在於
broker的介入,讓publisher與subscriber都只相依於broker即可,將來有任何修改,都集中在broker,符合開放封閉原則
Sample Code
完整的範例可以在我的 GitHub 上找到