go// DB 表示一个在磁盘上持久化存储的桶集合。
// 所有数据访问都通过事务进行,可以通过 DB 获得事务。
// 如果在调用 Open() 之前访问,DB 上的所有函数将返回 ErrDatabaseNotOpen。
type DB struct {
StrictMode bool // 启用严格模式时,数据库将在每次提交后执行 Check()。如果数据库处于不一致状态,则引发 panic。该标志对性能有很大的影响,因此仅应用于调试目的。
NoSync bool // 设置 NoSync 标志将导致数据库在每次提交后跳过 fsync() 调用。在将数据批量加载到数据库中并且可以在系统故障或数据库损坏时重新启动批量加载时,可以使用此标志。请勿在正常使用中设置此标志。
NoGrowSync bool // 当为 true 时,当扩展数据库时跳过 truncate 调用。仅在非 ext3/ext4 系统上设置此标志是安全的。跳过截断避免了硬盘空间的预分配,并且在重新映射时避免了 truncate() 和 fsync() 系统调用。
MmapFlags int // 如果要快速读取整个数据库,则可以将 MmapFlag 设置为 syscall.MAP_POPULATE,用于 Linux 2.6.23+ 的顺序读取预读。
MaxBatchSize int // MaxBatchSize 是批处理的最大大小。默认值从 Open 中的 DefaultMaxBatchSize 复制。如果 <= 0,则禁用批处理。在调用 Batch 时请勿并发更改。
MaxBatchDelay time.Duration // MaxBatchDelay 是批处理开始前的最大延迟。默认值从 Open 中的 DefaultMaxBatchDelay 复制。如果 <= 0,则 effectively 禁用批处理。在调用 Batch 时请勿并发更改。
AllocSize int // AllocSize 是数据库需要创建新页面时分配的空间量。这样做是为了分摊增长数据文件时 truncate() 和 fsync() 的成本。
path string // 数据库文件路径
file *os.File
lockfile *os.File // 仅 Windows 平台需要
dataref []byte // mmap'ed readonly,写操作会引发 SEGV
data *[maxMapSize]byte
datasz int // 数据大小
filesz int // 当前磁盘上的文件大小
meta0 *meta // meta 数据页
meta1 *meta // meta 数据页
pageSize int // 页面大小
opened bool // 是否已打开数据库
rwtx *Tx // 当前读写事务
txs []*Tx // 事务列表
freelist *freelist // 空闲列表
stats Stats // 统计信息
pagePool sync.Pool
batchMu sync.Mutex // 批处理锁
batch *batch
rwlock sync.Mutex // 仅允许一个写入者
metalock sync.Mutex // 保护 meta 页访问
mmaplock sync.RWMutex // 保护在重新映射期间的 mmap 访问
statlock sync.RWMutex // 保护统计信息访问
ops struct {
writeAt func(b []byte, off int64) (n int, err error)
}
// 只读模式。
// 当为 true 时,Update() 和 Begin(true) 立即返回 ErrDatabaseReadOnly。
readOnly bool
}
go// Options 表示在打开数据库时可以设置的选项。
type Options struct {
// Timeout 是获取文件锁时等待的时间。
// 当设置为零时,将无限等待。此选项仅在 Darwin 和 Linux 上可用。
Timeout time.Duration
// 在内存映射文件之前设置 DB.NoGrowSync 标志。
NoGrowSync bool
// 以只读模式打开数据库。使用 flock(..., LOCK_SH |LOCK_NB) 来获取共享锁(UNIX)。
ReadOnly bool
// 在内存映射文件之前设置 DB.MmapFlags 标志。
MmapFlags int
// InitialMmapSize 是数据库的初始 mmap 大小(以字节为单位)。
// 如果 InitialMmapSize 大到足以容纳数据库的 mmap 大小,那么读事务不会阻塞写事务。
// 如果 <= 0,则初始映射大小为 0。
// 如果 initialMmapSize 小于先前的数据库大小,则不起作用。
InitialMmapSize int
}
// Open 创建并打开指定路径的数据库。
// 如果文件不存在,则将自动创建。
// 如果未提供选项(options为nil),则 Bolt 将使用默认选项打开数据库。
func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
// 创建一个数据库实例,并设置 opened 为 true,表示数据库已打开
var db = &DB{opened: true}
// 如果未提供选项,则使用默认选项
if options == nil {
options = DefaultOptions
}
db.NoGrowSync = options.NoGrowSync
db.MmapFlags = options.MmapFlags
// 设置后续数据库操作的默认值
db.MaxBatchSize = DefaultMaxBatchSize
db.MaxBatchDelay = DefaultMaxBatchDelay
db.AllocSize = DefaultAllocSize
// 根据只读选项设置文件打开标志
flag := os.O_RDWR
if options.ReadOnly {
flag = os.O_RDONLY
db.readOnly = true
}
// 打开数据文件并为元数据写入创建单独的同步处理程序
db.path = path
var err error
if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
_ = db.close()
return nil, err
}
// 锁定文件,以便使用 Bolt 的其他进程在读写模式下不能同时使用数据库
// 这样做是为了防止两个进程分别写入 meta 页面和 free 页面,导致数据库损坏
// 如果 !options.ReadOnly,则数据库文件以独占锁定(只有一个进程可以获得锁)
// 否则(options.ReadOnly 已设置),使用共享锁定(多个进程可以同时持有锁)
if err := flock(db, mode, !db.readOnly, options.Timeout); err != nil {
_ = db.close()
return nil, err
}
// 默认值用于测试钩子
db.ops.writeAt = db.file.WriteAt
// 如果数据库文件不存在,则进行初始化
if info, err := db.file.Stat(); err != nil {
return nil, err
} else if info.Size() == 0 {
// 初始化具有 meta 页面的新文件
if err := db.init(); err != nil {
return nil, err
}
} else {
// 读取第一个 meta 页面以确定页面大小
var buf [0x1000]byte
if _, err := db.file.ReadAt(buf[:], 0); err == nil {
m := db.pageInBuffer(buf[:], 0).meta()
if err := m.validate(); err != nil {
// 如果无法读取页面大小,我们可以假设它与 OS 相同
// 因为这是首次选择页面大小的方式
// 如果第一个页面无效且此 OS 使用与创建数据库时不同的页面大小,则无法访问数据库
db.pageSize = os.Getpagesize()
} else {
db.pageSize = int(m.pageSize)
}
}
}
// 初始化页面池
db.pagePool = sync.Pool{
New: func() interface{} {
return make([]byte, db.pageSize)
},
}
// 内存映射数据文件
if err := db.mmap(options.InitialMmapSize); err != nil {
_ = db.close()
return nil, err
}
// 读取空闲列表
db.freelist = newFreelist()
db.freelist.read(db.page(db.meta().freelist))
// 标记数据库已打开并返回
return db, nil
}
mmap
go// mmap 打开底层的内存映射文件并初始化 meta 引用。
// minsz 是新 mmap 可以拥有的最小大小。
func (db *DB) mmap(minsz int) error {
db.mmaplock.Lock()
defer db.mmaplock.Unlock()
info, err := db.file.Stat()
if err != nil {
return fmt.Errorf("mmap stat error: %s", err)
} else if int(info.Size()) < db.pageSize*2 {
return fmt.Errorf("file size too small")
}
// 确保大小至少是最小大小。
var size = int(info.Size())
if size < minsz {
size = minsz
}
size, err = db.mmapSize(size)
if err != nil {
return err
}
// 在继续之前解除所有 mmap 引用。
if db.rwtx != nil {
db.rwtx.root.dereference()
}
// 在继续之前取消映射现有数据。
if err := db.munmap(); err != nil {
return err
}
// 将数据文件作为字节切片进行内存映射。
if err := mmap(db, size); err != nil {
return err
}
// 保存对 meta 页面的引用。
db.meta0 = db.page(0).meta()
db.meta1 = db.page(1).meta()
// 验证 meta 页面。只有在两个 meta 页面都无法验证时才返回错误,
// 因为 meta0 无法验证意味着它没有被正确保存,但我们可以使用 meta1 进行恢复。反之亦然。
err0 := db.meta0.validate()
err1 := db.meta1.validate()
if err0 != nil && err1 != nil {
return err0
}
return nil
}
mmap
函数用于打开底层的内存映射文件,并初始化 meta
引用。minsz
是新 mmap 可以拥有的最小大小。meta
页面的引用,并验证这些页面的有效性。munmap
go// munmap 从内存中取消映射数据文件。
func (db *DB) munmap() error {
if err := munmap(db); err != nil {
return fmt.Errorf("unmap error: " + err.Error())
}
return nil
}
munmap
函数用于从内存中取消映射数据文件。munmap
函数并返回可能的错误信息。mmapSize
go// mmapSize 根据当前数据库的大小确定 mmap 的适当大小。
// 最小大小为32KB,每次翻倍直到达到1GB。
// 如果新的 mmap 大小大于允许的最大值,则返回错误。
func (db *DB) mmapSize(size int) (int, error) {
// 从32KB开始翻倍直到1GB。
for i := uint(15); i <= 30; i++ {
if size <= 1<<i {
return 1 << i, nil
}
}
// 验证请求的大小不超过允许的最大值。
if size > maxMapSize {
return 0, fmt.Errorf("mmap too large")
}
// 如果大于1GB,则每次增加1GB。
sz := int64(size)
if remainder := sz % int64(maxMmapStep); remainder > 0 {
sz += int64(maxMmapStep) - remainder
}
// 确保 mmap 大小是页面大小的倍数。
// 这应该总是正确的,因为我们以MB为单位递增。
pageSize := int64(db.pageSize)
if (sz % pageSize) != 0 {
sz = ((sz / pageSize) + 1) * pageSize
}
// 如果超过最大大小,则仅增长到最大大小。
if sz > maxMapSize {
sz = maxMapSize
}
return int(sz), nil
}
mmapSize
函数根据当前数据库的大小确定 mmap 的适当大小。mmap
go// mmap 将数据库的数据文件映射到内存中。
func mmap(db *DB, sz int) error {
// 将数据文件映射到内存。
b, err := syscall.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED|db.MmapFlags)
if err != nil {
return err
}
// 告知内核 mmap 是随机访问的。
if err := madvise(b, syscall.MADV_RANDOM); err != nil {
return fmt.Errorf("madvise: %s", err)
}
// 保存原始的字节切片并转换为字节数组指针。
db.dataref = b
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
db.datasz = sz
return nil
}
mmap
函数使用 syscall.Mmap
将数据库文件映射到内存中。db.file.Fd()
返回数据库文件的文件描述符。sz
字节。syscall.PROT_READ
表示映射区域可读。syscall.MAP_SHARED
表示对映射区域的写入会反映到文件中,并且此映射区域可能被其他映射它的进程看到。db.MmapFlags
包含其他 mmap 选项。madvise
用于告知内核,映射是随机访问的,这可以提高性能。b
,并将其转换为一个指向字节数组的指针 db.data
,以及记录了映射的大小 db.datasz
。munmap
go// munmap 从内存中取消映射数据文件。
func munmap(db *DB) error {
// 如果没有映射数据,则忽略取消映射。
if db.dataref == nil {
return nil
}
// 使用原始的字节切片取消映射。
err := syscall.Munmap(db.dataref)
db.dataref = nil
db.data = nil
db.datasz = 0
return err
}
munmap
函数用于从内存中取消映射数据文件。syscall.Munmap
函数取消映射,并清空相应的字段。madvise
go// madvise 通知内核有关于内存映射的信息。
// 由于该函数在 darwin 上不可用,因此从 stdlib 复制而来。
func madvise(b []byte, advice int) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice))
if e1 != 0 {
err = e1
}
return
}
madvise
函数通知内核有关于内存映射的信息。madvise
不可用,因此从标准库复制了该函数。flock
go// flock 在文件描述符上获取一个咨询性的锁。
func flock(db *DB, mode os.FileMode, exclusive bool, timeout time.Duration) error {
var t time.Time
for {
// 如果超出超时时间,则返回错误。
// 这只能在我们尝试过一次 flock 之后发生。
if t.IsZero() {
t = time.Now()
} else if timeout > 0 && time.Since(t) > timeout {
return ErrTimeout
}
flag := syscall.LOCK_SH
if exclusive {
flag = syscall.LOCK_EX
}
// 否则尝试获取独占锁。
err := syscall.Flock(int(db.file.Fd()), flag|syscall.LOCK_NB)
if err == nil {
return nil
} else if err != syscall.EWOULDBLOCK {
return err
}
// 等待一会儿然后再次尝试。
time.Sleep(50 * time.Millisecond)
}
}
flock
函数在文件描述符上获取一个咨询性的锁。mode
表示锁的模式,exclusive
为 true
表示独占锁,为 false
表示共享锁。timeout
表示等待锁的最长时间,如果超过此时间还未获取到锁,则返回超时错误。syscall.Flock
尝试获取锁,如果成功则返回 nil
。syscall.EWOULDBLOCK
表示当前无法获得锁,等待一段时间后再次尝试。funlock
go// funlock 释放文件描述符上的咨询性锁。
func funlock(db *DB) error {
return syscall.Flock(int(db.file.Fd()), syscall.LOCK_UN)
}
funlock
函数释放文件描述符上的咨询性锁,使用 syscall.Flock
并指定 syscall.LOCK_UN
表示释放锁。go// Update 在读写托管事务的上下文中执行一个函数。
// 如果函数没有返回错误,则提交事务。
// 如果函数返回错误,则整个事务将被回滚。
// Update() 方法返回函数中的错误或提交时的错误。
//
// 在函数中尝试手动提交或回滚将引发 panic。
func (db *DB) Update(fn func(*Tx) error) error {
// 开始一个读写事务。
t, err := db.Begin(true)
if err != nil {
return err
}
// 确保在发生 panic 时事务回滚。
defer func() {
if t.db != nil {
t.rollback()
}
}()
// 将事务标记为托管,以便内部函数不能手动提交。
t.managed = true
// 如果函数返回错误,则回滚并返回错误。
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
// 提交事务。
return t.Commit()
}
// View 在托管的只读事务上下文中执行一个函数。
// 从函数返回的任何错误都将从 View() 方法中返回。
//
// 在函数中尝试手动回滚将引发 panic。
func (db *DB) View(fn func(*Tx) error) error {
// 开始一个只读事务。
t, err := db.Begin(false)
if err != nil {
return err
}
// 确保在发生 panic 时事务回滚。
defer func() {
if t.db != nil {
t.rollback()
}
}()
// 将事务标记为托管,以便内部函数不能手动回滚。
t.managed = true
// 如果函数返回错误,则传递该错误。
err = fn(t)
t.managed = false
if err != nil {
_ = t.Rollback()
return err
}
// 手动回滚事务,因为只读事务不需要提交。
if err := t.Rollback(); err != nil {
return err
}
return nil
}
go// meta 获取当前的 meta 页引用。
func (db *DB) meta() *meta {
// 我们必须返回具有最高 txid 且未失败验证的 meta 页引用。
// 否则,在实际上数据库处于一致状态时,可能会导致错误。
// metaA 是 txid 较高的 meta 页。
metaA := db.meta0
metaB := db.meta1
if db.meta1.txid > db.meta0.txid {
metaA = db.meta1
metaB = db.meta0
}
// 如果 metaA 有效,则使用较高的 meta 页。否则,如果 metaB 有效,则回退到上一个。
if err := metaA.validate(); err == nil {
return metaA
} else if err := metaB.validate(); err == nil {
return metaB
}
// 不应该到达这里,因为 meta1 和 meta0 在 mmap() 时已经验证过,并且我们在每次写操作时都执行 fsync()。
panic("bolt.DB.meta(): invalid meta pages")
}
本文作者:yowayimono
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!