Goroutine Coordination Patterns
Coordinating multiple goroutines is essential for building scalable concurrent applications. This lesson covers production-ready patterns: worker pools, fan-out/fan-in, pipelines, and bounded concurrency. Master these patterns to write efficient, maintainable concurrent Go code.
Figure: WaitGroup Lifecycle and Coordination Patterns (Worker Pool, Fan-Out/Fan-In)
WaitGroup Coordination Pattern
The fundamental pattern for coordinating goroutines uses
sync.WaitGroup to wait
for a collection of goroutines to complete:
Click Run to execute your code
wg.Add(1)- Increment counter before starting goroutinedefer wg.Done()- Decrement counter when goroutine completeswg.Wait()- Block until all goroutines finish
Worker Pool Pattern
The worker pool pattern uses a fixed number of workers to process jobs from a queue. This limits concurrency and provides better resource management:
Click Run to execute your code
- Limits concurrent goroutines (prevents resource exhaustion)
- Reuses goroutines (reduces overhead)
- Provides backpressure (buffered job channel)
- Easy to scale (adjust number of workers)
Fan-Out Pattern
Fan-out distributes work across multiple goroutines to parallelize processing:
Click Run to execute your code
Fan-In Pattern
Fan-in merges multiple input channels into a single output channel:
Click Run to execute your code
- Collecting results from multiple workers
- Merging log streams
- Aggregating data from multiple sources
- Combining outputs from parallel computations
Pipeline Pattern with WaitGroup
Pipelines chain multiple stages together, with each stage processing data and passing it to the next:
Click Run to execute your code
| Stage | Input | Processing | Output |
|---|---|---|---|
| Generate | - | Create numbers | Channel of ints |
| Square | Channel of ints | Square each number | Channel of ints |
| Channel of ints | Print results | - |
Error Handling in Concurrent Code
Collecting errors from multiple goroutines requires careful coordination:
Click Run to execute your code
golang.org/x/sync/errgroup package, which provides a WaitGroup with
error
collection built-in.
Bounded Concurrency
Limit the number of concurrent goroutines using a semaphore pattern:
Click Run to execute your code
Common Mistakes
1. Adding to WaitGroup inside goroutine
// ❌ Wrong - race condition
for i := 0; i < 10; i++ {
go func() {
wg.Add(1) // Too late! Main might call Wait() first
defer wg.Done()
// work
}()
}
// ✅ Correct - Add before starting goroutine
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// work
}()
}
2. Closing channel too early
// ❌ Wrong - channel closed before workers finish
close(jobs)
wg.Wait() // Workers might still be reading!
// ✅ Correct - close after sending all jobs
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs) // Now safe to close
wg.Wait()
3. Not closing results channel
// ❌ Wrong - range will block forever
for result := range results {
fmt.Println(result)
}
// ✅ Correct - close results when done
go func() {
wg.Wait()
close(results) // Signal no more results
}()
for result := range results {
fmt.Println(result)
}
Exercise: Concurrent File Processor
Task: Build a concurrent file processor using the worker pool pattern.
Requirements:
- Process 20 "files" (simulated with numbers)
- Use 4 workers in a worker pool
- Each file takes 200-500ms to process
- Collect and display all results
- Handle errors gracefully
Show Solution
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type FileResult struct {
FileID int
Size int
Err error
}
func processFile(fileID int) (int, error) {
// Simulate processing time
time.Sleep(time.Duration(200+rand.Intn(300)) * time.Millisecond)
// Simulate occasional errors
if fileID%7 == 0 {
return 0, fmt.Errorf("failed to process file %d", fileID)
}
return rand.Intn(1000), nil
}
func worker(id int, files <-chan int, results chan<- FileResult, wg *sync.WaitGroup) {
defer wg.Done()
for fileID := range files {
fmt.Printf("Worker %d processing file %d\n", id, fileID)
size, err := processFile(fileID)
results <- FileResult{FileID: fileID, Size: size, Err: err}
}
}
func main() {
const numWorkers = 4
const numFiles = 20
files := make(chan int, numFiles)
results := make(chan FileResult, numFiles)
var wg sync.WaitGroup
// Start worker pool
fmt.Printf("Starting %d workers...\n", numWorkers)
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, files, results, &wg)
}
// Send files to process
for f := 1; f <= numFiles; f++ {
files <- f
}
close(files)
// Close results when all workers done
go func() {
wg.Wait()
close(results)
}()
// Collect results
var totalSize int
var errors []error
for result := range results {
if result.Err != nil {
errors = append(errors, result.Err)
} else {
totalSize += result.Size
}
}
// Display summary
fmt.Printf("\n=== Processing Complete ===\n")
fmt.Printf("Files processed: %d\n", numFiles-len(errors))
fmt.Printf("Total size: %d bytes\n", totalSize)
if len(errors) > 0 {
fmt.Printf("Errors: %d\n", len(errors))
for _, err := range errors {
fmt.Printf(" - %v\n", err)
}
}
}
Summary
- WaitGroup coordinates multiple goroutines (Add, Done, Wait)
- Worker Pool limits concurrency with fixed workers
- Fan-Out distributes work across multiple goroutines
- Fan-In merges multiple channels into one
- Pipeline chains processing stages together
- Error Collection requires dedicated error channel
- Bounded Concurrency uses semaphore pattern
- Always Add() before starting goroutine
- Always defer Done() to ensure cleanup
- Close channels when no more data will be sent
What's Next?
You've mastered goroutine coordination patterns! These patterns form the foundation of production Go concurrency. Continue exploring advanced topics like generics and reflection to complete your Go expertise.
Enjoying these tutorials?