您现在的位置是:亿华云 > 应用开发

一个 Demo 学会 WorkerPool

亿华云2025-10-09 07:03:01【应用开发】0人已围观

简介本文转载自微信公众号「Golang来啦」,作者Seekload。转载本文请联系Golang来啦公众号。四哥水平有限,如有翻译或理解错误,烦请帮忙指出,感谢!今天给大家分享一篇关于 workPool 的

本文转载自微信公众号「Golang来啦」,作者Seekload。转载本文请联系Golang来啦公众号。

四哥水平有限,如有翻译或理解错误,烦请帮忙指出,感谢!

今天给大家分享一篇关于 workPool 的文章,这个平时大家应该用的比较多,一起来看下。

原文如下:

工作池是这样一个池子,会创建指定数量的 worker,这些 worker 能获取任务并处理。允许多个任务同时处理,但是需要维持固定数量的 worker 避免系统资源被过度使用。

通常有两种方式创建任务池:

一种是预先创建固定数量的 worker; 另外一种是当有需要的时候才会创建 worker,当然也会有数量限制;

本文将与大家一起讨论第一种方式。当我们预先知道有许多任务需要同时运行,并且很大概率会用上最大数量的 worker,网站模板通常会采用这种方式。

为了演示,我们先创建 Worker 结构体,它获取任务并执行。

import (  "fmt" ) // Worker ... type Worker struct {   ID       int  Name     string  StopChan chan bool } // Start ... func (w *Worker) Start(jobQueue chan Job) {   w.StopChan = make(chan bool)  successChan := make(chan bool)  go func() {    successChan <- true   for {     // take job    job := <-jobQueue    if job != nil {      job.Start(w)    } else {      fmt.Printf("worker %s to be stopped\n", w.Name)     w.StopChan <- true     break    }   }  }()  // wait for the worker to start  <-successChan } // Stop ... func (w *Worker) Stop() {   // wait for the worker to stop, blocking  _ = <-w.StopChan  fmt.Printf("worker %s stopped\n", w.Name) } 

Worker 有一些属性保存当前的状态,另外还声明了两个方法分别用于启动、停止 worker。

在 Start() 方法里,创建了两个 channel 分别用于 worker 的启动和停止。最重要的是 for 循环里面,worker 会一直等待获取 job 并可执行的直到任务队列关闭。

Job 是包含单个方法 Start() 的接口,所以只要实现 Start() 方法就可以有不同类型的 job。

// Job ... type Job interface {   Start(worker *Worker) error } 

一旦 Worker 确定之后,接下来就是创建 pool 来管理 workers。

import (  "fmt"  "sync" ) // Pool ... type Pool struct {   Name string  Size    int  Workers []*Worker  QueueSize int  Queue     chan Job } // Initiualize ... func (p *Pool) Initialize() {   // maintain minimum 1 worker  if p.Size < 1 {    p.Size = 1  }  p.Workers = []*Worker{ }  for i := 1; i <= p.Size; i++ {    worker := &Worker{     ID:   i - 1,    Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1),   }   p.Workers = append(p.Workers, worker)  }  // maintain min queue size as 1  if p.QueueSize < 1 {    p.QueueSize = 1  }  p.Queue = make(chan Job, p.QueueSize) } // Start ... func (p *Pool) Start() {   for _, worker := range p.Workers {    worker.Start(p.Queue)  }  fmt.Println("all workers started") } // Stop ... func (p *Pool) Stop() {   close(p.Queue) // close the queue channel  var wg sync.WaitGroup  for _, worker := range p.Workers {    wg.Add(1)   go func(w *Worker) {     defer wg.Done()    w.Stop()   }(worker)  }  wg.Wait()  fmt.Println("all workers stopped") } 

Pool 包含 worker 切片和用于保存 job 的队列。worker 的数量在初始化的亿华云时候是可以自定义。

关键点在 Stop() 的逻辑,当它被调用时,会先关闭 job 队列,worker 便会从 job 队列读到 nil,接着就会关闭对应的 worker。接着在 for 循环里,等待 worker 并发地停止直到最后一个 worker 停止。

为了演示整体逻辑,下面的例子展示了一个仅仅输出值的 job。

import "fmt" func main() {   pool := &Pool{    Name:      "test",   Size:      5,   QueueSize: 20,  }  pool.Initialize()  pool.Start()         defer pool.Stop()  for i := 1; i <= 100; i++ {    job := &PrintJob{     Index: i,   }   pool.Queue <- job  } } // PrintJob ... type PrintJob struct {   Index int } func (pj *PrintJob) Start(worker *Worker) error {   fmt.Printf("job %s - %d\n", worker.Name, pj.Index)  return nil } 

如果你看了上面的代码逻辑,就会发现很简单,创建了有 5 个 worker 的工作池并且 job 队列的大小是 20。

接着,模拟 job 创建和处理过程:一旦 job 被创建就会 push 到任务队列里,等待着的 worker 便会从队列里取出 job 并处理。

类似下面这样的输出:

all workers started job test-worker-3 - 4 job test-worker-3 - 6 job test-worker-3 - 7 job test-worker-3 - 8 job test-worker-3 - 9 job test-worker-3 - 10 job test-worker-3 - 11 job test-worker-3 - 12 job test-worker-3 - 13 job test-worker-3 - 14 job test-worker-3 - 15 job test-worker-3 - 16 job test-worker-3 - 17 job test-worker-3 - 18 job test-worker-3 - 19 job test-worker-3 - 20 worker test-worker-3 to be stopped job test-worker-4 - 5 job test-worker-0 - 1 worker test-worker-3 stopped job test-worker-2 - 3 worker test-worker-2 to be stopped worker test-worker-2 stopped worker test-worker-4 to be stopped worker test-worker-4 stopped worker test-worker-0 to be stopped worker test-worker-0 stopped job test-worker-1 - 2 worker test-worker-1 to be stopped worker test-worker-1 stopped all workers stopped 

via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang

作者:sonic0002

云服务器提供商

很赞哦!(46861)