乘风原创程序

  • Go并发4种方法简明讲解
  • 2022/4/6 14:34:42
  • 一、goroutine

    1、协程(coroutine)

    golang 在语言层面对并发编程进行了支持,使用了一种协程(goroutine)机制,

    协程本质上是一种用户态线程,不需要操作系统来进行抢占式调度,但是又寄生于线程中,因此系统开销极小,可以有效的提高线程的任务并发性,而避免多线程的缺点。但是协程需要语言上的支持,需要用户自己实现调度器,因为在go语言中,实现了调度器所以我们可以很方便的能过 go关键字来使用协程。

    func main() {
    	for i := 0; i <10; i++ {
    		go func(i int) {
    			for  {
    				fmt.printf("hello goroutine %d\n",i)
    			}
    		}(i)
    	}
    	time.sleep(time.millisecond)
    }

    最简单的一个并发编程小例子,并发输出一段话。

    我们同时开了10个协程进行输出,每次在fmt.printf时交出控制权(不一定每次都会交出控制权),回到调度器中,再由调度器分配。

    2、goroutine 可能切换的点

    • i/o,select
    • channel
    • 等待锁
    • 函数调用
    • runtime.gosched()

    我们看一个小例子:

    func main() {
    	var a [10]int
    	for i := 0; i <10; i++ {
    		go func(i int) {
    			for  {
    				a[i]++
    			}
    		}(i)
    	}
    	time.sleep(time.millisecond)
    	fmt.println(a)
    }
    

    在这里,代码直接锁死,程序没有退出,因为在执行函数中没有协程的切换,因为 main函数也是一个协程。

    如果想要程序退出,可以通过 runtime.gosched()函数,在执行函数中添加一行。

    for  {
      a[i]++
      runtime.gosched()
    }
    

    加上这个函数之后,代码是可以正常执行了,但是真的是正常执行吗?不一定,我们可以使用 -reac命令来看一下数据是否有冲突:

    这说明数据还是有冲突的,数组a中的元素一边在做自增,一边在输出。解决这个问题,我们只能使用 channel 来解决。

    二、channel

    channel 中 go语言在语言级别提供了对 goroutine 之间通信的支持,我们可以使用 channel 在两个或者多个goroutine之间进行信息传递,能过 channel 传递对像的过程和调用函数时的参数传递行为一样,可以传递普通参数和指针。

    channel 有两种模式:

    var ch1 = make(chan int)		// 无缓冲 channel,同步
    var ch2 = make(chan int, 2)	// 有缓冲 channel, 异步

    无缓冲的方式,数据进入 channel 只要没有被接收,就会处在阻塞状态。

    var ch1 = make(chan int)		// 无缓冲 channel,同步
    ch1 <- 1
    ch1 <- 2
    //  error: all goroutines are asleep - deadlock!
    fmt.println(<-ch1)
    

    如果想要运行,必须要再开一个协程不停的去请求数据:

    var ch1 = make(chan int)		// 无缓冲 channel,同步
    go func() {
      for  {
        n := <-ch1
        fmt.println(n)
      }
    }()
    ch1 <- 1
    ch1 <- 2
    

    有缓冲的方式,只要缓冲区没有满就可以一直进数据,缓冲区在填满之后没有接收也会处理阻塞状态。

    func bufferchannel() {
    	var ch2 = make(chan int,2)
    	ch2<-1
    	ch2<-2
    	fmt.println(ch2)
      // 不加这一行的话,是可以正常运行的
    	ch2<-3			// error: all goroutines are asleep - deadlock!
    }
    

    1、chaanel 指定方向

    比如我现在有一个函数创建一个 channel,并且不断的需要消费channel中的数据:

    func worker(ch chan int) {
    	for {
    		fmt.printf("hello goroutine worker %d\n", <-ch)
    	}
    }
    
    func createworker() chan int{
    	ch := make(chan int)
    	go worker(ch)
    	return ch
    }
    
    func main() {
    	ch := createworker()
    	ch<-1
    	ch<-2
    	ch<-3
    	time.sleep(time.millisecond)
    }
    

    这个函数我是要给别人用的,但是我怎么保证使用 createworker 函数创建的 channel 都是往里面传入数据的呢?

    如果外面有人消费了这个 channel 中的数据,我们怎么限制?

    这个时候,我们就可以给返回的channel 加上方向,指明这个 channel 中能往里传入数据,不能从中取数据:

    func worker(ch <-chan int) {
    	for {
    		fmt.printf("hello goroutine worker %d\n", <-ch)
    	}
    }
    
    func createworker() chan<- int{
    	ch := make(chan int)
    	go worker(ch)
    	return ch
    }
    

    我们可以在返回 channel 的地方加上方向,指明返回的函数只能是一个往里传入数据,不能从中取数据。

    并且我们还可以给专门消费的函数加上一个方向,指明这个函数只能出不能进。

    2、channel 关闭

    在使用 channel 的时候,随说我们可以等待channel中的函数使用完之后自己结束,或者等待 main 函数结束时关闭所有的 goroutine 函数,但是这样的方式显示不够优雅。

    当一个数据我们明确知道他的结束时候,我们可以发送一个关闭信息给这个 channel ,当这个 channel 接收到这个信号之后,自己关闭。

    // 方法一
    func worker(ch <-chan int) {
    	for {
    		if c ,ok := <- ch;ok{
    			fmt.printf("hello goroutine worker %d\n", c)
    		}else {
    			break
    		}
    	}
    }
    // 方法二
    func worker(ch <-chan int) {
    	for c := range ch{
    		fmt.printf("hello goroutine worker %d\n", c)
    	}
    }
    
    func main() {
    	ch := createworker()
    	ch<-1
    	ch<-2
    	ch<-3
    	close(ch)
    	time.sleep(time.millisecond)
    }
    

    通过 closeb函数,我们可以能过 channel 已经关闭,并且我们还可以通过两种方法判断通道内是否还有值。

    三、select

    当我们在实际开发中,我们一般同时处理两个或者多个 channel 的数据,我们想要完成一个那个 channel 先来数据,我们先来处理个那 channel 怎么办呢?

    此时,我们就可以使用 select 调度:

    func genint() chan int {
    	ch := make(chan int)
    	go func() {
    		i := 0
    		for {
    			// 随机两秒以内生成一次数据
    			time.sleep(time.duration(rand.intn(2000)) * time.millisecond)
    			ch <- i
    			i++
    		}
    	}()
    	return ch
    }
    
    func main() {
    	var c1 = genint()
    	var c2 = genint()
    	for {
    		select {
    		case n := <-c1:
    			fmt.printf("server 1 generator %d\n", n)
    		case n := <- c2:
    			fmt.printf("server 2 generator %d\n", n)
    		}
    	}
    }
    

    1、定时器

    	for {
    		tick := time.tick(time.second)
    		select {
    		case n := <-c1:
    			fmt.printf("server 1 generator %d\n", n)
    		case n := <-c2:
    			fmt.printf("server 2 generator %d\n", n)
    		case <-tick:
    			fmt.println("定时每秒输出一次!")
    		}
    	}
    

    2、超时

    	for {
    		tick := time.tick(time.second)
    		select {
    		case n := <-c1:
    			fmt.printf("server 1 generator %d\n", n)
    		case n := <-c2:
    			fmt.printf("server 2 generator %d\n", n)
    		case <-tick:
    			fmt.println("定时每秒输出一次!")
    		case <-time.after(1300 * time.millisecond): // 如果 1.3秒内没有数据进来,那么就输出超时
    			fmt.println("timeout")
    		}
    	}
    

    四、传统的并发控制

    1、sync.mutex

    type atomicint struct {
    	value int
    	lock sync.mutex
    }
    
    func (a *atomicint) increment() {
    	a.lock.lock()
    	defer a.lock.unlock()		// 使用 defer 解锁,以防忘记
    	a.value++
    }
    
    func main() {
    	var a atomicint
    	a.increment()
    	go func() {
    		a.increment()
    	}()
    	time.sleep(time.millisecond)
    	fmt.println(a.value)
    }
    

    2、sync.waitgroup

    type waitgrouint struct {
    	value int
    	wg sync.waitgroup
    }
    
    func (w *waitgrouint) addint() {
    	w.wg.add(1)
    	w.value++
    }
    
    func main() {
    	var w waitgrouint
    	for i := 0; i < 10; i++ {
    		w.addint()
    		w.wg.done()
    	}
    	w.wg.wait()
    	fmt.println(w.value)
    }
    
    

     更多关于go并发简明讲解请查看下面的相关链接