删除文章

确定要删除这篇文章吗?

取消
确定

go 工作池 连接池

     阅读(628)  2019-05-11 10:38:20

工作池

如线程池,不断的往池子里丢任务,池子开启多个线程不断的处理任务。go这里其实只是对go chan defer的简单封装。

work/main.go

package work

import "sync"

type Pool struct {
    wg   sync.WaitGroup
    task chan func()
}

func New(size int) *Pool {
    p := &Pool{
        task: make(chan func()),
    }
    p.wg.Add(size)

    for i := 0; i < size; i++ {
        go func() {
            defer p.wg.Done()
            for task := range p.task {
                task()
            }
        }()
    }

    return p
}

func (p *Pool) Run(f func()) {
    p.task <- f
}

func (p *Pool) Shutdown() {
    close(p.task)
    p.wg.Wait()
}

使用:

package main

import (
    "fmt"
    "math/rand"
    "time"

    "./work"
)

func init() {
    rand.Seed(time.Now().UnixNano())
}

func main() {
    start := time.Now().UnixNano()
    pool := work.New(20)

    for i := 0; i < 100; i++ {
        pool.Run(func() {
            time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        })
    }
    pool.Shutdown()

    end := time.Now().UnixNano()
    fmt.Println("spent", (end-start)/1e6)
}

例子2:

package main

import (
    "fmt"
    "sync"
    "time"
)

type WorkPool struct {
    jobs     chan int
    results  chan int
    workSize int
    wg       sync.WaitGroup
}

func NewWorkPool(poolSize int, workSize int) *WorkPool {
    return &WorkPool{
        jobs:     make(chan int, poolSize),
        results:  make(chan int, poolSize),
        workSize: workSize,
    }
}

func (p *WorkPool) Start() {
    go func() {
        p.wg.Wait()
        close(p.results)
    }()

    p.wg.Add(p.workSize)
    for i := 0; i < p.workSize; i++ {
        go func(workId int) {
            defer p.wg.Done()
            for j := range p.jobs {
                fmt.Println("worker", workId, "start", j)
                time.Sleep(time.Second)
                fmt.Println("worker", workId, "end", j)
                p.results <- j * 2
            }
        }(i)
    }
}

func (p *WorkPool) Stop() {
    close(p.jobs)
}

func (p *WorkPool) Add(i int) {
    p.jobs <- i
}

func (p *WorkPool) Result() []int {
    r := []int{}
    for a := range p.results {
        r = append(r, a)
    }
    return r
}

func main() {
    w := NewWorkPool(100, 8)
    w.Start()
    for i := 0; i < 3; i++ {
        w.Add(i)
    }
    w.Stop()
    fmt.Println(w.Result())
}

连接池

如数据库连接池,因为创建销毁数据库连接的代价比较大所以使用连接池来管理连接。

package pool

import (
    "errors"
    "fmt"
    "io"
    "sync"
)

type Pool struct {
    m sync.Mutex
    res chan io.Closer
    factory func()(io.Closer, error)
    closed bool
}

func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
    if size <= 0 {
        panic(errors.New("pool size error"))
    }
    return &Pool {
        res: make(chan io.Closer, size),
        factory: fn,
    }, nil
}

func (p *Pool)Get()(io.Closer, error) {
    select {
    case r, ok := <- p.res: {
        fmt.Println("get resource")
        if !ok {
            return nil, errors.New("pool closed")
        }
        return r, nil
    }
    default:
        fmt.Println("create resource")
        return p.factory()
    }
}

func (p *Pool)Close() {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        return
    }

    p.closed = true
    close(p.res)

    for r := range p.res {
        r.Close()
    }
}

func (p *Pool)Release(r io.Closer) {
    p.m.Lock()
    defer p.m.Unlock()

    if p.closed {
        r.Close()
        return
    }

    select {
    case p.res <- r:
        fmt.Println("resouce release")
    default:
        fmt.Println("pool full, resource close")
        r.Close()
    }
}

使用:

package main

import (
    "fmt"
    "io"
    "os"
    "test/pool"
)

func main() {
    p, err := pool.New(func() (io.Closer, error) {
        return os.Create("ok.txt")
    }, 5)
    if err != nil {
        panic(err)
    }

    r, err := p.Get()
    if err != nil {
        fmt.Println(err)
        return
    }

    switch f := r.(type) {
    case *os.File:
        f.WriteString("hello")
    default:
        fmt.Println("type error")
    }

    p.Release(r)
    p.Close()
}

文章评论

Keep it simple,stupid
文章数
329
今日访问
3076
今日IP数
228
最近评论

liangzi: 不错 谢谢分享
tujiaw: registerThreadInactive:如果当前没有激活的线程,就去激活线程,让等待的线程去执行任务。
hgzzx: 佩服佩服。 请教:registerThreadInactive的作用是什么?
xuehaoyun: 很不错,来围观一下
tujiaw: 抱歉csdn code服务关闭了,这个代码我也找不到了
于淞: 你好,这个文章的源码能分享一下吗,songsong9181@163.com,谢谢了 上面的写错了
回到顶部