Real-world patterns exercise with answers: https://github.com/eannchen/go-concurrency-exercises
A pipeline is a series of stages connected by channels, where each stage is a group of goroutines running the same function.
In each stage (goroutines):
The first stage → is also called the source or producer
The last stage → is also called the sink or consumer
An example without explicit cancellation
// First stage
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer func() {
fmt.Println("gen finishes sending")
close(out)
}()
for _, n := range nums {
out <- n
}
}()
return out
}
// Second stage
// When `gen` closes outbound channel,
// it **indicates** the downstreams' range loop **when** to finish looping.
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer func() {
fmt.Println("sq finishes calculating")
close(out)
}()
for n := range in {
time.Sleep(1 * time.Second) // heavy work
out <- n * n
}
}()
return out
}
// Last stage
func main() {
for n := range sq(gen(2, 3)) {
fmt.Println(n)
}
}
// output:
// gen finishes sending
// 4
// sq finishes calculating
// 9
// ... Same code before
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
// Fan-in:
// Converts a list of channels to a single outbound channel
// by starting a goroutine for each inbound channel.
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
// Fan-out:
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
for n := range merge(c1, c2) {
fmt.Println(n)
}
}
If downstream stops receiving values from upstreams inbound channels proactively (due to their logic or error handling), instead of upstreams closing outbound channels → It could cause upstreams memory leak.
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer func() {
fmt.Println("gen finishes sending")
close(out)
}()
for _, n := range nums {
// blocks until someone downstreams read it
out <- n
}
}()
return out
}
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer func() {
fmt.Println("sq finishes calculating")
close(out)
}()
for n := range in {
time.Sleep(1 * time.Second) // heavy work
// blocks until someone downstreams read it
out <- n * n
}
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
// blocks until someone downstreams read it
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
fmt.Println("merge finishes sending")
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
c1 := sq(in)
c2 := sq(in)
out := merge(c1, c2)
// Receive once manually,
// without using a range loop detecting when to stop receiving.
fmt.Println(<-out) // 4 or 9
}
// Output:
// gen finishes sending
// sq finishes calculating
// 9
// See from the output, we can see where gets stuck (memory leak),
// `merge` and one of the `sq` get memory leak.
func gen(done <-chan struct{}, nums ...int) <-chan int {
out := make(chan int)
go func() {
defer func() {
fmt.Println("gen finishes sending")
close(out)
}()
for _, n := range nums {
select {
case out <- n:
case <-done:
return
}
}
}()
return out
}
func sq(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer func() {
fmt.Println("sq finishes calculating")
close(out)
}()
for n := range in {
time.Sleep(1 * time.Second)
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
fmt.Println("merge finishes sending")
close(out)
}()
return out
}
func main() {
done := make(chan struct{})
defer func() {
close(done)
time.Sleep(1 * time.Second) // to mimic the stage is not in main function
}()
in := gen(done, 2, 3)
c1 := sq(done, in)
c2 := sq(done, in)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
}
// Output:
// gen finishes sending
// 9
// sq finishes calculating
// sq finishes calculating
// merge finishes sending