Web Analytics

Goroutine Coordination Patterns

Advanced ~45 min read

Coordinating multiple goroutines is essential for building scalable concurrent applications. This lesson covers production-ready patterns: worker pools, fan-out/fan-in, pipelines, and bounded concurrency. Master these patterns to write efficient, maintainable concurrent Go code.

Goroutine Coordination with WaitGroup

Figure: WaitGroup Lifecycle and Coordination Patterns (Worker Pool, Fan-Out/Fan-In)

WaitGroup Coordination Pattern

The fundamental pattern for coordinating goroutines uses sync.WaitGroup to wait for a collection of goroutines to complete:

Output
Click Run to execute your code
Pattern Steps:
  1. wg.Add(1) - Increment counter before starting goroutine
  2. defer wg.Done() - Decrement counter when goroutine completes
  3. wg.Wait() - Block until all goroutines finish

Worker Pool Pattern

The worker pool pattern uses a fixed number of workers to process jobs from a queue. This limits concurrency and provides better resource management:

Output
Click Run to execute your code
Worker Pool Benefits:
  • Limits concurrent goroutines (prevents resource exhaustion)
  • Reuses goroutines (reduces overhead)
  • Provides backpressure (buffered job channel)
  • Easy to scale (adjust number of workers)

Fan-Out Pattern

Fan-out distributes work across multiple goroutines to parallelize processing:

Output
Click Run to execute your code
When to Use Fan-Out: Use when you have independent tasks that can be processed in parallel, like processing multiple files, making concurrent API calls, or performing parallel computations.

Fan-In Pattern

Fan-in merges multiple input channels into a single output channel:

Output
Click Run to execute your code
Fan-In Use Cases:
  • Collecting results from multiple workers
  • Merging log streams
  • Aggregating data from multiple sources
  • Combining outputs from parallel computations

Pipeline Pattern with WaitGroup

Pipelines chain multiple stages together, with each stage processing data and passing it to the next:

Output
Click Run to execute your code
Stage Input Processing Output
Generate - Create numbers Channel of ints
Square Channel of ints Square each number Channel of ints
Print Channel of ints Print results -

Error Handling in Concurrent Code

Collecting errors from multiple goroutines requires careful coordination:

Output
Click Run to execute your code
Pro Tip: For more sophisticated error handling, consider using the golang.org/x/sync/errgroup package, which provides a WaitGroup with error collection built-in.

Bounded Concurrency

Limit the number of concurrent goroutines using a semaphore pattern:

Output
Click Run to execute your code
Semaphore Pattern: A buffered channel acts as a semaphore. Sending to the channel acquires a slot, receiving releases it. The buffer size limits concurrency.

Common Mistakes

1. Adding to WaitGroup inside goroutine

// ❌ Wrong - race condition
for i := 0; i < 10; i++ {
    go func() {
        wg.Add(1)  // Too late! Main might call Wait() first
        defer wg.Done()
        // work
    }()
}

// ✅ Correct - Add before starting goroutine
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        // work
    }()
}

2. Closing channel too early

// ❌ Wrong - channel closed before workers finish
close(jobs)
wg.Wait()  // Workers might still be reading!

// ✅ Correct - close after sending all jobs
for j := 1; j <= numJobs; j++ {
    jobs <- j
}
close(jobs)  // Now safe to close
wg.Wait()

3. Not closing results channel

// ❌ Wrong - range will block forever
for result := range results {
    fmt.Println(result)
}

// ✅ Correct - close results when done
go func() {
    wg.Wait()
    close(results)  // Signal no more results
}()

for result := range results {
    fmt.Println(result)
}

Exercise: Concurrent File Processor

Task: Build a concurrent file processor using the worker pool pattern.

Requirements:

  • Process 20 "files" (simulated with numbers)
  • Use 4 workers in a worker pool
  • Each file takes 200-500ms to process
  • Collect and display all results
  • Handle errors gracefully
Show Solution
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type FileResult struct {
    FileID int
    Size   int
    Err    error
}

func processFile(fileID int) (int, error) {
    // Simulate processing time
    time.Sleep(time.Duration(200+rand.Intn(300)) * time.Millisecond)
    
    // Simulate occasional errors
    if fileID%7 == 0 {
        return 0, fmt.Errorf("failed to process file %d", fileID)
    }
    
    return rand.Intn(1000), nil
}

func worker(id int, files <-chan int, results chan<- FileResult, wg *sync.WaitGroup) {
    defer wg.Done()
    
    for fileID := range files {
        fmt.Printf("Worker %d processing file %d\n", id, fileID)
        size, err := processFile(fileID)
        results <- FileResult{FileID: fileID, Size: size, Err: err}
    }
}

func main() {
    const numWorkers = 4
    const numFiles = 20
    
    files := make(chan int, numFiles)
    results := make(chan FileResult, numFiles)
    var wg sync.WaitGroup
    
    // Start worker pool
    fmt.Printf("Starting %d workers...\n", numWorkers)
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, files, results, &wg)
    }
    
    // Send files to process
    for f := 1; f <= numFiles; f++ {
        files <- f
    }
    close(files)
    
    // Close results when all workers done
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    var totalSize int
    var errors []error
    
    for result := range results {
        if result.Err != nil {
            errors = append(errors, result.Err)
        } else {
            totalSize += result.Size
        }
    }
    
    // Display summary
    fmt.Printf("\n=== Processing Complete ===\n")
    fmt.Printf("Files processed: %d\n", numFiles-len(errors))
    fmt.Printf("Total size: %d bytes\n", totalSize)
    if len(errors) > 0 {
        fmt.Printf("Errors: %d\n", len(errors))
        for _, err := range errors {
            fmt.Printf("  - %v\n", err)
        }
    }
}

Summary

  • WaitGroup coordinates multiple goroutines (Add, Done, Wait)
  • Worker Pool limits concurrency with fixed workers
  • Fan-Out distributes work across multiple goroutines
  • Fan-In merges multiple channels into one
  • Pipeline chains processing stages together
  • Error Collection requires dedicated error channel
  • Bounded Concurrency uses semaphore pattern
  • Always Add() before starting goroutine
  • Always defer Done() to ensure cleanup
  • Close channels when no more data will be sent

What's Next?

You've mastered goroutine coordination patterns! These patterns form the foundation of production Go concurrency. Continue exploring advanced topics like generics and reflection to complete your Go expertise.