布景

某一日,葫芦正在写一个简单的CURD代码,十三不知何时悄悄的来到死后

十三: 葫芦,你知道db的 conn 如何办理的

葫芦一颤抖

葫芦: 经过池化技术,经过maxIdleConns 和 maxOpenConns 参数来决议池子的巨细

十三: 那你知道超时的闲暇衔接如何处理的吗

葫芦: 布吉岛,TAT

葫芦GG

database/sql 衔接池

什么是衔接池

衔接池是一个存储数据库衔接的缓冲区,这些数据库衔接现已初始化并准备就绪,以便快速使用。这提高了应用程序的功能,由于它能够防止为每个新的数据库恳求创立一个新的数据库衔接。能够重复使用现有的衔接来处理新的数据库恳求,然后减少了衔接数据库所需的时刻和资源。一起,它也能够操控数据库衔接的数量,以防止过多的衔接造成数据库的负担。

database实现

DB结构

// can be controlled with SetMaxIdleConns.
type DB struct {
   // Total time waited for new connections.
   waitDuration atomic.Int64
   connector driver.Connector
   // numClosed is an atomic counter which represents a total number of
   // closed connections. Stmt.openStmt checks it before cleaning closed
   // connections in Stmt.css.
   numClosed atomic.Uint64
   mu           sync.Mutex    // protects following fields
   freeConn     []*driverConn // free connections ordered by returnedAt oldest to newest
   connRequests map[uint64]chan connRequest
   nextRequest  uint64 // Next key to use in connRequests.
   numOpen      int    // number of opened and pending open connections
   // Used to signal the need for new connections
   // a goroutine running connectionOpener() reads on this chan and
   // maybeOpenNewConnections sends on the chan (one send per needed connection)
   // It is closed during db.Close(). The close tells the connectionOpener
   // goroutine to exit.
   openerCh          chan struct{}
   closed            bool
   dep               map[finalCloser]depSet
   lastPut           map[*driverConn]string // stacktrace of last conn's put; debug only
   maxIdleCount      int                    // zero means defaultMaxIdleConns; negative means 0
   maxOpen           int                    // <= 0 means unlimited
   maxLifetime       time.Duration          // maximum amount of time a connection may be reused
   maxIdleTime       time.Duration          // maximum amount of time a connection may be idle before being closed
   cleanerCh         chan struct{}
   waitCount         int64 // Total number of connections waited for.
   maxIdleClosed     int64 // Total number of connections closed due to idle count.
   maxIdleTimeClosed int64 // Total number of connections closed due to idle time.
   maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.
   stop func() // stop cancels the connection opener.
}
  • waitDuration: 等候衔接的最长时刻
  • connector: 数据库衔接的创立工厂
  • numClosed: 已封闭的衔接数
  • freeConn: 未使用的衔接调集
  • connRequests: 用于存储衔接恳求,每个衔接恳求都有一个唯一的恳求 ID,经过这个 ID 能够在 connRequests 中找到相应的衔接恳求通道
  • nextRequest: 表明下一个衔接恳求的 ID
  • numOpen: 表明当时现已翻开的衔接数量
  • openerCh: 一个 chan struct{} 类型的通道,用于堵塞池的 opener,直到 freeConn 不为空
  • closed: 表明衔接池是否现已封闭
  • dep: 一个 map[finalCloser]depSet 类型的映射,用于处理衔接的依靠联系,保护衔接的封闭顺序
  • lastPut: 用于存储衔接最终一次被放回衔接池的时刻,首要用于衔接收回策略
  • maxIdleCount: 表明衔接池中最多能够保留的闲暇衔接数
  • maxOpen: 表明衔接池中最多能够翻开的衔接数量。假如 maxOpen 为 0,则表明没有限制
  • maxLifetime: 表明一个衔接的最大生命周期
  • maxIdleTime: 表明一个衔接能够在衔接池中闲暇的最长时刻
  • cleanerCh: 一个 chan struct{} 类型的通道,用于定期清理衔接池中的闲暇衔接
  • waitCount: 表明当时正在等候衔接的数量
  • maxIdleClosed: 表明由于闲暇衔接数超越限制而被封闭的衔接数量
  • maxIdleTimeClosed: 表明由于闲暇时刻超越限制而被封闭的衔接数量
  • maxLifetimeClosed: 表明由于生命周期超越限制而被封闭的衔接数量
  • stop: 一个类型为 func() 的函数变量,用于封闭衔接池并开释资源

Open 翻开db衔接

func Open(driverName, dataSourceName string) (*DB, error) {
   driversMu.RLock()
   driveri, ok := drivers[driverName]
   driversMu.RUnlock()
   if !ok {
      return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
   }
   if driverCtx, ok := driveri.(driver.DriverContext); ok {
      connector, err := driverCtx.OpenConnector(dataSourceName)
      if err != nil {
         return nil, err
      }
      return OpenDB(connector), nil
   }
   return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}

检查驱动是否存在,假如存在则调用OpenDB方法来衔接数据库

func OpenDB(c driver.Connector) *DB {
   ctx, cancel := context.WithCancel(context.Background())
   db := &DB{
      connector:    c,
      openerCh:     make(chan struct{}, connectionRequestQueueSize),
      lastPut:      make(map[*driverConn]string),
      connRequests: make(map[uint64]chan connRequest),
      stop:         cancel,
   }
   go db.connectionOpener(ctx)
   return db
}
// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
   for {
      select {
      case <-ctx.Done():
         return
      case <-db.openerCh:
         db.openNewConnection(ctx)
      }
   }
}
  • openerCh: 表明衔接恳求行列
  • lastPut: 表明最终一次分配的数据库衔接的goroutine ID
  • connRequests: 表明用于处理衔接恳求的goroutine ID

假如db.openerCh通道被触发,就意味着有新的衔接恳求需求处理,就会调用db.openNewConnection(ctx)函数去建立新的数据库衔接

Open new conn

// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
   // maybeOpenNewConnections has already executed db.numOpen++ before it sent
   // on db.openerCh. This function must execute db.numOpen-- if the
   // connection fails or is closed before returning.
   ci, err := db.connector.Connect(ctx)
   db.mu.Lock()
   defer db.mu.Unlock()
   if db.closed {
      if err == nil {
         ci.Close()
      }
      db.numOpen--
      return
   }
   if err != nil {
      db.numOpen--
      db.putConnDBLocked(nil, err)
      db.maybeOpenNewConnections()
      return
   }
   dc := &driverConn{
      db:         db,
      createdAt:  nowFunc(),
      returnedAt: nowFunc(),
      ci:         ci,
   }
   if db.putConnDBLocked(dc, err) {
      db.addDepLocked(dc, dc)
   } else {
      db.numOpen--
      ci.Close()
   }
}

函数将从衔接器的 Connect 方法中获取数据库衔接。Connect 方法的返回值是一个 driver.Conn 接口实例,表明一个数据库衔接。

假如衔接获取失败,则会将 db.numOpen 减一,收回衔接并终止该函数的履行。否则,将创立一个 driverConn 类型的结构体实例 dc 代表数据库衔接,其中包含衔接信息、衔接器、创立时刻等特点。
之后,函数调用 db 的 putConnDBLocked 方法将衔接增加到衔接池中,一起调用 addDepLocked 方法将其增加到依靠池中。假如增加失败,则将 dc 封闭,并将 db.numOpen 减一。增加成功后,函数保护 db.numOpen 计数器,表明当时具有的活动衔接数。值得注意的是,假如数据库衔接池已封闭,则将收回已创立的衔接,并将 db.numOpen 计数器减一。

也就是说,该函数实现了衔接池的办理和资源收回,保证了不会由于衔接走漏资源而导致系统功能下降

Close

func (db *DB) Close() error {
   db.mu.Lock()
   if db.closed { // Make DB.Close idempotent
      db.mu.Unlock()
      return nil
   }
   if db.cleanerCh != nil {
      close(db.cleanerCh)
   }
   var err error
   fns := make([]func() error, 0, len(db.freeConn))
   for _, dc := range db.freeConn {
      fns = append(fns, dc.closeDBLocked())
   }
   db.freeConn = nil
   db.closed = true
   for _, req := range db.connRequests {
      close(req)
   }
   db.mu.Unlock()
   for _, fn := range fns {
      err1 := fn()
      if err1 != nil {
         err = err1
      }
   }
   db.stop()
   if c, ok := db.connector.(io.Closer); ok {
      err1 := c.Close()
      if err1 != nil {
         err = err1
      }
   }
   return err
}

Close用于封闭衔接,并将衔接池中的衔接悉数归还到驱动衔接池中,最终封闭衔接

衔接保活

// startCleanerLocked starts connectionCleaner if needed.
func (db *DB) startCleanerLocked() {
   if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
      db.cleanerCh = make(chan struct{}, 1)
      go db.connectionCleaner(db.shortestIdleTimeLocked())
   }
}
func (db *DB) connectionCleaner(d time.Duration) {
   const minInterval = time.Second
   if d < minInterval {
      d = minInterval
   }
   t := time.NewTimer(d)
   for {
      select {
      case <-t.C:
      case <-db.cleanerCh: // maxLifetime was changed or db was closed.
      }
      db.mu.Lock()
      d = db.shortestIdleTimeLocked()
      if db.closed || db.numOpen == 0 || d <= 0 {
         db.cleanerCh = nil
         db.mu.Unlock()
         return
      }
      d, closing := db.connectionCleanerRunLocked(d)
      db.mu.Unlock()
      for _, c := range closing {
         c.Close()
      }
      if d < minInterval {
         d = minInterval
      }
      if !t.Stop() {
         select {
         case <-t.C:
         default:
         }
      }
      t.Reset(d)
   }
}

connectionCleaner 会默许已1s的间隔,去扫描闲暇的衔接行列,封闭超时的闲暇衔接,并从衔接池中移除

参考资料

tutorial: github.com/VividCortex…

database/sql: github.com/golang/go/t…