文章标题 原创 翻译 转载 文章内容 # 工作池 如线程池,不断的往池子里丢任务,池子开启多个线程不断的处理任务。go这里其实只是对go chan defer的简单封装。 work/main.go ``` package work import "sync" type Pool struct { wg sync.WaitGroup task chan func() } func New(size int) *Pool { p := &Pool{ task: make(chan func()), } p.wg.Add(size) for i := 0; i < size; i++ { go func() { defer p.wg.Done() for task := range p.task { task() } }() } return p } func (p *Pool) Run(f func()) { p.task <- f } func (p *Pool) Shutdown() { close(p.task) p.wg.Wait() } ``` 使用: ``` package main import ( "fmt" "math/rand" "time" "./work" ) func init() { rand.Seed(time.Now().UnixNano()) } func main() { start := time.Now().UnixNano() pool := work.New(20) for i := 0; i < 100; i++ { pool.Run(func() { time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) }) } pool.Shutdown() end := time.Now().UnixNano() fmt.Println("spent", (end-start)/1e6) } ``` 例子2: ``` package main import ( "fmt" "sync" "time" ) type WorkPool struct { jobs chan int results chan int workSize int wg sync.WaitGroup } func NewWorkPool(poolSize int, workSize int) *WorkPool { return &WorkPool{ jobs: make(chan int, poolSize), results: make(chan int, poolSize), workSize: workSize, } } func (p *WorkPool) Start() { go func() { p.wg.Wait() close(p.results) }() p.wg.Add(p.workSize) for i := 0; i < p.workSize; i++ { go func(workId int) { defer p.wg.Done() for j := range p.jobs { fmt.Println("worker", workId, "start", j) time.Sleep(time.Second) fmt.Println("worker", workId, "end", j) p.results <- j * 2 } }(i) } } func (p *WorkPool) Stop() { close(p.jobs) } func (p *WorkPool) Add(i int) { p.jobs <- i } func (p *WorkPool) Result() []int { r := []int{} for a := range p.results { r = append(r, a) } return r } func main() { w := NewWorkPool(100, 8) w.Start() for i := 0; i < 3; i++ { w.Add(i) } w.Stop() fmt.Println(w.Result()) } ``` # 连接池 如数据库连接池,因为创建销毁数据库连接的代价比较大所以使用连接池来管理连接。 ``` package pool import ( "errors" "fmt" "io" "sync" ) type Pool struct { m sync.Mutex res chan io.Closer factory func()(io.Closer, error) closed bool } func New(fn func()(io.Closer, error), size uint)(*Pool, error) { if size <= 0 { panic(errors.New("pool size error")) } return &Pool { res: make(chan io.Closer, size), factory: fn, }, nil } func (p *Pool)Get()(io.Closer, error) { select { case r, ok := <- p.res: { fmt.Println("get resource") if !ok { return nil, errors.New("pool closed") } return r, nil } default: fmt.Println("create resource") return p.factory() } } func (p *Pool)Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true close(p.res) for r := range p.res { r.Close() } } func (p *Pool)Release(r io.Closer) { p.m.Lock() defer p.m.Unlock() if p.closed { r.Close() return } select { case p.res <- r: fmt.Println("resouce release") default: fmt.Println("pool full, resource close") r.Close() } } ``` 使用: ``` package main import ( "fmt" "io" "os" "test/pool" ) func main() { p, err := pool.New(func() (io.Closer, error) { return os.Create("ok.txt") }, 5) if err != nil { panic(err) } r, err := p.Get() if err != nil { fmt.Println(err) return } switch f := r.(type) { case *os.File: f.WriteString("hello") default: fmt.Println("type error") } p.Release(r) p.Close() } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交