“Pipeline” 这个概念在计算机科学和编程领域有多种应用,但它最初和最广泛的来源是自操作系统和处理器设计。在这些领域中,pipeline 指的是将一个任务分解成几个子任务,这些子任务按照顺序在不同的处理单元上并行处理的技术。
在我前端生涯的生涯早期,大部分的时间做的事就是拿到后端接口返回的数据,然后在前端进行一些简单的处理,最终渲染到页面上。这个过程中,我们可以把数据处理的过程看成是一个 pipeline,数据从一个处理单元流向另一个处理单元,最终渲染到页面上。初期 if...else...
还能胜任,顶多用用 switch...case...
,最复杂也不过用到点策略模式。
但随着业务的发展,前端的业务逻辑越来越复杂,数据处理的过程也越来越复杂,这时候我们就需要一种更好的方式来处理数据,这时候 pipeline 模式可以派上用场。
要实现一个通用 pipeline 结构,我们可以使用“链式责任”模式。这种模式允许多个对象处理一个请求,将这些对象连成一条链。在 pipeline 的上下文中,每个处理步骤是链中的一个环节,负责接收输入,进行处理,并将结果传递到下一个环节。
让我们用伪代码来实现一个 pipeline:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| interface Handler { setNext(handler: Handler): Handler handle(request): result }
class AbstractHandler implements Handler { private nextHandler: Handler
setNext(handler: Handler): Handler { this.nextHandler = handler return handler }
handle(request): result { if (this.nextHandler) { return this.nextHandler.handle(request) } return null } }
class ConcreteHandlerA extends AbstractHandler { handle(request): result { } }
class ConcreteHandlerB extends AbstractHandler { handle(request): result { } }
class Client { main() { handlerA = new ConcreteHandlerA() handlerB = new ConcreteHandlerB()
handlerA.setNext(handlerB)
result = handlerA.handle(request) } }
|
在这个伪代码中,Handler
接口定义了一个基本的 handle
方法和 setNext
方法来构建链。AbstractHandler
类实现了这些方法,而 ConcreteHandlerA
和 ConcreteHandlerB
是具体的处理器,它们扩展了 AbstractHandler
并实现了具体的处理逻辑。客户端 Client
构建了处理器链并启动处理过程。
独立的处理单元,意味着可以复用和更方便地进行单测。这样的设计也更容易扩展,只需要在链中添加一个新的处理器即可。这种设计也符合开闭原则,因为你无需修改已有的代码就可以在应用中添加新的处理器。
当然,每个 handler
都要 setNext
一遍显得不够优雅,特别是当 handler
一多。我们可以包装个类来优化下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| class Pipeline { private handlers: Handler[]
constructor(handlers: Handler[]) { if (handlers && handlers.length > 0) { for (let handler of handlers) { this.addHandler(handler) } } }
addHandler(handler: Handler): Pipeline { if (this.handlers.length > 0) { this.handlers[this.handler.length - 1].setNext(handler) } this.handlers.push(handler) return this }
handle(request): result { if (this.handlers.length > 0) { return this.handlers[0].handler(request) } return null } }
class Client { main() { handlerA = new ConcreteHandlerA() handlerB = new ConcreteHandlerB()
pipeline = new Pipeline([handlerA, handlerB])
result = pipeline.handle(request) } }
|
在整个处理流程当中,为了更好地监测和控制,需要给我们的 pipeline
加上一些状态。比如:
isCancelled
:是否取消处理(主动)。
isBroken
:是否中断处理(被动,遇到错误等)。
isFinished
:是否处理完成。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| class Pipeline { private handlers: Handler[] private isCancelled: boolean private isBroken: boolean private isFinished: boolean
cancel() { this.finish(true) }
break() { this.isBroken = true }
finish(cancel?: boolean) { if (cancel) { this.isCancelled = true } if (this.isFinished) { return } this.isFinished = true } }
|
接下来我们要做的就是将链上的处理器与 pipeline
做一个连接,这样 pipeline
就能控制整个处理流程了。我们可以在 AbstractHandler
中添加一个 pipeline
属性,然后在 handle
方法中添加一些逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class AbstractHandler implements Handler { private nextHandler: Handler private pipeline: Pipeline
setNext(handler: Handler): Handler { this.nextHandler = handler return handler }
setContext(pipeline: Pipeline): Handler { this.pipeline = pipeline return this }
handle(request): result { if (this.pipeline.isFinished) { return } if (this.pipeline.isBroken) { return } if (this.nextHandler) { return this.nextHandler.handle(request) } this.pipeline.finish() return null } }
|
在 Pipeline
中添加处理器的时候,我们还需要将 pipeline
通过 setContext
方法传递给处理器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| class Pipeline {
addHandler(handler: Handler): Pipeline { if (this.handlers.length > 0) { this.handlers[this.handler.length - 1].setNext(handler) } this.handlers.push(handler) handler.setContext(this) return this }
}
|
这样,我们就可以在 handle
方法中控制整个处理流程了。
至此,一个 pipeline 的基本结构就完成了。当然,要投入实际使用的话,还要继续完善。比如:
- 针对异步的
handler
的处理。这里指的是处理器的 handle
方法返回的是一个 Promise。
- 更为完善的事件处理机制。比如可以将
Pipeline
在一个事件分发器的类上进行扩展,可以参考 Node.js 内置的 Event Emitter,当然也可以自己简单实现一个。 1 2 3 4 5 6 7 8 9 10 11
| class Pipeline extends EventEmitter {
break(err?: Error) { this.isBroken = true
if (err) { this.emit('error', err) } } }
|
- 管道嵌套,进一步提升 pipeline 的复用度。
1 2 3 4 5 6 7 8 9 10 11
| const webPipeline = new Pipeline( new WebCORSHandler(), new WebJSONPHandler(), ])
const netInvocationHandle = webPipeline.newInvocation()
const networkPipeline = new Pipeline( new NetworkNativeHandler(), netInvocationHandle ])
|
- 进一步丰富和扩展
Handler
接口,比如一些内置重试功能、跳过 pipeline
中的部分 handler
goto 到指定的 handler
等等。
😄 看看今后有没有开源的机会。后面有机会来写一下子如何基于 pipeline 模式实现的一套轻量的 plugin 管理系统。