go 工作池 连接池
•
工作池
如线程池,不断的往池子里丢任务,池子开启多个线程不断的处理任务。go这里其实只是对go chan defer的简单封装。
work/main.go
package work
import "sync"
type Pool struct {
	wg   sync.WaitGroup
	task chan func()
}
func New(size int) *Pool {
	p := &Pool{
		task: make(chan func()),
	}
	p.wg.Add(size)
	for i := 0; i < size; i++ {
		go func() {
			defer p.wg.Done()
			for task := range p.task {
				task()
			}
		}()
	}
	return p
}
func (p *Pool) Run(f func()) {
	p.task <- f
}
func (p *Pool) Shutdown() {
	close(p.task)
	p.wg.Wait()
}
使用:
package main
import (
	"fmt"
	"math/rand"
	"time"
	"./work"
)
func init() {
	rand.Seed(time.Now().UnixNano())
}
func main() {
	start := time.Now().UnixNano()
	pool := work.New(20)
	for i := 0; i < 100; i++ {
		pool.Run(func() {
			time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
		})
	}
	pool.Shutdown()
	end := time.Now().UnixNano()
	fmt.Println("spent", (end-start)/1e6)
}
例子2:
package main
import (
	"fmt"
	"sync"
	"time"
)
type WorkPool struct {
	jobs     chan int
	results  chan int
	workSize int
	wg       sync.WaitGroup
}
func NewWorkPool(poolSize int, workSize int) *WorkPool {
	return &WorkPool{
		jobs:     make(chan int, poolSize),
		results:  make(chan int, poolSize),
		workSize: workSize,
	}
}
func (p *WorkPool) Start() {
	go func() {
		p.wg.Wait()
		close(p.results)
	}()
	p.wg.Add(p.workSize)
	for i := 0; i < p.workSize; i++ {
		go func(workId int) {
			defer p.wg.Done()
			for j := range p.jobs {
				fmt.Println("worker", workId, "start", j)
				time.Sleep(time.Second)
				fmt.Println("worker", workId, "end", j)
				p.results <- j * 2
			}
		}(i)
	}
}
func (p *WorkPool) Stop() {
	close(p.jobs)
}
func (p *WorkPool) Add(i int) {
	p.jobs <- i
}
func (p *WorkPool) Result() []int {
	r := []int{}
	for a := range p.results {
		r = append(r, a)
	}
	return r
}
func main() {
	w := NewWorkPool(100, 8)
	w.Start()
	for i := 0; i < 3; i++ {
		w.Add(i)
	}
	w.Stop()
	fmt.Println(w.Result())
}
连接池
如数据库连接池,因为创建销毁数据库连接的代价比较大所以使用连接池来管理连接。
package pool
import (
	"errors"
	"fmt"
	"io"
	"sync"
)
type Pool struct {
	m sync.Mutex
	res chan io.Closer
	factory func()(io.Closer, error)
	closed bool
}
func New(fn func()(io.Closer, error), size uint)(*Pool, error) {
	if size <= 0 {
		panic(errors.New("pool size error"))
	}
	return &Pool {
		res: make(chan io.Closer, size),
		factory: fn,
	}, nil
}
func (p *Pool)Get()(io.Closer, error) {
	select {
	case r, ok := <- p.res: {
		fmt.Println("get resource")
		if !ok {
			return nil, errors.New("pool closed")
		}
		return r, nil
	}
	default:
		fmt.Println("create resource")
		return p.factory()
	}
}
func (p *Pool)Close() {
	p.m.Lock()
	defer p.m.Unlock()
	if p.closed {
		return
	}
	p.closed = true
	close(p.res)
	for r := range p.res {
		r.Close()
	}
}
func (p *Pool)Release(r io.Closer) {
	p.m.Lock()
	defer p.m.Unlock()
	if p.closed {
		r.Close()
		return
	}
	select {
	case p.res <- r:
		fmt.Println("resouce release")
	default:
		fmt.Println("pool full, resource close")
		r.Close()
	}
}
使用:
package main
import (
	"fmt"
	"io"
	"os"
	"test/pool"
)
func main() {
	p, err := pool.New(func() (io.Closer, error) {
		return os.Create("ok.txt")
	}, 5)
	if err != nil {
		panic(err)
	}
	r, err := p.Get()
	if err != nil {
		fmt.Println(err)
		return
	}
	switch f := r.(type) {
	case *os.File:
		f.WriteString("hello")
	default:
		fmt.Println("type error")
	}
	p.Release(r)
	p.Close()
}