布景
某一日,葫芦正在写一个简单的CURD代码,十三不知何时悄悄的来到死后
十三: 葫芦,你知道db的 conn 如何办理的
葫芦一颤抖
葫芦: 经过池化技术,经过maxIdleConns 和 maxOpenConns 参数来决议池子的巨细
十三: 那你知道超时的闲暇衔接如何处理的吗
葫芦: 布吉岛,TAT
葫芦GG
database/sql 衔接池
什么是衔接池
衔接池是一个存储数据库衔接的缓冲区,这些数据库衔接现已初始化并准备就绪,以便快速使用。这提高了应用程序的功能,由于它能够防止为每个新的数据库恳求创立一个新的数据库衔接。能够重复使用现有的衔接来处理新的数据库恳求,然后减少了衔接数据库所需的时刻和资源。一起,它也能够操控数据库衔接的数量,以防止过多的衔接造成数据库的负担。
database实现
DB
结构
// can be controlled with SetMaxIdleConns.
type DB struct {
// Total time waited for new connections.
waitDuration atomic.Int64
connector driver.Connector
// numClosed is an atomic counter which represents a total number of
// closed connections. Stmt.openStmt checks it before cleaning closed
// connections in Stmt.css.
numClosed atomic.Uint64
mu sync.Mutex // protects following fields
freeConn []*driverConn // free connections ordered by returnedAt oldest to newest
connRequests map[uint64]chan connRequest
nextRequest uint64 // Next key to use in connRequests.
numOpen int // number of opened and pending open connections
// Used to signal the need for new connections
// a goroutine running connectionOpener() reads on this chan and
// maybeOpenNewConnections sends on the chan (one send per needed connection)
// It is closed during db.Close(). The close tells the connectionOpener
// goroutine to exit.
openerCh chan struct{}
closed bool
dep map[finalCloser]depSet
lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
maxIdleCount int // zero means defaultMaxIdleConns; negative means 0
maxOpen int // <= 0 means unlimited
maxLifetime time.Duration // maximum amount of time a connection may be reused
maxIdleTime time.Duration // maximum amount of time a connection may be idle before being closed
cleanerCh chan struct{}
waitCount int64 // Total number of connections waited for.
maxIdleClosed int64 // Total number of connections closed due to idle count.
maxIdleTimeClosed int64 // Total number of connections closed due to idle time.
maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.
stop func() // stop cancels the connection opener.
}
- waitDuration: 等候衔接的最长时刻
- connector: 数据库衔接的创立工厂
- numClosed: 已封闭的衔接数
- freeConn: 未使用的衔接调集
- connRequests: 用于存储衔接恳求,每个衔接恳求都有一个唯一的恳求 ID,经过这个 ID 能够在 connRequests 中找到相应的衔接恳求通道
- nextRequest: 表明下一个衔接恳求的 ID
- numOpen: 表明当时现已翻开的衔接数量
- openerCh: 一个 chan struct{} 类型的通道,用于堵塞池的 opener,直到 freeConn 不为空
- closed: 表明衔接池是否现已封闭
- dep: 一个
map[finalCloser]depSet
类型的映射,用于处理衔接的依靠联系,保护衔接的封闭顺序 - lastPut: 用于存储衔接最终一次被放回衔接池的时刻,首要用于衔接收回策略
- maxIdleCount: 表明衔接池中最多能够保留的闲暇衔接数
- maxOpen: 表明衔接池中最多能够翻开的衔接数量。假如 maxOpen 为 0,则表明没有限制
- maxLifetime: 表明一个衔接的最大生命周期
- maxIdleTime: 表明一个衔接能够在衔接池中闲暇的最长时刻
- cleanerCh: 一个 chan struct{} 类型的通道,用于定期清理衔接池中的闲暇衔接
- waitCount: 表明当时正在等候衔接的数量
- maxIdleClosed: 表明由于闲暇衔接数超越限制而被封闭的衔接数量
- maxIdleTimeClosed: 表明由于闲暇时刻超越限制而被封闭的衔接数量
- maxLifetimeClosed: 表明由于生命周期超越限制而被封闭的衔接数量
- stop: 一个类型为 func() 的函数变量,用于封闭衔接池并开释资源
Open
翻开db衔接
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
检查驱动是否存在,假如存在则调用OpenDB方法来衔接数据库
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx)
return db
}
// Runs in a separate goroutine, opens new connections when requested.
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}
- openerCh: 表明衔接恳求行列
- lastPut: 表明最终一次分配的数据库衔接的goroutine ID
- connRequests: 表明用于处理衔接恳求的goroutine ID
假如db.openerCh通道被触发,就意味着有新的衔接恳求需求处理,就会调用db.openNewConnection(ctx)函数去建立新的数据库衔接
Open new conn
// Open one new connection
func (db *DB) openNewConnection(ctx context.Context) {
// maybeOpenNewConnections has already executed db.numOpen++ before it sent
// on db.openerCh. This function must execute db.numOpen-- if the
// connection fails or is closed before returning.
ci, err := db.connector.Connect(ctx)
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
if err == nil {
ci.Close()
}
db.numOpen--
return
}
if err != nil {
db.numOpen--
db.putConnDBLocked(nil, err)
db.maybeOpenNewConnections()
return
}
dc := &driverConn{
db: db,
createdAt: nowFunc(),
returnedAt: nowFunc(),
ci: ci,
}
if db.putConnDBLocked(dc, err) {
db.addDepLocked(dc, dc)
} else {
db.numOpen--
ci.Close()
}
}
函数将从衔接器的 Connect 方法中获取数据库衔接。Connect 方法的返回值是一个 driver.Conn 接口实例,表明一个数据库衔接。
假如衔接获取失败,则会将 db.numOpen 减一,收回衔接并终止该函数的履行。否则,将创立一个 driverConn 类型的结构体实例 dc 代表数据库衔接,其中包含衔接信息、衔接器、创立时刻等特点。
之后,函数调用 db 的 putConnDBLocked 方法将衔接增加到衔接池中,一起调用 addDepLocked 方法将其增加到依靠池中。假如增加失败,则将 dc 封闭,并将 db.numOpen 减一。增加成功后,函数保护 db.numOpen 计数器,表明当时具有的活动衔接数。值得注意的是,假如数据库衔接池已封闭,则将收回已创立的衔接,并将 db.numOpen 计数器减一。
也就是说,该函数实现了衔接池的办理和资源收回,保证了不会由于衔接走漏资源而导致系统功能下降
Close
func (db *DB) Close() error {
db.mu.Lock()
if db.closed { // Make DB.Close idempotent
db.mu.Unlock()
return nil
}
if db.cleanerCh != nil {
close(db.cleanerCh)
}
var err error
fns := make([]func() error, 0, len(db.freeConn))
for _, dc := range db.freeConn {
fns = append(fns, dc.closeDBLocked())
}
db.freeConn = nil
db.closed = true
for _, req := range db.connRequests {
close(req)
}
db.mu.Unlock()
for _, fn := range fns {
err1 := fn()
if err1 != nil {
err = err1
}
}
db.stop()
if c, ok := db.connector.(io.Closer); ok {
err1 := c.Close()
if err1 != nil {
err = err1
}
}
return err
}
Close用于封闭衔接,并将衔接池中的衔接悉数归还到驱动衔接池中,最终封闭衔接
衔接保活
// startCleanerLocked starts connectionCleaner if needed.
func (db *DB) startCleanerLocked() {
if (db.maxLifetime > 0 || db.maxIdleTime > 0) && db.numOpen > 0 && db.cleanerCh == nil {
db.cleanerCh = make(chan struct{}, 1)
go db.connectionCleaner(db.shortestIdleTimeLocked())
}
}
func (db *DB) connectionCleaner(d time.Duration) {
const minInterval = time.Second
if d < minInterval {
d = minInterval
}
t := time.NewTimer(d)
for {
select {
case <-t.C:
case <-db.cleanerCh: // maxLifetime was changed or db was closed.
}
db.mu.Lock()
d = db.shortestIdleTimeLocked()
if db.closed || db.numOpen == 0 || d <= 0 {
db.cleanerCh = nil
db.mu.Unlock()
return
}
d, closing := db.connectionCleanerRunLocked(d)
db.mu.Unlock()
for _, c := range closing {
c.Close()
}
if d < minInterval {
d = minInterval
}
if !t.Stop() {
select {
case <-t.C:
default:
}
}
t.Reset(d)
}
}
connectionCleaner 会默许已1s的间隔,去扫描闲暇的衔接行列,封闭超时的闲暇衔接,并从衔接池中移除
参考资料
tutorial: github.com/VividCortex…
database/sql: github.com/golang/go/t…