MYDB项目结构

整体结构

MYDB 分为后端和前端,前后端通过 socket 进行交互。
前端(客户端)的职责很单一,读取用户输入,并发送到后端执行,输出返回结果,并等待下一次输入。
MYDB 后端则需要解析 SQL,如果是合法的 SQL,就尝试执行并返回结果。不包括解析器,MYDB 的后端划分为五个模块,每个模块都又一定的职责,通过接口向其依赖的模块提供方法。五个模块如下:

  1. Transaction Manager (TM)
  2. Data Manager (DM)
  3. Version Manager (VM)
  4. Index Manager (IM)
  5. Table Manager (TBM)

这是五个模块的依赖关系图:

20251018151518

每个模块的职责如下:

  1. TM 通过维护 XID 文件来维护事务的状态,并提供接口供其他模块来查询某个事务的状态。
  2. DM 直接管理数据库 DB 文件和日志文件。DM 的主要职责有:1) 分页管理 DB 文件,并进行缓存;2) 管理日志文件,保证在发生错误时可以根据日志进行恢复;3) 抽象 DB 文件为 DataItem 供上层模块使用,并提供缓存。
  3. VM 基于两段锁协议实现了调度序列的可串行化,并实现了 MVCC 以消除读写阻塞。同时实现了两种隔离级别。
  4. IM 实现了基于 B+ 树的索引,BTW,目前 where 只支持已索引字段。
  5. TBM 实现了对字段和表的管理。同时,解析 SQL 语句,并根据语句操作表。

TM(Transaction Manager) 事务管理器

总结

TM模块主要用于管理事务,包括开始、提交、回滚事务,以及检查事务的状态。在类中需要定义一些常量来管理事务如LEN_XID_HEADER_LENGTH、XID_FIELD_SIZE、FIELD_TRAN_ACTIVE、FIELD_TRAN_COMMITTED、FIELD_TRAN_ABORTED、SUPER_XID和XID_SUFFIX,分别表示XID文件头长度、每个事务的占用长度、事务的三种状态、超级事务、XID文件后缀。

还需定义一个RandomAccessFile 类型的file和一个FileChannel类型的fc,用于操作XID文件。还有一个xidCounter用于记录事务的数量,以及一个Lock类用于保证线程安全。然后会在构造函数中给file和fc赋值,然后调用checkXIDCounter方法检查XID文件是否合法。

begin方法用于开始一个新的事务,commit方法用于提交事务,abort方法用于回滚事务。这三个方法内部都会调用updateXID方法,将事务ID和事务状态写入到XID文件中。begin还会调用另外一个incrXIDCounter方法,用于将XID +1并更新XID Header。

isActive、isCommittedisAborted方法用于检查事务是否处于活动、已提交或已回滚状态。这三个方法内部都会调用checkXID方法,检查XID文件中的事务状态是否与给定的状态相等;close方法用于关闭文件通道和文件。

XID文件

  1. XID 的定义和规则:
    • 每个事务都有一个唯一的事务标识符 XID,从 1 开始递增,并且 XID 0 被特殊定义为超级事务(Super Transaction)。
    • XID 0 用于表示在没有申请事务的情况下进行的操作,其状态永远是 committed。
  2. 事务的状态:
    • 每个事务可以处于三种状态之一:active(正在进行,尚未结束)、committed(已提交)和aborted(已撤销或回滚)。
  3. XID 文件的结构和管理:
    • TransactionManager 负责维护一个 XID 格式的文件,用于记录各个事务的状态。
    • XID 文件中为每个事务分配了一个字节的空间,用来保存其状态。
    • XID 文件的头部保存了一个 8 字节的数字,记录了这个 XID 文件管理的事务的个数。
    • 因此,事务 XID 在文件中的状态存储在 (XID-1)+8 字节的位置处,其中 XID-1 是因为 XID 0(超级事务)的状态不需要记录。

TM接口

TransactionManager中提供了一些接口供其他模块调用,用来创建事务和查询事务的状态;

1
2
3
4
5
6
7
8
9
public interface TransactionManager {
long begin(); //开启事务
void commit(long xid); //提交事务
void abort(long xid); //撤销或回滚事务
boolean isActive(long xid); //查询一个事务的状态是否正在运行
boolean isCommitted(long xid); //查询一个事务的状态是否已经提交
boolean isAborted(long xid); //查询一个事务的状态是否撤销或回滚
void close(); //关闭事务
}

如何实现TM的?

定义常量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class TransactionManagerImpl implements TransactionManager {

// XID文件头长度
static final int LEN_XID_HEADER_LENGTH = 8;
// 每个事务的占用长度
private static final int XID_FIELD_SIZE = 1;

// 事务的三种状态
private static final byte FIELD_TRAN_ACTIVE = 0;//正在进行,尚未结束
private static final byte FIELD_TRAN_COMMITTED = 1;//已提交
private static final byte FIELD_TRAN_ABORTED = 2;//已撤销(回滚)

// 超级事务,永远为commited状态
public static final long SUPER_XID = 0;

// XID文件后缀
static final String XID_SUFFIX = ".xid";

}

checkXIDCounter

在构造函数创建了一个 TransactionManager 之后,首先要对 XID 文件进行校验,以保证这是一个合法的 XID 文件。校验的方式也很简单,通过文件头的 8 字节数字反推文件的理论长度,与文件的实际长度做对比。如果不同则认为 XID 文件不合法。对于校验没有通过的,会直接通过 panic 方法,强制停机。在一些基础模块中出现错误都会如此处理,无法恢复的错误只能直接停机。

20251018154211

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 检查XID文件是否合法
* 读取XID_FILE_HEADER中的xidcounter,根据它计算文件的理论长度,对比实际长度
*/
private void checkXIDCounter() {
// 初始化文件长度为0
long fileLen = 0;
try {
// 获取文件的长度,RandomAccessFile在构造函数中赋值
fileLen = file.length();
} catch (IOException e1) {
// 如果出现异常,抛出BadXIDFileException错误
Panic.panic(Error.BadXIDFileException);
}

// 如果文件长度小于XID头部长度,抛出BadXIDFileException错误
if (fileLen < LEN_XID_HEADER_LENGTH) {
Panic.panic(Error.BadXIDFileException);
}

// 分配一个长度为XID头部长度的ByteBuffer
ByteBuffer buf = ByteBuffer.allocate(LEN_XID_HEADER_LENGTH);
try {
// 将文件通道的位置设置为0
fc.position(0);
// 从文件通道读取数据到ByteBuffer
fc.read(buf);
} catch (IOException e) {
// 如果出现异常,抛出错误
Panic.panic(e);
}
// 将ByteBuffer的内容解析为长整型,作为xidCounter
this.xidCounter = Parser.parseLong(buf.array());
// 计算xidCounter+1对应的XID位置
long end = getXidPosition(this.xidCounter + 1);
// 如果计算出的XID位置与文件长度不符,抛出BadXIDFileException错误
if (end != fileLen) {
Panic.panic(Error.BadXIDFileException);
}
}

/**
* 将数组前八位转换成长整数
* @param buf 需要转换的字节数组
* @return 转换后的数据
*/
public static long parseLong(byte[] buf) {
ByteBuffer buffer = ByteBuffer.wrap(buf, 0, 8);
return buffer.getLong();
}

@Test
public void testBufferGetLong(){

// 创建一个包含8个字节的字节数组
//因为long 在Java中占用8个字节,每个字节占用8位,一下数组可以转换成一个long数字
// 00000000 00000000 00000000 00000000 00000000 00000000 00001010 00000001
// 1010 00000001 --> 2561
byte[] byteArray = new byte[]{0, 0, 0, 0, 0, 0, 10, 1};
// 使用ByteBuffer.wrap方法将字节数组包装为一个ByteBuffer对象
ByteBuffer buffer = ByteBuffer.wrap(byteArray);

// 使用getLong方法从ByteBuffer中读取一个长整型数
long longValue = buffer.getLong();

// 输出读取的长整型数
System.out.println("The long value is: " + longValue);
}

getXidPosition

根据事务xid取得其在xid文件中对应的位置

1
2
3
4
// 根据事务xid取得其在xid文件中对应的位置
private long getXidPosition(long xid) {
return LEN_XID_HEADER_LENGTH + (xid - 1) * XID_FIELD_SIZE;
}

begin()

**begin()** 方法会开始一个事务,更具体的,首先设置 xidCounter+1 事务的状态为 active,随后 xidCounter 自增,并更新文件头。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 开始一个事务,并返回XID
public long begin() {
// 锁定计数器,防止并发问题
counterLock.lock();
try {
// xidCounter是当前事务的计数器,每开始一个新的事务,就将其加1
long xid = xidCounter + 1;
// 调用updateXID方法,将新的事务ID和事务状态(这里是活动状态)写入到XID文件中
updateXID(xid, FIELD_TRAN_ACTIVE);
// 调用incrXIDCounter方法,将事务计数器加1,并更新XID文件的头部信息
incrXIDCounter();
// 返回新的事务ID
return xid;
} finally {
// 释放锁
counterLock.unlock();
}
}

updateXid

更新事务ID状态,commit()abort()方法就可以直接借助 updateXID() 方法实现。

20251018154622

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 更新xid事务的状态为status
private void updateXID(long xid, byte status) {
// 获取事务xid在xid文件中对应的位置
long offset = getXidPosition(xid);
// 创建一个长度为XID_FIELD_SIZE的字节数组
byte[] tmp = new byte[XID_FIELD_SIZE];
// 将事务状态设置为status
tmp[0] = status;
// 使用字节数组创建一个ByteBuffer
ByteBuffer buf = ByteBuffer.wrap(tmp);
try {
// 将文件通道的位置设置为offset
fc.position(offset);
// 将ByteBuffer中的数据写入到文件通道
fc.write(buf);
} catch (IOException e) {
// 如果出现异常,调用Panic.panic方法处理
Panic.panic(e);
}
try {
// 强制将文件通道中的所有未写入的数据写入到磁盘
fc.force(false);
} catch (IOException e) {
// 如果出现异常,调用Panic.panic方法处理
Panic.panic(e);
}
}

incrXIDCounter

20251018155117

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 将XID加一,并更新XID Header
private void incrXIDCounter() {
// 事务总数加一
xidCounter++;
// 将新的事务总数转换为字节数组,并用ByteBuffer包装
ByteBuffer buf = ByteBuffer.wrap(Parser.long2Byte(xidCounter));
try {
// 将文件通道的位置设置为0,即文件的开始位置
fc.position(0);
// 将ByteBuffer中的数据写入到文件通道,即更新了XID文件的头部信息
fc.write(buf);
} catch (IOException e) {
// 如果出现异常,调用Panic.panic方法处理
Panic.panic(e);
}
try {
// 强制将文件通道中的所有未写入的数据写入到磁盘
fc.force(false);
} catch (IOException e) {
// 如果出现异常,调用Panic.panic方法处理
Panic.panic(e);
}
}


/**
* 将长整型值写入到字节缓冲区,将其转成为8字节的二进制形式,然后将这个8个字节写入到字节缓冲区
* @param value
* @return
*/
public static byte[] long2Byte(long value) {
return ByteBuffer.allocate(Long.SIZE / Byte.SIZE).putLong(value).array();
}

checkXID

**isActive()、isCommitted() ****isAborted()** 都是检查一个 xid 的状态

20251018155327

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 定义一个方法,接收一个事务ID(xid)和一个状态(status)作为参数
private boolean checkXID(long xid, byte status) {
// 计算事务ID在XID文件中的位置
long offset = getXidPosition(xid);
// 创建一个新的字节缓冲区(ByteBuffer),长度为XID_FIELD_SIZE
ByteBuffer buf = ByteBuffer.wrap(new byte[XID_FIELD_SIZE]);
try {
// 将文件通道的位置设置为offset
fc.position(offset);
// 从文件通道读取数据到字节缓冲区
fc.read(buf);
} catch (IOException e) {
// 如果出现异常,调用Panic.panic方法处理
Panic.panic(e);
}
// 检查字节缓冲区的第一个字节是否等于给定的状态
// 如果等于,返回true,否则返回false
return buf.array()[0] == status;
}

Data Manager (DM) 数据管理器

引用计数缓存框架

原文
WHY NOT LRU?
由于分页管理和数据项(DataItem)管理都涉及缓存,这里设计一个更通用的缓存框架。
看到这里,估计你们也开始犯嘀咕了,为啥使用引用计数策略,而不使用 “极为先进的” LRU 策略呢?
这里首先从缓存的接口设计说起,如果使用 LRU 缓存,那么只需要设计一个 get(key) 接口即可,释放缓存可以在缓存满了之后自动完成。设想这样一个场景:某个时刻缓存满了,缓存驱逐了一个资源,这时上层模块想要将某个资源强制刷回数据源,这个资源恰好是刚刚被驱逐的资源。那么上层模块就发现,这个数据在缓存里消失了,这时候就陷入了一种尴尬的境地:是否有必要做回源操作?

  1. 不回源。由于没法确定缓存被驱逐的时间,更没法确定被驱逐之后数据项是否被修改,这样是极其不安全的
  2. 回源。如果数据项被驱逐时的数据和现在又是相同的,那就是一次无效回源
  3. 放回缓存里,等下次被驱逐时回源。看起来解决了问题,但是此时缓存已经满了,这意味着你还需要驱逐一个资源才能放进去。这有可能会导致缓存抖动问题

当然我们可以记录下资源的最后修改时间,并且让缓存记录下资源被驱逐的时间。但是……

如无必要,无增实体。 —— 奥卡姆剃刀
问题的根源还是,LRU 策略中,资源驱逐不可控,上层模块无法感知。而引用计数策略正好解决了这个问题,只有上层模块主动释放引用,缓存在确保没有模块在使用这个资源了,才会去驱逐资源。
这就是引用计数法了。增加了一个方法 release(key),用于在上册模块不使用某个资源时,释放对资源的引用。当引用归零时,缓存就会驱逐这个资源。
同样,在缓存满了之后,引用计数法无法自动释放缓存,此时应该直接报错(和 JVM 似的,直接 OOM)

引用计数缓存框架是一种通用的缓存策略,与LRU(最近最少使用)相比,它采用了不同的资源管理方式。在引用计数缓存框架中,缓存的释放是由上层模块主动调用释放方法来触发的,而不是被动地由缓存管理器自动驱逐。当某个资源不再被上层模块引用时,通过调用释放方法来释放对该资源的引用。只有当资源的引用计数归零时,缓存才会驱逐该资源。这种方式可以确保缓存中的资源只有在确实不再被使用时才会被释放,避免了不必要的资源驱逐和回源操作

回溯
在数据库中,回源操作通常指的是从磁盘或者其他持久化存储介质中重新加载数据到内存中。这通常发生在数据库系统需要访问的数据不在内存中时。由于内存访问速度远高于磁盘访问速度,数据库系统会尽量将数据保留在内存中以提高访问速度。当需要访问的数据不在内存中时,数据库系统就需要从磁盘中加载数据,这个过程就称为回源操作。
回源操作的性能开销相对较高,因为它涉及到磁盘I/O操作,而磁盘I/O操作通常比内存访问速度慢得多。因此,数据库系统通常会采取各种策略来尽量减少回源操作的次数,例如通过缓存机制、预读取等方式来提高数据在内存中的命中率,以降低对磁盘的访问需求。

如何实现引用计数缓存

AbstractCache<T>

**common**包中定义了一个AbstractCache<T>抽象类,以及两个抽象方法,留给实现类去实现具体的操作;

1
2
3
4
5
6
7
8
/**
* 当资源不在缓存时的获取行为
*/
protected abstract T getForCache(long key) throws Exception;
/**
* 当资源被驱逐时的写回行为
*/
protected abstract void releaseForCache(T obj);
引用计数

除了普通的缓存功能之外,还需要维护另外一个计数。除此之外,为了应付多线程的场景,还需要记录哪些资源从数据源获取中。

  1. private HashMap<Long, T> cache;:这是一个 HashMap 对象,用于存储实际缓存的数据。键是资源的唯一标识符(通常是资源的ID或哈希值),值是缓存的资源对象(类型为 T)。在这个缓存框架中,cache 承担了普通缓存功能,即存储实际的资源数据。
  2. private HashMap<Long, Integer> references;:这是另一个 HashMap 对象,用于记录每个资源的引用个数。键是资源的唯一标识符,值是一个整数,表示该资源当前的引用计数。引用计数表示有多少个模块或线程正在使用特定的资源。通过跟踪引用计数,可以确定何时可以安全地释放资源。
  3. private HashMap<Long, Boolean> getting;:这是第三个 HashMap 对象,用于记录哪些资源当前正在从数据源获取中。键是资源的唯一标识符,值是一个布尔值,表示该资源是否正在被获取中。在多线程环境下,当某个线程尝试从数据源获取资源时,需要标记该资源正在被获取,以避免其他线程重复获取相同的资源。这个 getting 映射用于处理多线程场景下的并发访问问题。
    1
    2
    3
    private HashMap<Long, T> cache;                     // 实际缓存的数据
    private HashMap<Long, Integer> references; // 元素的引用个数
    private HashMap<Long, Boolean> getting; // 正在获取某资源的线程
get()

get()中获取资源,以下流程图不规范,理解大概意思即可

20251018162437

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//从缓存中获取资源
protected T get(long key) throws Exception {
// 循环直到获取资源
while (true) {
// 获取锁
lock.lock();
if (getting.containsKey(key)) {
// 如果其他线程正在获取这个资源,那么当前线程将等待一毫秒然后继续循环
lock.unlock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
continue;
}
continue;
}

if (cache.containsKey(key)) {
// 如果资源已经在缓存中,直接返回资源,并增加引用计数
T obj = cache.get(key);
references.put(key, references.get(key) + 1);
lock.unlock();
return obj;
}

// 如果资源不在缓存中,尝试获取资源。如果缓存已满,抛出异常
if (maxResource > 0 && count == maxResource) {
lock.unlock();
throw Error.CacheFullException;
}
count++;
getting.put(key, true);
lock.unlock();
break;
}

// 尝试获取资源
T obj = null;
try {
obj = getForCache(key);
} catch (Exception e) {
lock.lock();
count--;
getting.remove(key);
lock.unlock();
throw e;
}

// 将获取到的资源添加到缓存中,并设置引用计数为1
lock.lock();
getting.remove(key);
cache.put(key, obj);
references.put(key, 1);
lock.unlock();

return obj;
}
release()

释放一个缓存
20251018163454

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 强行释放一个缓存
*/
protected void release(long key) {
lock.lock(); // 获取锁
try {
int ref = references.get(key) - 1; // 获取资源的引用计数并减一
if (ref == 0) { // 如果引用计数为0
T obj = cache.get(key); // 从缓存中获取资源
releaseForCache(obj); // 处理资源的释放
references.remove(key); // 从引用计数的映射中移除资源
cache.remove(key); // 从缓存中移除资源
count--; // 将缓存中的资源计数减一
} else { // 如果引用计数不为0
references.put(key, ref); // 更新资源的引用计数
}
} finally {
lock.unlock(); // 释放锁
}
}
close()

关闭缓存,释放所有缓存信息
20251018163545

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 关闭缓存,写回所有资源
*/
protected void close() {
lock.lock();
try {
//获取所有资源key
Set<Long> keys = cache.keySet();
for (long key : keys) {
//获取缓存
T obj = cache.get(key);
//释放缓存
releaseForCache(obj);
//引用计数移除缓存
references.remove(key);
//实际缓存移除缓存
cache.remove(key);
}
} finally {
//释放锁
lock.unlock();
}
}

共享内存数组

原文

这里得提一个 Java 很蛋疼的地方。
Java 中,将数组看作一个对象,在内存中,也是以对象的形式存储的。而 c、cpp 和 go 之类的语言,数组是用指针来实现的。这就是为什么有一种说法:
只有 Java 有真正的数组
但这对这个项目似乎不是一个好消息。譬如 golang,可以执行下面语句:

1
2
var array1 [10]int64
array2 := array1[5:]

Copy
这种情况下,array2 和 array1 的第五个元素到最后一个元素,是共用同一片内存的,即使这两个数组的长度不同。
这在 Java 中是无法实现的(什么是高级语言啊~)。
在 Java 中,当你执行类似 subArray 的操作时,只会在底层进行一个复制,无法同一片内存。
于是,我写了一个 SubArray 类,来(松散地)规定这个数组的可使用范围:

1
2
3
4
5
6
7
8
9
10
11
12
public class SubArray {
public byte[] raw;
public int start;
public int end;

public SubArray(byte[] raw, int start, int end) {
this.raw = raw;
this.start = start;
this.end = end;
}
}

演示

因为这个数组没啥讲的,通过案例进行演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Test
public void testSubArray(){
//创建一个1~10的数组
byte[] subArray = new byte[10];
for (int i = 0; i < subArray.length; i++) {
subArray[i] = (byte) (i+1);
}
//创建两个SubArray
SubArray sub1 = new SubArray(subArray,3,7);
SubArray sub2 = new SubArray(subArray,6,9);

//修改共享数组数据
sub1.raw[4] = (byte)44;

//打印原始数组
System.out.println("Original Array: ");
printArray(subArray);

//打印共享数组
System.out.println("SubArray1: ");
printSubArray(sub1);
System.out.println("SubArray2: ");
printSubArray(sub2);
}

private void printArray(byte[] array){
System.out.println(Arrays.toString(array));
}

private void printSubArray(SubArray subArray){
for (int i = subArray.start; i <= subArray.end; i++) {
System.out.print(subArray.raw[i] + "\t");
}
System.out.println();
}


-------------------------演示结果----------------------------
Original Array:
[1, 2, 3, 4, 44, 6, 7, 8, 9, 10]
SubArray1:
4 44 6 7 8
SubArray2:
7 8 9 10

数据页的缓存和管理

本节主要内容就是 DM 模块向下对文件系统的抽象部分。DM 将文件系统抽象成页面,每次对文件系统的读写都是以页面为单位的。同样,从文件系统读进来的数据也是以页面为单位进行缓存的。

这里参考大部分数据库的设计,将默认数据页大小定为 8K。如果想要提升向数据库写入大量数据情况下的性能的话,也可以适当增大这个值。

上一节我们已经实现了一个通用的缓存框架,那么这一节我们需要缓存页面,就可以直接借用那个缓存的框架了。但是首先,需要定义出页面的结构。注意这个页面是存储在内存中的,与已经持久化到磁盘的抽象页面有区别。

数据库中实现页面缓存的相关设计和实现。

  1. 页面结构定义
    • 页面(Page)是存储在内存中的数据单元,其结构包括:
      • pageNumber:页面的页号,从1开始计数。
      • data:实际包含的字节数据。
      • dirty:标志着页面是否是脏页面,在缓存驱逐时,脏页面需要被写回磁盘。
      • lock:用于页面的锁。
      • PageCache:保存了一个 PageCache 的引用,方便在拿到 Page 的引用时可以快速对页面的缓存进行释放操作。
1
2
3
4
5
6
7
public class PageImpl implements Page {
private int pageNumber;
private byte[] data;
private boolean dirty;
private Lock lock;
private PageCache pc;
}
  1. 页面缓存接口定义

    • 定义了页面缓存的接口,包括新建页面、获取页面、释放页面、关闭缓存、根据最大页号截断缓存、获取当前页面数量以及刷新页面等方法。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      public interface PageCache {
      int newPage(byte[] initData); // 创建新页面并返回页号
      Page getPage(int pgno) throws Exception; // 根据页号获取页面
      void close(); // 关闭缓存并释放资源
      void release(Page page); // 释放页面资源

      void truncateByBgno(int maxPgno); // 截断超过指定页号的页面
      int getPageNumber(); // 获取当前最大页号
      void flushPage(Page pg); // 强制将页面写入持久存储
      }
  2. 页面缓存实现

    • 页面缓存的具体实现类需要继承抽象缓存框架,并实现具体的 getForCache() 和 releaseForCache() 方法。
    • getForCache() 方法用于从文件中读取页面数据,并将其包裹成 Page 返回。
    • releaseForCache() 方法用于在驱逐页面时决定是否将脏页面写回到文件系统。
  3. 页面写回文件系统

    • 页面缓存在驱逐页面时,根据页面是否是脏页面决定是否将其写回到文件系统。
    • 写回操作使用文件锁来保证写入的原子性和线程安全性。
  4. 新建页面

    • 新建页面时,页面缓存会自增页面数量,并在写入文件系统后返回新建页面的页号。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      //PageCache 还使用了一个 AtomicInteger,来记录了当前打开的数据库文件有多少页。
      //这个数字在数据库文件被打开时就会被计算,并在新建页面时自增。

      public int newPage(byte[] initData) {
      int pgno = pageNumbers.incrementAndGet();
      Page pg = new PageImpl(pgno, initData, null);
      flush(pg); // 新建的页面需要立刻写回
      return pgno;
      }
  5. 限制条件

    • 数据库中不允许同一条数据跨页存储,即单条数据的大小不能超过数据库页面的大小。

getForCache()

获取数据页的页面缓存,并将其包裹成Page
20251018164703

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 根据pageNumber从数据库文件中读取页数据,并包裹成Page
*/
@Override
protected Page getForCache(long key) throws Exception {
// 将key转换为页码
int pgno = (int) key;
// 计算页码对应的偏移量
long offset = PageCacheImpl.pageOffset(pgno);

// 分配一个大小为PAGE_SIZE的ByteBuffer
ByteBuffer buf = ByteBuffer.allocate(PAGE_SIZE);
// 锁定文件,确保线程安全
fileLock.lock();
try {
// 设置文件通道的位置为计算出的偏移量
fc.position(offset);
// 从文件通道读取数据到ByteBuffer
fc.read(buf);
} catch (IOException e) {
// 如果发生异常,调用Panic.panic方法处理
Panic.panic(e);
}
// 无论是否发生异常,都要解锁
fileLock.unlock();
// 使用读取到的数据、页码和当前对象创建一个新的PageImpl对象并返回
return new PageImpl(pgno, buf.array(), this);
}

public PageImpl(int pageNumber, byte[] data, PageCache pc) {
this.pageNumber = pageNumber; // 设置页面的页号
this.data = data; // 设置页面实际包含的字节数据
this.pc = pc; // 设置页面缓存
lock = new ReentrantLock(); // 初始化一个新的可重入锁
}

releaseForCache()

当一个Page对象(页面)不再需要在缓存中保留时,就会调用这个方法。如果这个页面被标记为”dirty”(即,这个页面的内容已经被修改,但还没有写回到磁盘),那么这个方法就会调用flush方法,将这个页面的内容写回到磁盘。

20251018164739

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
protected void releaseForCache(Page pg) {
if (pg.isDirty()) {
flush(pg);
pg.setDirty(false);
}
}

private void flush(Page pg) {
int pgno = pg.getPageNumber(); // 获取Page的页码
long offset = pageOffset(pgno); // 计算Page在文件中的偏移量

fileLock.lock(); // 加锁,确保线程安全
try {
ByteBuffer buf = ByteBuffer.wrap(pg.getData()); // 将Page的数据包装成ByteBuffer
fc.position(offset); // 设置文件通道的位置
fc.write(buf); // 将数据写入到文件中
fc.force(false); // 强制将数据从操作系统的缓存刷新到磁盘
} catch (IOException e) {
Panic.panic(e); // 如果发生异常,调用Panic.panic方法处理
} finally {
fileLock.unlock(); // 最后,无论是否发生异常,都要解锁
}
}

recoverInsert()recoverUpdate()

** recoverInsert() ****recoverUpdate()** 用于在数据库崩溃后重新打开时,恢复例程直接插入数据以及修改数据使用。

20251018164808

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 将raw插入pg中的offset位置,并将pg的offset设置为较大的offset
public static void recoverInsert(Page pg, byte[] raw, short offset) {
pg.setDirty(true); // 将pg的dirty标志设置为true,表示pg的数据已经被修改
System.arraycopy(raw, 0, pg.getData(), offset, raw.length); // 将raw的数据复制到pg的数据中的offset位置

short rawFSO = getFSO(pg.getData()); // 获取pg的当前空闲空间偏移量
if (rawFSO < offset + raw.length) { // 如果当前的空闲空间偏移量小于offset + raw.length
setFSO(pg.getData(), (short) (offset + raw.length)); // 将pg的空闲空间偏移量设置为offset + raw.length
}
}

// 将raw插入pg中的offset位置,不更新update
public static void recoverUpdate(Page pg, byte[] raw, short offset) {
pg.setDirty(true); // 将pg的dirty标志设置为true,表示pg的数据已经被修改
System.arraycopy(raw, 0, pg.getData(), offset, raw.length); // 将raw的数据复制到pg的数据中的offset位置
}

数据页的管理

第一页

数据库文件的第一个,用与做一些特殊用途,比如存储一些元数据,用于启动检查等。在MYDB 中的第一页,只是用来做启动检查。

  1. 每次数据库启动时,会生成一串随机字节,存储在 100~107 字节
  2. 在正常数据库关闭时,会将这串字节拷贝到第一页的 108~115字节
  3. 数据库每次启动时,都会检查第一页两处的字节是否相同;用来判断上次是否正常关闭,是否需要进行数据的恢复流程

启动初始化字节

1
2
3
4
5
6
7
8
9
10
// 设置"ValidCheck"为打开状态
public static void setVcOpen(Page pg) {
pg.setDirty(true);
setVcOpen(pg.getData());
}

private static void setVcOpen(byte[] raw) {
// 随机生成8字节的数据,并拷贝到第一页的 100~107 字节
System.arraycopy(RandomUtil.randomBytes(LEN_VC), 0, raw, OF_VC, LEN_VC);
}

关闭时拷贝字节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 设置"ValidCheck"为关闭状态
public static void setVcClose(Page pg) {
pg.setDirty(true);
setVcClose(pg.getData());
}

// 设置"ValidCheck"为关闭状态
private static void setVcClose(byte[] raw) {
// 将"ValidCheck"设置为关闭状态
// 通过复制raw数组中的一部分元素来实现
// 具体来说,就是将raw数组中从OF_VC开始的LEN_VC个元素复制到raw数组中从OF_VC+LEN_VC开始的位置
// 即 100~107 拷贝到 108~115
System.arraycopy(raw, OF_VC, raw, OF_VC + LEN_VC, LEN_VC);
}

校验字节

1
2
3
4
5
6
7
8
9
10
// 检查"ValidCheck"是否有效
public static boolean checkVc(Page pg) {
return checkVc(pg.getData());
}

// 检查"ValidCheck"是否有效
private static boolean checkVc(byte[] raw) {
// 比较 100~107 和 108~115 处字节是否相等
return Arrays.equals(Arrays.copyOfRange(raw, OF_VC, OF_VC + LEN_VC), Arrays.copyOfRange(raw, OF_VC + LEN_VC, OF_VC + 2 * LEN_VC));
}

普通页

一个普通页面是以 2字节无符号数起始,因为一个页面最大容量为 8k,而二字节的范围是 0到2^16-1,所以2字节作为初始完全足够表达这一页空闲位置的偏移量。
对于普通页的管理,基本上都是围绕着 **FSO(Free Space Offset)**进行管理的;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 设置空闲空间偏移量
private static void setFSO(byte[] raw, short ofData) {
// 将空闲空间偏移量的值复制到字节数组的指定位置
System.arraycopy(Parser.short2Byte(ofData), 0, raw, OF_FREE, OF_DATA);
}

// 获取pg的空闲空间偏移量
public static short getFSO(Page pg) {
return getFSO(pg.getData()); // 返回pg的数据的空闲空间偏移量
}

// 获取空闲空间偏移量
private static short getFSO(byte[] raw) {
return Parser.parseShort(Arrays.copyOfRange(raw, 0, 2)); // 返回字节数组的前两个字节表示的短整数值
}


// 获取页面的空闲空间大小
public static int getFreeSpace(Page pg) {
return PageCache.PAGE_SIZE - (int) getFSO(pg.getData()); // 返回页面的空闲空间大小
}

insert()

1
2
3
4
5
6
7
8
// 将raw插入pg中,返回插入位置
public static short insert(Page pg, byte[] raw) {
pg.setDirty(true); // 将pg的dirty标志设置为true,表示pg的数据已经被修改
short offset = getFSO(pg.getData()); // 获取pg的空闲空间偏移量
System.arraycopy(raw, 0, pg.getData(), offset, raw.length); // 将raw的数据复制到pg的数据中的offset位置
setFSO(pg.getData(), (short) (offset + raw.length)); // 更新pg的空闲空间偏移量
return offset; // 返回插入位置
}

日志文件

MYDB提供了崩溃后的数据恢复功能,DM层在每次对底层数据进行操作时,都会记录一条日志到磁盘上。在数据库崩溃之后,再次重启时,可以根据日志的内容,恢复数据文件,保证其一致性;

日志读写

日志的二进制文件,按照如下的格式进行排布:

1
[XChecksum][Log1][Log2][Log3]...[LogN][BadTail]

其中 **XChecksum** 是一个四字节的整数,是对后续所有日志计算的校验和。**Log1 ~ LogN** 是常规的日志数据,**BadTail** 是在数据库崩溃时,没有来得及写完的日志数据,这个 **BadTail** 不一定存在。
每条日志的格式如下:

1
2
3
4
[Size][Checksum][Data]
[0, 0, 0, 3] [3, -112, -4, 93] [97, 97, 97]
[0, 0, 0, 3] [14, 40, -23, -38] [98, 98, 98]
[0, 0, 0, 3] [24, -64, -41, 87] [99, 99, 99]

其中,**Size** 是一个四字节整数,标识了 **Data** 段的字节数。**Checksum** 则是该条日志的校验和。
单条文件的校验和
对每条日志进行校验和,就可以得到总文件的校验和了

1
2
3
4
5
6
private int calChecksum(int xCheck, byte[] log) {
for (byte b : log) {
xCheck = xCheck * SEED + b; //seed = 13331
}
return xCheck;
}

日志文件的创建及初始化
在日志文件创建时create()会初始化 [XChecksum] 的字节大小,默认为0;

1
2
3
4
5
6
7
8
9
// class: src/main/java/top/guoziyang/mydb/backend/dm/logger/Logger.java
ByteBuffer buf = ByteBuffer.wrap(Parser.int2Byte(0)); // 将0转换成四字节的数字
try {
fc.position(0);
fc.write(buf); //将其写入到文件
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}

日志文件创建完需要打开时,会调用open()方法,并读取日志文件的[XChecksum]以及去除BadTail

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void init() {
long size = 0;
try {
size = file.length(); //读取文件大小
} catch (IOException e) {
Panic.panic(e);
}
if (size < 4) { //若文件大小小于4,证明日志文件创建出现问题
Panic.panic(Error.BadLogFileException);
}

ByteBuffer raw = ByteBuffer.allocate(4); //创建一个容量为4的ByteBuffer
try {
fc.position(0);
fc.read(raw); //读取四字节大小的内容
} catch (IOException e) {
Panic.panic(e);
}
int xChecksum = Parser.parseInt(raw.array()); //将其转换成int整数
this.fileSize = size;
this.xChecksum = xChecksum; //赋值给当前对象

checkAndRemoveTail(); //检查是否需要去除BadTail
}

internNext()

画的不规范,大概意思知道即可
20251020184912

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 获取下一条日志
*/
private byte[] internNext() {
// 检查当前位置是否已经超过了文件的大小,如果超过了,说明没有更多的日志可以读取,返回 null
if (position + OF_DATA >= fileSize) {
return null;
}

// 创建一个大小为 4 的 ByteBuffer,用于读取日志的大小
ByteBuffer tmp = ByteBuffer.allocate(4);
try {
// 将文件通道的位置设置为当前位置
fc.position(position);
// 从文件通道中读取 4 个字节的数据到 ByteBuffer 中,即Size日志文件的大小
fc.read(tmp);
} catch (IOException e) {
// 如果发生 IO 异常,调用 Panic.panic 方法处理异常
Panic.panic(e);
}

// 使用 Parser.parseInt 方法将读取到的 4 个字节的数据转换为 int 类型,得到日志的大小
int size = Parser.parseInt(tmp.array());

// 检查当前位置加上日志的大小是否超过了文件的大小,如果超过了,说明日志不完整,返回 null
if (position + size + OF_DATA > fileSize) {
return null;
}

// 创建一个大小为 OF_DATA + size 的 ByteBuffer,用于读取完整的日志
ByteBuffer buf = ByteBuffer.allocate(OF_DATA + size);
try {
// 将文件通道的位置设置为当前位置
fc.position(position);
// 从文件通道中读取 OF_DATA + size 个字节的数据到 ByteBuffer 中
// 读取整条日志 [Size][Checksum][Data]
fc.read(buf);
} catch (IOException e) {
// 如果发生 IO 异常,调用 Panic.panic 方法处理异常
Panic.panic(e);
}

// 将 ByteBuffer 中的数据转换为字节数组
byte[] log = buf.array();

// 计算日志数据的校验和
int checkSum1 = calChecksum(0, Arrays.copyOfRange(log, OF_DATA, log.length));
// 从日志中读取校验和
int checkSum2 = Parser.parseInt(Arrays.copyOfRange(log, OF_CHECKSUM, OF_DATA));

// 比较计算得到的校验和和日志中的校验和,如果不相等,说明日志已经被破坏,返回 null
if (checkSum1 != checkSum2) {
return null;
}

// 更新当前位置
position += log.length;

// 返回读取到的日志
return log;
}

log()

向日志文件写入日志时,也是首先将数据包裹成日志格式,写入文件后,再更新文件的校验和,更新校验和时,会刷新缓冲区,保证内容写入磁盘。

20251020185849

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Override
public void log(byte[] data) {
// 解析成一条完整的log日志
byte[] log = wrapLog(data);
ByteBuffer buf = ByteBuffer.wrap(log);
lock.lock();
try {
//写入到指定位置
fc.position(fc.size());
fc.write(buf);
} catch (IOException e) {
Panic.panic(e);
} finally {
lock.unlock();
}
// 更新总校验值
updateXChecksum(log);
}

/**
* 更新总校验值
*/
private void updateXChecksum(byte[] log) {
//计算总校验值
this.xChecksum = calChecksum(this.xChecksum, log);
try {
fc.position(0);
fc.write(ByteBuffer.wrap(Parser.int2Byte(xChecksum)));
fc.force(false);
} catch (IOException e) {
Panic.panic(e);
}
}

/**
* 将数据解析成完整log
*/
private byte[] wrapLog(byte[] data) {
// 使用 calChecksum 方法计算数据的校验和,然后将校验和转换为字节数组
byte[] checksum = Parser.int2Byte(calChecksum(0, data));
// 将数据的长度转换为字节数组
byte[] size = Parser.int2Byte(data.length);
// 使用 Bytes.concat 方法将 size、checksum 和 data 连接成一个新的字节数组,然后返回这个字节数组
return Bytes.concat(size, checksum, data);
}

恢复策略

在MYDB中,有两条规则限制了数据库的操作,以便于恢复日志;

  1. 正在进行的事务,不会读取其他任何未提交的事务产生的数据
  2. 正在进行的事务,不会修改其他任何未提交的事务修改或产生的数据

根据上方的两条规则,MYDB日志的恢复也分为两种:

  1. 通过**redo log**重做所有崩溃时已经完成(**committed 或 aborted**)的事务
  2. 通过**undo log**撤销所有崩溃时未完成(**active****)的事务 **

redo:

  1. 正序扫描事务 T 的所有日志
  2. 如果日志是插入操作 (Ti, I, A, x),就将 x 重新插入 A 位置
  3. 如果日志是更新操作 (Ti, U, A, oldx, newx),就将 A 位置的值设置为 newx

undo:

  1. 倒序扫描事务 T 的所有日志
  2. 如果日志是插入操作 (Ti, I, A, x),就将 A 位置的数据删除
  3. 如果日志是更新操作 (Ti, U, A, oldx, newx),就将 A 位置的值设置为 oldx

    注:对于 redo log 和 undo log,可以学习该文章(图解MySQL-日志篇

日志格式
首先规定两种日志的格式类型:

1
2
3
4
5
6
7
8
9
10
public class Recover {
private static final byte LOG_TYPE_INSERT = 0;
private static final byte LOG_TYPE_UPDATE = 1;

// updateLog:
// [LogType] [XID] [UID] [OldRaw] [NewRaw]

// insertLog:
// [LogType] [XID] [Pgno] [Offset] [Raw]
}

重做所有已完成的事务 redo log
20251020191613

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static void redoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
// 重置日志文件的读取位置到开始
lg.rewind();
// 循环读取日志文件中的所有日志记录
while (true) {
// 读取下一条日志记录
byte[] log = lg.next();
// 如果读取到的日志记录为空,表示已经读取到日志文件的末尾,跳出循环
if (log == null) break;
// 判断日志记录的类型
if (isInsertLog(log)) {
// 如果是插入日志,解析日志记录,获取插入日志信息
InsertLogInfo li = parseInsertLog(log);
// 获取事务ID
long xid = li.xid;
// 如果当前事务已经提交,进行重做操作
if (!tm.isActive(xid)) {
doInsertLog(pc, log, REDO);
}
} else {
// 如果是更新日志,解析日志记录,获取更新日志信息
UpdateLogInfo xi = parseUpdateLog(log);
// 获取事务ID
long xid = xi.xid;
// 如果当前事务已经提交,进行重做操作
if (!tm.isActive(xid)) {
doUpdateLog(pc, log, REDO);
}
}
}
}

撤销所有未完成的事务 undolog
流程图只是简易表达了意思,详细请查看代码;
20251020192123

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private static void undoTranscations(TransactionManager tm, Logger lg, PageCache pc) {
// 创建一个用于存储日志的映射,键为事务ID,值为日志列表
Map<Long, List<byte[]>> logCache = new HashMap<>();
// 将日志文件的读取位置重置到开始
lg.rewind();
// 循环读取日志文件中的所有日志记录
while (true) {
// 读取下一条日志记录
byte[] log = lg.next();
// 如果读取到的日志记录为空,表示已经读取到日志文件的末尾,跳出循环
if (log == null) break;
// 判断日志记录的类型
if (isInsertLog(log)) {
// 如果是插入日志,解析日志记录,获取插入日志信息
InsertLogInfo li = parseInsertLog(log);
// 获取事务ID
long xid = li.xid;
// 如果当前事务仍然活跃,将日志记录添加到对应的日志列表中
if (tm.isActive(xid)) {
if (!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
logCache.get(xid).add(log);
}
} else {
// 如果是更新日志,解析日志记录,获取更新日志信息
UpdateLogInfo xi = parseUpdateLog(log);
// 获取事务ID
long xid = xi.xid;
// 如果当前事务仍然活跃,将日志记录添加到对应的日志列表中
if (tm.isActive(xid)) {
if (!logCache.containsKey(xid)) {
logCache.put(xid, new ArrayList<>());
}
// 将事务id对应的log添加到集合中
logCache.get(xid).add(log);
}
}
}

// 对所有活跃的事务的日志进行倒序撤销
for (Entry<Long, List<byte[]>> entry : logCache.entrySet()) {
List<byte[]> logs = entry.getValue();
for (int i = logs.size() - 1; i >= 0; i--) {
byte[] log = logs.get(i);
// 判断日志记录的类型
if (isInsertLog(log)) {
// 如果是插入日志,进行撤销插入操作
doInsertLog(pc, log, UNDO);
} else {
// 如果是更新日志,进行撤销更新操作
doUpdateLog(pc, log, UNDO);
}
}
// 中止当前事务
tm.abort(entry.getKey());
}
}

doInsertLog()

以上两种事务的insert操作都是通过此方法完成

20251020192948

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static void doInsertLog(PageCache pc, byte[] log, int flag) {
// 解析日志记录,获取插入日志信息
InsertLogInfo li = parseInsertLog(log);
Page pg = null;
try {
// 根据页码从页面缓存中获取页面,即AbstractCache.get()方法
pg = pc.getPage(li.pgno);
} catch (Exception e) {
// 如果发生异常,调用Panic.panic方法处理
Panic.panic(e);
}
try {
// 如果标志位为UNDO,将数据项设置为无效
if (flag == UNDO) {
DataItem.setDataItemRawInvalid(li.raw);
}
// 在指定的页面和偏移量处插入数据
PageX.recoverInsert(pg, li.raw, li.offset);
} finally {
// 无论是否发生异常,都要释放页面,即AbstractCache.release() 方法
pg.release();
}
}

doUpdateLog()

20251020193013

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static void doUpdateLog(PageCache pc, byte[] log, int flag) {
int pgno; // 用于存储页面编号
short offset; // 用于存储偏移量
byte[] raw; // 用于存储原始数据

// 根据标志位判断是进行重做操作还是撤销操作
if (flag == REDO) {
// 如果是重做操作,解析日志记录,获取更新日志信息,主要获取新数据
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.newRaw;
} else {
// 如果是撤销操作,解析日志记录,获取更新日志信息,主要获取旧数据
UpdateLogInfo xi = parseUpdateLog(log);
pgno = xi.pgno;
offset = xi.offset;
raw = xi.oldRaw;
}

Page pg = null; // 用于存储获取到的页面
try {
// 尝试从页面缓存中获取指定页码的页面
pg = pc.getPage(pgno);
} catch (Exception e) {
// 如果获取页面过程中发生异常,调用Panic.panic方法进行处理
Panic.panic(e);
}

try {
// 在指定的页面和偏移量处插入解析出的数据, 数据页缓存讲解了该方法
PageX.recoverUpdate(pg, raw, offset);
} finally {
// 无论是否发生异常,都要释放页面
pg.release();
}
}

页面索引

基本介绍
这个页面索引的设计用于提高在数据库中进行插入操作时的效率。它缓存了每一页的空闲空间信息,以便在进行插入操作时能够快速找到合适的页面,而无需遍历磁盘或者缓存中的所有页面。
具体来说,页面索引将每一页划分为一定数量的区间(这里是40个区间),并且在数据库启动时,会遍历所有页面,将每个页面的空闲空间信息分配到这些区间中。当需要进行插入操作时,插入操作首先会将所需的空间大小向上取整,然后映射到某一个区间,随后可以直接从该区间中选择任何一页,以满足插入需求。
**PageIndex** 的实现使用了一个数组,数组的每个元素都是一个列表,用于存储具有相同空闲空间大小的页面信息。
**PageIndex** 中获取页面的过程非常简单,只需要根据所需的空间大小计算出区间号,然后直接从对应的列表中取出一个页面即可。
被选择的页面会从 PageIndex 中移除,这意味着同一个页面不允许并发写入。在上层模块使用完页面后,需要将其重新插入 PageIndex,以便其他插入操作能够继续使用。
总的来说,页面索引的设计旨在提高数据库的插入操作效率,通过缓存页面的空闲空间信息,避免了频繁地访问磁盘或者缓存中的页面,从而加速了插入操作的执行。

注:以上内容来自原文跟GPT

PageIndex

1
2
3
4
5
6
7
8
9
public class PageIndex {
// 将一页划成40个区间,每个区间的大小为204字节
private final List<PageInfo>[] lists;
// 每个区间的大小为204字节
private static final int INTERVALS_NO = 40;
private static final int THRESHOLD = PageCache.PAGE_SIZE / INTERVALS_NO; //204

private Lock lock;
}

select(int spaceSize)

根据空闲空间的大小计算所处的编号位置,从PageIndex中获取页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 根据给定的空间大小选择一个 PageInfo 对象。
*
* @param spaceSize 需要的空间大小
* @return 一个 PageInfo 对象,其空闲空间大于或等于给定的空间大小。如果没有找到合适的 PageInfo,返回 null。
*/
public PageInfo select(int spaceSize) {
lock.lock(); // 获取锁,确保线程安全
try {
int number = spaceSize / THRESHOLD; // 计算需要的空间大小对应的区间编号
// 此处+1主要为了向上取整
/*
1、假需要存储的字节大小为5168,此时计算出来的区间号是25,但是25*204=5100显然是不满足条件的
2、此时向上取整找到 26,而26*204=5304,是满足插入条件的
*/
if (number < INTERVALS_NO) number++; // 如果计算出的区间编号小于总的区间数,编号加一
while (number <= INTERVALS_NO) { // 从计算出的区间编号开始,向上寻找合适的 PageInfo
if (lists[number].size() == 0) { // 如果当前区间没有 PageInfo,继续查找下一个区间
number++;
continue;
}
return lists[number].remove(0); // 如果当前区间有 PageInfo,返回第一个 PageInfo,并从列表中移除
}
return null; // 如果没有找到合适的 PageInfo,返回 null
} finally {
lock.unlock(); // 释放锁
}
}

add()

因为同一个页面是不允许并发写的,在上层模块使用完这个页面之后,需要重新将其插入到PaegIndex;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 根据给定的页面编号和空闲空间大小添加一个 PageInfo 对象。
* @param pgno 页面编号
* @param freeSpace 页面的空闲空间大小
*/
public void add(int pgno, int freeSpace) {
lock.lock(); // 获取锁,确保线程安全
try {
int number = freeSpace / THRESHOLD; // 计算空闲空间大小对应的区间编号
lists[number].add(new PageInfo(pgno, freeSpace)); // 在对应的区间列表中添加一个新的 PageInfo 对象
} finally {
lock.unlock(); // 释放锁
}
}

fillPageIndex()

DataManager 被创建时,需要获取所有页面并填充 PageIndex:

20251020194823

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 填充 PageIndex。
* 遍历从第二页开始的每一页,将每一页的页面编号和空闲空间大小添加到 PageIndex 中。
*/
void fillPageIndex() {
int pageNumber = pc.getPageNumber(); // 获取当前的页面数量
for (int i = 2; i <= pageNumber; i++) { // 从第二页开始,对每一页进行处理
Page pg = null;
try {
pg = pc.getPage(i); // 尝试获取页面
} catch (Exception e) {
Panic.panic(e); // 如果出现异常,处理异常
}
pIndex.add(pg.getPageNumber(), PageX.getFreeSpace(pg)); // 将页面编号和页面的空闲空间大小添加到 PageIndex 中
pg.release(); // 释放页面
}
}

DataItem

基本介绍

DataItem 是一个数据抽象层,它提供了一种在上层模块和底层数据存储之间进行交互的接口。其功能和作用主要包括:

  1. 数据存储和访问:DataItem 存储了数据的具体内容,以及一些相关的元数据信息,如数据的大小、有效标志等。上层模块可以通过 DataItem 对象获取到其中的数据内容,以进行读取、修改或删除等操作。
  2. 数据修改和事务管理:DataItem 提供了一些方法来支持数据的修改操作,并在修改操作前后执行一系列的流程,如保存原始数据、落日志等。这些流程保证了数据修改的原子性和一致性,同时支持事务管理,确保了数据的安全性。
  3. 数据共享和内存管理:DataItem 的数据内容通过 SubArray 对象返回给上层模块,这使得上层模块可以直接访问数据内容而无需进行拷贝。这种数据共享的方式提高了数据的访问效率,同时减少了内存的开销。
  4. 缓存管理:DataItem 对象由底层的 DataManager 缓存管理,通过调用 release() 方法可以释放缓存中的 DataItem 对象,以便回收内存资源,提高系统的性能和效率。

DataItem 提供了一种高层次的数据抽象,隐藏了底层数据存储的细节,为上层模块提供了方便的数据访问和管理接口,同时保证了数据的安全性和一致性。

具体实现

DataItem 中保存的数据,结构如下:

1
[ValidFlag] [DataSize] [Data]

其中 **ValidFlag** 占用 1 字节,标识了该 **DataItem** 是否有效。删除一个 **DataItem**,只需要简单地将其有效位设置为 0。**DataSize** 占用 2 字节,标识了后面 **Data** 的长度。

1
2
3
4
5
6
7
public class DataItemImpl implements DataItem {
private SubArray raw; //原始数据
private byte[] oldRaw; //旧的原始数据
private DataManagerImpl dm; //数据管理器
private long uid; //唯一标识符
private Page pg; //页面对象
}

data()

返回数据项中的数据部分,返回的是原始数据的引用,而不是数据的拷贝

1
2
3
4
5
@Override
public SubArray data() {
// 返回 [data] 部分
return new SubArray(raw.raw, raw.start+OF_DATA, raw.end);
}

before()

在修改数据项之前调用,用于锁定数据项并保存原始数据

1
2
3
4
5
6
7
@Override
public void before() {
wLock.lock();
pg.setDirty(true);
//保存原始数据的副本,以便在需要时进行回滚
System.arraycopy(raw.raw, raw.start, oldRaw, 0, oldRaw.length);
}

unbefore()

在需要撤销修改时调用,用于恢复原始数据并解锁数据项

1
2
3
4
5
@Override
public void unBefore() {
System.arraycopy(oldRaw, 0, raw.raw, raw.start, oldRaw.length);
wLock.unlock();
}

after()

在修改完成之后调用,用于记录日志并解锁数据项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public void after(long xid) {
dm.logDataItem(xid, this);
wLock.unlock();
}

/**
* 创建一个更新日志。
*
* @param xid 事务ID
* @param di DataItem对象
* @return 更新日志,包含日志类型、事务ID、DataItem的唯一标识符、旧原始数据和新原始数据
*/
public static byte[] updateLog(long xid, DataItem di) {
byte[] logType = {LOG_TYPE_UPDATE}; // 创建一个表示日志类型的字节数组,并设置其值为LOG_TYPE_UPDATE
byte[] xidRaw = Parser.long2Byte(xid); // 将事务ID转换为字节数组
byte[] uidRaw = Parser.long2Byte(di.getUid()); // 将DataItem对象的唯一标识符转换为字节数组
byte[] oldRaw = di.getOldRaw(); // 获取DataItem对象的旧原始数据
SubArray raw = di.getRaw(); // 获取DataItem对象的新原始数据
byte[] newRaw = Arrays.copyOfRange(raw.raw, raw.start, raw.end); // 将新原始数据转换为字节数组
return Bytes.concat(logType, xidRaw, uidRaw, oldRaw, newRaw); // 将所有字节数组连接在一起,形成一个完整的更新日志,并返回这个日志
}

release()

在使用完 **DataItem**后,需要调用 release() 释放调 DataItem的缓存

1
2
3
4
@Override
public void release() {
dm.releaseDataItem(this);
}

DM的实现

基本介绍

DataManager(DM)是数据库管理系统中的一层,主要负责底层数据的管理和操作。其功能和作用包括:

  1. 数据缓存和管理:DataManager 实现了对 DataItem 对象的缓存管理,通过缓存管理,可以提高数据的访问效率,并减少对底层存储的频繁访问,从而提高系统的性能。
  2. 数据访问和操作:DataManager 提供了读取、插入和修改等数据操作方法,上层模块可以通过这些方法对数据库中的数据进行操作和管理。
  3. 事务管理:DataManager 支持事务的管理,通过事务管理,可以保证对数据的修改是原子性的,并且在事务提交或回滚时能够保持数据的一致性和完整性。
  4. 日志记录和恢复:DataManager 在数据修改操作前后会执行一系列的流程,包括日志记录和数据恢复等操作,以确保数据的安全性和可靠性,即使在系统崩溃或异常情况下也能够保证数据的完整性。
  5. 页面索引管理:DataManager 中实现了页面索引管理功能,通过页面索引可以快速定位到合适的空闲空间,从而提高数据插入的效率和性能。
  6. 文件初始化和校验:DataManager 在创建和打开数据库文件时,会进行文件的初始化和校验操作,以确保文件的正确性和完整性,同时在文件关闭时会执行相应的清理操作。
  7. 资源管理和释放:DataManager 在关闭时会执行资源的释放和清理操作,包括缓存和日志的关闭,以及页面的释放和页面索引的清理等。

DataManager 在数据库管理系统中扮演着重要的角色,负责底层数据的管理和操作,为上层模块提供了方便的数据访问和操作接口,同时通过事务管理和日志记录等功能保证了数据的安全性和可靠性。

注:以上内容来自GPT

具体实现
DataManagerDM 层直接对外提供方法的类,同时,也实现成 DataItem 对象的缓存。DataItem 存储的 **key**,是由页号和页内偏移组成的一个 8 字节无符号整数,页号和偏移各占 4 字节。

Uid生成以及解析

初始化:假设是从第二个页面开始的,并且偏移量为0
pgno: 2;
offset: 0;

  1. 先通过页面编号以及偏移量生成唯一标识 uid

20251020200630

1
2
3
4
5
6
7
public class Types {
public static long addressToUid(int pgno, short offset) {
long u0 = (long)pgno;
long u1 = (long)offset;
return u0 << 32 | u1; //或运算是全0则0,见1则1
}
}
  1. 从 uid 中提取出偏移量(offset

20251020200806

1
2
3
// 从 uid 中提取出偏移量(offset),这是通过位操作实现的,偏移量是 uid 的低16位
// & 运算:有0则0,全1才1
short offset = (short) (uid & ((1L << 16) - 1));
  1. 将 uid 右移32位,再获取页面编号

20251020200824

1
2
3
4
5
// 将 uid 右移32位,以便接下来提取出页面编号(pgno)
uid >>>= 32;
// 从 uid 中提取出页面编号(pgno),页面编号是 uid 的高32位
// & 运算:有0则0,全1才1
int pgno = (int) (uid & ((1L << 32) - 1));

getForCache()

也是继承自AbstractCache,只需要从 key 中解析出页号,从 pageCache 中获取到页面,再根据偏移,解析出 DataItem 即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
protected DataItem getForCache(long uid) throws Exception {
// 从 uid 中提取出偏移量(offset),这是通过位操作实现的,偏移量是 uid 的低16位
short offset = (short) (uid & ((1L << 16) - 1));
// 将 uid 右移32位,以便接下来提取出页面编号(pgno)
uid >>>= 32;
// 从 uid 中提取出页面编号(pgno),页面编号是 uid 的高32位
int pgno = (int) (uid & ((1L << 32) - 1));
// 使用页面缓存(pc)的 getPage(int pgno) 方法根据页面编号获取一个 Page 对象
Page pg = pc.getPage(pgno);
// 使用 DataItem 接口的静态方法 parseDataItem(Page pg, short offset, DataManagerImpl dm)
// 根据获取到的 Page 对象、偏移量和当前的 DataManagerImpl 对象(this)解析出一个 DataItem 对象,并返回这个对象
return DataItem.parseDataItem(pg, offset, this);
}

releaseForCache()

DataItem 缓存释放,需要将 DataItem 写回数据源,由于对文件的读写是以页为单位进行的,只需要将 DataItem 所在的页 release 即可:

1
2
3
4
@Override
protected void releaseForCache(DataItem di) {
di.page().release();
}

DataManager初始化

对于**DataManager**文件的创建有两种流程,一种是从已有文件创建**DataManager**另外一种是从空文件创建**DataManager**对于两者的不同主要在于第一页的初始化和校验问题:

  1. 从空文件创建首先需要对第一页进行初始化
  2. 而从已有文件创建,则需要对第一页进行校验,来判断是否需要执行恢复流程,并重新对第一页生成随机字节
从空文件创建create()

20251020200953

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 静态方法,用于创建DataManager实例
public static DataManager create(String path, long mem, TransactionManager tm) {
// 创建一个PageCache实例,path是文件路径,mem是内存大小
PageCache pc = PageCache.create(path, mem);
// 创建一个Logger实例,path是文件路径
Logger lg = Logger.create(path);

// 创建一个DataManagerImpl实例,pc是PageCache实例,lg是Logger实例,tm是TransactionManager实例
DataManagerImpl dm = new DataManagerImpl(pc, lg, tm);
// 初始化PageOne
dm.initPageOne();
// 返回创建的DataManagerImpl实例
return dm;
}
从已有文件创建open()

20251020204934

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 静态方法,用于打开已存在的DataManager实例
public static DataManager open(String path, long mem, TransactionManager tm) {
// 打开一个PageCache实例,path是文件路径,mem是内存大小
PageCache pc = PageCache.open(path, mem);
// 打开一个Logger实例,path是文件路径
Logger lg = Logger.open(path);
// 创建一个DataManagerImpl实例,pc是PageCache实例,lg是Logger实例,tm是TransactionManager实例
DataManagerImpl dm = new DataManagerImpl(pc, lg, tm);
// 加载并检查PageOne,如果检查失败,则进行恢复操作
if (!dm.loadCheckPageOne()) {
Recover.recover(tm, lg, pc);
}
// 填充PageIndex,遍历从第二页开始的每一页,将每一页的页面编号和空闲空间大小添加到 PageIndex 中
dm.fillPageIndex();
// 设置PageOne为打开状态
PageOne.setVcOpen(dm.pageOne);
// 将PageOne立即写入到磁盘中,确保PageOne的数据被持久化
dm.pc.flushPage(dm.pageOne);

// 返回创建的DataManagerImpl实例
return dm;
}

DataManager 提供的三个功能

**DM**层主要提供了三个功能供上层使用,分别是读、插入和修改。由于修改是通过读出的 **DataItem** 实现的,也就是说 **DataManager** 只需要 **read()****insert()** 方法;

read()

**read()**** **是根据 UID 从缓存中获取的 **DataItem**,并校验有效位;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
public DataItem read(long uid) throws Exception {
//从缓存页面中读取到DataItemImpl
DataItemImpl di = (DataItemImpl) super.get(uid);
//校验di是否有效
if (!di.isValid()) {
// 无效释放缓存
di.release();
return null;
}
return di;
}

@Override
protected DataItem getForCache(long uid) throws Exception {
// 从 uid 中提取出偏移量(offset),这是通过位操作实现的,偏移量是 uid 的低16位
short offset = (short) (uid & ((1L << 16) - 1));
// 将 uid 右移32位,以便接下来提取出页面编号(pgno)
uid >>>= 32;
// 从 uid 中提取出页面编号(pgno),页面编号是 uid 的高32位
int pgno = (int) (uid & ((1L << 32) - 1));
// 使用页面缓存(pc)的 getPage(int pgno) 方法根据页面编号获取一个 Page 对象
Page pg = pc.getPage(pgno);
// 使用 DataItem 接口的静态方法 parseDataItem(Page pg, short offset, DataManagerImpl dm)
// 根据获取到的 Page 对象、偏移量和当前的 DataManagerImpl 对象(this)解析出一个 DataItem 对象,并返回这个对象
return DataItem.parseDataItem(pg, offset, this);
}
insert()

**pageIndex** 中获取一个足以存储插入内容的页面的页号,获取页面后,首先需要写入插入日志,接着才可以通过 **pageX** 插入数据,并返回插入位置的偏移。最后需要将页面信息重新插入 **pageIndex**
20251020204904

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@Override
public long insert(long xid, byte[] data) throws Exception {
// 将输入的数据包装成DataItem的原始格式
byte[] raw = DataItem.wrapDataItemRaw(data);
// 如果数据项的大小超过了页面的最大空闲空间,抛出异常
if (raw.length > PageX.MAX_FREE_SPACE) {
throw Error.DataTooLargeException;
}

// 初始化一个页面信息对象
PageInfo pi = null;
// 尝试5次找到一个可以容纳新数据项的页面
for (int i = 0; i < 5; i++) {
// 从页面索引中选择一个可以容纳新数据项的页面
pi = pIndex.select(raw.length);
// 如果找到了合适的页面,跳出循环
if (pi != null) {
break;
} else {
// 如果没有找到合适的页面,创建一个新的页面,并将其添加到页面索引中
int newPgno = pc.newPage(PageX.initRaw());
pIndex.add(newPgno, PageX.MAX_FREE_SPACE);
}
}
// 如果还是没有找到合适的页面,抛出异常
if (pi == null) {
throw Error.DatabaseBusyException;
}

// 初始化一个页面对象
Page pg = null;
// 初始化空闲空间大小为0
int freeSpace = 0;
try {
// 获取页面信息对象中的页面
pg = pc.getPage(pi.pgno);
// 生成插入日志
byte[] log = Recover.insertLog(xid, pg, raw);
// 将日志写入日志文件
logger.log(log);

// 在页面中插入新的数据项,并获取其在页面中的偏移量
short offset = PageX.insert(pg, raw);

// 释放页面
pg.release();
// 返回新插入的数据项的唯一标识符
return Types.addressToUid(pi.pgno, offset);

} finally {
// 将页面重新添加到页面索引中
if (pg != null) {
pIndex.add(pi.pgno, PageX.getFreeSpace(pg));
} else {
pIndex.add(pi.pgno, freeSpace);
}
}
}

/**
* 返回一个完整的 DataItem 结构数据
* dataItem 结构如下:
* [ValidFlag] [DataSize] [Data]
* ValidFlag 1字节,0为合法,1为非法
* DataSize 2字节,标识Data的长度
* @param raw
* @return
*/
public static byte[] wrapDataItemRaw(byte[] raw) {
byte[] valid = new byte[1]; //证明此时为非法数据
byte[] size = Parser.short2Byte((short)raw.length); //计算数据字节大小
return Bytes.concat(valid, size, raw); //拼接DataItem 结构数据
}

/**
* 根据给定的空间大小选择一个 PageInfo 对象。
*
* @param spaceSize 需要的空间大小
* @return 一个 PageInfo 对象,其空闲空间大于或等于给定的空间大小。如果没有找到合适的 PageInfo,返回 null。
*/
public PageInfo select(int spaceSize) {
lock.lock(); // 获取锁,确保线程安全
try {
int number = spaceSize / THRESHOLD; // 计算需要的空间大小对应的区间编号
// 此处+1主要为了向上取整
/*
1、假需要存储的字节大小为5168,此时计算出来的区间号是25,但是25*204=5100显然是不满足条件的
2、此时向上取整找到 26,而26*204=5304,是满足插入条件的
*/
if (number < INTERVALS_NO) number++; // 如果计算出的区间编号小于总的区间数,编号加一
while (number <= INTERVALS_NO) { // 从计算出的区间编号开始,向上寻找合适的 PageInfo
if (lists[number].size() == 0) { // 如果当前区间没有 PageInfo,继续查找下一个区间
number++;
continue;
}
return lists[number].remove(0); // 如果当前区间有 PageInfo,返回第一个 PageInfo,并从列表中移除
}
return null; // 如果没有找到合适的 PageInfo,返回 null
} finally {
lock.unlock(); // 释放锁
}
}

// 定义一个静态方法,用于创建插入日志
public static byte[] insertLog(long xid, Page pg, byte[] raw) {
// 创建一个表示日志类型的字节数组,并设置其值为LOG_TYPE_INSERT
byte[] logTypeRaw = {LOG_TYPE_INSERT};
// 将事务ID转换为字节数组
byte[] xidRaw = Parser.long2Byte(xid);
// 将页面编号转换为字节数组
byte[] pgnoRaw = Parser.int2Byte(pg.getPageNumber());
// 获取页面的第一个空闲空间的偏移量,并将其转换为字节数组
byte[] offsetRaw = Parser.short2Byte(PageX.getFSO(pg));
// 将所有字节数组连接在一起,形成一个完整的插入日志,并返回这个日志
return Bytes.concat(logTypeRaw, xidRaw, pgnoRaw, offsetRaw, raw);
}

// 将raw插入pg中,返回插入位置
public static short insert(Page pg, byte[] raw) {
pg.setDirty(true); // 将pg的dirty标志设置为true,表示pg的数据已经被修改
short offset = getFSO(pg.getData()); // 获取pg的空闲空间偏移量
System.arraycopy(raw, 0, pg.getData(), offset, raw.length); // 将raw的数据复制到pg的数据中的offset位置
setFSO(pg.getData(), (short) (offset + raw.length)); // 更新pg的空闲空间偏移量
return offset; // 返回插入位置
}
close()

DataManager 正常关闭时,需要执行缓存和日志的关闭流程,还需要设置第一页的字节校验:

1
2
3
4
5
6
7
8
9
@Override
public void close() {
super.close();
logger.close();

PageOne.setVcClose(pageOne);
pageOne.release();
pc.close();
}

Version Manager (VM) 版本管理器

2PL 与 MVCC

冲突与 2PL
首先来定义数据库的冲突,暂时不考虑插入操作,只看更新操作(U)和读操作(R),两个操作只要满足下面三个条件,就可以说这两个操作相互冲突:

1
2
3
这两个操作是由不同的事务执行的
这两个操作操作的是同一个数据项
这两个操作至少有一个是更新操作

那么这样,对同一个数据操作的冲突,其实就只有下面这两种情况:

1
2
两个不同事务的 U 操作冲突
两个不同事务的 U、R 操作冲突

那么冲突或者不冲突,意义何在?
作用在于,交换两个互不冲突的操作的顺序,不会对最终的结果造成影响,而交换两个冲突操作的顺序,则是会有影响的。

现在我们先抛开冲突不谈,记得在第四章举的例子吗,在并发情况下,两个事务同时操作 x。假设 x 的初值是 0:

1
2
3
4
5
6
7
8
T1 begin
T2 begin
R1(x) // T1 读到 0
R2(x) // T2 读到 0
U1(0+1) // T1 尝试把 x+1
U2(0+1) // T2 尝试把 x+1
T1 commit
T2 commit

最后 x 的结果是 1,这个结果显然与期望的不符。

VM 的一个很重要的职责,就是实现了调度序列的可串行化。MYDB 采用两段锁协议(2PL)来实现。当采用 2PL 时,如果某个事务 i 已经对 x 加锁,且另一个事务 j 也想操作 x,但是这个操作与事务 i 之前的操作相互冲突的话,事务 j 就会被阻塞。譬如,T1 已经因为 U1(x) 锁定了 x,那么 T2 对 x 的读或者写操作都会被阻塞,T2 必须等待 T1 释放掉对 x 的锁。

由此来看,2PL 确实保证了调度序列的可串行话,但是不可避免地导致了事务间的相互阻塞,甚至可能导致死锁。MYDB 为了提高事务处理的效率,降低阻塞概率,实现了 MVCC。

MVCC

在介绍 MVCC 之前,首先明确记录和版本的概念。

DM 层向上层提供了数据项(Data Item)的概念,VM 通过管理所有的数据项,向上层提供了记录(Entry)的概念。上层模块通过 VM 操作数据的最小单位,就是记录。VM 则在其内部,为每个记录,维护了多个版本(Version)。每当上层模块对某个记录进行修改时,VM 就会为这个记录创建一个新的版本。

MYDB 通过 MVCC,降低了事务的阻塞概率。譬如,T1 想要更新记录 X 的值,于是 T1 需要首先获取 X 的锁,接着更新,也就是创建了一个新的 X 的版本,假设为 x3。假设 T1 还没有释放 X 的锁时,T2 想要读取 X 的值,这时候就不会阻塞,MYDB 会返回一个较老版本的 X,例如 x2。这样最后执行的结果,就等价于,T2 先执行,T1 后执行,调度序列依然是可串行化的。如果 X 没有一个更老的版本,那只能等待 T1 释放锁了。所以只是降低了概率。

还记得我们在恢复策略章节中,为了保证数据的可恢复,VM 层传递到 DM 的操作序列需要满足以下两个规则:

1
2
规定 1:正在进行的事务,不会读取其他任何未提交的事务产生的数据。
规定 2:正在进行的事务,不会修改其他任何未提交的事务修改或产生的数据。

由于 2PL 和 MVCC,我们可以看到,这两个条件都被很轻易地满足了。

前言

tips:
VM是基于两段锁协议实现调度序列的可串行化,并实现了MVCC以消除读写阻塞。同时也实现了两种隔离级别,所以我们还需要明确版本的概念;
DM 层向上层提供了数据项(Data Item)的概念,VM 通过管理所有的数据项,向上层提供了记录(Entry)的概念。上层模块通过 VM 操作数据的最小单位,就是记录。VM 则在其内部,为每个记录,维护了多个版本(Version)。每当上层模块对某个记录进行修改时,VM 就会为这个记录创建一个新的版本。

如何实现版本记录?

Entry格式数据

[XMIN] [XMAX] [DATA]

  1. XMIN 是创建该条记录(版本)的事务编号
  2. **XMAX **则是删除该条记录(版本)的事务编号
  3. **DATA **就是这条记录持有的数据

Entry结构

对于一条记录来说,MYDB 使用 Entry 类维护了其结构。虽然理论上,MVCC 实现了多版本,但是在实现中,VM 并没有提供 Update 操作,对于字段的更新操作由后面的表和字段管理(TBM)实现。所以在 VM 的实现中,一条记录只有一个版本。 由于一条记录存储在一条 Data Item 中,所以 Entry 中保存一个 DataItem 的引用即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Entry {
// 定义了XMIN的偏移量为0
private static final int OF_XMIN = 0;
// 定义了XMAX的偏移量为XMIN偏移量后的8个字节
private static final int OF_XMAX = OF_XMIN+8;
// 定义了DATA的偏移量为XMAX偏移量后的8个字节
private static final int OF_DATA = OF_XMAX+8;

// uid字段,可能是用来唯一标识一个Entry的
private long uid;
// DataItem对象,用来存储数据的
private DataItem dataItem;
// VersionManager对象,用来管理版本的
private VersionManager vm;

// 静态方法,用来加载一个Entry。它首先从VersionManager中读取数据,然后创建一个新的Entry
public static Entry loadEntry(VersionManager vm, long uid) throws Exception {
DataItem di = ((VersionManagerImpl)vm).dm.read(uid);
return newEntry(vm, di, uid);
}

// 方法,用来移除一个Entry。它通过调用dataItem的release方法来实现
public void remove() {
dataItem.release();
}
}

日志格式操作

wrapEntryRaw()

1
2
3
4
5
6
7
8
9
10
11
/**
* 生成日志格式数据
*/
public static byte[] wrapEntryRaw(long xid, byte[] data) {
// 将事务id转为8字节数组
byte[] xmin = Parser.long2Byte(xid);
// 创建一个空的8字节数组,等待版本修改或删除是才修改
byte[] xmax = new byte[8];
// 拼接成日志格式
return Bytes.concat(xmin, xmax, data);
}

data()

获取记录中持有的数据,也就需要按照上面这个结构来解析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 以拷贝的形式返回内容
public byte[] data() {
// 加锁,确保数据安全
dataItem.rLock();
try {
// 获取日志数据
SubArray sa = dataItem.data();
// 创建一个去除前16字节的数组,因为前16字节表示 xmin and xmax
byte[] data = new byte[sa.end - sa.start - OF_DATA];
// 拷贝数据到data数组上
System.arraycopy(sa.raw, sa.start+OF_DATA, data, 0, data.length);
return data;
} finally {
//释放锁
dataItem.rUnLock();
}
}

setXmax()

当需要对数据进行修改时,就需要设置 xmax的值;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 设置删除版本的事务编号
* @param xid
*/
public void setXmax(long xid) {
// 在修改或删除之前先拷贝好旧数值
dataItem.before();
try {
// 获取需要删除的日志数据
SubArray sa = dataItem.data();
// 将事务编号拷贝到 8~15 处字节
System.arraycopy(Parser.long2Byte(xid), 0, sa.raw, sa.start+OF_XMAX, 8);
} finally {
// 生成一个修改日志
dataItem.after(xid);
}
}

事物的隔离级别

读提交

在数据库中,“读提交”(Read Committed)是一种事务隔离级别,表示在读取数据时,事务只能读取已经提交的事务产生的数据。这意味着当一个事务正在读取数据时,如果其他事务正在修改相同的数据,它只能读取已经被提交的修改,而无法读取尚未提交的修改。
在MYDB中实现读提交,主要为了防止级联回滚与 commit 语义冲突,对每个数据版本(或记录版本),维护了两个关键变量:XMINXMAX

  • XMIN表示创建该版本的事务编号。当一个事务创建了一个新的版本时,XMIN会记录该事务的编号。
  • XMAX表示删除该版本的事务编号。当一个版本被删除时,或者有新版本出现时,XMAX会记录删除该版本的事务编号。
读提交的事务可见性逻辑
  1. 如果版本的XMIN等于当前事务的事务编号,并且XMAX为空(表示尚未被删除),则该版本对当前事务可见。
  2. 或者,如果版本的XMIN对应的事务已经提交,并且XMAX为空(尚未被删除),或者XMAX不是当前事务的事务编号,并且XMAX对应的事务也已经提交,则该版本对当前事务可见。

    在读提交隔离级别下,事务只能看到已经提交的版本,而不能看到尚未提交的版本或被尚未提交的事务删除的版本。这样可以确保读取的数据是稳定和一致的,同时避免了读取到不一致或未提交的数据的可能性。

1
2
3
4
5
6
7
8
(XMIN == Ti and                             // 由Ti创建且
XMAX == NULL // 还未被删除
)
or // 或
(XMIN is commited and // 由一个已提交的事务创建且
(XMAX == NULL or // 尚未删除或
(XMAX != Ti and XMAX is not commited) // 由一个未提交的事务删除
))
readCommited()

20251020211720

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 用来在读提交的隔离级别下,某个记录是否对事务t可见
private static boolean readCommitted(TransactionManager tm, Transaction t, Entry e) {
// 获取事务的ID
long xid = t.xid;
// 获取记录的创建版本号
long xmin = e.getXmin();
// 获取记录的删除版本号
long xmax = e.getXmax();
// 如果记录的创建版本号等于事务的ID并且记录未被删除,则返回true
if (xmin == xid && xmax == 0) return true;

// 如果记录的创建版本已经提交
if (tm.isCommitted(xmin)) {
// 如果记录未被删除,则返回true
if (xmax == 0) return true;
// 如果记录的删除版本号不等于事务的ID
if (xmax != xid) {
// 如果记录的删除版本未提交,则返回true
// 因为没有提交,代表该数据还是上一个版本可见的
if (!tm.isCommitted(xmax)) {
return true;
}
}
}
// 其他情况返回false
return false;
}

可重复读

在数据库中,可重复读(Repeatable Read)是一种事务隔离级别,它解决了读提交隔离级别下的不可重复读问题。在可重复读隔离级别下,一个事务执行期间多次读取同一数据项,可以保证读取到的结果是一致的,不会因为其他事务的并发操作而导致数据的不一致性。
不可重复读问题指的是,在读提交隔离级别下,一个事务在执行过程中多次读取同一数据项,但由于其他事务的并发修改操作,导致每次读取到的数据值不同,出现了不一致的情况。可重复读隔离级别通过更严格的规则来解决这个问题。
在可重复读隔离级别下,事务只能读取它开始时已经提交的事务产生的数据版本。这意味着,在事务开始时已经提交的所有事务所产生的数据对当前事务是可见的,而在事务开始后产生的其他事务所产生的数据对当前事务则是不可见的。这样可以确保事务在执行期间读取到的数据是一致的,不会受到其他事务的影响。

可重复读的事务可见性逻辑
  1. 如果版本的XMIN等于当前事务的事务编号,并且XMAX为空(表示尚未被删除),则该版本对当前事务可见。
  2. 或者,如果版本的XMIN对应的事务已经提交,并且XMIN小于当前事务的事务编号,并且XMIN不在当前事务开始前活跃的事务集合SP(Ti)中,同时XMAX为空(尚未被删除),或者XMAX不是当前事务的事务编号,并且XMAX对应的事务已经提交,并且XMAX大于当前事务的事务编号,或者XMAX在当前事务开始前活跃的事务集合SP(Ti)中,则该版本对当前事务可见。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    (XMIN == Ti and                 // 由Ti创建且
    (XMAX == NULL or // 尚未被删除
    ))
    or // 或
    (XMIN is commited and // 由一个已提交的事务创建且
    XMIN < XID and // 这个事务小于Ti且
    XMIN is not in SP(Ti) and // 这个事务在Ti开始前提交且
    (XMAX == NULL or // 尚未被删除或
    (XMAX != Ti and // 由其他事务删除但是
    (XMAX is not commited or // 这个事务尚未提交或
    XMAX > Ti or // 这个事务在Ti开始之后才开始或
    XMAX is in SP(Ti) // 这个事务在Ti开始前还未提交
    ))))
事务结构

由于可重复读事务的可见性逻辑,需要提供一个结构,用来抽象事务,以保存快照数据;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// vm对一个事务的抽象
public class Transaction {
// 事务的ID
public long xid;
// 事务的隔离级别
public int level;
// 事务的快照,用于存储活跃事务的ID
public Map<Long, Boolean> snapshot;
// 事务执行过程中的错误
public Exception err;
// 标志事务是否自动中止
public boolean autoAborted;

// 创建一个新的事务
public static Transaction newTransaction(long xid, int level, Map<Long, Transaction> active) {
Transaction t = new Transaction();
// 设置事务ID
t.xid = xid;
// 设置事务隔离级别
t.level = level;
// 如果隔离级别不为0,创建快照
if (level != 0) {
t.snapshot = new HashMap<>();
// 将活跃事务的ID添加到快照中
for (Long x : active.keySet()) {
t.snapshot.put(x, true);
}
}
// 返回新创建的事务
return t;
}

// 判断一个事务ID是否在快照中
public boolean isInSnapshot(long xid) {
// 如果事务ID等于超级事务ID,返回false
if (xid == TransactionManagerImpl.SUPER_XID) {
return false;
}
// 否则,检查事务ID是否在快照中
return snapshot.containsKey(xid);
}
}

repeatableRead()

20251020212404

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private static boolean repeatableRead(TransactionManager tm, Transaction t, Entry e) {
// 获取事务的ID
long xid = t.xid;
// 获取条目的创建版本号
long xmin = e.getXmin();
// 获取条目的删除版本号
long xmax = e.getXmax();
// 如果条目的创建版本号等于事务的ID并且条目未被删除,则返回true
if (xmin == xid && xmax == 0) return true;

// 如果条目的创建版本已经提交,并且创建版本号小于事务的ID,并且创建版本号不在事务的快照中
if (tm.isCommitted(xmin) && xmin < xid && !t.isInSnapshot(xmin)) {
// 如果条目未被删除,则返回true
if (xmax == 0) return true;
// 如果条目的删除版本号不等于事务的ID
if (xmax != xid) {
// 如果条目的删除版本未提交,或者删除版本号大于事务的ID,或者删除版本号在事务的快照中,则返回true
if (!tm.isCommitted(xmax) || xmax > xid || t.isInSnapshot(xmax)) {
return true;
}
}
}
// 其他情况返回false
return false;
}

死锁检测

版本跳跃问题

版本跳跃问题是指在多版本并发控制(MVCC)中,一个事务要修改某个数据项时,可能会出现跳过中间版本直接修改最新版本的情况,从而产生逻辑上的错误。解决版本跳跃的关键在于检查最新版本的创建者对当前事务是否可见。如果当前事务要修改的数据已经被另一个事务修改并且对当前事务不可见,就要求当前事务回滚。具体来说,对于事务Ti要修改数据X的情况下,要检查如下两种情况:

  1. 如果另一个事务Tj的事务ID(XID)大于Ti的事务ID,则Tj在时间上晚于Ti开始,因此Ti应该回滚,避免版本跳跃。
  2. 如果Tj在Ti的快照集合(SP(Ti))中,则Tj在Ti开始之前已经提交,但Ti在开始之前并不能看到Tj的修改,因此也应该回滚。
版本跳跃的检查

因为读提交是允许版本跳跃的,可重复读是不允许的,所以只需要检查读提交即可

1
2
3
4
5
6
7
8
9
10
11
12
public static boolean isVersionSkip(TransactionManager tm, Transaction t, Entry e) {
// 获取条目的删除版本号
long xmax = e.getXmax();
// 如果事务的隔离级别为0,即读未提交,那么不跳过该版本,返回false
if (t.level == 0) {
return false;
} else {
// 如果事务的隔离级别不为0,那么检查删除版本是否已提交,并且删除版本号大于事务的ID或者删除版本号在事务的快照中
// 如果满足上述条件,那么跳过该版本,返回true
return tm.isCommitted(xmax) && (xmax > t.xid || t.isInSnapshot(xmax));
}
}
LockTable

上文提到了在基于2PL(两段锁协议)的并发控制中,当一个事务(例如Tj)想要获取某个数据项的锁时,如果该锁已经被其他事务(例如Ti)持有,则Tj会被阻塞,直到Ti释放了该锁。这种等待关系可以被抽象成有向边,比如Tj在等待Ti,可以表示为Tj → Ti。通过记录所有事务之间的等待关系,就可以构建一个有向图,即等待图(Wait-for graph)。在等待图中,如果存在环路,即存在一个事务的等待序列形成了一个闭环,那么就说明存在死锁。因此,检测死锁只需要查看等待图中是否存在环即可。

LockTable基本结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 维护了一个依赖等待图,以进行死锁检测
*/
public class LockTable {
// 某个XID已经获得的资源的UID列表,键是事务ID,值是该事物持有的资源ID列表。
private Map<Long, List<Long>> x2u;
// UID被某个XID持有,键是资源ID,值是持有该资源的事务ID。
private Map<Long, Long> u2x;
// 正在等待UID的XID列表,键是资源ID,值是正在等待该资源的事务ID。
private Map<Long, List<Long>> wait;
// 正在等待资源的XID的锁,键是事务ID,值是该事务的锁对象。
private Map<Long, Lock> waitLock;
// XID正在等待的UID,键是事务ID,值是该事务正在等待的资源ID。
private Map<Long, Long> waitU;
// 一个全局锁,用于同步。
private Lock lock;
}
add()

在每次出现等待的情况时,就尝试向图中增加一条边,并进行死锁检测。如果检测到死锁,就撤销这条边,不允许添加,并撤销该事务。

20251020213617

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 不需要等待则返回null,否则返回锁对象或者会造成死锁则抛出异常
public Lock add(long xid, long uid) throws Exception {
lock.lock(); // 锁定全局锁
try {
// 检查x2u是否已经拥有这个资源
if (isInList(x2u, xid, uid)) {
return null; // 如果已经拥有,直接返回null
}
// 检查UID资源是否已经被其他XID事务持有
if (!u2x.containsKey(uid)) {
u2x.put(uid, xid); // 如果没有被持有,将资源分配给当前事务
putIntoList(x2u, xid, uid); // 将资源添加到事务的资源列表中
return null; // 返回null
}
// 如果资源已经被其他事务持有,将当前事务添加到等待列表中
waitU.put(xid, uid);
putIntoList(wait, uid, xid);
// 检查是否存在死锁
if (hasDeadLock()) {
waitU.remove(xid); // 如果存在死锁,从等待列表中移除当前事务
removeFromList(wait, uid, xid);
throw Error.DeadlockException; // 抛出死锁异常
}
// 如果不存在死锁,为当前事务创建一个新的锁,并锁定它
Lock l = new ReentrantLock();
l.lock();
waitLock.put(xid, l); // 将新的锁添加到等待锁列表中
return l; // 返回新的锁

} finally {
lock.unlock(); // 解锁全局锁
}
}
hasDeadLock() and dfs()

检查是否包含死锁

20251020215311

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private boolean hasDeadLock() {
xidStamp = new HashMap<>(); // 创建一个新的xidStamp哈希映射
stamp = 1; // 将stamp设置为1
for (long xid : x2u.keySet()) { // 遍历所有已经获得资源的事务ID
Integer s = xidStamp.get(xid); // 获取xidStamp中对应事务ID的记录
if (s != null && s > 0) { // 如果记录存在,并且值大于0
continue; // 跳过这个事务ID,继续下一个
}
stamp++; // 将stamp加1
if (dfs(xid)) { // 调用dfs方法进行深度优先搜索
return true; // 如果dfs方法返回true,表示存在死锁,那么hasDeadLock方法也返回true
}
}
return false; // 如果所有的事务ID都被检查过,并且没有发现死锁,那么hasDeadLock方法返回false
}

private boolean dfs(long xid) {
Integer stp = xidStamp.get(xid); // 从xidStamp映射中获取当前事务ID的时间戳
if (stp != null && stp == stamp) { // 如果时间戳存在并且等于全局时间戳
return true; // 存在死锁,返回true
}
if (stp != null && stp < stamp) { // 如果时间戳存在并且小于全局时间戳
return false; // 这个事务ID已经被检查过,并且没有发现死锁,返回false
}
xidStamp.put(xid, stamp); // 将当前事务ID和全局时间戳添加到xidStamp映射中

Long uid = waitU.get(xid); // 从waitU映射中获取当前事务ID正在等待的资源ID
if (uid == null) return false; // 如果资源ID不存在,表示当前事务ID不在等待任何资源,返回false
Long x = u2x.get(uid); // 从u2x映射中获取当前资源ID被哪个事务ID持有
assert x != null; // 断言这个事务ID存在
return dfs(x); // 递归调用dfs方法检查这个事务ID
}

死锁演示

前言
采用一下数据实现死锁模拟:

  1. lockTable.add(1, 1); // 事务1请求资源1
  2. lockTable.add(2, 2); // 事务2请求资源2
  3. lockTable.add(3, 3); // 事务3请求资源3
  4. lockTable.add(1, 2); // 事务1请求资源2
  5. lockTable.add(2, 3); // 事务2请求资源3
  6. lockTable.add(3, 1); // 事务3请求资源1

    在这些数据添加完毕之后,事务1在等待事务2,事务2在等待事务3,事务3又在等待事务1,此时就触发了死锁!

当数据添加完毕之后,LockTable类中的MAP集合对着一下元素:

  1. x2u

    1
    2
    3
    4
    xid			uid
    1 1
    2 2
    3 3
  2. u2x

    1
    2
    3
    4
    uid			xid
    1 1
    2 2
    3 3
  3. wait

    1
    2
    3
    4
    uid			xid
    1 3
    2 1
    3 2
  4. waitU

    1
    2
    3
    4
    xid			uid
    1 2
    2 3
    3 1

    dfs()演示过程
    当上方数据插入装载完成之后,会进行死锁校验,此处只是采用简易代码实现,可以自己根据源码进行学习!

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    第一遍:xidStamp = null,stamp=2
    private boolean dfs(long xid) { //xid = 1
    Integer stp = xidStamp.get(xid); // null
    xidStamp.put(xid, stamp); // 1,2

    Long uid = waitU.get(xid); // uid = 2
    Long x = u2x.get(uid); // x = 2

    return dfs(x); // 将2存入进去
    }

    第二遍:xidStamp = {1=2},stamp=2
    private boolean dfs(long xid) { // xid = 2
    Integer stp = xidStamp.get(xid); // null
    xidStamp.put(xid, stamp); // 2,2

    Long uid = waitU.get(xid); // uid = 3
    Long x = u2x.get(uid); // x = 3

    return dfs(x); // 将3存入进去
    }

    第三遍:xidStamp = {1=2,2=2},stamp=2
    private boolean dfs(long xid) { // xid = 3
    Integer stp = xidStamp.get(xid); // 3
    xidStamp.put(xid, stamp); // 3,2

    Long uid = waitU.get(xid); // uid = 1
    Long x = u2x.get(uid); // x = 1

    return dfs(x); // 将1存入进去
    }

    第四遍:xidStamp = {1=2,2=2,3=2},stamp=2
    private boolean dfs(long xid) { // xid = 1
    Integer stp = xidStamp.get(xid); // 此时就获取到了数据,stp = 2;
    if (stp != null && stp == stamp) { // 此时条件成立,证明存在死锁
    return true; // 存在死锁,返回true
    }
    }


remove()

当一个事务commit或者abort时,就会释放掉它自己持有的锁,并将自身从等待图中删除
20251020215616

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void remove(long xid) {
lock.lock(); // 获取全局锁
try {
List<Long> l = x2u.get(xid); // 从x2u映射中获取当前事务ID已经获得的资源的UID列表
if (l != null) {
while (l.size() > 0) {
Long uid = l.remove(0); // 获取并移除列表中的第一个资源ID
selectNewXID(uid); // 从等待队列中选择一个新的事务ID来占用这个资源
}
}
waitU.remove(xid); // 从waitU映射中移除当前事务ID
x2u.remove(xid); // 从x2u映射中移除当前事务ID
waitLock.remove(xid); // 从waitLock映射中移除当前事务ID

} finally {
lock.unlock(); // 解锁全局锁
}
}
selectNewXID()

20251020215631

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 从等待队列中选择一个xid来占用uid
private void selectNewXID(long uid) {
u2x.remove(uid); // 从u2x映射中移除当前资源ID
List<Long> l = wait.get(uid); // 从wait映射中获取当前资源ID的等待队列
if (l == null) return; // 如果等待队列为空,立即返回
assert l.size() > 0; // 断言等待队列不为空

// 遍历等待队列
while (l.size() > 0) {
long xid = l.remove(0); // 获取并移除队列中的第一个事务ID
// 检查事务ID是否在waitLock映射中
if (!waitLock.containsKey(xid)) {
continue; // 如果不在,跳过这个事务ID,继续下一个
} else {
u2x.put(uid, xid); // 将事务ID和资源ID添加到u2x映射中
Lock lo = waitLock.remove(xid); // 从waitLock映射中移除这个事务ID
waitU.remove(xid); // 从waitU映射中移除这个事务ID
lo.unlock(); // 解锁这个事务ID的锁
break; // 跳出循环
}
}

// 如果等待队列为空,从wait映射中移除当前资源ID
if (l.size() == 0) wait.remove(uid);
}

VM的实现

VM的基本定义

VM 层通过 VersionManager 接口,向上层提供功能,如下:

1
2
3
4
5
6
7
8
9
public interface VersionManager {
byte[] read(long xid, long uid) throws Exception;
long insert(long xid, byte[] data) throws Exception;
boolean delete(long xid, long uid) throws Exception;

long begin(int level);
void commit(long xid) throws Exception;
void abort(long xid);
}

同时,VM 的实现类还被设计为 **Entry**** **的缓存,需要继承** AbstractCache<Entry>**。需要实现的获取到缓存和从缓存释放的方法很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
protected Entry getForCache(long uid) throws Exception {
// 核心还是调用dm.read()方法
Entry entry = Entry.loadEntry(this, uid);
if (entry == null) {
throw Error.NullEntryException;
}
return entry;
}

@Override
protected void releaseForCache(Entry entry) {
entry.remove();
}

具体实现

begin()

开启一个事务,并初始化事务的结构
20251020215950

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public long begin(int level) {
lock.lock(); // 获取锁,防止并发问题
try {
long xid = tm.begin(); // 调用事务管理器的begin方法,开始一个新的事务,并获取事务ID
Transaction t = Transaction.newTransaction(xid, level, activeTransaction); // 创建一个新的事务对象
activeTransaction.put(xid, t); // 将新的事务对象添加到活动事务的映射中
return xid; // 返回新的事务ID
} finally {
lock.unlock(); // 释放锁
}
}

// 创建一个新的事务
public static Transaction newTransaction(long xid, int level, Map<Long, Transaction> active) {
Transaction t = new Transaction();
// 设置事务ID
t.xid = xid;
// 设置事务隔离级别
t.level = level;
// 如果隔离级别不为0,创建快照
if (level != 0) {
t.snapshot = new HashMap<>();
// 将活跃事务的ID添加到快照中
for (Long x : active.keySet()) {
t.snapshot.put(x, true);
}
}
// 返回新创建的事务
return t;
}
commit()

20251020225334

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
public void commit(long xid) throws Exception {
lock.lock(); // 获取锁,防止并发问题
Transaction t = activeTransaction.get(xid); // 从活动事务中获取事务对象
lock.unlock(); // 释放锁

try {
if (t.err != null) { // 如果事务已经出错,那么抛出错误
throw t.err;
}
} catch (NullPointerException n) { // 如果事务对象为null,打印事务ID和活动事务的键集,然后抛出异常
System.out.println(xid);
System.out.println(activeTransaction.keySet());
Panic.panic(n);
}

lock.lock(); // 获取锁,防止并发问题
activeTransaction.remove(xid); // 从活动事务中移除这个事务
lock.unlock(); // 释放锁

lt.remove(xid); // 从锁表中移除这个事务的锁
tm.commit(xid); // 调用事务管理器的commit方法,进行事务的提交操作
}

abort()

abort 事务的方法则有两种,手动和自动。手动指的是调用 **abort()** 方法,而自动,则是在事务被检测出出现死锁时,会自动撤销回滚事务;或者出现版本跳跃时,也会自动回滚
20251020225406

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
// 公开的abort方法,用于中止一个事务
public void abort(long xid) {
// 调用内部的abort方法,autoAborted参数为false表示这不是一个自动中止的事务
internAbort(xid, false);
}

// 内部的abort方法,处理事务的中止
private void internAbort(long xid, boolean autoAborted) {
// 获取锁,防止并发问题
lock.lock();
// 从活动事务中获取事务对象
Transaction t = activeTransaction.get(xid);
// 如果这不是一个自动中止的事务,那么从活动事务中移除这个事务
if (!autoAborted) {
activeTransaction.remove(xid);
}
// 释放锁
lock.unlock();

// 如果事务已经被自动中止,那么直接返回,不做任何处理
if (t.autoAborted) return;
// 从锁表中移除这个事务的锁
lt.remove(xid);
// 调用事务管理器的abort方法,进行事务的中止操作
tm.abort(xid);
}
read()

read() 方法读取一个 entry,需要注意判断可见性
20251020215934

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
public byte[] read(long xid, long uid) throws Exception {
lock.lock(); // 获取锁,防止并发问题
Transaction t = activeTransaction.get(xid); // 从活动事务中获取事务对象
lock.unlock(); // 释放锁

if (t.err != null) { // 如果事务已经出错,那么抛出错误
throw t.err;
}

Entry entry = null;
try {
entry = super.get(uid); // 尝试获取数据项
} catch (Exception e) {
if (e == Error.NullEntryException) { // 如果数据项不存在,那么返回null
return null;
} else { // 如果出现其他错误,那么抛出错误
throw e;
}
}
try {
// 在事务隔离级别中讲解了该方法
if (Visibility.isVisible(tm, t, entry)) { // 如果数据项对当前事务可见,那么返回数据项的数据
return entry.data();
} else { // 如果数据项对当前事务不可见,那么返回null
return null;
}
} finally {
entry.release(); // 释放数据项
}
}
insert()

将数据包裹成 Entry,然后交给 DM 插入即可

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public long insert(long xid, byte[] data) throws Exception {
lock.lock(); // 获取锁,防止并发问题
Transaction t = activeTransaction.get(xid); // 从活动事务中获取事务对象
lock.unlock(); // 释放锁

if (t.err != null) { // 如果事务已经出错,那么抛出错误
throw t.err;
}

byte[] raw = Entry.wrapEntryRaw(xid, data); // 将事务ID和数据包装成一个新的数据项
return dm.insert(xid, raw); // 调用数据管理器的insert方法,插入新的数据项,并返回数据项的唯一标识符
}
delete()

20251020215919

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
@Override
// 删除一个数据项的方法
public boolean delete(long xid, long uid) throws Exception {
// 获取锁,防止并发问题
lock.lock();
// 从活动事务中获取事务对象
Transaction t = activeTransaction.get(xid);
// 释放锁
lock.unlock();

// 如果事务已经出错,那么抛出错误
if (t.err != null) {
throw t.err;
}
Entry entry = null;
try {
// 尝试获取数据项
entry = super.get(uid);
} catch (Exception e) {
// 如果数据项不存在,那么返回false
if (e == Error.NullEntryException) {
return false;
} else {
// 如果出现其他错误,那么抛出错误
throw e;
}
}
try {
// 如果数据项对当前事务不可见,那么返回false
if (!Visibility.isVisible(tm, t, entry)) {
return false;
}
Lock l = null;
try {
// 尝试为数据项添加锁
l = lt.add(xid, uid);
} catch (Exception e) {
// 如果出现并发更新的错误,那么中止事务,并抛出错误
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}
// 如果成功获取到锁,那么锁定并立即解锁
if (l != null) {
l.lock();
l.unlock();
}

// 如果数据项已经被当前事务删除,那么返回false
if (entry.getXmax() == xid) {
return false;
}

// 如果数据项的版本被跳过,那么中止事务,并抛出错误
if (Visibility.isVersionSkip(tm, t, entry)) {
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}

// 设置数据项的xmax为当前事务的ID,表示数据项被当前事务删除
entry.setXmax(xid);
// 返回true,表示删除操作成功
return true;

} finally {
// 释放数据项
entry.release();
}
}

Index Manager (IM) 索引管理器

前言

IM,即 Index Manager,索引管理器,为 MYDB 提供了基于 B+ 树的聚簇索引。目前 MYDB 只支持基于索引查找数据,不支持全表扫描。感兴趣的同学可以自行实现。

在依赖关系图中可以看到,IM 直接基于 DM,而没有基于 VM。索引的数据被直接插入数据库文件中,而不需要经过版本管理。

本节不赘述 B+ 树算法,更多描述实现。

基本结构

1
2
[LeafFlag][KeyNumber][SiblingUid]
[Son0][Key0][Son1][Key1]...[SonN][KeyN]
  • **[LeafFlag]**:标记该节点是否为叶子节点
  • **[KeyNumber]**:该节点中 key 的个数
  • **[SiblingUid]**:是其兄弟节点存储在 DM 中的 UID,用于实现节点的连接
  • [**SonN] [KeyN]**:后续穿插的子节点,最后一个 Key 始终为 MAX_VALUE,以方便查找

Node具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class Node {
static final int IS_LEAF_OFFSET = 0; // 表示该节点是否为叶子节点
static final int NO_KEYS_OFFSET = IS_LEAF_OFFSET + 1; // 表示该节点中key的个数
static final int SIBLING_OFFSET = NO_KEYS_OFFSET + 2; // 表示节点的兄弟节点的UID属性
static final int NODE_HEADER_SIZE = SIBLING_OFFSET + 8; // 表示节点头部的大小的常量

static final int BALANCE_NUMBER = 32; // 节点的平衡因子的常量,一个节点最多可以包含32个key
static final int NODE_SIZE = NODE_HEADER_SIZE + (2 * 8) * (BALANCE_NUMBER * 2 + 2); // 节点的大小


/**
* 设置是否为叶子节点,1表示是叶子节点,0表示非叶子节点
*/
static void setRawIsLeaf(SubArray raw, boolean isLeaf) {
if (isLeaf) {
raw.raw[raw.start + IS_LEAF_OFFSET] = (byte) 1;
} else {
raw.raw[raw.start + IS_LEAF_OFFSET] = (byte) 0;
}
}

/**
* 判断是否为叶子节点
*/
static boolean getRawIfLeaf(SubArray raw) {
return raw.raw[raw.start + IS_LEAF_OFFSET] == (byte) 1;
}

/**
* 设置节点个数
*/
static void setRawNoKeys(SubArray raw, int noKeys) {
System.arraycopy(Parser.short2Byte((short) noKeys), 0, raw.raw, raw.start + NO_KEYS_OFFSET, 2);
}

/**
* 获取节点个数
*/
static int getRawNoKeys(SubArray raw) {
return (int) Parser.parseShort(Arrays.copyOfRange(raw.raw, raw.start + NO_KEYS_OFFSET, raw.start + NO_KEYS_OFFSET + 2));
}

/**
* 设置兄弟节点的uid,占用八个字节
*
* @param raw
* @param sibling
*/
static void setRawSibling(SubArray raw, long sibling) {
System.arraycopy(Parser.long2Byte(sibling), 0, raw.raw, raw.start + SIBLING_OFFSET, 8);
}

/**
* 获取兄弟节点的uid
*
* @param raw
* @return
*/
static long getRawSibling(SubArray raw) {
return Parser.parseLong(Arrays.copyOfRange(raw.raw, raw.start + SIBLING_OFFSET, raw.start + SIBLING_OFFSET + 8));
}

/**
* 设置第k个子节点的UID。
* 注:k 是从0开始的
* @param raw 节点的原始字节数组。
* @param uid 要设置的UID。
* @param kth 子节点的索引。
* raw.start是字节数组的起始位置,NODE_HEADER_SIZE是节点头部的大小,
* kth * (8 * 2)是第k个子节点或键的偏移量。所以,raw.start + NODE_HEADER_SIZE + kth * (8 * 2)
* 就是第k个子节点或键在字节数组中的起始位置。
*/
static void setRawKthSon(SubArray raw, long uid, int kth) {
int offset = raw.start + NODE_HEADER_SIZE + kth * (8 * 2);
System.arraycopy(Parser.long2Byte(uid), 0, raw.raw, offset, 8);
}

/**
* 获取第k个子节点的UID。
*/
static long getRawKthSon(SubArray raw, int kth) {
int offset = raw.start + NODE_HEADER_SIZE + kth * (8 * 2);
return Parser.parseLong(Arrays.copyOfRange(raw.raw, offset, offset + 8));
}

/**
* 设置第k个键的值
*/
static void setRawKthKey(SubArray raw, long key, int kth) {
int offset = raw.start + NODE_HEADER_SIZE + kth * (8 * 2) + 8;
System.arraycopy(Parser.long2Byte(key), 0, raw.raw, offset, 8);
}

/**
* 获取第k个键的值
*/
static long getRawKthKey(SubArray raw, int kth) {
int offset = raw.start + NODE_HEADER_SIZE + kth * (8 * 2) + 8;
return Parser.parseLong(Arrays.copyOfRange(raw.raw, offset, offset + 8));
}

/**
* 从一个节点的原始字节数组中复制一部分数据到另一个节点的原始字节数组中
*/
static void copyRawFromKth(SubArray from, SubArray to, int kth) {
// 计算要复制的数据在源节点的原始字节数组中的起始位置
int offset = from.start + NODE_HEADER_SIZE + kth * (8 * 2);
// 将源节点的原始字节数组中的数据复制到目标节点的原始字节数组中
// 复制的数据包括从起始位置到源节点的原始字节数组的末尾的所有数据
System.arraycopy(from.raw, offset, to.raw, to.start + NODE_HEADER_SIZE, from.end - offset);
}
}

newRootRaw()

1
2
3
4
5
6
7
8
9
10
11
12
[LeafFlag: 0]
[KeyNumber: 2]
[SiblingUid: 0]
[Son0: left][Key0: key][Son1: right][Key1: MAX_VALUE]

注:一个简单的演示

(key)
/ \
/ \
/ \
[left] [right]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 创建一个新的根节点的原始字节数组。
* 这个新的根节点包含两个子节点,它们的键分别是`key`和`Long.MAX_VALUE`,UID分别是`left`和`right`。
*/
static byte[] newRootRaw(long left, long right, long key) {
// 创建一个新的字节数组,大小为节点的大小
SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE);

//设置节点的基本属性
// 设置节点为非叶子节点
setRawIsLeaf(raw, false);
// 设置节点的键的数量为2
setRawNoKeys(raw, 2);
// 设置节点的兄弟节点的UID为0
setRawSibling(raw, 0);

//设置子节点和键值
// 设置第0个子节点的UID为left
setRawKthSon(raw, left, 0);
// 设置第0个键的值为key
setRawKthKey(raw, key, 0);
// 设置第1个子节点的UID为right
setRawKthSon(raw, right, 1);
// 设置第1个键的值为Long.MAX_VALUE
setRawKthKey(raw, Long.MAX_VALUE, 1);

// 返回新创建的根节点的原始字节数组
return raw.raw;
}

newNilRootRaw()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 创建一个新的空根节点的原始字节数组,这个新的根节点没有子节点和键。
*/
static byte[] newNilRootRaw() {
// 创建一个新的字节数组,大小为节点的大小
SubArray raw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE);

// 设置节点为叶子节点
setRawIsLeaf(raw, true);
// 设置节点的键的数量为0
setRawNoKeys(raw, 0);
// 设置节点的兄弟节点的UID为0
setRawSibling(raw, 0);

// 返回新创建的空根节点的原始字节数组
return raw.raw;
}

searchNext()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class SearchNextRes {
long uid;
long siblingUid;
}

/**
* 在B+树的节点中搜索下一个节点的方法
*/
public SearchNextRes searchNext(long key) {
// 获取节点的读锁
dataItem.rLock();
try {
// 创建一个SearchNextRes对象,用于存储搜索结果
SearchNextRes res = new SearchNextRes();
// 获取节点个数
int noKeys = getRawNoKeys(raw);
for (int i = 0; i < noKeys; i++) {
// 获取第i个key的值
long ik = getRawKthKey(raw, i);
// 如果key小于ik,那么找到了下一个节点
if (key < ik) {
// 设置下一个节点的UID
res.uid = getRawKthSon(raw, i);
// 设置兄弟节点的UID为0
res.siblingUid = 0;
// 返回搜索结果
return res;
}
}
// 如果没有找到下一个节点,设置uid为0
res.uid = 0;
// 设置兄弟节点的UID为当前节点的兄弟节点的UID
res.siblingUid = getRawSibling(raw);
// 返回搜索结果
return res;

} finally {
// 释放节点的读锁
dataItem.rUnLock();
}
}

LeafSearchRangeRes()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class LeafSearchRangeRes {
List<Long> uids;
long siblingUid;
}

/**
* 在B+树的叶子节点中搜索一个键值范围的方法
*/
public LeafSearchRangeRes leafSearchRange(long leftKey, long rightKey) {
// 获取数据项的读锁
dataItem.rLock();
try {
// 获取节点中的键的数量
int noKeys = getRawNoKeys(raw);
int kth = 0;
// 找到第一个大于或等于左键的键
while (kth < noKeys) {
long ik = getRawKthKey(raw, kth);
if (ik >= leftKey) {
break;
}
kth++;
}
// 创建一个列表,用于存储所有在键值范围内的子节点的UID
List<Long> uids = new ArrayList<>();
// 遍历所有的键,将所有小于或等于右键的键对应的子节点的UID添加到列表中
while (kth < noKeys) {
long ik = getRawKthKey(raw, kth);
if (ik <= rightKey) {
uids.add(getRawKthSon(raw, kth));
kth++;
} else {
break;
}
}
// 如果所有的键都被遍历过,获取兄弟节点的UID
long siblingUid = 0;
if (kth == noKeys) {
siblingUid = getRawSibling(raw);
}
// 创建一个LeafSearchRangeRes对象,用于存储搜索结果
LeafSearchRangeRes res = new LeafSearchRangeRes();
res.uids = uids;
res.siblingUid = siblingUid;
// 返回搜索结果
return res;
} finally {
// 释放数据项的读锁
dataItem.rUnLock();
}
}

insertAndSplit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class InsertAndSplitRes {
long siblingUid, newSon, newKey;
}

/**
* 在B+树的节点中插入一个键值对,并在需要时分裂节点。
*/
public InsertAndSplitRes insertAndSplit(long uid, long key) throws Exception {
// 创建一个标志位,用于标记插入操作是否成功
boolean success = false;
// 创建一个异常对象,用于存储在插入或分裂节点时发生的异常
Exception err = null;
// 创建一个InsertAndSplitRes对象,用于存储插入和分裂节点的结果
InsertAndSplitRes res = new InsertAndSplitRes();

// 在数据项上设置一个保存点
dataItem.before();
try {
// 尝试在节点中插入键值对,并获取插入结果
success = insert(uid, key);
// 如果插入失败,设置兄弟节点的UID,并返回结果
if (!success) {
res.siblingUid = getRawSibling(raw);
return res;
}
// 如果需要分裂节点
if (needSplit()) {
try {
// 分裂节点,并获取分裂结果
SplitRes r = split();
// 设置新节点的UID和新键,并返回结果
res.newSon = r.newSon;
res.newKey = r.newKey;
return res;
} catch (Exception e) {
// 如果在分裂节点时发生错误,保存异常并抛出
err = e;
throw e;
}
} else {
// 如果不需要分裂节点,直接返回结果
return res;
}
} finally {
// 如果没有发生错误并且插入成功,提交数据项的修改
if (err == null && success) {
dataItem.after(TransactionManagerImpl.SUPER_XID);
} else {
// 如果发生错误或插入失败,回滚数据项的修改
dataItem.unBefore();
}
}
}


insert()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 在B+树的节点中插入一个键值对的方法
*/
private boolean insert(long uid, long key) {
// 获取节点中的键的数量
int noKeys = getRawNoKeys(raw);
// 初始化插入位置的索引
int kth = 0;
// 找到第一个大于或等于要插入的键的键的位置
while (kth < noKeys) {
long ik = getRawKthKey(raw, kth);
if (ik < key) {
kth++;
} else {
break;
}
}
// 如果所有的键都被遍历过,并且存在兄弟节点,插入失败
if (kth == noKeys && getRawSibling(raw) != 0) return false;

// 如果节点是叶子节点
if (getRawIfLeaf(raw)) {
// 在插入位置后的所有键和子节点向后移动一位
shiftRawKth(raw, kth);
// 在插入位置插入新的键和子节点的UID
setRawKthKey(raw, key, kth);
setRawKthSon(raw, uid, kth);
// 更新节点中的键的数量
setRawNoKeys(raw, noKeys + 1);
} else {
// 如果节点是非叶子节点
// 获取插入位置的键
long kk = getRawKthKey(raw, kth);
// 在插入位置插入新的键
setRawKthKey(raw, key, kth);
// 在插入位置后的所有键和子节点向后移动一位
shiftRawKth(raw, kth + 1);
// 在插入位置的下一个位置插入原来的键和新的子节点的UID
setRawKthKey(raw, kk, kth + 1);
setRawKthSon(raw, uid, kth + 1);
// 更新节点中的键的数量
setRawNoKeys(raw, noKeys + 1);
}
// 插入成功
return true;
}

split()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class SplitRes {
long newSon, newKey;
}

/**
* 分裂B+树的节点。
* 当一个节点的键的数量达到 `BALANCE_NUMBER * 2` 时,就意味着这个节点已经满了,需要进行分裂操作。
* 分裂操作的目的是将一个满的节点分裂成两个节点,每个节点包含一半的键。
*/
private SplitRes split() throws Exception {
// 创建一个新的字节数组,用于存储新节点的原始数据
SubArray nodeRaw = new SubArray(new byte[NODE_SIZE], 0, NODE_SIZE);
// 设置新节点的叶子节点标志,与原节点相同
setRawIsLeaf(nodeRaw, getRawIfLeaf(raw));
// 设置新节点的键的数量为BALANCE_NUMBER
setRawNoKeys(nodeRaw, BALANCE_NUMBER);
// 设置新节点的兄弟节点的UID,与原节点的兄弟节点的UID相同
setRawSibling(nodeRaw, getRawSibling(raw));
// 从原节点的原始字节数组中复制一部分数据到新节点的原始字节数组中
copyRawFromKth(raw, nodeRaw, BALANCE_NUMBER);
// 在数据管理器中插入新节点的原始数据,并获取新节点的UID
long son = tree.dm.insert(TransactionManagerImpl.SUPER_XID, nodeRaw.raw);
// 更新原节点的键的数量为BALANCE_NUMBER
setRawNoKeys(raw, BALANCE_NUMBER);
// 更新原节点的兄弟节点的UID为新节点的UID
setRawSibling(raw, son);

// 创建一个SplitRes对象,用于存储分裂结果
SplitRes res = new SplitRes();
// 设置新节点的UID
res.newSon = son;
// 设置新键为新节点的第一个键的值
res.newKey = getRawKthKey(nodeRaw, 0);
// 返回分裂结果
return res;
}

Table Manager (TBM) 表结构管理器

本章概述 TBM,即表管理器的实现。TBM 实现了对字段结构和表结构的管理。同时简要介绍 MYDB 使用的类 SQL 语句的解析。

SQL 解析器

**Parser** 实现了对类 **SQL** 语句的结构化解析,将语句中包含的信息封装为对应语句的类,这些类可见 **top.guoziyang.mydb.backend.parser.statement** 包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
<begin statement>
begin [isolation level (read committedrepeatable read)]
begin isolation level read committed

<commit statement>
commit

<abort statement>
abort

<create statement>
create table <table name>
<field name> <field type>
<field name> <field type>
...
<field name> <field type>
[(index <field name list>)]
create table students
id int32,
name string,
age int32,
(index id name)

<drop statement>
drop table <table name>
drop table students

<select statement>
select (*<field name list>) from <table name> [<where statement>]
select * from student where id = 1
select name from student where id > 1 and id < 4
select name, age, id from student where id = 12

<insert statement>
insert into <table name> values <value list>
insert into student values 5 "Zhang Yuanjia" 22

<delete statement>
delete from <table name> <where statement>
delete from student where name = "Zhang Yuanjia"

<update statement>
update <table name> set <field name>=<value> [<where statement>]
update student set name = "ZYJ" where id = 5

<where statement>
where <field name> (><=) <value> [(andor) <field name> (><=) <value>]
where age > 10 or age < 3

<field name> <table name>
[a-zA-Z][a-zA-Z0-9_]*

<field type>
int32 int64 string

<value>
.*

Tokenizer类:

  • Tokenizer类用于对语句进行逐字节解析,根据空白符或者特定的词法规则,将语句切割成多个token。
  • 提供了peek()和pop()方法,方便取出Token进行解析。
  • 具体的切割实现在内部,不在此段内容中赘述。
peek()

20251021163347

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* 获取当前的标记,如果需要的话,会生成新的标记。
*/
public String peek() throws Exception {
if (err != null) {
throw err;
}
if (flushToken) {
String token = null;
try {
token = next();
} catch (Exception e) {
err = e;
throw e;
}
currentToken = token;
flushToken = false;
}
return currentToken;
}

/**
* 获取下一个标记。如果存在错误,将抛出异常。
*/
private String next() throws Exception {
if (err != null) {
throw err; // 如果存在错误,抛出异常
}
return nextMetaState(); // 否则,获取下一个元状态
}

/**
* 获取下一个元状态。元状态可以是一个符号、引号包围的字符串或者一个由字母、数字或下划线组成的标记。
*/
private String nextMetaState() throws Exception {
while (true) {
Byte b = peekByte(); // 获取下一个字节
if (b == null) {
return ""; // 如果没有下一个字节,返回空字符串
}
if (!isBlank(b)) {
break; // 如果下一个字节不是空白字符,跳出循环
}
popByte(); // 否则,跳过这个字节
}
byte b = peekByte(); // 获取下一个字节
if (isSymbol(b)) {
popByte(); // 如果这个字节是一个符号,跳过这个字节
return new String(new byte[]{b}); // 并返回这个符号
} else if (b == '"' || b == '\'') {
return nextQuoteState(); // 如果这个字节是引号,获取下一个引号状态
} else if (isAlphaBeta(b) || isDigit(b)) {
return nextTokenState(); // 如果这个字节是字母、数字或下划线,获取下一个标记状态
} else {
err = Error.InvalidCommandException; // 否则,设置错误状态为无效的命令异常
throw err; // 并抛出异常
}
}

/**
* 获取下一个标记。标记是由字母、数字或下划线组成的字符串。
*/
private String nextTokenState() throws Exception {
StringBuilder sb = new StringBuilder(); // 创建一个StringBuilder,用于存储标记
while (true) {
Byte b = peekByte(); // 获取下一个字节
// 如果没有下一个字节,或者下一个字节不是字母、数字或下划线,那么结束循环
if (b == null || !(isAlphaBeta(b) || isDigit(b) || b == '_')) {
// 如果下一个字节是空白字符,那么跳过这个字节
if (b != null && isBlank(b)) {
popByte();
}
// 返回标记
return sb.toString();
}
// 如果下一个字节是字母、数字或下划线,那么将这个字节添加到StringBuilder中
sb.append(new String(new byte[]{b}));
popByte(); // 跳过这个字节
}
}

/**
* 处理引号状态,即处理被引号包围的字符串。
*/
private String nextQuoteState() throws Exception {
byte quote = peekByte(); // 获取下一个字节,这应该是一个引号
popByte(); // 跳过这个引号
StringBuilder sb = new StringBuilder(); // 创建一个StringBuilder,用于存储被引号包围的字符串
while (true) {
Byte b = peekByte(); // 获取下一个字节
if (b == null) {
err = Error.InvalidCommandException; // 如果没有下一个字节,设置错误状态为无效的命令异常
throw err; // 并抛出异常
}
if (b == quote) {
popByte(); // 如果这个字节是引号,跳过这个字节,并跳出循环
break;
}
sb.append(new String(new byte[]{b})); // 如果这个字节不是引号,将这个字节添加到StringBuilder中
popByte(); // 并跳过这个字节
}
return sb.toString(); // 返回被引号包围的字符串
}

pop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 将当前的标记设置为需要刷新,这样下次调用peek()时会生成新的标记。
*/
public void pop() {
flushToken = true;
}

/**
* 跳过该字节,指向下一个字节
*/
private void popByte() {
pos++;
if (pos > stat.length) {
pos = stat.length;
}
}

Parser类

  • Parser类直接对外提供了Parse(byte[] statement)方法,用于解析语句。
  • 解析过程核心是调用Tokenizer类来分割Token,并根据词法规则将Token包装成具体的Statement类,并返回。
  • 解析过程相对简单,仅根据第一个Token来区分语句类型,并分别处理。
  • 解析过程自己查看几遍源码即可,这里不多赘述
parse()

20251021163414

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* 解析输入的字节流,根据不同的标记(token)调用不同的解析方法,生成对应的语句对象。
*/
public static Object Parse(byte[] statement) throws Exception {
Tokenizer tokenizer = new Tokenizer(statement); // 创建一个Tokenizer对象,用于获取标记
String token = tokenizer.peek(); // 获取下一个标记
tokenizer.pop(); // 跳过这个标记

Object stat = null; // 用于存储生成的语句对象
Exception statErr = null; // 用于存储错误信息
try {
// 根据标记的值,调用对应的解析方法
switch (token) {
case "begin":
stat = parseBegin(tokenizer);
break;
case "commit":
stat = parseCommit(tokenizer);
break;
case "abort":
stat = parseAbort(tokenizer);
break;
case "create":
stat = parseCreate(tokenizer);
break;
case "drop":
stat = parseDrop(tokenizer);
break;
case "select":
stat = parseSelect(tokenizer);
break;
case "insert":
stat = parseInsert(tokenizer);
break;
case "delete":
stat = parseDelete(tokenizer);
break;
case "update":
stat = parseUpdate(tokenizer);
break;
case "show":
stat = parseShow(tokenizer);
break;
default:
throw Error.InvalidCommandException; // 如果标记的值不符合预期,抛出异常
}
} catch (Exception e) {
statErr = e; // 如果在解析过程中出现错误,保存错误信息
}
try {
String next = tokenizer.peek(); // 获取下一个标记
// 如果还有未处理的标记,那么抛出异常
if (!"".equals(next)) {
byte[] errStat = tokenizer.errStat();
statErr = new RuntimeException("Invalid statement: " + new String(errStat));
}
} catch (Exception e) {
e.printStackTrace();
byte[] errStat = tokenizer.errStat();
statErr = new RuntimeException("Invalid statement: " + new String(errStat));
}
// 如果存在错误,抛出异常
if (statErr != null) {
throw statErr;
}
// 返回生成的语句对象
return stat;
}

字段与表管理

注意,这里的字段与表管理,不是管理各个条目中不同的字段的数值等信息,而是管理表和字段的结构数据,例如表名、表字段信息和字段索引等。

结构数据

  1. 数据存储结构: 表和字段的信息以二进制形式存储在数据库的 Entry 中。
  2. 字段信息表示: 字段的二进制表示包含字段名(FieldName)、字段类型(TypeName)和索引UID(IndexUid)。
    • 字段名和字段类型以及其他信息都以字节形式的字符串存储。
    • **[FieldName] [TypeName] [IndexUid]**
    • 为了明确字符串的存储边界,采用了一种规定的字符串存储方式,即在字符串数据之前存储了字符串的长度信息。
    • **[StringLength] [StringData]**
  3. 字段类型限定: 字段的类型被限定为 int32、int64 和 string 类型。
  4. 索引表示: 如果字段被索引,则IndexUid指向了索引二叉树的根节点;否则该字段的IndexUid为0。
  5. 读取和解析: 通过唯一标识符(UID)从虚拟内存(VM)中读取字段信息,并根据上述结构解析该信息。

Table

基本定义
对于Table的表结构是如下的:

  • **[TableName] [NextTable] [Field1Uid][Field2Uid]...[FieldNUid]**
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    /**
    * Table 维护了表结构
    * 二进制结构如下:
    * [TableName][NextTable]
    * [Field1Uid][Field2Uid]...[FieldNUid]
    */
    public class Table {
    TableManager tbm; // 表管理器,用于管理数据库表
    long uid; // 表的唯一标识符
    String name; // 表的名称
    byte status; // 表的状态
    long nextUid; // 下一个表的唯一标识符
    List<Field> fields = new ArrayList<>(); // 表的字段列表
    }
createTable()

20251021164132

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 创建一个新的数据库表。
*
* @param tbm 表管理器,用于管理数据库表
* @param nextUid 下一个表的唯一标识符
* @param xid 事务ID
* @param create 创建表的语句
* @return 创建的表
*/
public static Table createTable(TableManager tbm, long nextUid, long xid, Create create) throws Exception {
// 创建一个新的表对象
Table tb = new Table(tbm, create.tableName, nextUid);
// 遍历创建表语句中的所有字段
for (int i = 0; i < create.fieldName.length; i++) {
// 获取字段名和字段类型
String fieldName = create.fieldName[i];
String fieldType = create.fieldType[i];
// 判断该字段是否需要建立索引
boolean indexed = false;
for (int j = 0; j < create.index.length; j++) {
if (fieldName.equals(create.index[j])) {
indexed = true;
break;
}
}
// 创建一个新的字段对象,并添加到表对象中
tb.fields.add(Field.createField(tb, xid, fieldName, fieldType, indexed));
}
// 将表对象的状态持久化到存储系统中,并返回表对象
return tb.persistSelf(xid);
}
persistSelf()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 将Table对象的状态持久化到存储系统中。 [TableName] [NextTable] [Field1Uid][Field2Uid]...[FieldNUid]
* @param xid 事务ID
* @return 当前Table对象
* @throws Exception 如果存在错误
*/
private Table persistSelf(long xid) throws Exception {
// 将表名转换为字节数组
byte[] nameRaw = Parser.string2Byte(name);
// 将下一个uid转换为字节数组
byte[] nextRaw = Parser.long2Byte(nextUid);
// 创建一个空的字节数组,用于存储字段的uid
byte[] fieldRaw = new byte[0];
// 遍历所有的字段
for (Field field : fields) {
// 将字段的uid转换为字节数组,并添加到fieldRaw中
fieldRaw = Bytes.concat(fieldRaw, Parser.long2Byte(field.uid));
}
// 将表名、下一个uid和所有字段的uid插入到存储系统中,返回插入的uid
uid = ((TableManagerImpl) tbm).vm.insert(xid, Bytes.concat(nameRaw, nextRaw, fieldRaw));
// 返回当前Table对象
return this;
}
loadTable()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 这个静态方法用于从数据库中加载一个表
public static Table loadTable(TableManager tbm, long uid) {
// 初始化一个字节数组用于存储从数据库中读取的原始数据
byte[] raw = null;
try {
// 使用表管理器的版本管理器从数据库中读取指定uid的表的原始数据
raw = ((TableManagerImpl) tbm).vm.read(TransactionManagerImpl.SUPER_XID, uid);
} catch (Exception e) {
// 如果在读取过程中发生异常,调用Panic.panic方法处理异常
Panic.panic(e);
}
// 断言原始数据不为空
assert raw != null;
// 创建一个新的表对象
Table tb = new Table(tbm, uid);
// 使用原始数据解析表对象,并返回这个表对象
return tb.parseSelf(raw);
}
parseSelf()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 这个方法用于解析表对象
// [TableName] [NextTable] [Field1Uid][Field2Uid]...[FieldNUid]
private Table parseSelf(byte[] raw) {
// 初始化位置变量
int position = 0;
// 使用Parser.parseString方法解析原始数据中的字符串
ParseStringRes res = Parser.parseString(raw);
// 将解析出的字符串赋值给表的名称
name = res.str;
// 更新位置变量
position += res.next;
// 使用Parser.parseLong方法解析原始数据中的长整数,并赋值给下一个uid
nextUid = Parser.parseLong(Arrays.copyOfRange(raw, position, position + 8));
// 更新位置变量
position += 8;

// 当位置变量小于原始数据的长度时,继续循环
while (position < raw.length) {
// 使用Parser.parseLong方法解析原始数据中的长整数,并赋值给uid
long uid = Parser.parseLong(Arrays.copyOfRange(raw, position, position + 8));
// 更新位置变量
position += 8;
// 使用Field.loadField方法加载字段,并添加到表的字段列表中
fields.add(Field.loadField(this, uid));
}
// 返回当前表对象
return this;
}

Field

基本定义
对于字段结构的定义是如下的:

  • **[FieldName] [TypeName] [IndexUid]**
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    /**
    * field 表示字段信息
    * 二进制格式为:
    * [FieldName][TypeName][IndexUid]
    * 如果field无索引,IndexUid为0
    */
    public class Field {

    // 唯一标识符,用于标识每个Field对象
    long uid;
    // Field对象所属的表
    private Table tb;
    // 字段名,用于标识表中的每个字段
    String fieldName;
    // 字段类型,用于标识字段的数据类型
    String fieldType;
    // 索引,用于标识字段是否有索引,如果索引为0,表示没有索引
    private long index;
    // B+树,用于存储索引,如果字段有索引,这个B+树会被加载
    private BPlusTree bt;

    }
createField()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 创建一个新的Field对象
* @param tb 表对象,Field对象所属的表
* @param xid 事务ID
* @param fieldName 字段名
* @param fieldType 字段类型
* @param indexed 是否创建索引
* @return 返回创建的Field对象
* @throws Exception 如果字段类型无效或者创建B+树索引失败,会抛出异常
*/
public static Field createField(Table tb, long xid, String fieldName, String fieldType, boolean indexed) throws Exception {
typeCheck(fieldType); // 检查字段类型是否有效
Field f = new Field(tb, fieldName, fieldType, 0); // 创建一个新的Field对象
if (indexed) { // 如果需要创建索引
long index = BPlusTree.create(((TableManagerImpl) tb.tbm).dm); // 创建一个新的B+树索引
BPlusTree bt = BPlusTree.load(index, ((TableManagerImpl) tb.tbm).dm); // 加载这个B+树索引
f.index = index; // 设置Field对象的索引
f.bt = bt; // 设置Field对象的B+树
}
f.persistSelf(xid); // 将Field对象持久化到存储中
return f; // 返回创建的Field对象
}

private static void typeCheck(String fieldType) throws Exception {
if (!"int32".equals(fieldType) && !"int64".equals(fieldType) && !"string".equals(fieldType)) {
throw Error.InvalidFieldException;
}
}
persistSelf()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* 将当前Field对象持久化到存储中
*/
private void persistSelf(long xid) throws Exception {
// 将字段名转换为字节数组
byte[] nameRaw = Parser.string2Byte(fieldName);
// 将字段类型转换为字节数组
byte[] typeRaw = Parser.string2Byte(fieldType);
// 将索引转换为字节数组
byte[] indexRaw = Parser.long2Byte(index);
// 将字段名、字段类型和索引的字节数组合并,然后插入到持久化存储中
// 插入成功后,会返回一个唯一的uid,将这个uid设置为当前Field对象的uid
this.uid = ((TableManagerImpl) tb.tbm).vm.insert(xid, Bytes.concat(nameRaw, typeRaw, indexRaw));
}
loadField()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 从持久化存储中加载一个Field对象。
*/
public static Field loadField(Table tb, long uid) {
byte[] raw = null; // 用于存储从持久化存储中读取的原始字节数据
try {
// 从持久化存储中读取uid对应的原始字节数据
raw = ((TableManagerImpl) tb.tbm).vm.read(TransactionManagerImpl.SUPER_XID, uid);
} catch (Exception e) {
// 如果读取过程中出现异常,调用Panic.panic方法处理异常
Panic.panic(e);
}
// 断言原始字节数据不为null,如果为null,那么会抛出AssertionError
assert raw != null;
// 创建一个新的Field对象,并调用parseSelf方法解析原始字节数据
return new Field(uid, tb).parseSelf(raw);
}
parseSelf()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 解析原始字节数组并设置字段名、字段类型和索引
* @param raw 原始字节数组
* @return 返回当前Field对象
*/
private Field parseSelf(byte[] raw) {
int position = 0; // 初始化位置为0
ParseStringRes res = Parser.parseString(raw); // 解析原始字节数组,获取字段名和下一个位置
fieldName = res.str; // 设置字段名
position += res.next; // 更新位置

res = Parser.parseString(Arrays.copyOfRange(raw, position, raw.length)); // 从新的位置开始解析原始字节数组,获取字段类型和下一个位置
fieldType = res.str; // 设置字段类型
position += res.next; // 更新位置

this.index = Parser.parseLong(Arrays.copyOfRange(raw, position, position + 8)); // 从新的位置开始解析原始字节数组,获取索引

if (index != 0) { // 如果索引不为0,说明存在B+树索引
try {
bt = BPlusTree.load(index, ((TableManagerImpl) tb.tbm).dm); // 加载B+树索引
} catch (Exception e) {
Panic.panic(e); // 如果加载失败,抛出异常
}
}
return this; // 返回当前Field对象
}

Where查询条件

parseWhere()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 解析 WHERE 子句并返回满足条件的记录的 uid 列表
*/
private List<Long> parseWhere(Where where) throws Exception {
// 初始化搜索范围和标志位
long l0 = 0, r0 = 0, l1 = 0, r1 = 0;
boolean single = false;
Field fd = null;

// 如果 WHERE 子句为空,则搜索所有记录
if (where == null) {
// 寻找第一个有索引的字段
for (Field field : fields) {
if (field.isIndexed()) {
fd = field;
break;
}
}
// 设置搜索范围为整个 uid 空间
l0 = 0;
r0 = Long.MAX_VALUE;
single = true;
} else {
// 如果 WHERE 子句不为空,则根据 WHERE 子句解析搜索范围
// 寻找 WHERE 子句中涉及的字段
for (Field field : fields) {
if (field.fieldName.equals(where.singleExp1.field)) {
// 如果字段没有索引,则抛出异常
if (!field.isIndexed()) {
throw Error.FieldNotIndexedException;
}
fd = field;
break;
}
}
// 如果字段不存在,则抛出异常
if (fd == null) {
throw Error.FieldNotFoundException;
}
// 计算 WHERE 子句的搜索范围
CalWhereRes res = calWhere(fd, where);
l0 = res.l0;
r0 = res.r0;
l1 = res.l1;
r1 = res.r1;
single = res.single;
}

// 在计算出的搜索范围内搜索记录
List<Long> uids = fd.search(l0, r0);
// 如果 WHERE 子句包含 OR 运算符,则需要搜索两个范围,并将结果合并
if (!single) {
List<Long> tmp = fd.search(l1, r1);
uids.addAll(tmp);
}
// 返回搜索结果
return uids;
}

calWhere()

Booter

前言

启动信息管理

  • MYDB的启动信息存储在bt文件中,其中所需的信息只有一个,即头表的UID。
  • Booter类提供了loadupdate两个方法,用于加载和更新启动信息。
  • update方法在修改bt文件内容时,采取了一种保证原子性的策略,即先将内容写入一个临时文件bt_tmp中,然后通过操作系统的重命名操作将临时文件重命名为bt文件。
  • 通过这种方式,利用操作系统重命名文件的原子性,来确保对bt文件的修改操作是原子的,从而保证了启动信息的一致性和正确性。

基本定义

1
2
3
4
5
6
7
8
9
10
11
12

// 记录第一个表的uid
public class Booter {
// 数据库启动信息文件的后缀
public static final String BOOTER_SUFFIX = ".bt";
// 数据库启动信息文件的临时后缀
public static final String BOOTER_TMP_SUFFIX = ".bt_tmp";
// 数据库启动信息文件的路径
String path;
// 数据库启动信息文件
File file;
}

create() and open()

通过创建或打开启动信息文件,来进行数据库的校验

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 创建一个新的Booter对象
*/
public static Booter create(String path) {
// 删除可能存在的临时文件
removeBadTmp(path);
// 创建一个新的文件对象,文件名是路径加上启动信息文件的后缀
File f = new File(path + BOOTER_SUFFIX);
try {
// 尝试创建新的文件,如果文件已存在,则抛出异常
if (!f.createNewFile()) {
Panic.panic(Error.FileExistsException);
}
} catch (Exception e) {
// 如果创建文件过程中出现异常,则处理异常
Panic.panic(e);
}
// 检查文件是否可读写,如果不可读写,则抛出异常
if (!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}
// 返回新创建的Booter对象
return new Booter(path, f);
}

/**
* 打开一个已存在的Booter对象
*/
public static Booter open(String path) {
// 删除可能存在的临时文件
removeBadTmp(path);
// 创建一个新的文件对象,文件名是路径加上启动信息文件的后缀
File f = new File(path + BOOTER_SUFFIX);
// 如果文件不存在,则抛出异常
if (!f.exists()) {
Panic.panic(Error.FileNotExistsException);
}
// 检查文件是否可读写,如果不可读写,则抛出异常
if (!f.canRead() || !f.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}
// 返回打开的Booter对象
return new Booter(path, f);
}

/**
* 删除可能存在的临时文件
*/
private static void removeBadTmp(String path) {
// 删除路径加上临时文件后缀的文件
new File(path + BOOTER_TMP_SUFFIX).delete();
}

load()

加载文件启动信息文件

1
2
3
4
5
6
7
8
9
10
public byte[] load() {
byte[] buf = null;
try {
// 读取文件的所有字节
buf = Files.readAllBytes(file.toPath());
} catch (IOException e) {
Panic.panic(e);
}
return buf;
}

update()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 更新启动信息文件的内容。
*
* @param data 要写入文件的数据
*/
public void update(byte[] data) {
// 创建一个新的临时文件
File tmp = new File(path + BOOTER_TMP_SUFFIX);
try {
// 尝试创建新的临时文件
tmp.createNewFile();
} catch (Exception e) {
// 如果创建文件过程中出现异常,则处理异常
Panic.panic(e);
}
// 检查临时文件是否可读写,如果不可读写,则抛出异常
if (!tmp.canRead() || !tmp.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}
try (FileOutputStream out = new FileOutputStream(tmp)) {
// 将数据写入临时文件
out.write(data);
// 刷新输出流,确保数据被写入文件
out.flush();
} catch (IOException e) {
// 如果写入文件过程中出现异常,则处理异常
Panic.panic(e);
}
try {
// 将临时文件移动到启动信息文件的位置,替换原来的文件
Files.move(tmp.toPath(), new File(path + BOOTER_SUFFIX).toPath(), StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
// 如果移动文件过程中出现异常,则处理异常
Panic.panic(e);
}
// 更新file字段为新的启动信息文件
file = new File(path + BOOTER_SUFFIX);
// 检查新的启动信息文件是否可读写,如果不可读写,则抛出异常
if (!file.canRead() || !file.canWrite()) {
Panic.panic(Error.FileCannotRWException);
}
}

TableManager

基本定义

  • TableManager中的方法直接返回执行结果,比如错误信息或者可读的结果信息的字节数组。
  • 这些方法的实现相对简单,主要是调用(VM)相关的方法来完成数据库操作。
  • 在创建新表时,采用了头插法,即每次创建表都将新表插入到链表的头部。这意味着最新创建的表会成为链表的第一个元素。由于使用了头插法,每次创建表都会改变表链表的头部,因此需要更新Booter文件,以便记录新的头表的UID。
  • 在创建TBM对象时,会初始化表信息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public interface TableManager {
    BeginRes begin(Begin begin);
    byte[] commit(long xid) throws Exception;
    byte[] abort(long xid);

    byte[] show(long xid);
    byte[] create(long xid, Create create) throws Exception;

    byte[] insert(long xid, Insert insert) throws Exception;
    byte[] read(long xid, Select select) throws Exception;
    byte[] update(long xid, Update update) throws Exception;
    byte[] delete(long xid, Delete delete) throws Exception;
    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public class TableManagerImpl implements TableManager {
    VersionManager vm; // 版本管理器,用于管理事务的版本
    DataManager dm; // 数据管理器,用于管理数据的存储和读取
    private Booter booter; // 启动信息管理器,用于管理数据库启动信息
    private Map<String, Table> tableCache; // 表缓存,用于缓存已加载的表,键是表名,值是表对象
    private Map<Long, List<Table>> xidTableCache; // 事务表缓存,用于缓存每个事务修改过的表,键是事务ID,值是表对象列表
    private Lock lock; // 锁,用于同步多线程操作

    TableManagerImpl(VersionManager vm, DataManager dm, Booter booter) {
    this.vm = vm;
    this.dm = dm;
    this.booter = booter;
    this.tableCache = new HashMap<>();
    this.xidTableCache = new HashMap<>();
    lock = new ReentrantLock();
    loadTables();
    }
    }

loadTables()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 加载所有的数据库表。
*/
private void loadTables() {
// 获取第一个表的UID
long uid = firstTableUid();
// 当UID不为0时,表示还有表需要加载
while (uid != 0) {
// 加载表,并获取表的UID
Table tb = Table.loadTable(this, uid);
// 更新UID为下一个表的UID
uid = tb.nextUid;
// 将加载的表添加到表缓存中
tableCache.put(tb.name, tb);
}
}

/**
* 获取 Botter 文件的前八位字节
* @return
*/
private long firstTableUid() {
byte[] raw = booter.load();
return Parser.parseLong(raw);
}

create()

这里主要讲解一下 create方法,其他方法都是调用 VM 层

20251021164705

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Override
public byte[] create(long xid, Create create) throws Exception {
// 加锁,防止多线程并发操作
lock.lock();
try {
// 检查表是否已存在,如果存在则抛出异常
if (tableCache.containsKey(create.tableName)) {
throw Error.DuplicatedTableException;
}
// 创建新的表,并获取表的UID
Table table = Table.createTable(this, firstTableUid(), xid, create);
// 更新第一个表的UID
updateFirstTableUid(table.uid);
// 将新创建的表添加到表缓存中
tableCache.put(create.tableName, table);
// 如果事务表缓存中没有当前事务ID的条目,则添加一个新的条目
if (!xidTableCache.containsKey(xid)) {
xidTableCache.put(xid, new ArrayList<>());
}
// 将新创建的表添加到当前事务的表列表中
xidTableCache.get(xid).add(table);
// 返回创建成功的消息
return ("create " + create.tableName).getBytes();
} finally {
// 解锁
lock.unlock();
}
}

服务端客户端的实现及其通信规则

前言

在MYDB 中传输数据使用了一种特殊的二进制格式,用于客户端和通信端之间的通信。在数据的传输和接受之前,会通过Package进行数据的加密以及解密:

  • **[Flag] [Data]**
  • 若 flag 为 0,表示发送的是数据,那么 data 即为这份数据本身,err 就为空
  • 若 flag 为 1,表示发送的是错误信息,那么 data 为空, err 为错误提示信息
    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class Package {
    byte[] data; // 存放数据信息
    Exception err; // 存放错误提示信息

    public Package(byte[] data, Exception err) {
    this.data = data;
    this.err = err;
    }
    }

Encoder

用于将数据加密成十六进制数据,这样可以避免特殊字符造成的问题,并在信息末尾加上换行符。这样在发送和接受数据时,可以简单使用 BufferedReaderBufferedWrite进行读写数据;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Encoder {

/**
* 将Package对象编码为字节数组。
* 如果Package对象中的错误信息不为空,将错误信息编码为字节数组,并在字节数组前添加一个字节1。
* 如果Package对象中的错误信息为空,将数据编码为字节数组,并在字节数组前添加一个字节0。
*/
public byte[] encode(Package pkg) {
if (pkg.getErr() != null) {
Exception err = pkg.getErr();
String msg = "Intern server error!";
if (err.getMessage() != null) {
msg = err.getMessage();
}
return Bytes.concat(new byte[]{1}, msg.getBytes());
} else {
return Bytes.concat(new byte[]{0}, pkg.getData());
}
}

/**
* 将字节数组解码为Package对象。
* 如果字节数组的长度小于1,抛出InvalidPkgDataException异常。
* 如果字节数组的第一个字节为0,将字节数组的剩余部分解码为数据,创建一个新的Package对象,其中数据为解码后的数据,错误信息为null。
* 如果字节数组的第一个字节为1,将字节数组的剩余部分解码为错误信息,创建一个新的Package对象,其中数据为null,错误信息为解码后的错误信息。
* 如果字节数组的第一个字节既不是0也不是1,抛出InvalidPkgDataException异常。
*/
public Package decode(byte[] data) throws Exception {
if (data.length < 1) {
throw Error.InvalidPkgDataException;
}
if (data[0] == 0) {
return new Package(Arrays.copyOfRange(data, 1, data.length), null);
} else if (data[0] == 1) {
return new Package(null, new RuntimeException(new String(Arrays.copyOfRange(data, 1, data.length))));
} else {
throw Error.InvalidPkgDataException;
}
}

}

Transporter

编码之后的信息会通过 Transporter类,写入输出流发送出去;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Transporter {
private Socket socket;
private BufferedReader reader; // 字节缓冲流
private BufferedWriter writer;

public Transporter(Socket socket) throws IOException {
this.socket = socket;
this.reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
this.writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
}

/**
* 发送数据
*/
public void send(byte[] data) throws Exception {
String raw = hexEncode(data);
writer.write(raw);
writer.flush();
}

/**
* 接受数据
*/
public byte[] receive() throws Exception {
String line = reader.readLine();
if(line == null) {
close();
}
return hexDecode(line);
}

public void close() throws IOException {
writer.close();
reader.close();
socket.close();
}

/**
* 将字节数组转换为十六进制字符串
*/
private String hexEncode(byte[] buf) {
return Hex.encodeHexString(buf, true)+"\n";
}

/**
* 将十六进制字符串转换回字节数组
*/
private byte[] hexDecode(String buf) throws DecoderException {
return Hex.decodeHex(buf);
}
}

Packager

Packager 则是 EncoderTransporter 的结合体,直接对外提供 sendreceive 方法: bmn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Packager {
private Transporter transpoter;
private Encoder encoder;

public Packager(Transporter transpoter, Encoder encoder) {
this.transpoter = transpoter;
this.encoder = encoder;
}

/**
* 将信息编码之后发送
*/
public void send(Package pkg) throws Exception {
byte[] data = encoder.encode(pkg);
transpoter.send(data);
}

/**
* 将数据接收之后解密
*/
public Package receive() throws Exception {
byte[] data = transpoter.receive();
return encoder.decode(data);
}

public void close() throws Exception {
transpoter.close();
}
}

服务端和客户端的实现

Server 和 Client,都是使用了Java 的 socket;这一块内容属于 Java 网络编程的,可以通过 二哥的进阶之路 学习;

Server

**Server**是一个服务器类,主要作用是监听指定的端口号,接受客户端的连接请求,并为每个连接请求创建一个新的线程来处理;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Server {
private int port;
TableManager tbm;

public Server(int port, TableManager tbm) {
this.port = port;
this.tbm = tbm;
}

public void start() {
// 创建一个ServerSocket对象,用于监听指定的端口
ServerSocket ss = null;
try {
ss = new ServerSocket(port);
} catch (IOException e) {
e.printStackTrace();
return;
}
System.out.println("Server listen to port: " + port);
// 创建一个线程池,用于管理处理客户端连接请求的线程
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 20, 1L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
try {
// 无限循环,等待并处理客户端的连接请求
while (true) {
// 接收一个客户端的连接请求
Socket socket = ss.accept();
// 创建一个新的HandleSocket对象,用于处理这个连接请求
Runnable worker = new HandleSocket(socket, tbm);
// 将这个HandleSocket对象提交给线程池,由线程池中的一个线程来执行
tpe.execute(worker);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 在最后,无论是否发生异常,都要关闭ServerSocket
try {
ss.close();
} catch (IOException ignored) {
}
}
}
}
HandleSocket

HandleSocket 类实现了 **Runnable**** 接口,在建立连接后初始化 **Packager**,随后就循环接收来自客户端的数据并处理;主要通过 **Executor** 对象来执行 **SQL**语句,在接受、执行SQL语句的过程中发生异常的话,将会结束循环,并关闭 **Executor** **和 **Package**;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class HandleSocket implements Runnable {
private Socket socket;
private TableManager tbm;

public HandleSocket(Socket socket, TableManager tbm) {
this.socket = socket;
this.tbm = tbm;
}

@Override
public void run() {
// 获取远程客户端的地址信息
InetSocketAddress address = (InetSocketAddress) socket.getRemoteSocketAddress();
// 打印客户端的IP地址和端口号
System.out.println("Establish connection: " + address.getAddress().getHostAddress() + ":" + address.getPort());
Packager packager = null;
try {
// 创建一个Transporter对象,用于处理网络传输
Transporter t = new Transporter(socket);
// 创建一个Encoder对象,用于处理数据的编码和解码
Encoder e = new Encoder();
// 创建一个Packager对象,用于处理数据的打包和解包
packager = new Packager(t, e);
} catch (IOException e) {
// 如果在创建Transporter或Encoder时发生异常,打印异常信息并关闭socket
e.printStackTrace();
try {
socket.close();
} catch (IOException e1) {
e1.printStackTrace();
}
return;
}
// 创建一个Executor对象,用于执行SQL语句
Executor exe = new Executor(tbm);
while (true) {
Package pkg = null;
try {
// 从客户端接收数据包
pkg = packager.receive();
} catch (Exception e) {
// 如果在接收数据包时发生异常,结束循环
break;
}
// 获取数据包中的SQL语句
byte[] sql = pkg.getData();
byte[] result = null;
Exception e = null;
try {
// 执行SQL语句,并获取结果
result = exe.execute(sql);
} catch (Exception e1) {
// 如果在执行SQL语句时发生异常,保存异常信息
e = e1;
e.printStackTrace();
}
// 创建一个新的数据包,包含执行结果和可能的异常信息
pkg = new Package(result, e);
try {
// 将数据包发送回客户端
packager.send(pkg);
} catch (Exception e1) {
// 如果在发送数据包时发生异常,打印异常信息并结束循环
e1.printStackTrace();
break;
}
}
// 关闭Executor
exe.close();
try {
// 关闭Packager
packager.close();
} catch (Exception e) {
// 如果在关闭Packager时发生异常,打印异常信息
e.printStackTrace();
}
}

}
Launcher

这个类是服务器的启动入口,这个类解析了命令行参数。很重要的参数就是-open或者-createLauncher根据这两个参数,来决定是创建数据库文件,还是启动一个已有的数据库;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
public class Launcher {
// 定义服务器监听的端口号
public static final int port = 9999;
// 定义默认的内存大小,这里是64MB,用于数据管理器
public static final long DEFALUT_MEM = (1 << 20) * 64;
// 定义一些内存单位,用于解析命令行参数中的内存大小
public static final long KB = 1 << 10; // 1KB
public static final long MB = 1 << 20; // 1MB
public static final long GB = 1 << 30; // 1GB

public static void main(String[] args) throws ParseException {
Options options = new Options();
options.addOption("open", true, "-open DBPath");
options.addOption("create", true, "-create DBPath");
options.addOption("mem", true, "-mem 64MB");
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);

if (cmd.hasOption("open")) {
openDB(cmd.getOptionValue("open"), parseMem(cmd.getOptionValue("mem")));
return;
}
if (cmd.hasOption("create")) {
createDB(cmd.getOptionValue("create"));
return;
}
System.out.println("Usage: launcher (open|create) DBPath");
}

/**
* 创建新的数据库
*
* @param path 数据库路径
*/
private static void createDB(String path) {
// 创建事务管理器
TransactionManager tm = TransactionManager.create(path);
// 创建数据管理器
DataManager dm = DataManager.create(path, DEFALUT_MEM, tm);
// 创建版本管理器
VersionManager vm = new VersionManagerImpl(tm, dm);
// 创建表管理器
TableManager.create(path, vm, dm);
tm.close();
dm.close();
}

/**
* 启动已有的数据库
*/
private static void openDB(String path, long mem) {
// 打开事务管理器
TransactionManager tm = TransactionManager.open(path);
// 打开数据管理器,传入路径、内存大小和事务管理器
DataManager dm = DataManager.open(path, mem, tm);
// 创建版本管理器,传入事务管理器和数据管理器
VersionManager vm = new VersionManagerImpl(tm, dm);
// 打开表管理器,传入路径、版本管理器和数据管理器
TableManager tbm = TableManager.open(path, vm, dm);
// 创建服务器对象,并启动服务器
new Server(port, tbm).start();
}

// 定义一个方法,用于解析命令行参数中的内存大小
private static long parseMem(String memStr) {
// 如果内存大小为空或者为空字符串,那么返回默认的内存大小
if (memStr == null || "".equals(memStr)) {
return DEFALUT_MEM;
}
// 如果内存大小的字符串长度小于2,那么抛出异常
if (memStr.length() < 2) {
Panic.panic(Error.InvalidMemException);
}
// 获取内存大小的单位,即字符串的后两个字符
String unit = memStr.substring(memStr.length() - 2);
// 获取内存大小的数值部分,即字符串的前部分,并转换为数字
long memNum = Long.parseLong(memStr.substring(0, memStr.length() - 2));
// 根据内存单位,计算并返回最终的内存大小
switch (unit) {
case "KB":
return memNum * KB;
case "MB":
return memNum * MB;
case "GB":
return memNum * GB;
// 如果内存单位不是KB、MB或GB,那么抛出异常
default:
Panic.panic(Error.InvalidMemException);
}
// 如果没有匹配到任何情况,那么返回默认的内存大小
return DEFALUT_MEM;
}
}

Client

解析客户输入的内容;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Client {
// RoundTripper实例,用于处理请求的往返传输
private RoundTripper rt;

// 构造函数,接收一个Packager对象作为参数,并创建一个新的RoundTripper实例
public Client(Packager packager) {
this.rt = new RoundTripper(packager);
}

// execute方法,接收一个字节数组作为参数,将其封装为一个Package对象,并通过RoundTripper发送
// 如果响应的Package对象中包含错误,那么抛出这个错误
// 否则,返回响应的Package对象中的数据
public byte[] execute(byte[] stat) throws Exception {
Package pkg = new Package(stat, null);
Package resPkg = rt.roundTrip(pkg);
if(resPkg.getErr() != null) {
throw resPkg.getErr();
}
return resPkg.getData();
}

// close方法,关闭RoundTripper
public void close() {
try {
rt.close();
} catch (Exception e) {
}
}
}
RoundTripper

用于发送请求并接受响应

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class RoundTripper {
private Packager packager;

public RoundTripper(Packager packager) {
this.packager = packager;
}

// 定义一个方法,用于处理请求的往返传输
public Package roundTrip(Package pkg) throws Exception {
// 发送请求包
packager.send(pkg);
// 接收响应包,并返回
return packager.receive();
}

public void close() throws Exception {
packager.close();
}
}
Shell

用于接受用户的输入,并调用Client.execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class Shell {
private Client client;

public Shell(Client client) {
this.client = client;
}

// 定义一个运行方法,用于启动客户端的交互式命令行界面
public void run() {
// 创建一个Scanner对象,用于读取用户的输入
Scanner sc = new Scanner(System.in);
try {
// 循环接收用户的输入,直到用户输入"exit"或"quit"
while (true) {
// 打印提示符
System.out.print(":> ");
// 读取用户的输入
String statStr = sc.nextLine();
// 如果用户输入"exit"或"quit",则退出循环
if ("exit".equals(statStr) || "quit".equals(statStr)) {
break;
}
// 尝试执行用户的输入命令,并打印执行结果
try {
// 将用户的输入转换为字节数组,并执行
byte[] res = client.execute(statStr.getBytes());
// 将执行结果转换为字符串,并打印
System.out.println(new String(res));
// 如果在执行过程中发生异常,打印异常信息
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
// 无论是否发生异常,都要关闭Scanner和Client
} finally {
// 关闭Scanner
sc.close();
// 关闭Client
client.close();
}
}
}
Launcher

启动客户端并连接服务器;

1
2
3
4
5
6
7
8
9
10
11
12
public class Launcher {
public static void main(String[] args) throws UnknownHostException, IOException {
Socket socket = new Socket("127.0.0.1", 9999);
Encoder e = new Encoder();
Transporter t = new Transporter(socket);
Packager packager = new Packager(t, e);

Client client = new Client(packager);
Shell shell = new Shell(client);
shell.run();
}
}