本章记录 Go 语言的数据结构及基础包学习笔记,结合源码进行理解。

数据结构

Slice

type slice struct {
	array unsafe.Pointer
	len   int
	cap   int
}

在源码中可以看出,slice(切片) 本身是一个 struct(结构体),其工作机制是对 array(数组) 指针的封装。

在复制或传递切片的过程中,对其底层数组的引用也会一并复制,使用时需注意⚠️修改新切片会影响到其他使用该底层数组的切片,如有需求,使用 copy 进行复制。

切片的 len 属性表示当前元素的长度,cap 表示底层数组的长度。因为切片是可变长的,避免因为 append 而频繁申请内存,需要预留空间。

s := []int{1, 2, 3, 4}
new := s[0:3:3]

切片下标的使用是左闭右开的,第三个参数表示新切片的 cap 大小,如果不传则默认为原切片的 cap 大小

if cap < old.cap {
panic(errorString("growslice: cap out of range"))
}

切片不支持缩容。

扩容策略

func growslice(et *_type, old slice, cap int) slice {
	......
}

切片在扩容时会判断 cap 是否已经大于 1024,如果没有,会按二倍扩容,超过则增长因子变为 1.25,直至大于所需扩容长度。

append 的数据长度超过 double old cap,则新 cap 会是数据长度(偶数)或数据长度(奇数)+1。

扩容必定会导致内存拷贝。

Range

s := []int{1, 2, 3, 4}
for key, value := range s {
	......
}

使用 range 遍历切片时,value 是值传递,每次遍历都会重新对 value 赋值,在使用时应注意⚠️!

Map

Map 是一种很常见的数据结构,用于存储一些 key/value pair(键值对)

所有的 key 都是不同的,通过给定的 key 可以在常数时间 O(1) 复杂度内查找、更新或删除对应 value

底层一般都是通过 hash table(哈希表) 来实现。对于给定的 key,一般先进行 hash 操作,然后相对哈希表的长度取模,将 key 映射到指定地方

哈希函数

又称为散列函数,是一种从任何一种数据中创建小的数字“指纹”的方法。散列函数把消息或数据压缩成摘要,使数据量变小,将数据格式固定下来。

加密散列函数

每个加密散列函数都是一个散列函数,但不是每个散列函数都是都是加密散列。加密散列函数注重安全,难以通过散列函数的输出结果,回推出函数的输入数据。相应的代价就是速度比较于非加密散列函数要慢很多

非加密散列函数多用于查找

memhash(Go 哈希函数)

通过种子(seed)与预设的 hashkey 值的混合,产生初始 h 值

使用 unsafe.Pointer 对字符串进行类型转换,转为 byte 数组,而后使用或运算对数组中的每一位进行运算,得到相应位数的值

使用 h 对上一步得到的值进行异或运算

然后是各种位运算......

哈希冲突

链表数组法

每个键值对表长取模,如果结果相同,用链表的方式依次向后插入。

开放地址法

一旦发生冲突,就把位置往后移,直到找到一个空的位置。

哈希表的扩容策略

装载因子表示哈希表的填充程度,填充的越满,冲突的概率越大,填充的越少,空间浪费的越大。

当达到装载因子到达某个阈值,就需要动态的增大哈希表的长度。但当哈希表的长度发生变化时,每个 key 在哈希表中的对应下标索引需要全部重新计算,不能直接拷贝过去。这样做会导致时间复杂度太高,O(n),数据量大了性能就会很差。

Redis 的做法是保证触发扩容时也保证 O(1) 的插入速度,旧哈希表的内容分多次拷贝到新哈希表。

Go map 实现

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
  ......
	if h.flags&hashWriting != 0 {
		throw("concurrent map read and map write")
	}
	......
}

不是线程安全的,禁止并发读写。

	// Maximum number of key/elem pairs a bucket can hold.
	bucketCntBits = 3
	bucketCnt     = 1 << bucketCntBits

一个桶(bucket)最多可以装8个键值对。

	// Maximum average load of a bucket that triggers growth is 6.5.
	// Represent as loadFactorNum/loadFactDen, to allow integer math.
	loadFactorNum = 13
	loadFactorDen = 2

装载因子阈值为 6.5。

  // Maximum key or elem size to keep inline (instead of mallocing per element).
	// Must fit in a uint8.
	// Fast versions cannot handle big elems - the cutoff size for
	// fast versions in cmd/compile/internal/gc/walk.go must be at most this elem.
	maxKeySize  = 128
	maxElemSize = 128

nevacuate 是扩容迁移的计数器。

桶内最大存储 128 字节的值,超过时将存储指针。

存储键值对的方式是将 key 放在一起,紧接着将 value 放在一起,为了节约内存对齐的内存消耗。

内存对齐:CPU 每次获取定长的字节块(字),若访问到未对齐的内存,将会导致 CPU 进行两次访问,并且花费额外的时间周期来处理对齐及运算,所以编辑器会处理内存对齐问题。

使用链表的方式控制哈希冲突,在桶中会存有 overflow bucket 指针。

bmap 为最小粒度挂载,可以减少对象数量,减轻管理内存的负担,利于 gc。

没有缩容(shrink),内存只会越用越多。

哈希表以 2 为底,大小始终为 2 的指数倍,取余运算(hash mod 2^b)可以简化为(hash & (2^B-1))。

mapexpra 用于存储 bucketkeyvalue 都不包含指针的情况,会将 bmapoverflow 放入 hmap.extra.overflow 中,这样可以避免 gc 的扫描。

扩容

func growWork(t *maptype, h *hmap, bucket uintptr) {
	// make sure we evacuate the oldbucket corresponding
	// to the bucket we're about to use
	evacuate(t, h, bucket&h.oldbucketmask())

	// evacuate one more oldbucket to make progress on growing
	if h.growing() {
		evacuate(t, h, h.nevacuate)
	}
}

nevacuate 是扩容迁移的计数器。

当哈希表达到阈值,就会触发增量扩容:标记当前为扩容状态,在 insertdelete 时搬迁一个桶,如果正在扩容中,就再搬一个。

扩容会建立一个原有大小 2 倍的新的表,并将旧的 bucket 搬到新的表之后,并不会将旧的 bucketoldbucket 中删除,而是加上一个已删除的标记。这正是由于迁移工作是逐步完成的,只有当所有的 bucket 都从旧表迁移到新表之后,才会将 oldbucket 释放掉。

查找

type bmap struct {
	// tophash generally contains the top byte of the hash value
	// for each key in this bucket. If tophash[0] < minTopHash,
	// tophash[0] is a bucket evacuation state instead.
	tophash [bucketCnt]uint8
	// Followed by bucketCnt keys and then bucketCnt elems.
	// NOTE: packing all the keys together and then all the elems together makes the
	// code a bit more complicated than alternating key/elem/key/elem/... but it allows
	// us to eliminate padding which would be needed for, e.g., map[int64]int8.
	// Followed by an overflow pointer.
}
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

bmapbucket 的结构体,显式的存储了桶中所有的 tophash 用于快速试错,keysvaluesoverflow bucket 通过 dataOffset 获取内存地址。

Go 通过将 hash 值的高位(前 8 位)在 bucket 中查找 key,而后通过高位循环对比 tophash 中的,用于快速试错,如果 hash 值一致,则对比 key 值。未找到则去 overflow bucket 中寻找。

map 为什么是无序的

因为哈希表会触发扩容,从而导致 map 的排序不准确,所以 Go 语言在设计遍历 map 时加入随机数,避免依赖于 map 的排序,导致扩容后的错误结果。

线程安全的 map

Go 1.9 以后,官方增加了一个线程安全的 map: sync.map

为什么不改变原有的 map?faq 中提到不想降低大多数程序的性能,来增加少数程序的安全性

Channel

Go 语言的并发模型是基于 CSP 理论实现的,Go 语言鼓励“不要通过共享内存来通信,我们应该使用通信来共享内存”,channel 正是 Go 对于消息通道的实现。

CSP(Communicating Sequential Processes)是由 Tony·Hoare 提出的并发理论,Go 语言实现了其中的并发执行体(goroutine) 与消息通道(channel)来构建并发模型。

使用

特性

线程安全、先进先出(队列)

类型

无缓存,会阻塞

有缓存,写入未满或读取未空前不会阻塞

Close

  • 关闭时会唤醒所有阻塞中的 goroutine,并发送一条零值消息
  • 重复关闭会 panic
  • 关闭未初始化的 channel 会 panic
  • 向已关闭的 channel 发送消息会 panic
  • 从已关闭的 channel 中读取消息不会 panic

Select

  • 可使用 select 同时监听多个 channel 的消息
  • 若有多个,随即执行其中一个

原理

环形队列

内存上没有环形的结构,因此环形队列实际上是通过数组的线性空间来实现的。当数据到了尾部之后将转回 0 位置。

lock

基于锁实现线程安全

源码

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

qcount 表示当前队列中剩余元素个数,len 函数取值字段。

dataqsiz 表示环形队列长度,即创建有缓存 channel 时 cap 参数。

buf 队列数组指针。

sendx 队列下标,用于指示元素写入时的位置。

recv 队列下标,用于指示读取元素时的位置。

recvq 等待读取消息的 goroutine 队列。

sendq 等待写入消息的 goroutine 队列。

type waitq struct {
	first *sudog
	last  *sudog
}

Goroutine 队列的链表实现,sudog 是对 g 的封装。

sudogelem 属性是 channel 的数据。

Init

const (
	maxAlign  = 8
	hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
	debugChan = false
)

hchanSize 计算出 hchan struct 所需基本内存大小。

func makechan(t *chantype, size int) *hchan {
  ......
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
  ......
}

根据 channel 类型进行内存申请,及根据有无指针,申请的 ring buffer 是否为连续内存。

Send

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	......
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
	
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	
	if !block {
		unlock(&c.lock)
		return false
	}
}

chansend 中的 block 参数用于验证是否阻塞,只有在使用 select 发送时才会为 false ,用途是可以向 nil channel 发送数据,以及遇到阻塞时直接跳过,不会保存 goroutine。

recvq 存在元素时,说明队列已空并且有 goroutine 在阻塞等待数据,所以直接发送,节省时间。

队列有位置则写入队列,没有则阻塞,保存 goroutine,放入 sendq 队列中。

环形队列的实现,判断当前位置已到达最后,将 sendx 索引重置为 0。

Recv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ......
}

流程类似于发送:

select 的接收动作不会阻塞以及可以从 nil 中读取。

sendq 中存在元素时,直接获取。

当队列中有元素时,直接获取。

如果都没有就阻塞住,保存 goroutine 放入 recvq 队列中。

Close

func closechan(c *hchan) {
  ......
	// release all readers
	// release all writers (they will panic)
  ......
}

closechan 操作会唤醒 recvq 中的所有 goroutine,并发送一条零值消息。也会唤醒所有 sendq 中的 goroutine,他们会 panic

String

Go 语言中的字符串是一个只读的字节数组切片。

Go 使用 + 作为字符串的连接符,在编译期就识别并转换为 addstr

字符串虽然和字节数组的内容一样,但是字符串是只读的,不能通过下标或者其他形式来改变其内存存储的数据,而字节数组中的内容都是可读写的,所以两种类型转换时也是需要内存拷贝的。

type stringStruct struct {
	str unsafe.Pointer
	len int
}

string在底层实现上是引用类型,但是因为string不允许修改,只能生成新的对象,在逻辑上和值类型无差别。

Context

type Context interface {
	Deadline() (deadline time.Time, ok bool)
	Done() <-chan struct{}
	Err() error
	Value(key interface{}) interface{}
}

设计原理

在 Goroutine 构成的树形结构中对信号进行同步以减少计算资源的浪费是 context.Context 的最大作用。

我们可能会创建多个 Goroutine 来处理一次请求,而 context.Context 的作用就是在不同 Goroutine 之间同步请求特定数据、取消信号以及处理请求的截止日期。

默认 Context

context 包中最常用的方法还是 context.Backgroundcontext.TODO,这两个方法都会返回预先初始化好的私有变量 backgroundtodo,它们会在同一个 Go 程序中被复用:

func Background() Context {
	return background
}

func TODO() Context {
	return todo
}

var (
	background = new(emptyCtx)
	todo       = new(emptyCtx)
)

// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
	return
}

func (*emptyCtx) Done() <-chan struct{} {
	return nil
}

func (*emptyCtx) Err() error {
	return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
	return nil
}

从源码中可以看出 context.Backgroundcontext.TODO 只有语义上的区别。

取消信号

context.WithCancel 函数能够从 context.Context 中衍生出一个新的子上下文并返回用于取消该上下文的函数(CancelFunc)。一旦我们执行返回的取消函数,当前上下文以及它的子上下文都会被取消,所有的 Goroutine 都会同步收到这一取消信号。

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	c := newCancelCtx(parent)
	propagateCancel(parent, &c)
	return &c, func() { c.cancel(true, Canceled) }
}

除了 context.WithCancel 之外,context 包的另外两个函数 context.WithDeadlinecontext.WithTimeout 也都能创建可以被取消的计数器上下文 context.timerCtx

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
	return WithDeadline(parent, time.Now().Add(timeout))
}

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if cur, ok := parent.Deadline(); ok && cur.Before(d) {
		// The current deadline is already sooner than the new one.
		return WithCancel(parent)
	}
	c := &timerCtx{
		cancelCtx: newCancelCtx(parent),
		deadline:  d,
	}
	propagateCancel(parent, c)
	dur := time.Until(d)
	if dur <= 0 {
		c.cancel(true, DeadlineExceeded) // deadline has already passed
		return c, func() { c.cancel(false, Canceled) }
	}
	c.mu.Lock()
	defer c.mu.Unlock()
	if c.err == nil {
		c.timer = time.AfterFunc(dur, func() {
			c.cancel(true, DeadlineExceeded)
		})
	}
	return c, func() { c.cancel(true, Canceled) }
}

Context 传值

context.WithValue 函数能从父上下文中衍生出 context.valueCtx 类型的子上下文。

func WithValue(parent Context, key, val interface{}) Context {
	if parent == nil {
		panic("cannot create context from nil parent")
	}
	if key == nil {
		panic("nil key")
	}
	if !reflectlite.TypeOf(key).Comparable() {
		panic("key is not comparable")
	}
	return &valueCtx{parent, key, val}
}

valueCtx 会通过组合使用 parent 的方法,只重写了 Value 方法。

type valueCtx struct {
	Context
	key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
	if c.key == key {
		return c.val
	}
	return c.Context.Value(key)
}

Value 方法会在父上下文中查找,直到找到值或返回 nil,所以虽然 valueCtx 只能存储一个值,但我们可以通过子上下文存储多个值,在查找时也不需要多余操作。

Knative 如何利用 valueCtx 进行传值

knative 在 init 阶段注册初始化闭包,在有需要时调用放入 Context 中。

key 通过一个可比较的空 struct 实现,value 在取出后转换类型。