我的编程空间,编程开发者的网络收藏夹
学习永远不晚

深入理解Golangchannel的应用

短信预约 -IT技能 免费直播动态提醒
省份

北京

  • 北京
  • 上海
  • 天津
  • 重庆
  • 河北
  • 山东
  • 辽宁
  • 黑龙江
  • 吉林
  • 甘肃
  • 青海
  • 河南
  • 江苏
  • 湖北
  • 湖南
  • 江西
  • 浙江
  • 广东
  • 云南
  • 福建
  • 海南
  • 山西
  • 四川
  • 陕西
  • 贵州
  • 安徽
  • 广西
  • 内蒙
  • 西藏
  • 新疆
  • 宁夏
  • 兵团
手机号立即预约

请填写图片验证码后获取短信验证码

看不清楚,换张图片

免费获取短信验证码

深入理解Golangchannel的应用

前言

channel是用于 goroutine 之间的同步、通信的数据结构

channel 的底层是通过 mutex 来控制并发的,但它为程序员提供了更高一层次的抽象,封装了更多的功能,这样并发编程变得更加容易和安全,得以让程序员把注意力留到业务上去,提升开发效率

channel的用途包括但不限于以下几点:

  • 协程间通信,同步
  • 定时任务:和timer结合
  • 解耦生产方和消费方,实现阻塞队列
  • 控制并发数

本文将介绍channel的底层原理,包括数据结构,channel的创建,发送,接收,关闭的实现逻辑

整体结构

Go channel的数据结构如下所示:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

qcount:已经存储了多少个元素

dataqsie:最多存储多少个元素,即缓冲区容量

buf:指向缓冲区的位置,实际上是一个数组

elemsize:每个元素占多大空间

closed:channel能够关闭,这里记录其关闭状态

elemtype:保存数据的类型信息,用于go运行时使用

sendx,recvx:

  • 记录下一个要发送到的位置,下一次从哪里还是接收
  • 这里用数组模拟队列,这两个变量即表示队列的队头,队尾
  • 因此channel的缓冲也被称为环形缓冲区

recvq,sendq:

当发送个接收不能立即完成时,需要让协程在channel上等待,所以有两个等待队列,分别针对接收和发送

lock:channel支持协程间并发访问,因此需要一把锁来保护

创建

创建channel会被编译器编译为调用makechan函数

// 无缓冲通道
ch1 := make(chan int)
// 有缓冲通道
ch2 := make(chan int, 10)

会根据创建的是带缓存,还是无缓冲,决定第二个参数size的值

可以看出,创建出来的是hchan指针,这样就能在函数间直接传递 channel,而不用传递 channel 的指针

func makechan(t *chantype, size int) *hchan {
   elem := t.elem
    
   // mem:缓冲区大小 
   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError( "makechan: size out of range" ))
   }

   var c *hchan
   switch {
   // 缓冲区大小为空,只申请hchanSize大小的内存
   case mem == 0:
       c = (*hchan)(mallocgc(hchanSize, nil, true))
       c.buf = c.raceaddr()
   // 元素类型不包含指针,一次性分配hchanSize+mem大小的内存
   case elem.ptrdata == 0:
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       c.buf = add(unsafe.Pointer(c), hchanSize)
   // 否则就是带缓存,且有指针,分配两次内存
   default:
      // Elements contain pointers.
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
   }
    
   // 保存元素类型,元素大小,容量
   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)
   
   return c
}

发送

执行以下代码时:

ch <- 3

编译器会转化为对chansend的调用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 如果channel是空
   if c == nil {
      // 非阻塞,直接返回
      if !block {
         return  false
      }
      // 否则阻塞当前协程
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,没有关闭,且容量满了,无法发送,直接返回
   if !block && c.closed == 0 && full(c) {
      return  false
   }

   // 加锁
   lock(&c.lock)

   // 如果已经关闭,无法发送,直接panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "send on closed channel" ))
   }

   // 从接收队列弹出一个协程的包装结构sudog
   if sg := c.recvq.dequeue(); sg != nil {
      // 如果能弹出,即有等到接收的协程,说明:
      // 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
      // 将要发送的数据拷贝到该协程的接收指针上
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true
}

   // 缓冲区还有空间
   if c.qcount < c.dataqsiz {
      // qp:计算要发送到的位置的地址
      qp := chanbuf(c, c.sendx)
      // 将数据从ep拷贝到qp
      typedmemmove(c.elemtype, qp, ep)
      // 待发送位置移动
      c.sendx++
      // 由于是数组模拟队列,sendx到顶了需要归零
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 缓冲区数量++
      c.qcount++
      unlock(&c.lock)
      return  true
}

   // 往下就是缓冲区无数据,也没有等到接收协程的情况了
   
   // 如果是非阻塞模式,直接返回
   if !block {
      unlock(&c.lock)
      return  false
    }

   // 将当前协程包装成sudog,阻塞到channel上
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
  
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   
   // 当前协程进入发送等待队列
   c.sendq.enqueue(mysg)
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   
 // 被唤醒后从这里开始执行
   
   KeepAlive(ep)

   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   // 被唤醒后发现channel关闭了,panic
   if closed {
      if c.closed == 0 {
         throw( "chansend: spurious wakeup" )
      }
      panic(plainError( "send on closed channel" ))
   }
   return  true
}

整体流程为:

如果当前操作为非阻塞,channel没有关闭,且容量满了,无法发送,直接返回

从接收队列弹出一个协程的包装结构sudog,如果能弹出,即有等到接收的协程,说明:

  • 该channel要么是无缓冲,要么缓冲区为空,不然不可能有协程在等待
  • 将要发送的数据拷贝到该协程的接收指针上,返回
  • 这里直接从发送者拷贝到接收者的内存,而不是先把数据拷贝到缓冲区,再从缓冲区拷贝到接收者,节约了一次内存拷贝

否则看看缓冲区还有空间,如果有,将数据拷贝到缓冲区上,也返回

接下来就是既没有接收者等待,缓冲区也为空的情况,就需要将当前协程包装成sudog,阻塞到channel上

将协程阻塞到channel的等待队列时,将其包装成了sudog结构:

type sudog struct {
   // 协程
   g *g
   // 前一个,后一个指针
   next *sudog
   prev *sudog
   // 等到发送的数据在哪,等待从哪个位置接收数据
   elem unsafe.Pointer
   acquiretime int64
   releasetime int64
   ticket      uint32
   isSelect bool
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   // 在哪个channel上等待
   c        *hchan // channel
}

其目的是:

  • g本身没有存储前一个,后一个指针,需要用sudog结构包装才能加入队列
  • elem字段存储等到发送的数据在哪,等待从哪个位置接收数据,用于从数据能从协程到协程的直接拷贝

来看看一些子函数:

1.判断channel是否是满的

func full(c *hchan) bool {
   // 无缓冲
   if c.dataqsiz == 0 {
      // 并且没有其他协程在等待
      return c.recvq.first == nil
   }
   // 有缓冲,但容量装满了
   return c.qcount == c.dataqsiz
}

2.send方法:


func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果接收者指针不为空,直接把数据从ep拷贝到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒该接收者协程
   goready(gp, skip+1)
}

接收

从channel中接收数据有几种写法:

  • 带不带ok
  • 接不接收返回值

根据带不带ok,决定用下面哪个方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
}

根据接不接收返回值,决定elem是不是nil

最终都会调用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 如果channel为nil,根据参数中是否阻塞来决定是否阻塞
   if c == nil {
      if !block {
         return
   }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,并且channel为空
   if !block && empty(c) {
      // 如果还没关闭,直接返回
   if atomic.Load(&c.closed) == 0 {
      return
   }
      // 否则已经关闭,
      // 如果为空,返回该类型的零值
   if empty(c) {
     if ep != nil {
        typedmemclr(c.elemtype, ep)
     }
     return  true, false
       }
   }

   lock(&c.lock)
   
   // 同样,如果channel已经关闭,且缓冲区没有元素,返回该类型零值
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return  true, false
}
    
   // 如果有发送者正在阻塞,说明:
   // 1.无缓冲
   // 2.有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
   if sg := c.sendq.dequeue(); sg != nil {
      // 将数据从缓冲区拷贝到ep,再将sg的数据拷贝到缓冲区,该函数详细流程可看下文
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true, true
}
    
   // 如果缓存区有数据, 
   if c.qcount > 0 {
      // qp为缓冲区中下一次接收的位置
      qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return  true, true
}

   // 接下来就是既没有发送者在等待,也缓冲区也没数据
   if !block {
      unlock(&c.lock)
      return  false, false
}

   // 将当前协程包装成sudog,阻塞到channel中
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // 记录接收地址
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)

   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)

   // 从这里唤醒
   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   success := mysg.success
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return  true, success
}

接收流程如为:

如果channel为nil,根据参数中是否阻塞来决定是否阻塞

如果channel已经关闭,且缓冲区没有元素,返回该类型零值

如果有发送者正在阻塞,说明:

  • 要么是无缓冲
  • 有缓冲,但缓冲区满了。因为只有缓冲区满了,才可能有发送者在等待
  • 将数据从缓冲区拷贝到ep,再将发送者的数据拷贝到缓冲区,并唤该发送者

如果缓存区有数据, 则从缓冲区将数据复制到ep,返回

接下来就是既没有发送者在等待,也缓冲区也没数据的情况:

将当前协程包装成sudog,阻塞到channel中

来看其中的子函数recv():


func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是无缓冲channel,直接将数据从发送者sg拷贝到ep
   if c.dataqsiz == 0 {
      if ep != nil {
         recvDirect(c.elemtype, sg, ep)
      }
   // 接下来是有缓冲,且缓冲区满的情况   
   } else {
      // qp为channel缓冲区中,接收者下一次接收的地址
   qp := chanbuf(c, c.recvx)
      // 将数据从qp拷贝到ep
   if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
    }
    // 将发送者的数据从sg.elem拷贝到qp
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
       c.recvx = 0
    }
    // 由于一接收已发送,缓冲区还是满的,因此 c.sendx = c.recvx
    c.sendx = c.recvx 
}
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 唤醒发送者
   goready(gp, skip+1)
}

关闭

func closechan(c *hchan) {
   // 不能关闭空channel
   if c == nil {
      panic(plainError( "close of nil channel" ))
   }

   lock(&c.lock)
   // 不能重复关闭
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "close of closed channel" ))
   }

   // 修改关闭状态
   c.closed = 1

   var glist gList

   // 释放所有的接收者协程,并为它们赋予零值
 for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }

   // 释放所有的发送者协程
 for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
     }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }
   unlock(&c.lock)

   // 执行唤醒操作
 for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

关闭的流程比较简单,可以看出:

不能关闭空channel,不能重复关闭channel

先上一把大锁,接着把所有挂在这个 channel 上的 sender 和 receiver 全都连成一个 sudog 链表,再解锁。最后,再将所有的 sudog 全都唤醒:

接收者:会收到该类型的零值

这里返回零值没有问题,因为之所以这些接收者会阻塞,就是因为缓冲区没有数据,因此channel关闭后该接收者收到零值也符合逻辑

发送者:会被唤醒,然后panic

因此不能在有多个sender的时候贸然关闭channel

以上就是深入理解Golang channel的应用的详细内容,更多关于Golang channel的资料请关注编程网其它相关文章!

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

深入理解Golangchannel的应用

下载Word文档到电脑,方便收藏和打印~

下载Word文档

猜你喜欢

Spring中Controller应用深入理解

这篇文章主要介绍了Spring项目中的Controller,SpringController本身也是一个SpringBean,只是它多提供了Web能力,只需要造类上提供@Controller注解即可
2022-12-08

提升Golang应用性能:深入理解Context的应用

理解Context在提升Golang应用性能中的作用Context在Golang中可为高并发分布式系统管理请求生命周期、传播上下文信息,从而提升应用性能。它包含请求截止时间、取消信号和元数据,优化请求处理。Context的优点包括:生命周期管理:防止长时间任务耗尽资源。取消传播:释放不必要的资源。上下文共享:减少复制和函数调用。并行执行:提高吞吐量和响应能力。错误处理:简化错误传播。遵循最佳实践,例如使用命名键值对、限制大小、使用副本和取消Context,以有效利用Context。
提升Golang应用性能:深入理解Context的应用
2024-04-23

深入理解 PHP Session 跨域的应用场景

引言:在 Web 开发中,会经常遇到需要在不同域名下进行数据共享的场景。而 PHP Session 是一种常用的实现方式,用于在不同页面之间传递用户会话数据。然而,由于浏览器的同源策略,Session 数据在跨域情况下的传递会受到限制。本文
2023-10-21

深入理解Vue3响应式原理

响应式就是当对象本身(对象的增删值)或者对象属性(重新赋值)发生变化时,将会运行一些函数,最常见的就是render函数,下面这篇文章主要给大家介绍了关于Vue3响应式原理的相关资料,需要的朋友可以参考下
2022-12-19

深入解析Golang中锁的原理和应用

Golang中锁的原理及应用解析引言在并发编程中,常常会遇到多个 goroutine 同时访问共享资源的情况,一旦多个 goroutine 同时对共享资源进行读写操作,可能导致数据不一致性或者产生竞态条件。为了解决这个问题,Golang 提
深入解析Golang中锁的原理和应用
2023-12-28

深入理解粘性定位的应用和功能

粘性定位是一种在网页设计中常用的技术,它能够使网页元素保持在页面的固定位置,即使用户滚动页面时也不会发生改变。粘性定位具有很强的功能性和实用性,在网页设计和用户体验中发挥着重要作用。本文将探讨粘性定位的功能和应用。一、功能固定导航栏:粘性
深入理解粘性定位的应用和功能
2024-01-29

深入理解MySQL存储过程的应用场景

深入理解MySQL存储过程的应用场景MySQL是一种常用的关系型数据库管理系统,广泛应用于各种Web应用和企业信息系统中。存储过程是MySQL中一种重要的数据库对象,它是一组预先编译好的SQL语句和数据处理逻辑的集合,可以被多次调用和重复
深入理解MySQL存储过程的应用场景
2024-03-14

深入理解z-index的工作原理和应用技巧

z-index是CSS中控制元素层级的属性,但其工作原理并不简单。本文将深入探讨z-index的工作原理,包括堆叠上下文、层叠顺序和层叠上下文等概念,并介绍z-index的应用技巧,如如何避免z-index的陷阱、如何使用z-index实现复杂布局等。
2023-05-19

编程热搜

  • Python 学习之路 - Python
    一、安装Python34Windows在Python官网(https://www.python.org/downloads/)下载安装包并安装。Python的默认安装路径是:C:\Python34配置环境变量:【右键计算机】--》【属性】-
    Python 学习之路 - Python
  • chatgpt的中文全称是什么
    chatgpt的中文全称是生成型预训练变换模型。ChatGPT是什么ChatGPT是美国人工智能研究实验室OpenAI开发的一种全新聊天机器人模型,它能够通过学习和理解人类的语言来进行对话,还能根据聊天的上下文进行互动,并协助人类完成一系列
    chatgpt的中文全称是什么
  • C/C++中extern函数使用详解
  • C/C++可变参数的使用
    可变参数的使用方法远远不止以下几种,不过在C,C++中使用可变参数时要小心,在使用printf()等函数时传入的参数个数一定不能比前面的格式化字符串中的’%’符号个数少,否则会产生访问越界,运气不好的话还会导致程序崩溃
    C/C++可变参数的使用
  • css样式文件该放在哪里
  • php中数组下标必须是连续的吗
  • Python 3 教程
    Python 3 教程 Python 的 3.0 版本,常被称为 Python 3000,或简称 Py3k。相对于 Python 的早期版本,这是一个较大的升级。为了不带入过多的累赘,Python 3.0 在设计的时候没有考虑向下兼容。 Python
    Python 3 教程
  • Python pip包管理
    一、前言    在Python中, 安装第三方模块是通过 setuptools 这个工具完成的。 Python有两个封装了 setuptools的包管理工具: easy_install  和  pip , 目前官方推荐使用 pip。    
    Python pip包管理
  • ubuntu如何重新编译内核
  • 改善Java代码之慎用java动态编译

目录