Real-world patterns exercise with answers: https://github.com/eannchen/go-concurrency-exercises

Pipeline

A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.

In each stage (goroutines):

  1. Receive values from upstream stage via inbound channels
    1. Receive until those channels closed
  2. Produce new value via any number of goroutines
  3. Send values to downstream stage via outbound channels
    1. Close outbound channels after send operations are done

The first stage → is also called the source or producer

The last stage → is also called the sink or consumer

Example: Squaring numbers

An example without explicit cancellation

// First stage
func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer func() {
			fmt.Println("gen finishes sending")
			close(out)
		}()
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

// Second stage
// When `gen` closes outbound channel,
// it **indicates** the downstreams' range loop **when** to finish looping.
func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer func() {
			fmt.Println("sq finishes calculating")
			close(out)
		}()
		for n := range in {
			time.Sleep(1 * time.Second) // heavy work
			out <- n * n
		}
	}()
	return out
}

// Last stage
func main() {
	for n := range sq(gen(2, 3)) {
		fmt.Println(n)
	}
}

// output:
// gen finishes sending
// 4
// sq finishes calculating
// 9

Fan-out, Fan-in

// ... Same code before

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    
    // Fan-in:
    // Converts a list of channels to a single outbound channel
    // by starting a goroutine for each inbound channel.
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    in := gen(2, 3)

		// Fan-out:
    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    for n := range merge(c1, c2) {
        fmt.Println(n)
    }
}

Upstreams Memory Leak ⭐️

If downstream stops receiving values from upstreams inbound channels proactively (due to their logic or error handling), instead of upstreams closing outbound channels → It could cause upstreams memory leak.

func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer func() {
			fmt.Println("gen finishes sending")
			close(out)
		}()
		
		for _, n := range nums {
			// blocks until someone downstreams read it
			out <- n
		}
	}()
	return out
}

func sq(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer func() {
			fmt.Println("sq finishes calculating")
			close(out)
		}()
		for n := range in {
			time.Sleep(1 * time.Second) // heavy work
			
			// blocks until someone downstreams read it
			out <- n * n
		}
	}()
	return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        for n := range c {
            // blocks until someone downstreams read it
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        fmt.Println("merge finishes sending")
        close(out)
    }()
    return out
}

func main() {
    in := gen(2, 3)

    c1 := sq(in)
    c2 := sq(in)

    out := merge(c1, c2)
    
    // Receive once manually, 
    // without using a range loop detecting when to stop receiving.
    fmt.Println(<-out) // 4 or 9
}

// Output:
// gen finishes sending
// sq finishes calculating
// 9

// See from the output, we can see where gets stuck (memory leak),
// `merge` and one of the `sq` get memory leak.

Solved: Explicit cancellation

func gen(done <-chan struct{}, nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer func() {
			fmt.Println("gen finishes sending")
			close(out)
		}()
		for _, n := range nums {
			select {
			case out <- n:
			case <-done:
				return
			}
		}
	}()
	return out
}

func sq(done <-chan struct{}, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer func() {
			fmt.Println("sq finishes calculating")
			close(out)
		}()
		for n := range in {
			time.Sleep(1 * time.Second)
			select {
			case out <- n * n:
			case <-done:
				return
			}
		}
	}()
	return out
}

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			select {
			case out <- n:
			case <-done:
				return
			}
		}
	}
	wg.Add(len(cs))
	for _, c := range cs {
		go output(c)
	}

	go func() {
		wg.Wait()
		fmt.Println("merge finishes sending")
		close(out)
	}()
	return out
}

func main() {
	done := make(chan struct{})
	defer func() {
		close(done)
		time.Sleep(1 * time.Second) // to mimic the stage is not in main function
	}()

	in := gen(done, 2, 3)

	c1 := sq(done, in)
	c2 := sq(done, in)

	out := merge(done, c1, c2)
	fmt.Println(<-out) // 4 or 9
}

// Output:
// gen finishes sending
// 9
// sq finishes calculating
// sq finishes calculating
// merge finishes sending

Example: MD5 app