基于swiftNIO 实现一个swift版本的 express

/ Mac / 没有评论 / 1484浏览

基于swiftNIO 实现一个swift版本的 express

swift5 即将发布 又该学习一门新语言了是吧。正好这段时间 apple 在 try?swift 会上发布了 新的服务端基础组件 SwiftNIO 可以说是良心之作,官方定位就是在于java的Netty。这里 我们基于swiftNIO来开发我们自己的express 服务框架。

截止3.14 siwft NIO 已经支持多种协议,TCP、UDP、 HTTP1.1、 HTTPS 、Websocket 。HTTP2大礼包也在路上,待HTTP2发布后会支持grpc 这样 微服务那套也可以上手,可谓是良心之作了,再加上swift5 发布 语言层面上支持协程,async await 操作,可以说是会吸一大波粉了。

我们这里实现这样的效果
let app = Express()
app.use(querystring)
app.use { (req, res, next) in
    print("1",req.userInfo)
    next()
}

app.get("/var") { (req, res, next) in
    res.send("fuck your")
}

let r = Router()
r.get("/router") { (req, res, next) in

    res.send("router is ok")
}


r.post("hi") { (req, res, next) in
    res.send("hello")
}
app.use("/s", router: r)

app.listen(8989)
用swift package manager 创建我们的项目

swift package init --type executable 创建一个可执行项目。 如果选 library 为创建一个库 package.swift 文件中加入依赖。在target 中 我们要依赖两个库,为 NIO,NIOHTTP1,否则spm不会将文件依赖打包

let package = Package(
    name: "swift-express",
    dependencies: [
        // Dependencies declare other packages that this package depends on.
        // .package(url: /* package url */, from: "1.0.0"),
        .package(url: "https://github.com/apple/swift-nio.git", from: "1.0.0")
    ],
    targets: [
        // Targets are the basic building blocks of a package. A target can define a module or a test suite.
        // Targets can depend on other targets in this package, and on products in packages which this package depends on.
        .target(
            name: "swift-express",
            dependencies: ["NIO","NIOHTTP1"]),
    ]
)

首先 我们要知道 什么是swift NIO

这里我们知道它是一个底层的高性能网络应用,基于 事件驱动模型 无I/O 阻塞。

第一步 我们想实现如下 一个最基础的服务

let app = Express()

app.listen(8989)
Express.swift
import Foundation
import NIO
import NIOHTTP1

open class Express {

  override public init() {}

  let loopGroup = 
  MultiThreadedEventLoopGroup(numThreads: System.coreCount)

  open func listen(_ port: Int) {
    let reuseAddrOpt = ChannelOptions.socket(
      SocketOptionLevel(SOL_SOCKET),
      SO_REUSEADDR)
    let bootstrap = ServerBootstrap(group: loopGroup)
    .serverChannelOption(ChannelOptions.backlog, value: 256)
    .serverChannelOption(reuseAddrOpt, value: 1)

    .childChannelInitializer {
      channel in
      channel.pipeline.addHTTPServerHandlers()

      // this is where the action is going to be!
    }

    .childChannelOption(ChannelOptions.socket(
      IPPROTO_TCP, TCP_NODELAY), value: 1)
    .childChannelOption(reuseAddrOpt, value: 1)
    .childChannelOption(ChannelOptions.maxMessagesPerRead, 
                        value: 1)

    do {
      let serverChannel = 
      try bootstrap.bind(host: "localhost", port: port)
      .wait()
      print("Server running on:", serverChannel.localAddress!)

      try serverChannel.closeFuture.wait() // runs forever
    }
    catch {
      fatalError("failed to start server: \(error)")
    }
  }
}

xcode run

Server running on: [IPv6]::1:8989

讨论下 第一步创建了MultiThreadedEventLoopGroup

let loopGroup = MultiThreadedEventLoopGroup(numThreads: System.coreCount)

swiftNIO中的EventLoop 有点类似于DispatchQueue,它处理IO事件,可以异步处理多任务,你可以设置一个时间 就像 DispatchQueue.asyncAfter。 MultiThreadedEventLoopGroup 就像一个并发队列,他会在他工作的时候去使用多线程(好羡慕go 的协程)。

第二步 listen函数

它使用了ServerBootstrap 对象去设置 server channel,bootstrap 就是一个初始化 channel 的辅助对象,对象设置完成之后 channel也就完成了。

swiftNIO中的channel 有点类似于swift 中的 FileHandle。包裹了文件描述以及在他之上提供了一些操作。

channels维护了一个channelPipline (管道),他们可以按顺序执行,并且操作传入传出的数据。 最后 我们调用了 channel.pipeline.addHTTPServerHandlers(),浙江处理管道中传入的数据转化为高级的http对象 即为 请求对象,并且输出字节到客户端中。

添加我们自己的NIO 处理函数

因为这个函数也就只有在Express中有用 所以我们可以直接在Express类中去完成它

open class Express {
  // other code

  final class HTTPHandler : ChannelInboundHandler {
    typealias InboundIn = HTTPServerRequestPart

    func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
      let reqPart = unwrapInboundIn(data)

      switch reqPart {
        case .head(let header):
        print("req:", header)

        // ignore incoming content to keep it micro :-)
        case .body, .end: break
      }
    }
  }
}

修改之前的初始化bootstrap代码

open class Express {
  // ...
  .childChannelInitializer { 
    channel in
    channel.pipeline.addHTTPServerHandlers().then {
      channel.pipeline.add(handler: HTTPHandler())
    }
  }
  // ...
}

为什么会有个then方法?这里就需要读者去翻阅NIO源码了

至此 我们完成了此部分。可以调用 listen 来 接受请求

第二步 构建我们自己的 Request/Response 对象

2.1 IncomingMessage 顾名思义 是对于请求对象的封装

服务端收到请求都有这么几个特征

import NIOHTTP1

open class IncomingMessage {

  public let header   : HTTPRequestHead // <= from NIOHTTP1
  public var userInfo = [ String : Any ]()
  
  init(header: HTTPRequestHead) {
    self.header = header
  }
}
2.2 ServerResponse

ServerResponse 会把我们需要发给客户端的信息 通过相关的Channel 发送。然后发出适当的信号 (head body end)

import NIO
import NIOHTTP1

open class ServerResponse {
  public  var status         = HTTPResponseStatus.ok
  public  var headers        = HTTPHeaders()
  public  let channel        : Channel
  public init(channel: Channel) {
    self.channel = channel
  }
  open func send(_ s: String)  {} 
}

你只要调用send 即可发送信息给客户端 下面给出详细的实现

import NIO
import NIOHTTP1

open class ServerResponse {

  public  var status         = HTTPResponseStatus.ok
  public  var headers        = HTTPHeaders()
  public  let channel        : Channel
  private var didWriteHeader = false
  private var didEnd         = false
  
  public init(channel: Channel) {
    self.channel = channel
  }
  
  /// An Express like `send()` function.
  open func send(_ s: String) {
    flushHeader()

    let utf8   = s.utf8
    var buffer = channel.allocator.buffer(capacity: utf8.count)
    buffer.write(bytes: utf8)

    let part = HTTPServerResponsePart.body(.byteBuffer(buffer))
    
    _ = channel.writeAndFlush(part)
               .mapIfError(handleError)
               .map { self.end() }
  }
  
  /// Check whether we already wrote the response header.
  /// If not, do so.
  func flushHeader() {
    guard !didWriteHeader else { return } // done already
    didWriteHeader = true
    
    let head = HTTPResponseHead(version: .init(major:1, minor:1),
                                status: status, headers: headers)
    let part = HTTPServerResponsePart.head(head)
    _ = channel.writeAndFlush(part).mapIfError(handleError)
  }
  
  func handleError(_ error: Error) {
    print("ERROR:", error)
    end()
  }
  
  func end() {
    guard !didEnd else { return }
    didEnd = true
    _ = channel.writeAndFlush(HTTPServerResponsePart.end(nil))
               .map { self.channel.close() }
  }
}

重点考虑send 函数

  1. flushHeader() 写入header
  2. 写入body数据
  3. 最后一个map 调用了 end()函数 为写入end 这里都调用了NIO的writeAndFlush函数。顾名思义 写入并且清理。 注意 string需要转为ByteBuffer后才能输出
2.21

我们再对响应添加扩展,使其可以以下标方式访问或写入header中的数据。

public extension ServerResponse {

  /// A more convenient header accessor. Not correct for
  /// any header.
  public subscript(name: String) -> String? {
    set {
      assert(!didWriteHeader, "header is out!")
      if let v = newValue {
        headers.replaceOrAdd(name: name, value: v)
      }
      else {
        headers.remove(name: name)
      }
    }
    get {
      return headers[name].joined(separator: ", ")
    }
  }
}
2.21

顺手再写个发送json给客户端的函数 锦上添花

public extension ServerResponse {

  /// Send a Codable object as JSON to the client.
  func json<T: Encodable>(_ model: T) {
    // create a Data struct from the Codable object
    let data : Data
    do {
      data = try JSONEncoder().encode(model)
    }
    catch {
      return handleError(error)
    }

    // setup JSON headers
    self["Content-Type"]   = "application/json"
    self["Content-Length"] = "\(data.count)"

    // send the headers and the data
    flushHeader()

    var buffer = channel.allocator.buffer(capacity: data.count)
    buffer.write(bytes: data)
    let part = HTTPServerResponsePart.body(.byteBuffer(buffer))

    _ = channel.writeAndFlush(part)
    .mapIfError(handleError)
    .map { self.end() }
  }
}

至此 我们已经可以很方便的拿到response 以及 request 在 express.swift 文件中修改 channelRead(ctx,data)函数为

func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
  let reqPart = self.unwrapInboundIn(data)

  switch reqPart {
    case .head(let header):

    let req = IncomingMessage(header: header)
    let res = ServerResponse(channel: ctx.channel)
    //
    //                // trigger Router

    router.handle(request: req, response: res) {
      (items : Any...) in // the final handler
      res.status = .notFound
      res.send("No middleware handled the request!")
    }

    // ignore incoming content to keep it micro :-)
    case .body, .end: break
  }
}

3 实现中间件

在node 的Express中 我们这样使用中间件

let app = new Express()
app.use((req,res,next) => {
})

同样我们希望在swift也如此实现, 需要如下类型 签名函数

func(req:IncomingMessage,res:ServerResponse,next:()->()){
  //print(req)
  next() 调用next 将会执行下一个中间件 
}

所以我们最终的设计会是这样

public typealias Next = ( Any... ) -> Void
public typealias Middleware =( IncomingMessage, ServerResponse, @escaping Next ) -> Void

4 实现路由

有了中间件 我们希望去用它做点事情。那么正好把路由也一起做了 一个路由 应该有一个中间件数组,每次调用这个路由 的uri 我们将会去遍历每个中间件,依次调用 (中间件就是一个闭包,等待调用)

protocol RouterProtocol {
  var middleware : [Middleware]
  func use(_ middleware:Middleware...) // 可以接受多参数
}

实现该协议即可做到

let app = new Router()
app.use((req,res,next) => {
})
Router.swift
open class Router {
  private var part : String = ""
  /// The sequence of Middleware functions.
  private var middleware = [ Middleware ]()

  /// Add another middleware (or many) to the list
  open func use(_ middleware: Middleware...) {
    self.middleware.append(contentsOf: middleware)
  }

  /// Request handler. Calls its middleware list
  /// in sequence until one doesn't call `next()`.
  func handle(request        : IncomingMessage,
              response       : ServerResponse,
              next upperNext : @escaping Next)
  {
    final class State {
      var stack    : ArraySlice<Middleware>
      let request  : IncomingMessage
      let response : ServerResponse
      var next     : Next?

      init(_ stack    : ArraySlice<Middleware>,
           _ request  : IncomingMessage,
           _ response : ServerResponse,
           _ next     : @escaping Next)
      {
        self.stack    = stack
        self.request  = request
        self.response = response
        self.next     = next
      }

      func step(_ args : Any...) {
        if let middleware = stack.popFirst() {
          middleware(request, response, self.step)
        }
        else {
          next?(); next = nil
        }
      }
    }

    let state = State(middleware[middleware.indices],
                      request, response, upperNext)
    state.step()
  }
}

handle 函数需要在接受响应后调用 会依次调用中间件。

public extension Router {

  /// Register a middleware which triggers on a `GET`
  /// with a specific path prefix.
  public func get(_ path: String = "",
                  middleware: @escaping Middleware)
  {
    use {
      req, res, next in
      guard req.header.method == .GET,
      req.header.uri.hasPrefix(self.part + path)
      else { return next() }

      middleware(req, res, next)
    }
  }

  public func post(_ path: String = "",
                   middleware: @escaping Middleware)
  {
    use {
      req, res, next in
      guard req.header.method == .POST,
      req.header.uri.hasPrefix(self.part + "/" + path)
      else { return next() }

      middleware(req, res, next)
    }
  }
}

public extension Router  {

  public func use(router:Router){
    let _ = router.middleware.map{
      self.middleware.append($0)
    }
  }

  public func use(_ part:String,router:Router){
    router.part = part
    use(router: router)
  }
}

我们对Router做了些扩展。这样它就可以实现我们一开始的目标。愉快的使用它吧。