我的编程空间,编程开发者的网络收藏夹
学习永远不晚

Go语言实现的可读性更高的并发神库详解

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

Go语言实现的可读性更高的并发神库详解

前言

前几天逛github发现了一个有趣的并发库-conc,其目标是:

  • 更难出现goroutine泄漏
  • 处理panic更友好
  • 并发代码可读性高

从简介上看主要封装功能如下:

  • waitGroup进行封装,避免了产生大量重复代码,并且也封装recover,安全性更高
  • 提供panics.Catcher封装recover逻辑,统一捕获panic,打印调用栈一些信息
  • 提供一个并发执行任务的worker池,可以控制并发度、goroutine可以进行复用,支持函数签名,同时提供了stream方法来保证结果有序
  • 提供ForEachmap方法优雅的处理切片

接下来就区分模块来介绍一下这个库;

仓库地址:github.com/sourcegraph…

WaitGroup的封装

Go语言标准库有提供sync.waitGroup控制等待goroutine,我们一般会写出如下代码:

func main(){
    var wg sync.WaitGroup
    for i:=0; i < 10; i++{
        wg.Add(1)
        go func() {
            defer wg.Done()
            defer func() {
                // recover panic
                err := recover()
                if err != nil {
                    fmt.Println(err)
                }
            }
            // do something
            handle()
        }
    }
    wg.Wait()
}

上述代码我们需要些一堆重复代码,并且需要单独在每一个func中处理recover逻辑,所以conc库对其进行了封装,代码简化如下:

func main() {
	wg := conc.NewWaitGroup()
	for i := 0; i &lt; 10; i++ {
		wg.Go(doSomething)
	}
	wg.Wait()
}
func doSomething() {
	fmt.Println("test")
}

conc库封装也比较简单,结构如下:

type WaitGroup struct {
	wg sync.WaitGroup
	pc panics.Catcher
}

其自己实现了Catcher类型对recover逻辑进行了封装,封装思路如下:

type Catcher struct {
	recovered atomic.Pointer[RecoveredPanic]
}

recovered是原子指针类型,RecoveredPanic是捕获的recover封装,封装了堆栈等信息:

type RecoveredPanic struct {
	// The original value of the panic.
	Value any
	// The caller list as returned by runtime.Callers when the panic was
	// recovered. Can be used to produce a more detailed stack information with
	// runtime.CallersFrames.
	Callers []uintptr
	// The formatted stacktrace from the goroutine where the panic was recovered.
	// Easier to use than Callers.
	Stack []byte
}

提供了Try方法执行方法,只会记录第一个panic的goroutine信息:

func (p *Catcher) Try(f func()) {
	defer p.tryRecover()
	f()
}
func (p *Catcher) tryRecover() {
	if val := recover(); val != nil {
		rp := NewRecoveredPanic(1, val)
        // 只会记录第一个panic的goroutine信息
		p.recovered.CompareAndSwap(nil, &amp;rp)
	}
}

提供了Repanic()方法用来重放捕获的panic:

func (p *Catcher) Repanic() {
	if val := p.Recovered(); val != nil {
		panic(val)
	}
}
func (p *Catcher) Recovered() *RecoveredPanic {
	return p.recovered.Load()
}

waitGroup对此也分别提供了Wait()WaitAndRecover()方法:

func (h *WaitGroup) Wait() {
	h.wg.Wait()
	// Propagate a panic if we caught one from a child goroutine.
	h.pc.Repanic()
}
func (h *WaitGroup) WaitAndRecover() *panics.RecoveredPanic {
	h.wg.Wait()
	// Return a recovered panic if we caught one from a child goroutine.
	return h.pc.Recovered()
}

wait方法只要有一个goroutine发生panic就会向上抛出panic,比较简单粗暴;

waitAndRecover方法只有有一个goroutine发生panic就会返回第一个recover的goroutine信息;

总结:conc库对waitGrouop的封装总体是比较不错的,可以减少重复的代码;

worker池

conc提供了几种类型的worker池:

  • ContextPool:可以传递context的pool,若有goroutine发生错误可以cancel其他goroutine
  • ErrorPool:通过参数可以控制只收集第一个error还是所有error
  • ResultContextPool:若有goroutine发生错误会cancel其他goroutine并且收集错误
  • RestultPool:收集work池中每个任务的执行结果,并不能保证顺序,保证顺序需要使用stream或者iter.map;

我们来看一个简单的例子:

import "github.com/sourcegraph/conc/pool"
func ExampleContextPool_WithCancelOnError() {
	p := pool.New().
		WithMaxGoroutines(4).
		WithContext(context.Background()).
		WithCancelOnError()
	for i := 0; i < 3; i++ {
		i := i
		p.Go(func(ctx context.Context) error {
			if i == 2 {
				return errors.New("I will cancel all other tasks!")
			}
			<-ctx.Done()
			return nil
		})
	}
	err := p.Wait()
	fmt.Println(err)
	// Output:
	// I will cancel all other tasks!
}

在创建pool时有如下方法可以调用:

  • p.WithMaxGoroutines()配置pool中goroutine的最大数量
  • p.WithErrors:配置pool中的task是否返回error
  • p.WithContext(ctx):配置pool中运行的task当遇到第一个error要取消
  • p.WithFirstError:配置pool中的task只返回第一个error
  • p.WithCollectErrored:配置pool的task收集所有error

pool的基础结构如下:

type Pool struct {
	handle   conc.WaitGroup
	limiter  limiter
	tasks    chan func()
	initOnce sync.Once
}

limiter是控制器,用chan来控制goroutine的数量:

type limiter chan struct{}
func (l limiter) limit() int {
	return cap(l)
}
func (l limiter) release() {
	if l != nil {
		&lt;-l
	}
}

pool的核心逻辑也比较简单,如果没有设置limiter,那么就看有没有空闲的worker,否则就创建一个新的worker,然后投递任务进去;

如果设置了limiter,达到了limiter worker数量上限,就把任务投递给空闲的worker,没有空闲就阻塞等着;

func (p *Pool) Go(f func()) {
	p.init()
	if p.limiter == nil {
		// 没有限制
		select {
		case p.tasks <- f:
			// A goroutine was available to handle the task.
		default:
			// No goroutine was available to handle the task.
			// Spawn a new one and send it the task.
			p.handle.Go(p.worker)
			p.tasks <- f
		}
	} else {
		select {
		case p.limiter <- struct{}{}:
			// If we are below our limit, spawn a new worker rather
			// than waiting for one to become available.
			p.handle.Go(p.worker)
			// We know there is at least one worker running, so wait
			// for it to become available. This ensures we never spawn
			// more workers than the number of tasks.
			p.tasks <- f
		case p.tasks <- f:
			// A worker is available and has accepted the task.
			return
		}
	}
}

这里work使用的是一个无缓冲的channel,这种复用方式很巧妙,如果goroutine执行很快避免创建过多的goroutine;

使用pool处理任务不能保证有序性,conc库又提供了Stream方法,返回结果可以保持顺序;

Stream

Steam的实现也是依赖于pool,在此基础上做了封装保证结果的顺序性,先看一个例子:

func ExampleStream() {
	times := []int{20, 52, 16, 45, 4, 80}
	stream := stream2.New()
	for _, millis := range times {
		dur := time.Duration(millis) * time.Millisecond
		stream.Go(func() stream2.Callback {
			time.Sleep(dur)
			// This will print in the order the tasks were submitted
			return func() { fmt.Println(dur) }
		})
	}
	stream.Wait()
	// Output:
	// 20ms
	// 52ms
	// 16ms
	// 45ms
	// 4ms
	// 80ms
}

stream的结构如下:

type Stream struct {
	pool             pool.Pool
	callbackerHandle conc.WaitGroup
	queue            chan callbackCh
	initOnce sync.Once
}

queue是一个channel类型,callbackCh也是channel类型 - chan func():

type callbackCh chan func()

在提交goroutine时按照顺序生成callbackCh传递结果:

func (s *Stream) Go(f Task) {
	s.init()
	// Get a channel from the cache.
	ch := getCh()
	// Queue the channel for the callbacker.
	s.queue <- ch
	// Submit the task for execution.
	s.pool.Go(func() {
		defer func() {
			// In the case of a panic from f, we don't want the callbacker to
			// starve waiting for a callback from this channel, so give it an
			// empty callback.
			if r := recover(); r != nil {
				ch <- func() {}
				panic(r)
			}
		}()
		// Run the task, sending its callback down this task's channel.
		callback := f()
		ch <- callback
	})
}
var callbackChPool = sync.Pool{
	New: func() any {
		return make(callbackCh, 1)
	},
}
func getCh() callbackCh {
	return callbackChPool.Get().(callbackCh)
}
func putCh(ch callbackCh) {
	callbackChPool.Put(ch)
}

ForEach和map

ForEach

conc库提供了ForEach方法可以优雅的并发处理切片,看一下官方的例子:

conc库使用泛型进行了封装,我们只需要关注handle代码即可,避免冗余代码,我们自己动手写一个例子:

func main() {
	input := []int{1, 2, 3, 4}
	iterator := iter.Iterator[int]{
		MaxGoroutines: len(input) / 2,
	}
	iterator.ForEach(input, func(v *int) {
		if *v%2 != 0 {
			*v = -1
		}
	})
	fmt.Println(input)
}

ForEach内部实现为Iterator结构及核心逻辑如下:

type Iterator[T any] struct {
	MaxGoroutines int
}
func (iter Iterator[T]) ForEachIdx(input []T, f func(int, *T)) {
	if iter.MaxGoroutines == 0 {
		// iter is a value receiver and is hence safe to mutate
		iter.MaxGoroutines = defaultMaxGoroutines()
	}
	numInput := len(input)
	if iter.MaxGoroutines > numInput {
		// No more concurrent tasks than the number of input items.
		iter.MaxGoroutines = numInput
	}
	var idx atomic.Int64
	// 通过atomic控制仅创建一个闭包
	task := func() {
		i := int(idx.Add(1) - 1)
		for ; i < numInput; i = int(idx.Add(1) - 1) {
			f(i, &input[i])
		}
	}
	var wg conc.WaitGroup
	for i := 0; i < iter.MaxGoroutines; i++ {
		wg.Go(task)
	}
	wg.Wait()
}

可以设置并发的goroutine数量,默认取的是GOMAXPROCS ,也可以自定义传参;

并发执行这块设计的很巧妙,仅创建了一个闭包,通过atomic控制idx,避免频繁触发GC;

map

conc库提供的map方法可以得到对切片中元素结果,官方例子:

使用map可以提高代码的可读性,并且减少了冗余代码,自己写个例子:

func main() {
	input := []int{1, 2, 3, 4}
	mapper := iter.Mapper[int, bool]{
		MaxGoroutines: len(input) / 2,
	}
	results := mapper.Map(input, func(v *int) bool { return *v%2 == 0 })
	fmt.Println(results)
	// Output:
	// [false true false true]
}

map的实现也依赖于Iterator,也是调用的ForEachIdx方法,区别于ForEach是记录处理结果;

总结

花了小半天时间看了一下这个库,很多设计点值得我们学习,总结一下我学习到的知识点:

  • conc.WatiGroup对Sync.WaitGroup进行了封装,对Add、Done、Recover进行了封装,提高了可读性,避免了冗余代码
  • ForEach、Map方法可以更优雅的并发处理切片,代码简洁易读,在实现上Iterator中的并发处理使用atomic来控制只创建一个闭包,避免了GC性能问题
  • pool是一个并发的协程队列,可以控制协程的数量,实现上也很巧妙,使用一个无缓冲的channel作为worker,如果goroutine执行速度快,避免了创建多个goroutine
  • stream是一个保证顺序的并发协程队列,实现上也很巧妙,使用sync.Pool在提交goroutine时控制顺序,值得我们学习;

以上就是Go语言实现的可读性更高的并发神库详解的详细内容,更多关于Go语言可读性并发库的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

Go语言实现的可读性更高的并发神库详解

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Go语言实现的可读性更高的并发神库详解

这篇文章主要为大家介绍了Go语言实现的可读性更高的并发神库详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
2023-01-31

一文带你了解Go语言实现的并发神库conc

前几天逛github发现了一个有趣的并发库-conc,这篇文章将为大家详细介绍一下这个库的实现,文中的示例代码讲解详细,感兴趣的可以了解一下
2023-01-31

Go 语言实现高并发与高可用的远程请求

本篇文章给大家分享《Go 语言实现高并发与高可用的远程请求》,覆盖了Golang的常见基础知识,其实一个语言的全部知识点一篇文章是不可能说完的,但希望通过这些问题,让读者对自己的掌握程度有一定的认识(B 数),从而弥补自己的不足,更好的掌握
Go 语言实现高并发与高可用的远程请求
2024-04-04

Go语言实现JSON解析的神器详解

php转go是大趋势,越来越多公司的php服务都在用go进行重构,重构过程中,会发现php的json解析操作是真的香。本文和大家分享了一个Go语言实现JSON解析的神器,希望对大家有所帮助
2023-01-29

go语言的高级并发模式怎么实现

Go语言的高级并发模式可以通过以下几种方式实现:基于通道的并发模式:Go语言通过通道(Channel)来实现并发的通信和同步。可以通过创建多个goroutine,并使用通道来传递数据和进行同步操作,以实现高级的并发模式。例如,可以使用无缓冲
2023-10-27

Go语言通过WaitGroup实现控制并发的示例详解

Channel能够很好的帮助我们控制并发,但是在开发习惯上与显示的表达不太相同,所以在Go语言中可以利用sync包中的WaitGroup实现并发控制,本文就来和大家详细聊聊WaitGroup如何实现控制并发
2023-01-30

通过golang实现高可用性的Select Channels Go并发式编程

在Go语言中,可以使用select语句和通道(channel)来实现高可用性的并发编程。首先,我们定义一个用于处理任务的工作器函数:```gofunc worker(id int, tasks for task := range tasks
2023-10-10

如何在go语言中实现高并发的消息中间件

在Go语言中,可以使用一些开源的消息中间件来实现高并发的消息处理。以下是一些常用的消息中间件和它们在Go语言中的使用方法:1. RabbitMQ:RabbitMQ是一个可靠的、高度可扩展的开源消息中间件。在Go语言中,可以使用RabbitM
2023-10-12

使用Go语言开发实现高性能的语音识别应用

随着科技的不断发展,语音识别技术也得到了长足的进步和应用。语音识别应用广泛运用在语音助手、智能音箱、虚拟现实等领域,为人们提供了更加便捷和智能的交互方式。而如何实现高性能的语音识别应用,则成为了一个值得探讨的问题。近年来,Go语言作为一种高
使用Go语言开发实现高性能的语音识别应用
2023-11-20

在golang中实现高可扩展性的Select Channels Go并发式编程

在Golang中实现高可扩展性的select channels并发编程,可以按照以下步骤进行:1. 定义一个输入和输出的通道,用于在不同的goroutine之间传递数据。goinputChan := make(chan int)output
2023-10-20

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录