Building a Go Work Pool

I recently started working with Go which is a very opinionated open source programming language from Google and contributors. It is a fantastic language and I rather enjoy how it has helped me develop as a programmer these past few months.

Back Story

My new position at work requires me to work with a large data set that I decided to truncate into smaller sets for processing. I wanted to process each batch in parallel but without restricting myself to a single "job type". So that in the future when the code requires a different "job type" I would not have to wrangle multiple work pools. In developing the solution, I found a work around to Go's lack of generics so I could process multiple "job types" via the same work pool. This code is modified from a blog post written here. I also packaged this code for my reuse here.

Dispatcher

The role of the Dispatcher is to initialize the WorkerPool, dispatch jobs as they are created, and wait for the go routines to finish before closing out the main thread.

package main

import "sync"

// Dispatcher creates workers and dispatches jobs when received
type Dispatcher struct {  
    JobQueue   chan Job
    MaxWorkers int
    WaitGroup  *sync.WaitGroup
    // A pool of workers channels that are registered with the dispatcher
    WorkerPool chan chan Job
}

// NewDispatcher creates a dispatcher that is used to create workers
// and dispatch jobs to them
func NewDispatcher(maxWorkers int) *Dispatcher {  
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{JobQueue: make(chan Job, 1024), MaxWorkers: maxWorkers, WorkerPool: pool, WaitGroup: &sync.WaitGroup{}}
}

// Run creates the workers and dispatches jobs from a JobQueue to each worker
func (d *Dispatcher) Run() {  
    // starting n number of workers
    for i := 0; i < d.MaxWorkers; i++ {
        worker := NewWorker(d.WorkerPool, d.WaitGroup)
        worker.Start()
    }

  // start the dispatcher routine
    go d.dispatch()
}

func (d *Dispatcher) dispatch() {  
    for {
        select {
        case job := <-d.JobQueue:
            // a job request has been received
            go func(job Job) {
                // try to obtain a worker job channel that is available.
                // this will block until a worker is idle
                jobChannel := <-d.WorkerPool
                // dispatch the job to the worker job channel
                jobChannel <- job
            }(job)
        }
    }
}
Worker

A Worker is started by the Dispatcherand registers itself to the WorkerPool. Once a Job has been sent to the Dispatcher, it waits for a Worker to become ready for processing and hands off the Job to the Worker. The Worker triggers the process() method of the Job.

package main

import (  
    "fmt"
    "sync"
)

// Worker represents the worker that executes the job
type Worker struct {  
  // A pool of workers channels that are registered with the dispatcher
    WorkerPool chan chan Job
  // A channel for receiving a job that was dispatched
    JobChannel chan Job
  // A channel for receiving a worker termination signal
  // (quits after processing)
  quit       chan bool
  // A WaitGroup to signal the completed processing of a Job
    wg         *sync.WaitGroup
}

// NewWorker creates a new worker that can be registered to a WorkerPool
// and receive jobs
func NewWorker(workerPool chan chan Job, wg *sync.WaitGroup) Worker {  
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        wg:         wg}
}

// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {  
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                job.process()
              // signal to the wait group that a queued job has been processed
              // so the main thread can continue
                w.wg.Done()
            case <-w.quit:
                // we have received a signal to stop
                return
            }
        }
    }()
}
Job

All Job types implement a process() method. This way we do not need to infer types in the Worker thus allowing us to achieve a level of generic types in Go.

package main

// Job interface will be implemented for each task so that they
// may be passed to workers in the pool by the dispatcher
type Job interface {  
    process() error
}

type BatchJob struct {  
  // define the struct
}

func (b BatchJob) process() error {  
  // process data here for batch sets ...
}

type SingleJob struct {  
  // define the struct
}

func (s SingleJob) process() error {  
  // process data here for a single set ...
}
All Together Now

Here is an example of the above code in action. The struct need to be defined and process() implemented but it demonstrates the overall concept.

package main

func main() {  
  // Initialize a Dispatcher
  dispatcher := NewDispatcher(4)
  // Start the Dispatcher and create/register the Workers to the WorkerPool
  dispatcher.Run()
  // Queue two jobs for processing
  dispatcher.WaitGroup.Add(2)
  // Two jobs of different structures queued
  dispatcher.JobQueue <- BatchJob{}
  dispatcher.JobQueue <- SingleJob{}
  // Block main thread until processing in go routines completes
  dispatcher.WaitGroup.Wait()
}

Pasquale D'Agostino

Read more posts by this author.