Derick
930 words
5 minutes
Go语言教程:并发处理与同步机制

本文将通过一个示例代码,深入浅出地介绍Go语言中的并发处理与同步机制。我们将逐步解析代码,帮助你理解其原理,并举一反三地应用到其他场景中。

示例代码概述#

该示例代码展示了如何使用Go语言的goroutine和通道(channel)来实现并发处理。代码中定义了一个数据处理配置结构体handlerConfig,并通过多个goroutine对缓冲区进行数据的并发写入和读取。

代码解析#

1. 导入必要的包#

import (
	"bytes"
	"errors"
	"fmt"
	"io"
	"log"
	"sync"
	"time"
)

这些包提供了我们需要的基本功能,如输入输出操作、错误处理、日志记录、并发控制等。

2. 定义处理函数类型和配置结构体#

type singleHandler func() (data string, n int, err error)

type handlerConfig struct {
	handler   singleHandler
	goNum     int
	number    int
	interval  time.Duration
	counter   int
	counterMu sync.Mutex
}

  • singleHandler:定义了一个处理函数类型,返回处理的数据、数据长度和错误信息。
  • handlerConfig:定义了处理配置,包括处理函数、goroutine数量、处理次数、间隔时间、计数器和计数器的互斥锁。

3. 计数器方法#

func (hc *handlerConfig) count(increment int) int {
	hc.counterMu.Lock()
	defer hc.counterMu.Unlock()
	hc.counter += increment
	return hc.counter
}

count方法用于安全地增加计数器的值,并返回增加后的计数。使用互斥锁counterMu来确保线程安全。

4. 主函数#

func main() {
	var mu sync.Mutex

	genWriter := func(writer io.Writer) singleHandler {
		return func() (data string, n int, err error) {
			data = fmt.Sprintf("%s\\t", time.Now().Format(time.StampNano))
			mu.Lock()
			defer mu.Unlock()
			n, err = writer.Write([]byte(data))
			return
		}
	}

	genReader := func(reader io.Reader) singleHandler {
		return func() (data string, n int, err error) {
			buffer, ok := reader.(*bytes.Buffer)
			if !ok {
				err = errors.New("unsupported reader")
				return
			}
			mu.Lock()
			defer mu.Unlock()
			data, err = buffer.ReadString('\\t')
			n = len(data)
			return
		}
	}

	var buffer bytes.Buffer

	writingConfig := handlerConfig{
		handler:  genWriter(&buffer),
		goNum:    5,
		number:   4,
		interval: time.Millisecond * 100,
	}

	readingConfig := handlerConfig{
		handler:  genReader(&buffer),
		goNum:    10,
		number:   2,
		interval: time.Millisecond * 100,
	}

	sign := make(chan struct{}, writingConfig.goNum+readingConfig.goNum)

	for i := 1; i <= writingConfig.goNum; i++ {
		go func(i int) {
			defer func() { sign <- struct{}{} }()
			for j := 1; j <= writingConfig.number; j++ {
				time.Sleep(writingConfig.interval)
				data, n, err := writingConfig.handler()
				if err != nil {
					log.Printf("writer [%d-%d]: error: %s", i, j, err)
					continue
				}
				total := writingConfig.count(n)
				log.Printf("writer [%d-%d]: %s (total: %d)", i, j, data, total)
			}
		}(i)
	}

	for i := 1; i <= readingConfig.goNum; i++ {
		go func(i int) {
			defer func() { sign <- struct{}{} }()
			for j := 1; j <= readingConfig.number; j++ {
				time.Sleep(readingConfig.interval)
				var data string
				var n int
				var err error
				for {
					data, n, err = readingConfig.handler()
					if err == nil || err != io.EOF {
						break
					}
					time.Sleep(readingConfig.interval)
				}
				if err != nil {
					log.Printf("reader [%d-%d]: error: %s", i, j, err)
					continue
				}
				total := readingConfig.count(n)
				log.Printf("reader [%d-%d]: %s (total: %d)", i, j, data, total)
			}
		}(i)
	}

	signNumber := writingConfig.goNum + readingConfig.goNum
	for j := 0; j < signNumber; j++ {
		<-sign
	}
}

5. 生成写入和读取函数#

  • genWriter:生成一个写入函数,向缓冲区写入当前时间戳。
  • genReader:生成一个读取函数,从缓冲区读取数据。

6. 配置写入和读取#

  • writingConfig:配置写入操作,包括处理函数、goroutine数量、处理次数和间隔时间。
  • readingConfig:配置读取操作。

7. 启动goroutine进行并发处理#

  • 启动多个goroutine进行数据写入,每个goroutine按照配置的次数和间隔时间进行操作。
  • 启动多个goroutine进行数据读取,处理读取过程中可能出现的EOF错误。

8. 等待所有goroutine完成#

使用通道sign来等待所有goroutine完成操作。

总结#

通过这个示例代码,我们学习了如何在Go语言中使用goroutine和通道来实现并发处理,并通过互斥锁确保数据的线程安全。你可以将这种模式应用到其他需要并发处理的场景中,如并发网络请求、并发文件处理等。

希望这个教程对你有所帮助!如果有任何问题,欢迎随时提问。

Go语言教程:并发处理与同步机制
https://blog.ithuo.net/posts/go-concurrency-and-synchronization-tutorial/
Author
Derick
Published at
2022-05-24