go 连接池

     阅读(143)  2019-05-11 10:19:28

连接池使用场景如数据库连接,通常为了平衡性能和资源会建立多个数据库连接,将他们放到一个池子中,需要的时候从池子里面取,用完了再归还给池子,如果池子里的资源不够会创建新资源,尽量要避免新建资源的速度大于池子的大小,池子的大小要在获取资源和归还资源之间做到平衡最好, 这样才能避免了频繁的建立和销毁资源,让池子里的资源能得到最大限度的利用。

pool.go

package pool

import (
    "errors"
    "log"
    "io"
    "sync"
)

type Pool struct {
    mutex sync.Mutex                        // 互斥量锁住同步修改的值
    resources chan io.Closer                // 资源放在通道里
    factory func() (io.Closer, error)       // 创建资源(如:连接)
    closed bool                             // 池子是否关闭
}

var ErrPoolClosed = errors.New("Pool has been closed.")

// 创建一个池子
// 参数1:创建一个资源的函数
// 参数2:池子大小
func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size value too small.")
    }
    return &Pool{
        factory: fn,
        resources: make(chan io.Closer, size),
    }, nil
}

// 获取资源,从通道里获取一个资源,如果没有就新建一个
func (p *Pool) Acquire() (io.Closer, error) {
    select {
    case r, ok := <-p.resources:
        log.Println("Acquire shared resource")
        if ok {
            return r, nil
        }
        return nil, ErrPoolClosed
    default:
        log.Println("Acquire new resource")
        return p.factory()
    }
}

// 归还资源
// 如果池子已关闭就关闭当前资源
// 将资源放进通道
// 如果通道已满就直接关闭资源,尽量要少发生这种情况
func (p *Pool) Release(r io.Closer) {
    p.mutex.Lock()
    defer p.mutex.Unlock()
    if p.closed {
        r.Close()
        return
    }
    select {
    case p.resources <-r:
        log.Println("Release: In Queue")
    default:
        log.Println("Relase: Closing")
        r.Close()
    }
}

// 关闭池子
func (p *Pool) Close() {
    p.mutex.Lock()
    defer p.mutex.Unlock()
    if p.closed {
        return
    }
    p.closed = true
    close(p.resources)
    for r := range p.resources {
        r.Close()
    }
}

main.go

package main

import (
    "log"
    "fmt"
    "io"
    "sync"
    "sync/atomic"
    "time"
    "myproject/pool"
    "math/rand"
)

const (
    maxGoroutines = 25  // 任务数
    poolSize = 3        // 池子大小
)

// 假设这是个数据库连接
type dbConnection struct {
    ID int32
}

// 数据库连接要实现关闭接口
func (conn *dbConnection) Close() error {
    log.Println("Close: Connection id", conn.ID)
    return nil
}

// 递增新ID
func getIdImpl() func() int32 {
    var id int32 = 0
    return func() int32 {
        atomic.AddInt32(&id, 1)
        return id
    }
}
var getId = getIdImpl()

// 创建连接的方法
func createConnection() (io.Closer, error) {
    id := getId()
    log.Println("Create: New Connection", id)
    return &dbConnection{id}, nil
}

// 从连接池里取出一个连接做查询操作
func performQueries(query int, p *pool.Pool) {
    conn, err := p.Acquire()
    if err != nil {
        log.Println(err)
        return
    }
    defer p.Release(conn)

    // 模拟查询操作,需要消耗一定的时间
    time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond);
    log.Printf("handle conn id:%d, query id:%d\n", conn.(*dbConnection).ID, query)
}

// 初始化一个随机数种子,让每次程序启动生成的随机数都不一样
func init() {
    rand.Seed(time.Now().UnixNano())
}

func main() {
    wg := sync.WaitGroup{}
    wg.Add(maxGoroutines)

    pool, err := pool.New(createConnection, poolSize)
    if err != nil {
        log.Println(err)
    }
    for i := 0; i < maxGoroutines; i++ {
        // 模拟任务调用,同一时间不应该调用那么多任务,这里间隔一会
        time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond);

        // 启动查询任务,使用闭包函数,参数要从外面传进来,这样每次传进来的值才能不一样
        go func(i int) {
            defer wg.Done()
            performQueries(i, pool)
        }(i)
    }

    // 所有任务完成关闭池子
    wg.Wait()
    pool.Close()

    fmt.Println("bye bye...")
}

文章评论

Keep it simple,stupid
文章数
290
总访问量
305734
今日访问
377
最近评论

xuehaoyun : 很不错,来围观一下
tujiaw : 抱歉csdn code服务关闭了,这个代码我也找不到了
于淞 : 你好,这个文章的源码能分享一下吗,songsong9181@163.com,谢谢了 上面的写错了
于淞 : 你好,这个文章的源码能分享一下吗,838106303@163.com,谢谢了 上面的链接不能用了
tujiaw : 多谢多谢
essaypinglun college-paper.org : 很好的博客,赞赞
Folly : 这个实现有点奇怪,Qt为什么没有统一的比对方法。
过多s : alert("hello, world!")
tujiaw : 还不错哦
回到顶部