與 Observer Pattern 很接近的 Pub/Sub Pattern

Pub/Sub Pattern 是 OOP 中著名的 Design Pattern,尤其應付 多對多 的場景特別有效。在本文中,我們將以 Angular 與 TypeScript 實現。

Pub/Sub Pattern 與 Observer Pattern 非常接近,將特別探討與 Observer Pattern 之間的差異。

Version


Node.js 8.9.4
Angular CLI 1.6.2
TypeScript 2.5.3
Angular 5.2.2

User Story


pub000

畫面上有兩個 數字鐘,上面的是 每秒鐘 更新一次,下面的是 每 3 秒 更新一次。

Task


程式碼希望分成兩部分,一個部分送出 目前時間,另一個部分負責 顯示時間

但因為目前分成 每秒鐘每 3 秒,因此會有 2 個 class 分別 負責產生時間。

Definition


Pub/Sub Pattern

當物件之間有 多對多 的依賴關係,且當 發行者 (publisher) 改變時,訂閱者 (subscriber) 也必須跟著改變,就特別適合使用 Pub/Sub Pattern

pub008

此為 Pub/Sub Pattern 最原始的 UML class diagram,實務上會有多個 publisher 與多個 subscriber,且 publishersubscriber 彼此互相依賴,broker 的介入讓 publishersubscriber 的關係簡單化。

subscriber 不用知道有哪些 publisher,只要知道 broker 即可;publisher 也不用知道有哪些 subscriber,只要知道 broker 即可,如此 subscriberpublisher 就徹底解耦合,且將來無論新增多少 subscriberpublisher,唯一需要修改的只有 broker,符合 開放封閉原則 的要求。

Architecture


pub001

  • Clock1sPublisher 負責 每秒鐘 送出目前時間;Clock3sPublisher 負責 每 3 秒 送出目前時間
  • Digital1sComponent 負責 每秒鐘 顯示目前時間; Digital3sComponent 負責 每 3 秒 顯示目前時間
  • DigitalComponent 必須能向 ClockBroker subscribe 資料;ClockPublisher 必須能向 ClockBroker publish 資料,根據 DigitalComponentClockPublisher 需求,訂出 BrokerInterface,期望 ClockBroker 能遵守
  • ClockBroker 必須能向 DigitalComponent 送出目前時間,根據 ClockBroker 需求,訂出 SubscriberInterface,期望 DigitalComponent 能遵守
  • ClockBroker 必須能向 ClockPublisher 設定 broker,根據 ClockBroker 需求,訂出 PublisherInterface,期望 ClockPublisher 能遵守

Implementation


DigitalClock1sComponent

pub002

digital-clock1s.component.html

1
{{ now | date:'HH:mm:ss'}}

HTML 負責顯示 目前時間,至於 時:分:秒 部分就不用自己寫程式處理了,靠 pipe 即可。

digital-clock1s.component.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import { Component, Inject, OnDestroy, OnInit } from '@angular/core';
import { ClockBroker } from '../../broker/clock.broker';
import { SubscriberInterface } from '../../interface/subscriber.interface';
import { SubjectEnum } from '../../enum/subject.enum';
import { BrokerInterface } from '../../interface/broker.interface';
import { BrokerInterfaceToken } from '../../interface/injection.token';

@Component({
selector: 'app-digital-clock1s',
templateUrl: './digital-clock1s.component.html',
styleUrls: ['./digital-clock1s.component.css']
})
export class DigitalClock1sComponent implements SubscriberInterface, OnInit, OnDestroy {
now: Date = new Date();

constructor(
@Inject(BrokerInterfaceToken)
private clockBroker: BrokerInterface) {

}

ngOnInit(): void {
this.clockBroker.subscribe(SubjectEnum.Clock1s, this);
}

ngOnDestroy(): void {
this.clockBroker.unsubscribe(SubjectEnum.Clock3s, this);
}

update(date: Date): void {
this.now = date;
}
}

12 行

1
export class DigitalClock1sComponent implements SubscriberInterface

先不考慮 DigitalClock1sComponent 所使用的 SubscriberInterface ,稍後會討論。

也因為是實現 SubscriberInterface,因此 DigitalClock1sComponent 本質上就是一個 Subscriber,只是藉由 component 實作。

subscriber 在觀念上很類似 Observer Pattern 的 observer

20 行

1
2
3
ngOnInit(): void {
this.clockBroker.subscribe(SubjectEnum.Clock1s, this);
}

回想 subscriber 的初衷,除了顯示 目前時間 外,另外一個目的就是能對 broker 加以 訂閱取消訂閱

既然要訂閱,該在什麼時候執行呢 ?

最好是在 subscriber 開始 初始化 時就對 broker 加以訂閱,因此選擇使用 ngOnInit() lifecycle hook。

並且希望 brokersubscribe(),提供 訂閱功能。

subscribe() 第 1 個參數為 SubjectEnum,設定要 subscribe 什麼 subject;第 2 個參數則是將 this 傳進去,準備將來 callback 使用。

Sub/Pub Pattern 與 Observer Pattern 一個主要的差異是 observer 會直接對 subject 加以訂閱,但 Sub/Pub Pattern 則會透過 broker 加以訂閱

24 行

1
2
3
ngOnDestroy(): void {
this.clockBroker.unsubscribe(SubjectEnum.Clock1s, this);
}

既然有 訂閱,就應該有 取消訂閱,該在什麼時候取消呢 ?

最好是在 subscriber 最後 被消滅 時對 broker 加以 取消註冊,因此選擇 ngOnDestroy() lifecycle hook。

並且希望 brokerunsubscribe() ,提供 取消訂閱 功能。

綜合 ngOnInit()ngOnDestroy(),根據 介面隔離原則,已經大概可猜到 broker 的 interface 該提供 subscribe()unsubscribe()

15 行

1
2
3
4
constructor(
@Inject(BrokerInterfaceToken)
private clockBroker: BrokerInterface) {

}

既然 subscriber 需要 ClockBroker,因此必須在 constructor 將 ClockBroker DI 注入進來。

根據 依賴反轉原則 : subscriber 不應該依賴底層的 broker,兩者應該依賴於 interface。

根據 介面隔離原則 : subscriber 應該只相依於他所需要的 interface,目前看來只需要 subscribe()unsubscribe(),因此由 subscriber 需求角度訂出的 broker 的 interface。

因此 DI 注入的 ClockBroker ,其型別為 BrokerInterface,這樣就符合 依賴反轉原則介面隔離原則

subscribe() 在觀念上類似於 Observer Pattern 的 addObserver() ;而 unsubscribe() 類似於 removeObserver()

Clock1sPublisher

pub003

clock1s.publisher.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import { Injectable } from '@angular/core';
import { PublisherInterface } from '../../interface/publisher.interface';
import { BrokerInterface } from '../../interface/broker.interface';

@Injectable()
export class Clock1sPublisher implements PublisherInterface {
private interval = 1000;
private clockBroker: BrokerInterface;

constructor() {
setInterval(() => this.tick(), this.interval);
}

setBroker(broker: BrokerInterface): void {
this.clockBroker = broker;
}

private tick(): void {
this.clockBroker.publish(new Date());
}
}

第 6 行

1
export class Clock1sPublisher implements PublisherInterface

先不考慮 Clock1sPublisher 所使用的 PublisherInterface ,稍後會討論。

10 行

1
2
3
constructor() {
setInterval(() => this.tick(), this.interval);
}

根據需求,publisher 要能夠每秒送出 目前時間,所以使用 JavaScript 原生的 setInterval(),每 1 秒鐘呼叫 tick() 一次。

18 行

1
2
3
private tick(): void {
this.clockBroker.publish(new Date());
}

每一秒執行 tick() 時,會將 目前時間 透過 publish() 發佈至 broker。

根據 介面隔離原則,已經大概可猜到 broker 的 interface 該提供 publish()

BrokerInterface

pub004

broker.interface.ts

1
2
3
4
5
6
7
8
import { SubjectEnum } from '../enum/subject.enum';
import { SubscriberInterface } from './subscriber.interface';

export interface BrokerInterface {
publish(date: Date): void;
subscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void;
unsubscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void;
}

根據 介面隔離原則subscriberpublisher 應該只相依於他所需要的 interface,目前看來共需要 publish()subscribe()unsubscribe(),因此訂出 BrokerInterface,而 broker 必須實作此 interface。

ClockBroker

pub005

clock.broker.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import { Injectable } from '@angular/core';
import { Clock1sPublisher } from '../publisher/clock1s/clock1s.publisher';
import { BrokerInterface } from '../interface/broker.interface';
import { SubscriberInterface } from '../interface/subscriber.interface';
import { Clock3sPublisher } from '../publisher/clock3s/clock3s.publisher';
import { SubjectEnum } from '../enum/subject.enum';
import { SubjectSubscriber } from '../model/subject-subscriber.model';
import * as _ from 'lodash';

@Injectable()
export class ClockBroker implements BrokerInterface {
private subscribers: SubjectSubscriber[] = [];
private counter = 0;
private maxCounter = 3;

constructor(private clock1sPublisher: Clock1sPublisher,
private clock3sPublisher: Clock3sPublisher) {

this.clock1sPublisher.setBroker(this);
this.clock3sPublisher.setBroker(this);
}

publish(date: Date): void {
if (this.counter === this.maxCounter) {
this.publishToClock3sSubscriber(date);
this.counter = 0;

return;
}

this.publishToClock1sSubscriber(date);
this.counter++;
}

subscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void {
const subjectSubscriber: SubjectSubscriber = {
subject: subject,
subscriber: subscriber
};
this.subscribers.push(subjectSubscriber);
}

unsubscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void {
const subjectSubscriber: SubjectSubscriber = {
subject: subject,
subscriber: subscriber
};

_.remove(this.subscribers, subjectSubscriber);
}

private publishToClock3sSubscriber(date: Date) {
this.subscribers
.filter(item => item.subject === SubjectEnum.Clock3s)
.forEach(item => item.subscriber.update(date));
}

private publishToClock1sSubscriber(date: Date) {
this.subscribers
.filter(item => item.subject === SubjectEnum.Clock1s)
.forEach(item => item.subscriber.update(date));
}
}

11 行

1
export class ClockBroker implements BrokerInterface

根據 依賴反轉原則broker 應該相依於 subscriberpublisher 所訂出的 interface,因此必須實現 BrokerInterface

16 行

1
2
3
4
5
constructor(private clock1sPublisher: Clock1sPublisher,
private clock3sPublisher: Clock3sPublisher) {

this.clock1sPublisher.setBroker(this);
this.clock3sPublisher.setBroker(this);
}

ClockBroker 的角色就是做 publishersubscriber 的中介角色,除了 subscriber 注入外,還必須自己注入多個 publisher

因為 publisher 將來會對 broker 加以 callback,因此使用 setBroker()broker 的 reference 傳進去,讓 publisher 可對 broker 做 callback。

根據 介面隔離原則,已經大概可猜到 publisher 的 interface 該提供 setBroker()

12 行

1
private subscribers: SubjectSubscriber[] = [];

subscribers 陣列儲存所有訂閱的 subscriber,每個物件型別為 SubjectSubscriber

至於什麼是 SubjectSubscriber,稍後會討論。

34 行

1
2
3
4
5
6
7
subscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void {
const subjectSubscriber: SubjectSubscriber = {
subject: subject,
subscriber: subscriber
};
this.subscribers.push(subjectSubscriber);
}

既然 BlockerInterface 已經定義了 subscribe()broker 就必須加以實作。

SubjectSubscriber 物件提供了 subjectsubscriber 兩個 field,subject 儲存所訂閱的主題,其型別為 SubjectEnumsubscriber 儲存有訂閱的 subscriber,其型別為 SubscriberInterface

subscribers 為陣列,使用 push() 新增資料進陣列。

22 行

1
2
3
4
5
6
7
8
9
10
11
publish(date: Date): void {
if (this.counter === this.maxCounter) {
this.publishToClock3sSubscriber(date);
this.counter = 0;

return;
}

this.publishToClock1sSubscriber(date);
this.counter++;
}

既然 BlockerInterface 已經定義了 publish()broker 就必須加以實作。

目前 broker 主要應付兩種 publisher,一種是 每秒,另一種是 每三秒

每一秒 counter+1,當 counter3 會 reset 為 0

3 秒會執行 publishToClock3sSubscriber(),而每 1 秒會執行 publishToClock1sSubscriber(),並將 目前時間 送出。

42 行

1
2
3
4
5
private publishToClock3sSubscriber(date: Date) {
this.subscribers
.filter(item => item.subject === SubjectEnum.Clock3s)
.forEach(item => item.subscriber.update(date));
}

3 秒執行 publishToClock3sSubscriber() 一次,先 filter 只訂閱 3 秒更新subscriber,然後 foreach 執行每個 subscriberupdate(),並送出 目前時間

51 行

1
2
3
4
5
private publishToClock1sSubscriber(date: Date) {
this.subscribers
.filter(item => item.subject === SubjectEnum.Clock1s)
.forEach(item => item.subscriber.update(date));
}

1 秒執行 publishToClock1sSubscriber() 一次,先 filter 只訂閱 1 秒更新subscriber,然後 foreach 執行每個 subscriberupdate(),並送出 目前時間

根據 介面隔離原則,已經大概可猜到 subscriber 的 interface 該提供 update()

40 行

1
2
3
4
5
6
7
8
unsubscribe(subject: SubjectEnum, subscriber: SubscriberInterface): void {
const subjectSubscriber: SubjectSubscriber = {
subject: subject,
subscriber: subscriber
};

_.remove(this.subscribers, subjectSubscriber);
}

既然 BlockerInterface 已經定義了 subscribe()broker 就必須加以實作。

使用 Lodash 的 remove() 對陣列刪除物件

PublisherInterface

pub006

publisher.interface.ts

1
2
3
4
5
import { BrokerInterface } from './broker.interface';

export interface PublisherInterface {
setBroker(broker: BrokerInterface): void;
}

根據 介面隔離原則publisher 應該只相依於他所需要的 interface,目前看來需要 setBroker(),因此訂出 PublisherInterface,而 publisher 必須實作此 interface。

Clock1sPublisher

pub003

clock1s.publisher.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import { Injectable } from '@angular/core';
import { PublisherInterface } from '../../interface/publisher.interface';
import { BrokerInterface } from '../../interface/broker.interface';

@Injectable()
export class Clock1sPublisher implements PublisherInterface {
private interval = 1000;
private clockBroker: BrokerInterface;

constructor() {
setInterval(() => this.tick(), this.interval);
}

setBroker(broker: BrokerInterface): void {
this.clockBroker = broker;
}

private tick(): void {
this.clockBroker.publish(new Date());
}
}

第 6 行

1
export class Clock1sPublisher implements PublisherInterface

根據 依賴反轉原則publisher 應該相依於 broker 所訂出的 interface,因此必須實現 PublisherInterface

14 行

1
2
3
setBroker(broker: BrokerInterface): void {
this.clockBroker = broker;
}

既然 PublisherInterface 已經定義了 setBroker()publisher 就必須加以實作。

SubscriberInterface

pub007

subscriber.interface.ts

1
2
3
export interface SubscriberInterface {
update(date: Date): void;
}

根據 介面隔離原則broker 應該只相依於他所需要的 interface,目前看來需要 update(),因此訂出 SubscriberInterface,而 subscriber 必須實作此 interface。

DigitalClock1sComponent

pub002

digital-clock1s.component.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import { Component, Inject, OnDestroy, OnInit } from '@angular/core';
import { ClockBroker } from '../../broker/clock.broker';
import { SubscriberInterface } from '../../interface/subscriber.interface';
import { SubjectEnum } from '../../enum/subject.enum';
import { BrokerInterface } from '../../interface/broker.interface';
import { BrokerInterfaceToken } from '../../interface/injection.token';

@Component({
selector: 'app-digital-clock1s',
templateUrl: './digital-clock1s.component.html',
styleUrls: ['./digital-clock1s.component.css']
})
export class DigitalClock1sComponent implements SubscriberInterface, OnInit, OnDestroy {
now: Date = new Date();

constructor(
@Inject(BrokerInterfaceToken)
private clockBroker: BrokerInterface) {

}

ngOnInit(): void {
this.clockBroker.subscribe(SubjectEnum.Clock1s, this);
}

ngOnDestroy(): void {
this.clockBroker.unsubscribe(SubjectEnum.Clock3s, this);
}

update(date: Date): void {
this.now = date;
}
}

13 行

1
export class DigitalClock1sComponent implements SubscriberInterface, OnInit, OnDestroy

根據 依賴反轉原則subscriber 應該相依於 broker 所訂出的 interface,因此必須實現 SubscriberInterface

25 行

1
2
3
update(date: Date): void {
this.now = date;
}

既然 SubscriberInterface 已經定義了 update()subscriber 就必須加以實作。

AppModule

app.module.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import { BrowserModule } from '@angular/platform-browser';
import { NgModule } from '@angular/core';
import { AppComponent } from './app.component';
import { ClockBroker } from './broker/clock.broker';
import { Clock1sPublisher } from './publisher/clock1s/clock1s.publisher';
import { DigitalClock1sComponent } from './component/digital-clock1s/digital-clock1s.component';
import { DigitalClock3sComponent } from './component/digital-clock3s/digital-clock3s.component';
import { Clock3sPublisher } from './publisher/clock3s/clock3s.publisher';
import { BrokerInterfaceToken } from './interface/injection.token';

@NgModule({
declarations: [
AppComponent,
DigitalClock1sComponent,
DigitalClock3sComponent
],
imports: [
BrowserModule
],
providers: [
{provide: BrokerInterfaceToken, useClass: ClockBroker},
Clock1sPublisher,
Clock3sPublisher
],
bootstrap: [AppComponent]
})
export class AppModule { }

20 行

1
2
3
4
5
providers: [
{ provide: BrokerInterfaceToken, useClass: ClockBroker },
Clock1sPublisher,
Clock3sPublisher
],

broker 採用 interface 方式注入,要特別使用 token。

兩個 publisher 直接對 class 注入即可。

Summary

Pub/Sub Pattern vs. Observer Pattern

Observer Pattern 是原始 GoF 23 個 Design Pattern 之一,Pub/Sub Pattern 與 Observer Pattern 理念上非常接近,廣義上來說,Pub/Sub Pattern 算是 Observer Pattern 的變形,有些書甚至也認為 Pub/Sub Pattern 就是 Observer Pattern,但嚴格來說,兩者還是有些微差異:

相同點

  • Observer Pattern 的 observer 相當於 Pub/Sub Pattern 的 subscriber
  • Observer Pattern 的 subject 相當於 Pub/Sub Pattern 的 publisher
  • Observer Pattern 的 SubjectInterfaceaddObserver()removeObserver() ; Pub/Sub Pattern 的 BrokerInterface 也有 subscribe()unsubscribe()
  • Observer Pattern 的 ObserverInterfaceupdate();Pub/Sub Pattern 的 SubscriberInterface 也有 update()

相異點

  • Pub/Sub Pattern 多了 broker 介入
  • 對於 publisher 而言,brokerpublishersubscriber;對於 subscriber 而言,brokersubscriberpublisher,所以 broker 兼具 subscriberpublisher 的角色
  • Observer Pattern 只有一個 subject,但 Pub/Sub Pattern 有多個 publisher

Conclusion


  • Observer Pattern 適合 一對多 場景;而 Pub/Sub Pattern 適合 多對多 場景
  • 一對一 當然也可以使用 Pub/Sub Pattern 實現,只是 Design Pattern 強調的是 intention,應該依照使用場景選擇適合的 pattern
  • Pub/Sub Pattern 與 Observer Pattern 的最大差異在於 broker 的介入,讓 publishersubscriber 都只相依於 broker 即可,將來有任何修改,都集中在 broker,符合 開放封閉原則

Sample Code


完整的範例可以在我的 GitHub 上找到