玩转 Go 生态|Hertz WebSocket 扩展简析

WebSocket 是一种能够在单个 TCP 衔接上进行全双工通信,位于 OSI 模型的应用层。WebSocket 使得客户端和服务器之间的数据交流变得更加简略,答应服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完结一次握手,两者之间就能够创立持久性的衔接,并进行双向数据传输。

Hertz 供给了 WebSocket 的支撑,参考 gorilla/websocket 库运用 hijack 的方法在 Hertz 进行了适配,用法和参数基本保持一致。

安装

go get github.com/hertz-contrib/websocket

示例代码

package main
​
import (
  "context"
  "flag"
  "html/template"
  "log"
​
  "github.com/cloudwego/hertz/pkg/app"
  "github.com/cloudwego/hertz/pkg/app/server"
  "github.com/hertz-contrib/websocket"
)
​
var addr = flag.String("addr", "localhost:8080", "http service address")
​
var upgrader = websocket.HertzUpgrader{} // use default options
​
func echo(_ context.Context, c *app.RequestContext) {
  err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
    for {
      mt, message, err := conn.ReadMessage()
      if err != nil {
        log.Println("read:", err)
        break
       }
      log.Printf("recv: %s", message)
      err = conn.WriteMessage(mt, message)
      if err != nil {
        log.Println("write:", err)
        break
       }
     }
   })
  if err != nil {
    log.Print("upgrade:", err)
    return
   }
}
​
func home(_ context.Context, c *app.RequestContext) {
  c.SetContentType("text/html; charset=utf-8")
  homeTemplate.Execute(c, "ws://"+string(c.Host())+"/echo")
}
​
func main() {
  flag.Parse()
  h := server.Default(server.WithHostPorts(*addr))
  // https://github.com/cloudwego/hertz/issues/121
  h.NoHijackConnPool = true
  h.GET("/", home)
  h.GET("/echo", echo)
  h.Spin()
}
​
var homeTemplate = template.Must(template.New("").Parse(`
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<script> 
window.addEventListener("load", function(evt) {
​
  var output = document.getElementById("output");
  var input = document.getElementById("input");
  var ws;
​
  var print = function(message) {
    var d = document.createElement("div");
    d.textContent = message;
    output.appendChild(d);
    output.scroll(0, output.scrollHeight);
  };
​
  document.getElementById("open").onclick = function(evt) {
    if (ws) {
      return false;
    }
    ws = new WebSocket("{{.}}");
    ws.onopen = function(evt) {
      print("OPEN");
    }
    ws.onclose = function(evt) {
      print("CLOSE");
      ws = null;
    }
    ws.onmessage = function(evt) {
      print("RESPONSE: " + evt.data);
    }
    ws.onerror = function(evt) {
      print("ERROR: " + evt.data);
    }
    return false;
  };
​
  document.getElementById("send").onclick = function(evt) {
    if (!ws) {
      return false;
    }
    print("SEND: " + input.value);
    ws.send(input.value);
    return false;
  };
​
  document.getElementById("close").onclick = function(evt) {
    if (!ws) {
      return false;
    }
    ws.close();
    return false;
  };
​
});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server, 
"Send" to send a message to the server and "Close" to close the connection. 
You can change the message and send multiple times.
<p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<p><input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output" style="max-height: 70vh;overflow-y: scroll;"></div>
</td></tr></table>
</body>
</html>
`))

运行 server:

go run server.go

上述示例代码中,服务器包括一个简略的网络客户端。要运用该客户端,在浏览器中翻开 http://127.0.0.1:8080,并按照页面上的指示操作。

Upgrade

websocket.Conn 类型代表一个 WebSocket 衔接。服务器应用程序从 HTTP 恳求处理程序中调用 HertzUpgrader.Upgrade 方法,将 HTTP 协议的衔接恳求升级为 WebSocket 协议的衔接恳求。

这部分逻辑对应着示例代码的 echo() 函数,此处侧重介绍 HertzUpgrader.Upgrade

函数签名:

func (u *HertzUpgrader) Upgrade(ctx *app.RequestContext, handler HertzHandler) error

内部处理逻辑:

func (u *HertzUpgrader) Upgrade(ctx *app.RequestContext, handler HertzHandler) error {
  if !ctx.IsGet() {
    return u.returnError(ctx, consts.StatusMethodNotAllowed, fmt.Sprintf("%s request method is not GET", badHandshake))
   }
  // 校验 requsetHeader 中与 websocket 相关的字段(此处省略部分逻辑代码)
​
  subprotocol := u.selectSubprotocol(ctx)
  compress := u.isCompressionEnable(ctx)
​
  ctx.SetStatusCode(consts.StatusSwitchingProtocols)
  // 结构协议升级后的呼应头部信息
  ctx.Response.Header.Set("Upgrade", "websocket")
  ctx.Response.Header.Set("Connection", "Upgrade")
  ctx.Response.Header.Set("Sec-WebSocket-Accept", computeAcceptKeyBytes(challengeKey))
  // “无上下文接纳”形式
  if compress {
    ctx.Response.Header.Set("Sec-WebSocket-Extensions", "permessage-deflate; server_no_context_takeover; client_no_context_takeover")
   }
  if subprotocol != nil {
    ctx.Response.Header.SetBytesV("Sec-WebSocket-Protocol", subprotocol)
   }
​
  // 经过 Hijack 的方法,完成 websocket 全双工的通信
  ctx.Hijack(func(netConn network.Conn) {
    writeBuf := poolWriteBuffer.Get().([]byte)
    c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize, u.WriteBufferPool, nil, writeBuf)
    if subprotocol != nil {
      c.subprotocol = b2s(subprotocol)
     }
​
    if compress {
      c.newCompressionWriter = compressNoContextTakeover
      c.newDecompressionReader = decompressNoContextTakeover
     }
​
    netConn.SetDeadline(time.Time{})
​
    handler(c)
​
    writeBuf = writeBuf[0:0]
    poolWriteBuffer.Put(writeBuf)
   })
​
  return nil
}

HertzHandler

HertzHandler 是上述 HertzUpgrader.Upgrade 函数的第二个参数。HertzHandler 在握手完结后接纳一个 websocket 衔接,经过绑架这个衔接,完结全双工的通信。

HertzHandler 必须由用户供给,内部界说了 WebSocket 恳求和呼应的具体流程。

函数签名:

type HertzHandler func(*Conn)

上述 echo 服务器的 websocket 处理流程:

err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
  for {
    // 读取客户端发送的信息
    mt, message, err := conn.ReadMessage()
    if err != nil {
      log.Println("read:", err)
      break
     }
    log.Printf("recv: %s", message)
    // 向客户端发送信息
    err = conn.WriteMessage(mt, message)
    if err != nil {
      log.Println("write:", err)
      break
     }
   }
})

装备

上述文档现已讲述了Hertz WebSocket 最中心的协议升级衔接绑架的逻辑,下面将罗列 Hertz WebSocket 运用过程中可选的装备参数。

这部分将环绕 websocket.HertzUpgrader 结构打开说明。

参数 介绍
ReadBufferSize 用于设置输入缓冲区的巨细,单位为字节。假如缓冲区巨细为零,那么就运用 HTTP 服务器分配的巨细。输入缓冲区巨细并不限制能够接纳的信息的巨细。
WriteBufferSize 用于设置输出缓冲区的巨细,单位为字节。假如缓冲区巨细为零,那么就运用 HTTP 服务器分配的巨细。输出缓冲区巨细并不限制能够发送的信息的巨细。
WriteBufferPool 用于设置写操作的缓冲池。
Subprotocols 用于按优先次序设置服务器支撑的协议。假如这个字段不是 nil,那么 Upgrade 方法经过挑选这个列表中与客户端恳求的协议的第一个匹配来洽谈一个子协议。假如没有匹配,那么就不洽谈协议(Sec-Websocket-Protocol 头不包括在握手呼应中)。
Error 用于设置生成 HTTP 过错呼应的函数。
CheckOrigin 用于设置针对恳求的 Origin 头的校验函数, 假如恳求的 Origin 头是可接受的,CheckOrigin 回来 true。
EnableCompression 用于设置服务器是否应该测验洽谈每个音讯的紧缩(RFC 7692)。将此值设置为 true 并不能保证紧缩会被支撑。

WriteBufferPool

假如该值没有被设置,则额外初始化写缓冲区,并在当前生命周期内分配给该衔接。当应用程序在大量的衔接上有适度的写入量时,缓冲池是最有用的。

应用程序应该运用一个单一的缓冲池来为不同的衔接分配缓冲区。

接口签名:

// BufferPool represents a pool of buffers. The *sync.Pool type satisfies this
// interface.  The type of the value stored in a pool is not specified.
type BufferPool interface {
  // Get gets a value from the pool or returns nil if the pool is empty.
  Get() interface{}
  // Put adds a value to the pool.
  Put(interface{})
}

示例代码:

type simpleBufferPool struct {
  v interface{}
}
​
func (p *simpleBufferPool) Get() interface{} {
  v := p.v
  p.v = nil
  return v
}
​
func (p *simpleBufferPool) Put(v interface{}) {
  p.v = v
}
​
var upgrader = websocket.HertzUpgrader{
  WriteBufferPool: &simpleBufferPool{},
}

Subprotocols

WebSocket 仅仅界说了一种交流恣意音讯的机制。这些音讯是什么意思,客户端在任何特定的时间点能够期待什么样的音讯,或许他们被答应发送什么样的音讯,彻底取决于完成应用程序。

所以你需要在服务器和客户端之间就这些工作达成协议。子协议参数仅仅让客户端和服务端正式地交流这些信息。你能够为你想要的任何协议假造任何名字。服务器能够简略地检查客户在握手过程中是否遵守了该协议。

Error

假如 Error 为 nil,则运用 Hertz 供给的 API 来生成 HTTP 过错呼应。

函数签名:

func(ctx *app.RequestContext, status int, reason error)

示例代码:

var upgrader = websocket.HertzUpgrader{
  Error: func(ctx *app.RequestContext, status int, reason error) {
    ctx.Response.Header.Set("Sec-Websocket-Version", "13")
    ctx.AbortWithMsg(reason.Error(), status)
   },
}

CheckOrigin

假如 CheckOrigin 为nil,则运用一个安全的默许值:假如Origin恳求头存在,而且源主机不等于恳求主机头,则回来false。CheckOrigin 函数应该细心验证恳求的来历,以避免跨站恳求假造。

函数签名:

func(ctx *app.RequestContext) bool

默许完成:

func fastHTTPCheckSameOrigin(ctx *app.RequestContext) bool {
  origin := ctx.Request.Header.Peek("Origin")
  if len(origin) == 0 {
    return true
   }
  u, err := url.Parse(b2s(origin))
  if err != nil {
    return false
   }
  return equalASCIIFold(u.Host, b2s(ctx.Host()))
}

EnableCompression

服务端接受一个或许多个扩展字段,这些扩展字段是包括客户端恳求的 Sec-WebSocket-Extensions 头字段扩展中的。当 EnableCompression 为 true 时,服务端根据当前自身支撑的扩展与其进行匹配,假如匹配成功则支撑紧缩。

校验逻辑:

var strPermessageDeflate = []byte("permessage-deflate")
​
func (u *HertzUpgrader) isCompressionEnable(ctx *app.RequestContext) bool {
  extensions := parseDataHeader(ctx.Request.Header.Peek("Sec-WebSocket-Extensions"))
​
  // Negotiate PMCE
  if u.EnableCompression {
    for _, ext := range extensions {
      if bytes.HasPrefix(ext, strPermessageDeflate) {
        return true
       }
     }
   }
​
  return false
}

现在仅支撑“无上下文接纳”形式,详见上述 HertzUpgrader.Upgrade 代码部分。

Set Deadline

当运用 websocket 进行读写的时候,能够经过类似如下方法设置超时时间(在每次读写过程中都会收效)。

示例代码:

func echo(_ context.Context, c *app.RequestContext) {
  err := upgrader.Upgrade(c, func(conn *websocket.Conn) {
    defer conn.Close()
    // "github.com/cloudwego/hertz/pkg/network"
    conn.NetConn().(network.Conn).SetReadTimeout(1 * time.Second)
    ...
   })
  if err != nil {
    log.Print("upgrade:", err)
    return
   }
}

更多用法示例详见 examples 。