KWDB SQL 执行流程分析
约 1480 字大约 5 分钟...
KWDB SQL 执行流程分析

1. 整体架构
Client (PostgreSQL wire protocol)
│
▼
[pgwire] 网络层 (协议解析)
│
▼
[connExecutor] 会话状态机
│
▼
[planner/optimizer] 查询规划
│
▼
[DistSQL] 分布式执行
│
▼
[KV Layer] → RocksDB + kwdbts2 (C++ 时序引擎)
KWDB 基于 CockroachDB 构建,遵循经典的分层 SQL 引擎架构。
2. 网络层 — pgwire
入口文件: kwbase/pkg/sql/pgwire/conn.go
连接入口
Server.ServeConn() // 接收 TCP 连接
└─> conn.serveImpl() // 每个连接一个 goroutine
└─> processCommandsAsync() // 读取 pgwire 消息
├─> handleSimpleQuery() // 简单查询协议 (SQL 直接发送)
│ └─> parser.ParseWithInt() // ★ SQL 解析
│ └─> stmtBuf.Push(ExecStmt{...})
│
└─> handleParse() // 扩展协议 - Parse 步骤
简单查询处理
// pgwire/conn.go:843
func (c *conn) handleSimpleQuery(ctx, buf, ...) error {
query, err := buf.GetString() // 从网络读取 SQL 文本
c.parser.ParseWithInt(query, ...) // [1] PARSE — parser/parse.go
c.stmtBuf.Push(ctx, sql.ExecStmt{...}) // [2] 推入语句缓冲区
}
扩展协议处理
// pgwire/conn.go:1300
func (c *conn) handleParse(...) {
query, err := buf.GetString() // 读取 SQL 文本
c.parser.ParseWithInt(query, ...) // [1] PARSE
// [2] 推送 PreparedStatement 元数据到 stmtBuf
}
3. SQL 解析
入口文件: kwbase/pkg/sql/parser/parse.go
解析流程
parser.Parse(sql)
→ scanOneStmt() // 词法分析 (scan.go)
→ lexer.Run() // 将 token 输入 yacc parser
→ parserImpl.Parse() // yacc 语法分析 (sql.y)
→ Statements []Statement // 返回 AST
关键文件
| 文件 | 职责 |
|---|---|
parser/parse.go | Parser.Parse(sql string) -> Statements — 顶层解析入口 |
parser/scan.go | scanner — 将 SQL 文本转为 token(关键字、标识符、字面量等) |
parser/lexer.go | lexer — 将 scanner 的 token 输入 yacc parser,处理占位符 |
parser/sql.y | yacc/bison 语法定义 — 生成 SQL 语法分析器 |
Statement 结构
解析得到的 Statement 包含:
AST tree.Statement— 解析后 AST 的根节点SQL string— 原始 SQL 文本NumPlaceholders int—$N占位符数量
4. 会话执行器 — connExecutor
入口文件: kwbase/pkg/sql/conn_executor.go
主事件循环
// conn_executor.go:1593
func (ex *connExecutor) run(ctx, ...) error {
ex.activate(ctx, ...)
for {
if err := ex.execCmd(ex.Ctx()); err != nil {
return err
}
}
}
命令分发
// conn_executor.go:1761
func (ex *connExecutor) execCmd(ctx) error {
cmd, pos, err := ex.stmtBuf.CurCmd() // 获取当前命令
switch tcmd := cmd.(type) {
case ExecStmt:
ex.execStmt(ctx, curStmt, stmtRes, nil) // 常规语句
case ExecPortal:
ex.execPortal(ctx, portal, stmtRes) // Portal 执行
case DrainRequest, Sync, CancelRequest...:
// 其他命令类型
}
}
语句执行 — execStmt()
入口文件: kwbase/pkg/sql/conn_executor_exec.go
// conn_executor_exec.go:221
func (ex *connExecutor) execStmt(ctx, stmt, res, pinfo) (ev, payload, err) {
switch ex.machine.CurState().(type) {
case stateNoTxn:
ex.execStmtInNoTxnState(ctx, stmt) // 自动提交隐式事务
case stateOpen:
ex.execStmtInOpenState(ctx, stmt, res, pinfo) // ★ 主要执行路径
case stateAborted:
ex.execStmtInAbortedState(ctx, stmt, res)
case stateCommitWait:
ex.execStmtInCommitWaitState(stmt, res)
}
}
在打开事务状态中执行
// conn_executor_exec.go:371
func (ex *connExecutor) execStmtInOpenState(ctx, stmt, res, pinfo) (ev, payload, err) {
// [1] 特殊语句(无需规划): BEGIN, COMMIT, ROLLBACK, SAVEPOINT 等
// [2] 设置 planner
p := &ex.planner
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
// [3] 分发到执行引擎
if txn.ReadCommitted && !implicitTxn {
ex.dispatchReadCommittedStmtToExecutionEngine(ctx, p, res) // RC 隔离级别
} else {
ex.dispatchToExecutionEngine(ctx, p, res) // ★ 正常路径
}
}
5. 查询规划 — Optimizer
入口文件: kwbase/pkg/sql/plan_opt.go
入口函数
// conn_executor_exec.go:965
func (ex *connExecutor) dispatchToExecutionEngine(ctx, planner, res) error {
err := ex.makeExecPlan(ctx, planner) // [1] 生成执行计划
// ... 设置结果列、缓冲 ...
ex.execWithDistSQLEngine(ctx, planner, ..., distributePlan, ...) // [2] 运行计划
}
makeOptimizerPlan()
// plan_opt.go:202
func (p *planner) makeOptimizerPlan(ctx) error {
opc := &p.optPlanningCtx
opc.reset()
execMemo, layerType, err := opc.buildExecMemo(ctx) // [A] 构建优化器 memo
execFactory := makeExecFactory(p) // [B] 创建 node 工厂
bld := execbuilder.New(&execFactory, execMemo, ...)
plan, err := bld.Build(true) // [C] 生成 exec.Node 树
p.curPlan = *result
return nil
}
Optimizer 详细流程
AST (tree.Statement)
│
▼
optbuilder.Build() // 从 AST 构建逻辑计划
│
▼
xform.Optimizer.Run() // 规则优化 + 代价优化
│
▼
memo.Memo // 优化后的内部表示
│
▼
execbuilder.Build() // 转为 exec.Node 执行树
关键文件
| 文件 | 职责 |
|---|---|
sql/planner.go | planner 结构体 — 持有事务、semaCtx、curPlan |
sql/plan_opt.go | makeOptimizerPlan() — 构建优化器 memo 和执行计划 |
sql/opt/optbuilder/ | optbuilder — 从 AST 构建逻辑计划表达式 |
sql/opt/xform/ | xform.Optimizer — 基于规则和代价的查询优化器 |
sql/opt/memo/ | memo.Memo — 优化器内部表示 |
sql/opt/exec/execbuilder/builder.go | execbuilder.Builder — 将 memo 转为 exec.Node 树 |
sql/opt/exec/factory.go | exec.Factory 接口 — 创建 exec.Node 对象的工厂 |
6. 分布式执行 — DistSQL
入口文件: kwbase/pkg/sql/distsql_running.go
PlanAndRun
// distsql_running.go:1574
func (dsp *DistSQLPlanner) PlanAndRun(ctx, evalCtx, planCtx, txn, plan, recv, ...) {
physPlan := dsp.GetPhysPlan(ctx, planCtx, plan, recv, stmt) // [1] 创建物理计划
return dsp.Run(planCtx, txn, physPlan, recv, evalCtx, nil) // [2] 执行
}
// distsql_running.go:1600
func (dsp *DistSQLPlanner) GetPhysPlan(...) *PhysicalPlan {
physPlan, err := dsp.createPlanForNode(planCtx, plan) // planNode → PhysicalPlan
dsp.FinalizePlan(planCtx, &physPlan)
return &physPlan
}
物理计划执行
// distsql_running.go:597
func (dsp *DistSQLPlanner) Run(planCtx, txn, physPlan, recv, evalCtx, ...) {
// [1] 在节点上调度 flows(可能远程)
// [2] 设置节点间 rpc 流
// [3] 在每个节点启动处理器
// [4] 通过 recv 流回行
}
关键文件
| 文件 | 职责 |
|---|---|
sql/distsql_running.go | DistSQLPlanner.PlanAndRun(), Run(), GetPhysPlan() |
sql/distsql_physical_planner.go | 物理规划,flow 调度 |
sql/execinfra/ | 处理器接口,flow 基础设施 |
7. 完整调用链
TCP 连接
│
└─> pgwire.Server.ServeConn() [pgwire/server.go:513]
│
└─> conn.serveImpl() [pgwire/conn.go:286]
│
├─> Authentication (authPipe)
│
└─> conn.processCommandsAsync() [pgwire/conn.go:658]
[goroutine: 读取 pgwire 消息]
│
├─> handleSimpleQuery() [pgwire/conn.go:843]
│ ├─> parser.ParseWithInt() ★ 解析
│ └─> stmtBuf.Push(ExecStmt{})
│
└─> handleParse() [pgwire/conn.go:1300]
├─> parser.ParseWithInt() ★ 解析
└─> stmtBuf.Push(...)
connExecutor.run() [conn_executor.go:1593]
[事件循环]
│
└─> connExecutor.execCmd() [conn_executor.go:1761]
│ stmtBuf.CurCmd() → ExecStmt / ExecPortal / ...
▼
└─> connExecutor.execStmt() [conn_executor_exec.go:221]
│ [根据事务状态分发]
▼
└─> connExecutor.execStmtInOpenState() [conn_executor_exec.go:371]
│ [特殊语句: BEGIN/COMMIT/ROLLBACK]
▼
└─> connExecutor.dispatchToExecutionEngine() [conn_executor_exec.go:965]
│
▼
└─> connExecutor.makeExecPlan() [conn_executor_exec.go:1166]
│
▼
└─> planner.makeOptimizerPlan() [plan_opt.go:202]
│
├─> opc.buildExecMemo() ★ 优化
│ └─> memo.Memo (优化后的逻辑计划)
│
├─> makeExecFactory(p)
│
└─> execbuilder.New().Build()
└─> planNode 树
▼
└─> connExecutor.execWithDistSQLEngine() [conn_executor_exec.go:1281]
│ [决定: 本地还是分布式执行]
▼
└─> DistSQLPlanner.PlanAndRun() [distsql_running.go:1574]
│
├─> GetPhysPlan() → PhysicalPlan
│
└─> Run() → 启动 flows,流式结果
▼
KV Layer → Storage Engine (RocksDB) / kwdbts2 (C++ 时序引擎)
▼
结果通过 DistSQLReceiver 流式返回
▼
pgwire conn.writerState 发送结果到客户端
8. 关键文件速查
| 阶段 | 文件路径 | 关键函数 |
|---|---|---|
| 网络层入口 | sql/pgwire/server.go | Server.ServeConn() |
| 连接处理 | sql/pgwire/conn.go | conn.serveImpl(), handleSimpleQuery() |
| SQL 解析 | sql/parser/parse.go | Parser.Parse() |
| 词法分析 | sql/parser/scan.go | scanner |
| 语法分析 | sql/parser/sql.y | yacc 语法定义 |
| 会话管理 | sql/conn_executor.go | connExecutor.run(), execCmd() |
| 语句执行 | sql/conn_executor_exec.go | execStmt(), execStmtInOpenState() |
| 查询优化 | sql/plan_opt.go | makeOptimizerPlan() |
| 逻辑计划构建 | sql/opt/optbuilder/ | optbuilder |
| 优化器 | sql/opt/xform/ | xform.Optimizer |
| 执行计划构建 | sql/opt/exec/execbuilder/builder.go | execbuilder.Builder |
| 分布式执行 | sql/distsql_running.go | DistSQLPlanner.PlanAndRun() |
| 事务状态机 | sql/conn_fsm.go | 状态机定义 |
9. 代码目录结构
kwbase/pkg/
├── sql/
│ ├── parser/ # SQL 词法/语法分析
│ │ ├── parse.go # 解析入口
│ │ ├── scan.go # 词法分析器
│ │ ├── lexer.go # lexer
│ │ └── sql.y # yacc 语法
│ ├── pgwire/ # PostgreSQL wire 协议
│ │ ├── server.go # 服务器入口
│ │ └── conn.go # 连接处理
│ ├── conn_executor.go # 会话执行器主循环
│ ├── conn_executor_exec.go # 语句执行
│ ├── planner.go # planner 结构体
│ ├── plan_opt.go # 查询优化入口
│ ├── opt/ # 优化器
│ │ ├── optbuilder/ # 逻辑计划构建
│ │ ├── xform/ # 规则/代价优化
│ │ ├── memo/ # memo 内部表示
│ │ └── exec/ # 执行计划构建
│ └── distsql_running.go # 分布式执行
├── server/ # 服务器实现
├── storage/ # 存储层抽象
└── kv/ # KV 接口实现
