godis中tcp模块详解

最近在看godis源码https://github.com/hdt3213/godis/blob/master/README_CN.md

如果你想自己试一试的话,可以去源码里复制tcp模块运行测试。

下面讲解一下godis中tcp模块的实现

tcp模块组成

tcp模块有下面三个文件,实际的tcp服务器只有server.go一个文件

  1. echo.go //负责实现一个返回输入内容的handler
  2. echo_test.go //负责测试server.go是否正常
  3. server.go //实现tcp服务器

server.go

server.go有两个函数

  1. ListenAndServeWithSignal //在tcp服务器核心上提供了配置和退出服务器功能
  2. ListenAndServe //tcp服务器核心

我们首先看ListenAndServe函数,echo.go和echo_test.go都是为了这个函数准备的

参数

  1. listener net.Listener //接收一个监听器(仅仅是测试用,因为net.Listen是本地监听器)
  2. handler tcp.Handler //tcp接收请求后,由handler函数处理
  3. closeChan <-chan struct{} //仅接受单向通道closeChan,用于关闭tcp服务器
 ​
 // ListenAndServe binds port and handle requests, blocking until close
 func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
  // listen signal
   //创建一个协程,当监测到关闭信号后关闭服务器。
  go func() {
  <-closeChan
  logger.Info("shutting down...")
  _ = listener.Close() // listener.Accept() will return err immediately
  _ = handler.Close()  // close connections
  }()
 
  // listen port
   //手动终止函数运行时会调用defer,关闭连接,但是在测试里就用不上了(除非你手动终止了服务器)
  defer func() {
  // close during unexpected error
  _ = listener.Close()
  _ = handler.Close()
  }()
 ​
  ctx := context.Background() //下文有详细介绍
  var waitDone sync.WaitGroup //下文有详细介绍
  for {
  conn, err := listener.Accept() //一直等待直到有新连接
  if err != nil {
  break
  }
  // handle
  logger.Info("accept link")
  waitDone.Add(1)
  go func() {
  defer func() {
  waitDone.Done() //协程关闭,计数-1
  }()
  handler.Handle(ctx, conn)  //交给handle处理
  }()
  }
  waitDone.Wait() //等待所有协程退出
 }
 ​

context.Background()

context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。

随着 context 包的引入,标准库中很多接口因此加上了 context 参数,例如 database/sql 包。context 几乎成为了并发控制和超时控制的标准做法。

参考:https://zhuanlan.zhihu.com/p/68792989

虽然这里传入了ctx,handler的所有实现方法中均没有调用ctx。应该是handler方法中没有子协程,无需统一并发控制

sync.WaitGroup

在线程需要等待多个协程完成时,可以使用管道channel来等待所有协程的完成信号。但是管道在这里显得有些大材小用,因为它被设计出来不仅仅只是在这里用作简单的同步处理,在这里使用管道实际上是不合适的。而且假设我们有一万、十万甚至更多的for循环,也要申请同样数量大小的管道出来,对内存也是不小的开销。

WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(),Done(),Wait() 用来控制计数器的数量。Add(n) 把计数器设置为n ,Done() 每次把计数器减1 ,wait() 会阻塞代码的运行,直到计数器地值减为0。

这里使用WaitGroup记录tcp服务器的连接,当服务器关闭时会执行waitDone.Wait(),等所有连接完成后才会关闭服务器。

在生产环境下需要保证TCP服务器关闭前完成必要的清理工作,包括将完成正在进行的数据传输,关闭TCP连接等。这种关闭模式称为优雅关闭,可以避免资源泄露以及客户端未收到完整数据导致故障。

ListenAndServeWithSignal

这个函数是在tcp服务器基础上提供了配置和退出功能,但是最大连接数和超时时间作者实际并没有进行处理(不知道为啥)。

首先看看几个系统信号表示的含义

  • Ctrl-C 发送 INT signal (SIGINT),通常导致进程结束
  • SIGHUP,终端控制进程结束(终端连接断开)
  • SIGQUIT,用户发送QUIT字符(Ctrl+/)触发
  • SIGTERM,结束程序(可以被捕获、阻塞或忽略)

那这个函数就很好理解了,核心就是调用ListenAndServe

 // Config stores tcp server properties
 type Config struct {
  Address    string        `yaml:"address"` //监听的地址,一般配置0.0.0.0全部侦听
  MaxConnect uint32        `yaml:"max-connect"` //最大连接数
  Timeout    time.Duration `yaml:"timeout"` //超时时间
 }
 ​
 // ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal
 func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
  closeChan := make(chan struct{})
  sigCh := make(chan os.Signal)
  signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
  go func() {
  sig := <-sigCh
  switch sig {
  case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
  closeChan <- struct{}{}
  }
  }()
  listener, err := net.Listen("tcp", cfg.Address)
  if err != nil {
  return err
  }
  //cfg.Address = listener.Addr().String()
  logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
  ListenAndServe(listener, handler, closeChan)
  return nil
 }

echo.go

echo.go是用来测试server.go的,这里实现了一个handler返回客户端输入的信息。

 ​
 // EchoHandler echos received line to client, using for test
 type EchoHandler struct {
  activeConn sync.Map //线程安全
  closing    atomic.Boolean //多线程bool原子性
 }
 ​
 // MakeEchoHandler creates EchoHandler
 // 使用函数进行取地址初始化,可以当作构造函数
 func MakeEchoHandler() *EchoHandler {
  return &EchoHandler{}
 }
 ​
 // EchoClient is client for EchoHandler, using for test
 type EchoClient struct {
  Conn    net.Conn
  Waiting wait.Wait  //这里作者封装了一下sync.WaitGroup,其实就是sync.WaitGroup
 }
 ​
 // Close close connection
 func (c *EchoClient) Close() error {
  c.Waiting.WaitWithTimeout(10 * time.Second) //下文有详解
  c.Conn.Close()
  return nil
 }
 ​
 // Handle echos received line to client
 func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
   //服务端已经关闭,等待客户端剩余连接完成时就不会再接入新的请求了
  if h.closing.Get() {
  // closing handler refuse new connection
  _ = conn.Close()
  return
  }
  //创建一个客户端,这里的客户端是相对于server来说的
  client := &EchoClient{
  Conn: conn,
  }
   //记录当前客户端
  h.activeConn.Store(client, struct{}{})
 ​
  reader := bufio.NewReader(conn)
  for {
     //下面就是循环读输入内容,然后原封不动返回
  // may occurs: client EOF, client timeout, server early close
  msg, err := reader.ReadString('\n')
  if err != nil {
  if err == io.EOF {
  logger.Info("connection close")
  h.activeConn.Delete(client)
  } else {
  logger.Warn(err)
  }
  return
  }
     //这里waiting的意思是,客户端提交了一个任务,等待服务端返回
  client.Waiting.Add(1)//任务列表+1
  //logger.Info("sleeping")
  //time.Sleep(10 * time.Second)
     //这里只是模拟了一个简单的服务端任务
  b := []byte(msg)
  _, _ = conn.Write(b)
     //服务端做好了处理
  client.Waiting.Done()//任务列表-1
  }
 }
 ​
 // Close stops echo handler
 func (h *EchoHandler) Close() error {
  logger.Info("handler shutting down...")
  h.closing.Set(true) //不再接入新的连接
   //逐个关闭还在排序的连接
  h.activeConn.Range(func(key interface{}, val interface{}) bool {
  client := key.(*EchoClient)
  _ = client.Close()
  return true
  })
  return nil
 }
 ​

数据结构定义

 type EchoHandler struct {
    activeConn sync.Map //线程安全
    closing    atomic.Boolean //线程安全,多线程bool原子性
 }

sync.Map

sync.map是一个线程安全map,用于在多线程(协程)时使用。

需要并发读写时,一般的做法是加锁,但这样性能并不高,Go语言在 1.9 版本中提供了一种效率较高的并发安全的 sync.Map,sync.Map 和 map 不同,不是以语言原生形态提供,而是在 sync 包下的特殊结构。

sync.Map 有以下特性:

  1. 无须初始化,直接声明即可。
  2. sync.Map 不能使用 map 的方式进行取值和设置等操作,而是使用 sync.Map 的方法进行调用,Store 表示存储,Load 表示获取,Delete 表示删除。
  3. 使用 Range 配合一个回调函数进行遍历操作,通过回调函数返回内部遍历出来的值,Range 参数中回调函数的返回值在需要继续迭代遍历时,返回 true,终止迭代遍历时,返回 false。

atomic.Boolean

也是保证线程安全,具体介绍可以看这篇文章

https://gfw.go101.org/article/concurrent-atomic-operation.html

c.Waiting.WaitWithTimeout(10 * time.Second)

作者自己写了一个wait包,包装了sync.WaitGroup。

除此之外多了一个等待超时的函数WaitWithTimeout用来给客户端在超时关闭连接,一会讲完测试后我会演示一下当客户端还有任务没有完成时超时的情况。

 // WaitWithTimeout blocks until the WaitGroup counter is zero or timeout
 // returns true if timeout
 func (w *Wait) WaitWithTimeout(timeout time.Duration) bool {
  c := make(chan bool, 1)
   //客户端
  go func() { //创建一个协程,等待连接。
  defer close(c)
  w.wg.Wait()  //客户端已关闭
  c <- true
  }()
  select {
  case <-c:
  return false // completed normally
  case <-time.After(timeout):
  return true // timed out
  }
 }

echo_test.go

Go 语言推荐测试文件和源代码文件放在一块,测试文件以 _test.go 结尾。比如,当前 package 有 calc.go 一个文件,我们想测试 calc.go 中的 AddMul 函数,那么应该新建 calc_test.go 作为测试文件。

 example/
    |--calc.go
    |--calc_test.go

参考:https://geektutu.com/post/quick-go-test.html

必须符合命名规范goland才会识别为测试文件,并且测试函数要以Test开头+想要测试的函数。当符合这些规范后goland会多出一个快捷运行的箭头。

image-20221226094838140

下面是 echo_test.go文件。首先创建了一个服务端,自动运行在一个端口上并接收本地所有ip,然后使用dial连接这个地址。测试了10次输入输出是否正确,然后关闭连接,测试了5次在服务端关闭的情况下的请求。

需要注意的是最后一行time.Sleep(time.Second)是为了服务端和客户端能够正常输出,因为TestListenAndServe执行结束后就看不见控制台输出了。

 ​
 func TestListenAndServe(t *testing.T) {
  var err error
  closeChan := make(chan struct{})
  listener, err := net.Listen("tcp", ":0")
  if err != nil {
  t.Error(err)
  return
  }
  addr := listener.Addr().String()
  go ListenAndServe(listener, MakeEchoHandler(), closeChan)
 ​
  conn, err := net.Dial("tcp", addr)
  if err != nil {
  t.Error(err)
  return
  }
  for i := 0; i < 10; i++ {
  val := strconv.Itoa(rand.Int())
  _, err = conn.Write([]byte(val + "\n"))
  if err != nil {
  t.Error(err)
  return
  }
  bufReader := bufio.NewReader(conn)
  line, _, err := bufReader.ReadLine()
  if err != nil {
  t.Error(err)
  return
  }
  if string(line) != val {
  t.Error("get wrong response")
  return
  }
  }
  _ = conn.Close()
  for i := 0; i < 5; i++ {
  // create idle connection
  _, _ = net.Dial("tcp", addr)
  }
  closeChan <- struct{}{}
  time.Sleep(time.Second)
 }
 ​

测试服务端处理任务超时

当客户端关闭连接时,如果任务已经完成则立即关闭连接。若还有任务等待服务端相应就需要等待一个超时时间,超时后也会关闭。

在handler服务端处理完任务后,不记录任务已经完成

 //echo.go
 client.Waiting.Add(1)
 b := []byte(msg)
 _, _ = conn.Write(b)
 //client.Waiting.Done()z

上面说过,测试函数如果关闭过早就看不见server的输出了,所以这里改成20s(比超时10s大就好)

 //echo_test.go
  //_ = conn.Close() //客户端不主动关闭连接,模拟还需要等待响应
  //for i := 0; i < 5; i++ {
  // // create idle connection
  // _, _ = net.Dial("tcp", addr)
  //}
  closeChan <- struct{}{}
  time.Sleep(time.Second * 20) //更改成20s才能查看超时

为了能看见是因为超时关闭还是任务完成关闭,这里打印WaitWithTimeout的输出。输出为true表示因为超时关闭。

 func (c *EchoClient) Close() error {
  logger.Warn(c.Waiting.WaitWithTimeout(10 * time.Second))
  c.Conn.Close()
  return nil
 }

查看结果

可以看到输出了true,表示因为超时而关闭的连接。

image-20221226100430144

总结

golang搭建一个tcp服务器还是很简便的。作者在这里考虑到了tcp服务器关闭后,继续处理现有任务并不接受新任务的场景和服务端超时自动关闭连接的场景。遗憾的是有些功能还未实现,有机会可以提pr完善。

学到了很多!

发表评论