跳至主要內容

KWDB SQL 执行流程分析

悟空约 1480 字大约 5 分钟...

KWDB SQL 执行流程分析

kaiwudb sql 执行流程分析.png
kaiwudb sql 执行流程分析.png

1. 整体架构

Client (PostgreSQL wire protocol)
    │
    ▼
[pgwire] 网络层 (协议解析)
    │
    ▼
[connExecutor] 会话状态机
    │
    ▼
[planner/optimizer] 查询规划
    │
    ▼
[DistSQL] 分布式执行
    │
    ▼
[KV Layer] → RocksDB + kwdbts2 (C++ 时序引擎)

KWDB 基于 CockroachDB 构建,遵循经典的分层 SQL 引擎架构。


2. 网络层 — pgwire

入口文件: kwbase/pkg/sql/pgwire/conn.go

连接入口

Server.ServeConn()           // 接收 TCP 连接
  └─> conn.serveImpl()       // 每个连接一个 goroutine
        └─> processCommandsAsync()  // 读取 pgwire 消息
              ├─> handleSimpleQuery()   // 简单查询协议 (SQL 直接发送)
              │      └─> parser.ParseWithInt()  // ★ SQL 解析
              │      └─> stmtBuf.Push(ExecStmt{...})
              │
              └─> handleParse()         // 扩展协议 - Parse 步骤

简单查询处理

// pgwire/conn.go:843
func (c *conn) handleSimpleQuery(ctx, buf, ...) error {
    query, err := buf.GetString()              // 从网络读取 SQL 文本
    c.parser.ParseWithInt(query, ...)           // [1] PARSE — parser/parse.go
    c.stmtBuf.Push(ctx, sql.ExecStmt{...})       // [2] 推入语句缓冲区
}

扩展协议处理

// pgwire/conn.go:1300
func (c *conn) handleParse(...) {
    query, err := buf.GetString()              // 读取 SQL 文本
    c.parser.ParseWithInt(query, ...)           // [1] PARSE
    // [2] 推送 PreparedStatement 元数据到 stmtBuf
}

3. SQL 解析

入口文件: kwbase/pkg/sql/parser/parse.go

解析流程

parser.Parse(sql)
  → scanOneStmt()           // 词法分析 (scan.go)
  → lexer.Run()             // 将 token 输入 yacc parser
  → parserImpl.Parse()      // yacc 语法分析 (sql.y)
  → Statements []Statement   // 返回 AST

关键文件

文件职责
parser/parse.goParser.Parse(sql string) -> Statements — 顶层解析入口
parser/scan.goscanner — 将 SQL 文本转为 token(关键字、标识符、字面量等)
parser/lexer.golexer — 将 scanner 的 token 输入 yacc parser,处理占位符
parser/sql.yyacc/bison 语法定义 — 生成 SQL 语法分析器

Statement 结构

解析得到的 Statement 包含:

  • AST tree.Statement — 解析后 AST 的根节点
  • SQL string — 原始 SQL 文本
  • NumPlaceholders int$N 占位符数量

4. 会话执行器 — connExecutor

入口文件: kwbase/pkg/sql/conn_executor.go

主事件循环

// conn_executor.go:1593
func (ex *connExecutor) run(ctx, ...) error {
    ex.activate(ctx, ...)
    for {
        if err := ex.execCmd(ex.Ctx()); err != nil {
            return err
        }
    }
}

命令分发

// conn_executor.go:1761
func (ex *connExecutor) execCmd(ctx) error {
    cmd, pos, err := ex.stmtBuf.CurCmd()   // 获取当前命令
    switch tcmd := cmd.(type) {
    case ExecStmt:
        ex.execStmt(ctx, curStmt, stmtRes, nil)   // 常规语句
    case ExecPortal:
        ex.execPortal(ctx, portal, stmtRes)        // Portal 执行
    case DrainRequest, Sync, CancelRequest...:
        // 其他命令类型
    }
}

语句执行 — execStmt()

入口文件: kwbase/pkg/sql/conn_executor_exec.go

// conn_executor_exec.go:221
func (ex *connExecutor) execStmt(ctx, stmt, res, pinfo) (ev, payload, err) {
    switch ex.machine.CurState().(type) {
    case stateNoTxn:
        ex.execStmtInNoTxnState(ctx, stmt)         // 自动提交隐式事务
    case stateOpen:
        ex.execStmtInOpenState(ctx, stmt, res, pinfo) // ★ 主要执行路径
    case stateAborted:
        ex.execStmtInAbortedState(ctx, stmt, res)
    case stateCommitWait:
        ex.execStmtInCommitWaitState(stmt, res)
    }
}

在打开事务状态中执行

// conn_executor_exec.go:371
func (ex *connExecutor) execStmtInOpenState(ctx, stmt, res, pinfo) (ev, payload, err) {
    // [1] 特殊语句(无需规划): BEGIN, COMMIT, ROLLBACK, SAVEPOINT 等

    // [2] 设置 planner
    p := &ex.planner
    ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)

    // [3] 分发到执行引擎
    if txn.ReadCommitted && !implicitTxn {
        ex.dispatchReadCommittedStmtToExecutionEngine(ctx, p, res)  // RC 隔离级别
    } else {
        ex.dispatchToExecutionEngine(ctx, p, res)                    // ★ 正常路径
    }
}

5. 查询规划 — Optimizer

入口文件: kwbase/pkg/sql/plan_opt.go

入口函数

// conn_executor_exec.go:965
func (ex *connExecutor) dispatchToExecutionEngine(ctx, planner, res) error {
    err := ex.makeExecPlan(ctx, planner)          // [1] 生成执行计划
    // ... 设置结果列、缓冲 ...
    ex.execWithDistSQLEngine(ctx, planner, ..., distributePlan, ...)  // [2] 运行计划
}

makeOptimizerPlan()

// plan_opt.go:202
func (p *planner) makeOptimizerPlan(ctx) error {
    opc := &p.optPlanningCtx
    opc.reset()

    execMemo, layerType, err := opc.buildExecMemo(ctx)   // [A] 构建优化器 memo
    execFactory := makeExecFactory(p)                    // [B] 创建 node 工厂
    bld := execbuilder.New(&execFactory, execMemo, ...)
    plan, err := bld.Build(true)                         // [C] 生成 exec.Node 树
    p.curPlan = *result
    return nil
}

Optimizer 详细流程

AST (tree.Statement)
  │
  ▼
optbuilder.Build()            // 从 AST 构建逻辑计划
  │
  ▼
xform.Optimizer.Run()        // 规则优化 + 代价优化
  │
  ▼
memo.Memo                     // 优化后的内部表示
  │
  ▼
execbuilder.Build()          // 转为 exec.Node 执行树

关键文件

文件职责
sql/planner.goplanner 结构体 — 持有事务、semaCtx、curPlan
sql/plan_opt.gomakeOptimizerPlan() — 构建优化器 memo 和执行计划
sql/opt/optbuilder/optbuilder — 从 AST 构建逻辑计划表达式
sql/opt/xform/xform.Optimizer — 基于规则和代价的查询优化器
sql/opt/memo/memo.Memo — 优化器内部表示
sql/opt/exec/execbuilder/builder.goexecbuilder.Builder — 将 memo 转为 exec.Node 树
sql/opt/exec/factory.goexec.Factory 接口 — 创建 exec.Node 对象的工厂

6. 分布式执行 — DistSQL

入口文件: kwbase/pkg/sql/distsql_running.go

PlanAndRun

// distsql_running.go:1574
func (dsp *DistSQLPlanner) PlanAndRun(ctx, evalCtx, planCtx, txn, plan, recv, ...) {
    physPlan := dsp.GetPhysPlan(ctx, planCtx, plan, recv, stmt)  // [1] 创建物理计划
    return dsp.Run(planCtx, txn, physPlan, recv, evalCtx, nil)   // [2] 执行
}

// distsql_running.go:1600
func (dsp *DistSQLPlanner) GetPhysPlan(...) *PhysicalPlan {
    physPlan, err := dsp.createPlanForNode(planCtx, plan)   // planNode → PhysicalPlan
    dsp.FinalizePlan(planCtx, &physPlan)
    return &physPlan
}

物理计划执行

// distsql_running.go:597
func (dsp *DistSQLPlanner) Run(planCtx, txn, physPlan, recv, evalCtx, ...) {
    // [1] 在节点上调度 flows(可能远程)
    // [2] 设置节点间 rpc 流
    // [3] 在每个节点启动处理器
    // [4] 通过 recv 流回行
}

关键文件

文件职责
sql/distsql_running.goDistSQLPlanner.PlanAndRun(), Run(), GetPhysPlan()
sql/distsql_physical_planner.go物理规划,flow 调度
sql/execinfra/处理器接口,flow 基础设施

7. 完整调用链

TCP 连接
  │
  └─> pgwire.Server.ServeConn()                    [pgwire/server.go:513]
        │
        └─> conn.serveImpl()                       [pgwire/conn.go:286]
              │
              ├─> Authentication (authPipe)
              │
              └─> conn.processCommandsAsync()      [pgwire/conn.go:658]
                    [goroutine: 读取 pgwire 消息]
                    │
                    ├─> handleSimpleQuery()         [pgwire/conn.go:843]
                    │      ├─> parser.ParseWithInt()     ★ 解析
                    │      └─> stmtBuf.Push(ExecStmt{})
                    │
                    └─> handleParse()               [pgwire/conn.go:1300]
                           ├─> parser.ParseWithInt()     ★ 解析
                           └─> stmtBuf.Push(...)

connExecutor.run()                               [conn_executor.go:1593]
  [事件循环]
  │
  └─> connExecutor.execCmd()                      [conn_executor.go:1761]
        │ stmtBuf.CurCmd() → ExecStmt / ExecPortal / ...
        ▼
  └─> connExecutor.execStmt()                     [conn_executor_exec.go:221]
        │ [根据事务状态分发]
        ▼
  └─> connExecutor.execStmtInOpenState()         [conn_executor_exec.go:371]
        │ [特殊语句: BEGIN/COMMIT/ROLLBACK]
        ▼
  └─> connExecutor.dispatchToExecutionEngine()   [conn_executor_exec.go:965]
        │
        ▼
  └─> connExecutor.makeExecPlan()                [conn_executor_exec.go:1166]
        │
        ▼
  └─> planner.makeOptimizerPlan()                [plan_opt.go:202]
        │
        ├─> opc.buildExecMemo()                    ★ 优化
        │     └─> memo.Memo (优化后的逻辑计划)
        │
        ├─> makeExecFactory(p)
        │
        └─> execbuilder.New().Build()
              └─> planNode 树
        ▼
  └─> connExecutor.execWithDistSQLEngine()       [conn_executor_exec.go:1281]
        │  [决定: 本地还是分布式执行]
        ▼
  └─> DistSQLPlanner.PlanAndRun()                [distsql_running.go:1574]
        │
        ├─> GetPhysPlan()                         → PhysicalPlan
        │
        └─> Run()                                → 启动 flows,流式结果
        ▼
KV Layer → Storage Engine (RocksDB) / kwdbts2 (C++ 时序引擎)
        ▼
结果通过 DistSQLReceiver 流式返回
        ▼
pgwire conn.writerState 发送结果到客户端

8. 关键文件速查

阶段文件路径关键函数
网络层入口sql/pgwire/server.goServer.ServeConn()
连接处理sql/pgwire/conn.goconn.serveImpl(), handleSimpleQuery()
SQL 解析sql/parser/parse.goParser.Parse()
词法分析sql/parser/scan.goscanner
语法分析sql/parser/sql.yyacc 语法定义
会话管理sql/conn_executor.goconnExecutor.run(), execCmd()
语句执行sql/conn_executor_exec.goexecStmt(), execStmtInOpenState()
查询优化sql/plan_opt.gomakeOptimizerPlan()
逻辑计划构建sql/opt/optbuilder/optbuilder
优化器sql/opt/xform/xform.Optimizer
执行计划构建sql/opt/exec/execbuilder/builder.goexecbuilder.Builder
分布式执行sql/distsql_running.goDistSQLPlanner.PlanAndRun()
事务状态机sql/conn_fsm.go状态机定义

9. 代码目录结构

kwbase/pkg/
├── sql/
│   ├── parser/           # SQL 词法/语法分析
│   │   ├── parse.go      # 解析入口
│   │   ├── scan.go       # 词法分析器
│   │   ├── lexer.go      # lexer
│   │   └── sql.y         # yacc 语法
│   ├── pgwire/          # PostgreSQL wire 协议
│   │   ├── server.go    # 服务器入口
│   │   └── conn.go      # 连接处理
│   ├── conn_executor.go # 会话执行器主循环
│   ├── conn_executor_exec.go  # 语句执行
│   ├── planner.go       # planner 结构体
│   ├── plan_opt.go      # 查询优化入口
│   ├── opt/             # 优化器
│   │   ├── optbuilder/  # 逻辑计划构建
│   │   ├── xform/       # 规则/代价优化
│   │   ├── memo/        # memo 内部表示
│   │   └── exec/        # 执行计划构建
│   └── distsql_running.go  # 分布式执行
├── server/              # 服务器实现
├── storage/             # 存储层抽象
└── kv/                  # KV 接口实现
评论
  • 按正序
  • 按倒序
  • 按热度