Golang 源码解析系列学习

本文参考 公众号:Go语言中文网 微信号: studygolang 系列文章记录

Map

基本原理概述

  1. Map 底层是通过hash table存储,每个元素是一个bmap(下称为bucket)

  2. 每个bucket的是一个容量为bucketCnt=8的链表,用于解决冲突

  3. 将key进行hash,获取到的数对应的二进制,低B位用于确定桶序号,高8位用于确定桶中的Tophash(用于标记key所在Cell)

    • 若低B位冲突,则入桶后,计算Tophash放入桶链表的第一个空位中
    • 若桶溢出,则通过桶的最后一个overflow指针指向overflow bucket
    • 桶中存放kv的格式是kkk…vvv。将k和v分别存放;这样可以避免在使用kvkvkv组合交替存放时,需要填充一些位数(padding)来补齐数据以便查找的空间浪费
  4. B通过负载因子(loadFactor)和map可容纳kv数量(count)决定,不超过15

    • count <= loadFactor * 2^B^

    • bucktCount = 2^B^

    • loadFactor 内置为6.5,
  5. loadFactor=6.5?若桶恰好全满时负载因子为8 (每个桶的8个位置全占满,则kv数量为桶的8倍),那么6.5就可以表时现在的负载状态已经进入有点拥挤状态,碰撞频发,性能可能开始下降,因此需要调整B的大小;loadFactor太大,会导致桶溢出严重,性能下降;loadFactor太小又会导致整个桶过于稀疏,不仅浪费,在极端情况下还可能导致查询效率退化为线型。

  6. kv的查找,基本遵循上述原则,当查询出现桶号一样且Tophash相同,会进一步比较key;若在本bucket没找到,且overflow不为空则会去overflow bucket中找

  7. 为不破坏bucket不含指针的理念,overflow指针通过hmap.extra.[]overflow汇集所有bucket的overflow指针,初始化为2^B-4^个(B >=4时)

  8. 原生Map在读写删前会检查是否有并发写删在进行,因此不支持并发操作

  9. 在写Map时,会先计算插入桶的位置,并且检查,如果需要扩容,则会先进行扩容,然后重新计算插入的位置;最终确定可写再写kv

  10. 若可写入但是没有位置了,需要申请overflow buket和当前buket”串联”用于存放新key

  11. 写Map时可能导致扩容,扩容的先决条件是负载超过负载因子(kv数大于桶数的6.5倍)或者overflow桶过多(overflow桶的估计值大于等于桶的个数)
  12. Map的过程是对bucket搬迁的过程,首先通过B++来完成bucket扩容,并将老buckets挂入hmap对应指针,新buckets分配内存。而后,在每次写和删除kv时尝试对未完成的搬迁工程进行,每次最多搬迁2个bucket
  13. Map的遍历因为上述的底层存储形式(bucket => cell)和搬迁的存在导致遍历的存在无序可能;而又为了让用户意识到无序性的存在,在遍历Map时,起始bucket会是随机的,使得即使Map是个”常量”也是无序遍历,最终导致Map的无序遍历。
  14. 对Map的删除,只会将topHash清零,供未来使用,不会将已有的Cell往前怼(考虑效率怼不符合逻辑),因此如果进行一波大批量不超过负载因子的插入,再进行一大波删除,overflow会始终存在,使得kv存储的很分散。这就是为什么前面扩容也需要对overflow桶个数进行考虑。
  15. 基于上述过程,也就可以发现,在读写map时需要经过hash、查找、读写的流程,而这一套流程当出现并发场景时,就可能导致如协程A准备查找没有准备写入;A写入前B也进行查找,也没有,也进行写入,这就导致了数据覆盖。

Slice

  1. 一个Slice结构体中包含一个指向底层数组的指针、len intcap int,因此规定占24字节(64位机器)
1
2
3
4
5
type slice struct {
array unsafe.Pointer // 指向底层数组的指针
len int // 当前slice长度
cap int // slice分配的底层数组大小
}
  1. 创建Slice
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func makeslice(et *_type, len, cap int) unsafe.Pointer {
// 1. 计算slice申请的空间cap,初步判断是否会超过内存限制
mem, overflow := math.MulUintptr(et.size, uintptr(cap))
// 2. 如果超过可分配空间限制或者len、cap指定的不合理准备抛异常
if overflow || mem > maxAlloc || len < 0 || len > cap {
// 2.1 解释了一下如果len不合理,则优先抛出len异常,因为cap是一个隐含的属性,因此优先抛出len更易理解一些
// NOTE: Produce a 'len out of range' error instead of a
// 'cap out of range' error when someone does make([]T, bignumber).
// 'cap out of range' is true too, but since the cap is only being
// supplied implicitly, saying len is clearer.
// See golang.org/issue/4085.
mem, overflow := math.MulUintptr(et.size, uintptr(len))
if overflow || mem > maxAlloc || len < 0 {
panicmakeslicelen()
}
// 2.2 如果len合理而cap不合理则抛出cap异常
panicmakeslicecap()
}
// 3. 一切正常,分配空间。
// 若分配空间在32KB一下,会从per-P cache的freelist申请,可以理解为本地cache
// 大于32KB情况下,会直接向heap申请空间
return mallocgc(mem, et, true)
}
  1. 添加元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// append函数发生扩容时主要底层实现是growslice函数,需要将元素类型、老的slice和append后所需的新最小容量cap传入
// 返回是一个新的slice,容量至少为传入的cap
func growslice(et *_type, old slice, cap int) slice {
// ...
// 新cap的计算规则,最开始假定容量不变
// 先计算理论新容量cap的大小,内存块对齐后会有进一步变化
newcap := old.cap
doublecap := newcap + newcap
if cap > doublecap {
// 如果新的容量cap是原先的两倍以上,那么直接扩容为新容量
newcap = cap
} else {
if old.len < 1024 {
// append前的slice长度在1024以下,容量翻倍增长
newcap = doublecap
} else {
// append前的slice长度在1024以上,则新容量使用1.25^n速率趋近所需容量
// 判断0 < newcap是为了防止溢出导致数值变为负数
for 0 < newcap && newcap < cap {
newcap += newcap / 4
}
// 如果新的容量计算溢出,则请求值是多少分配多少
if newcap <= 0 {
newcap = cap
}
}
}
//...
// 根据元素类型 & 计算出的新容量newCap理论值计算内存
switch {
case et.size == 1: // 元素大小为1字节,"个数"即内存
lenmem = uintptr(old.len)
newlenmem = uintptr(cap)
capmem = roundupsize(uintptr(newcap))
overflow = uintptr(newcap) > maxAlloc
newcap = int(capmem)
case et.size == sys.PtrSize:// 元素为指针大小则使用元素类型大小计算内存分配大小
lenmem = uintptr(old.len) * sys.PtrSize
newlenmem = uintptr(cap) * sys.PtrSize
capmem = roundupsize(uintptr(newcap) * sys.PtrSize) // 内存对齐,避免过多内存碎片
overflow = uintptr(newcap) > maxAlloc/sys.PtrSize
newcap = int(capmem / sys.PtrSize) // 使用内存对齐后的内存大小重新计算容量值
case isPowerOfTwo(et.size): // 如果元素大小是2的次方数,则计算过程可以使用位运算代替乘除运算
var shift uintptr
if sys.PtrSize == 8 {
// Mask shift for better code generation.
shift = uintptr(sys.Ctz64(uint64(et.size))) & 63
} else {
shift = uintptr(sys.Ctz32(uint32(et.size))) & 31
}
lenmem = uintptr(old.len) << shift
newlenmem = uintptr(cap) << shift
capmem = roundupsize(uintptr(newcap) << shift)
overflow = uintptr(newcap) > (maxAlloc >> shift)
newcap = int(capmem >> shift)
default: // 以上情况都不是则直接使用元素大小计算
lenmem = uintptr(old.len) * et.size
newlenmem = uintptr(cap) * et.size
capmem, overflow = math.MulUintptr(et.size, uintptr(newcap))
capmem = roundupsize(capmem)
newcap = int(capmem / et.size)
}
// ...
// 将原有底层数组头字节old.array开始的lenmem长度的内存拷贝到新底层数组指针p上,完成底层数组的复制
memmove(p, old.array, lenmem)

// 底层数组指针变为新指针p,长度不变,容量变为新容量newcap
return slice{p, old.len, newcap}
}
  1. Slice 截取
  • 由于上述growslice的底层实现,在截取后,底层数组不会发生变化
  • 因此如果直接修改元素会影响到公用底层数组的所有slice
  • 仅当某次append执行了growslice后同步影响才会消除
  • 因此获取slice可以使用newSlice :=append([]uint8{}, s[1:2]...)消除底层数组影响
  1. copy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func slicecopy(to, fm slice, width uintptr) int {
if fm.len == 0 || to.len == 0 {
return 0
}

n := fm.len
if to.len < n { // 最终拷贝的数组大小以fm和to两个slice中长度较短的为准
n = to.len
}

if width == 0 { // 元素大小为0
return n
}
// ...
size := uintptr(n) * width
if size == 1 { // 一个字节则直接赋值即可
*(*byte)(to.array) = *(*byte)(fm.array) // known to be a byte pointer
} else {
// 将fm.array开始,长度为size的内存拷贝至to.array上
memmove(to.array, fm.array, size)
}
return n
}
  1. slice的本质是结构体,因此在函数传参时,也是将{arrayPointer, len, cap}这样一个结构体传递。如果函数操作对arrayPinter指向的底层数组操作变化,原slice也会变化;否则,不影响原slice。

Channel

不要用共享内存来通信,而用通信的方式来共享内存

  • chan 的底层结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type hchan struct {
qcount uint // chan中元素的总数
dataqsiz uint // 底层缓冲区循环数组分配的大小
buf unsafe.Pointer // 指向循环数组的指针
elemsize uint16 // 元素大小
closed uint32 // chan是否关闭
elemtype *_type // 元素类型
sendx uint // 已发送元素在循环数组中的下标
recvx uint // 已接收元素在循环数组中的下标
recvq waitq // 等待接收元素的goroutine队列
sendq waitq // 等待发送元素的goroutine队列
lock mutex // 控制chan并发访问的锁, 保护范围包括了hchan的字段和waitq中sudog的字段;
// 因此channel是并发安全的
}

type waitq struct {
first *sudog // sudo表示一个等待发送/接受的goroutine队列,保存了G信息和双向指针等信息
last *sudog
}
  1. chan的创建
  • chan一般情况下都会通过make创建(只读、只写channel一般只用于传参)。make函数在编译期间,会先编译成OMAKE节点,在类型检查阶段,会转化成OMAKECHAN,最终在SSA中间代码生成阶段之前被转换成makechan 或者 makechan64 的函数调用。makechan64用于处理缓冲区大小大于 2 的 32 次方的情况,可以主要关注makechan方法(需要了解go编译方面的知识)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// ...

// 计算所需的buf内存大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}

// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected
var c *hchan
switch {
case mem == 0: // 无需缓冲区(struct{}{}类型所占内存为0 或者 size == 0),只需要分配hchan的内存空间
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 元素类型不为指针类型,需要分配hchan+buf的一段连续空间
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 元素为指针类型,单独分配hchan和buf的内存空间
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)

if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
  1. chan的写入(发送数据)
  • 编译器讲发送数据语句ch <- i,编译器会解析为OSEND节点,并最终转换为chansend1函数;而chansend1函数则直接调用的chansend函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}

if debugChan {
print("chansend: chan=", c, "\n")
}

if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}

var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}

lock(&c.lock)

if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}

if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

if !block {
unlock(&c.lock)
return false
}

// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)

// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
0%