目前比较流行的并发模式
多进程 资源开销最大 好处 进程间 互不影响。 系统开销大 所有进程都是由内核管理的。多线程 多线程在大部分操作系统上面都属于系统层面的并发模式。 比多进程的开销小, 但是总体开销大基于 回调的非阻塞 异步IO协程 本质是用户态线程 不需要操作系统来进行抢占式调度, 且是在真正 的实现中寄存于线程中。因此线程开销很小, 有效的提高线程任务的并发性, 而避免多线程的缺点。消息传递系统 : 对线程间共享的状态的各种操作都被封装在线程之间传递的消息中, 这通常要求: 发送消息时候对状态进行复制, 并且在消息传递的边界上交出这个状态的所有权。 从逻辑看这个操作和共享内存系统中执行的原子操作相同, 这从物理上来看很不一样。 由于需要执行复制操作, 所以大多数消息传递的实现在性能上并不 优越, 但是线程中的状态管理工作通常会变得更为简单。
go 在语言层面支持协程(轻量级线程) goroutine . Go 语言标准库提供的所有系统调用操作(包括所有的IO同步操作)都会出让cpu 给其他的goroutine. 这让轻量级的线程 的切换管理不 依赖于系统进程 也不依赖cpu 的核心数量。
goroutine 是go 语言中轻量级线程的实现, 由Go 运行时(runtime) 管理。
goroutine 不是线程, 他是对线程的多路复用。 所以协程的数量可以比线程数量多得多。一个goroutine 启动时候只需要非常小的栈并且这个栈可以按需拓展和缩小(Go1.4 中 goroutine启动栈的大小仅为2kb)
当一个goroutine 被阻塞的时候, 他也会阻塞所复用操作系统线程,而运行时环境 runtime 则会把位于被阻塞线程上其他 goroutine 移动到其他未阻塞的线程上继续运行。
代码看看如何使用
package main import ( "fmt" "sync" "time" ) func add(x, y int, wg *sync.WaitGroup) { z := x + y fmt.Println(z) } func main() { for i := 0; i < 10; i++ { go add(i, i, &wg) } }运行之后什么都没有打印 程序从main 函数开始 当main()函数返回时候, 程序退出, 且程序并不等其他goroutine(非主goroutine) 结束。 上面代码主函数启动了是个 goroutine 饭后返回, 这是程序退出了, 而被启动执行 Add()的goroutine没来得及执行, 所以程序没有任何输出。
解决方法之一 就是前面文章提到的 sync.WatiGroup() 函数的使用 计数器
package main import ( "fmt" "sync" "time" ) func add(x, y int, wg *sync.WaitGroup) { time.Sleep(5 * time.Second) z := x + y fmt.Println(z) wg.Done() } func main() { wg := sync.WaitGroup{} wg.Add(10) t1 := time.Now().Unix() for i := 0; i < 10; i++ { go add(i, i, &wg) } wg.Wait() t2 := time.Now().Unix() fmt.Println("sleep: ", t2 - t1) }t1 t2 的目的之一就是 检验 协程之间是不是相互影响的 打印是5 说明是并行执行的 互不影响
在工程上最常见的并发通信模型: 共享数据 和消息 共享数据: 多个并发单元分别保持对同一数据的引用, 实现对该数据的共享。 被共享的数据可能有多种形式, 比如 内存数据块, 磁盘文件, 网络数据等。 在实际中最常用的是内存, 也就是通常说的共享内存。
消息机制: go 是以消息机制 而不非共享内存作为通信方式 消息机制认为每个并发单元都是自包含的, 独立的个体 并且都有自己的变量,并在不同的并发单元间这些变量不共享。 每个并发单元的输入和输出只有一种, 那就是消息。 这点有点类似于进程, 每个进程不被别的进程打扰 他只做好自己的工作就好了。 不同进程间靠消息来通信, 他们不会共享内存。
Go 语言 提供的消息通信机制被称为channel
不要通过共享内存来通信 而应该通过通信来共享内存
channel 是go 语言在语言级别提供的goroutine 间的通信方式 我们可以使用channel 在两个或者多个goroutine 之间传递消息。 channel 是进程间的通信方式, 因此通过channel 传递对象的过程和调用函数时的参数传递行为比较一致。比如也可以传递指针。 如果需要跨进程通信, 建议用分布式的方式 比如使用 socket 或者http 等通讯协议
channel 是类型相关的 也就是说一个channel 只能传递一种类型的值, 这个类型需要再声明channel 的时候指定。
channel 声明
var chanName chan ElementType ( var ch chan int) var m map[string] chan bool定义channel
ch := make(chan int)
写入: 将一个数据写入(发送)至channel : ch <- value
像channel 写入数据尝尝会导致程序阻塞, 直到其他的goroutine从这个channel中读出数据
读出: value := <- ch
如果channel 之前没有写入数据, 那么从channel 中读取数据 也会得导致程序阻塞, 直到channel 中被写入数据为止。
通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了IO 操作, 该select() 调用就会被返回。 这个机制现在被用来实现高并发的socket 服务器程序。 go语言在远层面支持 select 关键字 用于处理异步io 问题
select 的用法 和switch 语言非常类似, 由select 开启一个新的选择块 每个选择条件由case 语句来描述。 但是select 有很多限制: 最大的限制就是每个case 语句后面必须有一个IO 操作
select { case <- chan1: // 如果chan1 成功读取到数据 则处理case case chan2 <- 1 // 如果chan2 成功写入数据 则处理case default: // 如果上面都没有成功 怎进入default 处理流程 }之前示范的都是 不带缓冲的channel 这种做法对于传递单个数据的场景可以接受, 但是对于需要持续传输大量数据的场景就有些不合适
channel 带上缓冲 达到消息队列的效果
c := make(chan int, 1024)在缓冲区还没有填写 完之前不会阻塞
从缓冲区 读取数据 可以 使用range 关键字来读取
for i := range c { fmt.Println("Received:", i) }在并发机制中 最需要处理的问题就是 超时问题
在像channel 写数据时 发现channel 已满或者从channel 视图读取数据时候发现channel 为空。 如果不正确处理这些情况很可能导致整个goroutine 锁死
Go 语言没有提供直接的超时处理机制, 可以利用select 机制 。select 的一个特点是只要其中一个select已经完成, 程序就会继续往下执行 而不会考虑其他case 的情况
timeout := make(chan bool, 1) go func() { time.Sleep(1e9) // 等待1秒钟 timeout <- true }() // 然后我们把timeout这个channel利用起来 select { case <-ch: // 从ch中读取到数据 case <-timeout: // 一直没有从ch中读取到数据,但从timeout中读取到了数据 }这样使用select 机制可以避免永久等待问题 因为程序会在timeout中获取一个数据后继续执行无论对ch 的读取 是否处在等待状态 从而达成一秒超时的效果
这种写法要被合理的利用起来 从而有效的提高代码质量
channel 可别传递
管道上是一种使用非常广泛的一种设计模式, 比如再处理数据时候, 可以采用管道设计, 这样就比较容易以插件的方式增加数据的处理流程。 可以利用可传递性来实现管道
type PipeData struct { value int handler func(int) int next chan int }流式处理数据
func handle(queue chan *PipeData) { for data := range queue { data.next <- data.handler(data.value) } }不存在 真正的单向channel 单向的都是阻塞的更本没有办法使用的。
所谓的单向 是对channel 的使用限制 channel 支持被传递 还支持类型转换
ch4 := make(chan int ) ch5 := <- chan int (ch4) // ch5 是单向的的读取channel ch6 := chan <- int(ch4) // ch6 是一个单向的写入 channel基于ch4 通过类型转化初始化了两个单向channel : 单向读的ch5 和单向写的ch6
单向channel 的声明
var ch1 chan int // ch1 是一个正常的channel 不是单独的 var ch2 chan <- float64 // ch2 是单向channel 只用于写float64 var ch3 <-chan int // ch3 是单向channel 用用于读取int 数据目前 我不知道在什么样的场景使用
如何判断一个通道是否关闭呢 ?
x, ok := <- chok 为false 表示已经关闭
即使成功的使用channel 来通信手段还是避免不了多个goroutine之间数据共享问题, 这里也有资源锁方案
sync 包提供了两种锁类型:
sync.Mutexsync.RWMutexsync.Mutex 比较简单但是比较暴力, 当一个goroutine 获得了 mutex后其他的goroutine就只能乖乖的等到这个goroutine 释放改mutex. RWMutex 相对比较友好, 是经典的单写多读模型。在读锁占用的情况下会阻塞写 但是不会阻止读, 也就是多个goroutine 可以同时获得读锁(调用RLock; 而写锁(调用LOCK()方法)), 会阻止任何其他的goroutine(包括读写),
从 RWMutex 的实现看, RWMutex 类型其实组合了 Mutex:
type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount int32 readerWait int32 }锁的典型使用模型
var temp sync.Mutex func foo(){ temp.Lock() defer temp.Unlock() }对于全局只需要运行一次的代码, 比如全局初始化操作 go语言提供了一个once 类型来保证 全局的唯一性
var a string var once sync.Once func setup(){ a = "hello world" } func doprint(){ once.Do(setup) print(a) } func twoprint(){ go doprint() go doprint() }Once的DO() 方法可以保证在全局范围内只调用指定函数一次(这里值setup() 函数) 而且其他的goroutine 在调用此语句时候 将会先被阻塞, 直至全局唯一的once.Do() 调用结束之后才调用。 once.Do() 还是原子操作。
为了更好的控制并发过程中的 原子性操作 sync 包还提供了一个 automic 子包, 它提供了对于一些基础数据类型的原子操作函数 比如
func CompareAndSwapUint64(val *uint64, old, new uint64) (swapped bool)这个函数提供了比较和交换两个 uint64类型数据的操作。 这让操作者无需再为这样的操作专门添加Lock 操作。
如果一个程序在执行时候依赖于特定的顺序或时序, 但是又无法保证这种顺序和时序, 此时就会存在竞争条件、
竞争条件的存在将导致程序的行为变得飘忽不定而且难以预测。
竞争条件通常出现在哪些需要修改共享资源的并发程序当中。当有两个 或者多个进程同时去修改一项共享资源的时候, 最先访问资源的那个进程/ 线程将得到预期的结果, 而其他的进程或者线程则不然。 最终, 因为程序无法判断哪个进程或者线程先访问了资源, 所以它将无法产生一致的行为。
互斥技术:mutex , 该技术可以将同一时间内访问临界区的进程数量限制为一个。