927 words
5 minutes

Go语言教程:并发处理与同步机制

本文将通过一个示例代码,深入浅出地介绍Go语言中的并发处理与同步机制。我们将逐步解析代码,帮助你理解其原理,并举一反三地应用到其他场景中。

示例代码概述#

该示例代码展示了如何使用Go语言的goroutine和通道(channel)来实现并发处理。代码中定义了一个数据处理配置结构体handlerConfig,并通过多个goroutine对缓冲区进行数据的并发写入和读取。

代码解析#

1. 导入必要的包#

import (
"bytes"
"errors"
"fmt"
"io"
"log"
"sync"
"time"
)

这些包提供了我们需要的基本功能,如输入输出操作、错误处理、日志记录、并发控制等。

2. 定义处理函数类型和配置结构体#

type singleHandler func() (data string, n int, err error)
type handlerConfig struct {
handler singleHandler
goNum int
number int
interval time.Duration
counter int
counterMu sync.Mutex
}
  • singleHandler:定义了一个处理函数类型,返回处理的数据、数据长度和错误信息。
  • handlerConfig:定义了处理配置,包括处理函数、goroutine数量、处理次数、间隔时间、计数器和计数器的互斥锁。

3. 计数器方法#

func (hc *handlerConfig) count(increment int) int {
hc.counterMu.Lock()
defer hc.counterMu.Unlock()
hc.counter += increment
return hc.counter
}

count方法用于安全地增加计数器的值,并返回增加后的计数。使用互斥锁counterMu来确保线程安全。

4. 主函数#

func main() {
var mu sync.Mutex
genWriter := func(writer io.Writer) singleHandler {
return func() (data string, n int, err error) {
data = fmt.Sprintf("%s\\t", time.Now().Format(time.StampNano))
mu.Lock()
defer mu.Unlock()
n, err = writer.Write([]byte(data))
return
}
}
genReader := func(reader io.Reader) singleHandler {
return func() (data string, n int, err error) {
buffer, ok := reader.(*bytes.Buffer)
if !ok {
err = errors.New("unsupported reader")
return
}
mu.Lock()
defer mu.Unlock()
data, err = buffer.ReadString('\\t')
n = len(data)
return
}
}
var buffer bytes.Buffer
writingConfig := handlerConfig{
handler: genWriter(&buffer),
goNum: 5,
number: 4,
interval: time.Millisecond * 100,
}
readingConfig := handlerConfig{
handler: genReader(&buffer),
goNum: 10,
number: 2,
interval: time.Millisecond * 100,
}
sign := make(chan struct{}, writingConfig.goNum+readingConfig.goNum)
for i := 1; i <= writingConfig.goNum; i++ {
go func(i int) {
defer func() { sign <- struct{}{} }()
for j := 1; j <= writingConfig.number; j++ {
time.Sleep(writingConfig.interval)
data, n, err := writingConfig.handler()
if err != nil {
log.Printf("writer [%d-%d]: error: %s", i, j, err)
continue
}
total := writingConfig.count(n)
log.Printf("writer [%d-%d]: %s (total: %d)", i, j, data, total)
}
}(i)
}
for i := 1; i <= readingConfig.goNum; i++ {
go func(i int) {
defer func() { sign <- struct{}{} }()
for j := 1; j <= readingConfig.number; j++ {
time.Sleep(readingConfig.interval)
var data string
var n int
var err error
for {
data, n, err = readingConfig.handler()
if err == nil || err != io.EOF {
break
}
time.Sleep(readingConfig.interval)
}
if err != nil {
log.Printf("reader [%d-%d]: error: %s", i, j, err)
continue
}
total := readingConfig.count(n)
log.Printf("reader [%d-%d]: %s (total: %d)", i, j, data, total)
}
}(i)
}
signNumber := writingConfig.goNum + readingConfig.goNum
for j := 0; j < signNumber; j++ {
<-sign
}
}

5. 生成写入和读取函数#

  • genWriter:生成一个写入函数,向缓冲区写入当前时间戳。
  • genReader:生成一个读取函数,从缓冲区读取数据。

6. 配置写入和读取#

  • writingConfig:配置写入操作,包括处理函数、goroutine数量、处理次数和间隔时间。
  • readingConfig:配置读取操作。

7. 启动goroutine进行并发处理#

  • 启动多个goroutine进行数据写入,每个goroutine按照配置的次数和间隔时间进行操作。
  • 启动多个goroutine进行数据读取,处理读取过程中可能出现的EOF错误。

8. 等待所有goroutine完成#

使用通道sign来等待所有goroutine完成操作。

总结#

通过这个示例代码,我们学习了如何在Go语言中使用goroutine和通道来实现并发处理,并通过互斥锁确保数据的线程安全。你可以将这种模式应用到其他需要并发处理的场景中,如并发网络请求、并发文件处理等。

希望这个教程对你有所帮助!如果有任何问题,欢迎随时提问。

Share

If this article helped you, please share it with others!

Go语言教程:并发处理与同步机制
https://blog.ithuo.net/posts/go-concurrency-and-synchronization-tutorial/
Author
Derick
Published at
2022-05-24
License
CC BY-NC-SA 4.0
Last updated on 2022-05-24,1325 days ago

Some content may be outdated

Table of Contents