Khoa Pham
30 Apr 2019
•
3 min read
P1 TLDR: keep the state in the message! and if you don’t know what that means yet, keep reading.
Goroutine and channel are the embraced model to do concurrency in Go. Channel syntax abstracts the explicit use of locks and help developer avoid incorrect data flow.
But this post will focus on the tedious side of working with goroutine and channel, how to use them right, and how to organize them into composable and reusable code.
The post will be laddered up in complexity.
Simplest: Fire-and-forget (only apply for async ops with no result collecting). Very easy but don’t forget to set your timer (especially on i/o call)
client := &http.Client{
Timeout: 3 * time.Second,
}
go func() {
client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))
}()
Great… Wait, we don’t need that closure do we:
go client.Post("http://example.com", "application/json", bytes.NewReader([]byte("data")))
Much brevity but DON’T do the above if you make that call in a loop with different arguments, you will need function scope.
Intermediate: Long running goroutines listen to a channel, they all do the same logic - process data and send back result (you may call them workers pool).
type (
work struct {
query string
}
result struct {
answer string
}
)
var (
workc = make(chan work, 4)
resultc = make(chan result, 4)
errc = make(chan error, 4)
)
// Run result collector in background
go func() {
for {
select {
case r := <-resultc:
processResult(r)
case err := <-errc:
log.Println(err)
default:
}
}
}()
// Run 4 workers in background
for i := 0; i < 4; i++ {
go func() {
for {
select {
case w := <- workc:
r, err := processWork(w)
if err != nil {
errc <- err
continue
}
resultc <- r
default:
}
}()
}
Great… Wait do we really need 3 channels? error in Go is just value, we can combine error and result together and reduce it to 2 channels, in and out!
// Combine result and error
type result struct {
answer string
err error
}
// Updated result collector
go func() {
for {
select {
case r := <-resultc:
if r.err != nil {
log.Println(err)
continue
}
processResult(r)
default:
}
}
}()
// Updated workers
for i := 0; i < 4; i++ {
go func() {
for {
select {
case w := <- workc:
resultc <- processWork(w)
default:
}
}()
}
Another intermediate: Long running goroutines listen on multiple channels. Given scenario that a set of workers listen to 2 or more channels to do different kind of work. This is particular common in a lot of codebase, so I think it’s beneficial to point it out.
type (
work1 struct {
query string
}
work2 struct {
command int
}
result1 struct {
answer string
err error
}
result2 struct {
output int
err error
}
)
var (
workc1 = make(chan work1, 4)
workc2 = make(chan work2, 4)
resultc1 = make(chan result1, 4)
resultc2 = make(chan result2, 4)
)
// Result collector listens to 2 result channels
go func() {
for {
select {
case r := <-resultc1:
if r.err != nil {
log.Println(err)
continue
}
processResultType1(r)
case r := <-resultc2
if r.err != nil {
log.Println(err)
continue
}
processResultType2(r)
default:
}
}
}()
// 4 workers listen to 2 working channels
for i := 0; i < 4; i++ {
go func() {
for {
select {
case w := <- workc1
resultc1 <- processWorkType1(w)
case w := <- workc2
resultc2 <- processWorkType2(w)
default:
}
}()
}
Great… Wait that’s 4 channels already, only for 2 types of work! and quite a pattern of repetitive code, imagine we have to do this with 3, 4 or even 10 types of work (we will need 2x number of channels!)
Channel is just a medium for communication between goroutines. A good medium is a stateless one, it shouldn’t care what it carries. To reduce the number of channels (down to a fixed 2 - in and out!) disregarding how many types of work, we can do this:
type work struct {
typ int
query string
command int
err error
}
type result struct {
typ int
answer string
output int
err error
}
or even better, consider in and out data types are just data!
type (
work1 struct {
query string
}
work2 struct {
command int
}
result1 struct {
answer string
}
result2 struct {
output int
}
message struct {
work1
work2
result1
result2
typ int
err error
}
)
The only notice is that we should increase the buffer if the message carry more types, for above example, we can bump the buffer up to guarantee the same throughput (in ideal scenario):
var (
workc = make(chan message, 8)
resultc = make(chan message, 8)
)
// Updated collector logic
go func() {
for {
select {
case msg := <-resultc:
if msg.err != nil {
log.Println(err)
continue
}
switch msg.typ {
case 1:
processResultType1(msg.result1)
case 2:
processResultType2(msg.result2)
}
default:
}
}
}()
// Updated workers logic
for i:= 0; i < 4; i++ {
go func() {
for {
select {
case msg := <- workc:
switch msg.typ {
case 1:
resultc <- processWorkType1(msg.work1)
case 2:
resultc <- processWorkType2(msg.work2)
}
default:
}
}()
}
Level 9000: Complex routing, duplex channel, bouncing messages and how to make them into reusable code.
P1 of this post ends here as it’s running long already. I’m still writing up P2. P2 will cover Level 9000 above :)
Let me know if this could be your favorite goroutine and channel structure in comments below. Happy coding!
Khoa Pham
See other articles by Khoa
Ground Floor, Verse Building, 18 Brunswick Place, London, N1 6DZ
108 E 16th Street, New York, NY 10003
Join over 111,000 others and get access to exclusive content, job opportunities and more!