跳至主要內容

RocketMQ Reactor多线程模型流程图

悟空约 1030 字大约 3 分钟...

graph TD
    A[客户端请求] --> B[Reactor主线程<br/>NettyBoss_%d<br/>线程数: 1]
    B --> C{操作系统类型}
    C -->|Linux| D[EpollEventLoopGroup]
    C -->|其他| E[NioEventLoopGroup]
    D --> F[监听TCP连接请求]
    E --> F
    F --> G[建立连接<br/>创建SocketChannel<br/>注册到Selector]

    G --> H[Reactor线程池<br/>NettyServerEPOLLSelector_%d_%d<br/>线程数: 3]
    H --> I[监听网络数据]
    I --> J[接收网络数据]

    J --> K[Worker线程池<br/>NettyServerCodecThread_%d<br/>线程数: 8]
    K --> L[SSL验证]
    L --> M[协议编解码]
    M --> N[空闲检查]
    N --> O[网络连接管理]
    O --> P[解析RemotingCommand]

    P --> Q[根据请求码查找Processor]
    Q --> R{请求码类型}

    R -->|SEND_MESSAGE| S[发送消息线程池<br/>sendMessageExecutor]
    R -->|PULL_MESSAGE| T[拉取消息线程池<br/>pullMessageExecutor]
    R -->|ACK_MESSAGE| U[确认消息线程池<br/>ackMessageExecutor]
    R -->|HEART_BEAT| V[心跳处理线程池<br/>heartbeatExecutor]
    R -->|QUERY_MESSAGE| W[查询消息线程池<br/>queryMessageExecutor]
    R -->|其他| X[默认处理器线程池<br/>defaultExecutor]

    S --> Y[SendMessageProcessor<br/>处理发送消息业务]
    T --> Z[PullMessageProcessor<br/>处理拉取消息业务]
    U --> AA[AckMessageProcessor<br/>处理确认消息业务]
    V --> BB[ClientManageProcessor<br/>处理心跳业务]
    W --> CC[QueryMessageProcessor<br/>处理查询消息业务]
    X --> DD[DefaultProcessor<br/>处理默认业务]

    Y --> EE[业务处理完成]
    Z --> EE
    AA --> EE
    BB --> EE
    CC --> EE
    DD --> EE

    EE --> FF[构建响应]
    FF --> GG[编码响应]
    GG --> HH[发送响应给客户端]

    style B fill:#e1f5fe
    style H fill:#f3e5f5
    style K fill:#e8f5e8
    style S fill:#fff3e0
    style T fill:#fff3e0
    style U fill:#fff3e0
    style V fill:#fff3e0
    style W fill:#fff3e0
    style X fill:#fff3e0

线程池层次结构

graph TB
    subgraph "Reactor主线程层"
        A1[NettyBoss_%d<br/>1个线程<br/>职责: 连接建立]
    end

    subgraph "Reactor线程池层"
        A2[NettyServerEPOLLSelector_%d_%d<br/>3个线程<br/>职责: 网络I/O处理]
    end

    subgraph "Worker线程池层"
        A3[NettyServerCodecThread_%d<br/>8个线程<br/>职责: 协议处理]
    end

    subgraph "业务线程池层"
        A4[sendMessageExecutor<br/>发送消息线程池]
        A5[pullMessageExecutor<br/>拉取消息线程池]
        A6[ackMessageExecutor<br/>确认消息线程池]
        A7[heartbeatExecutor<br/>心跳处理线程池]
        A8[queryMessageExecutor<br/>查询消息线程池]
        A9[其他业务线程池]
    end

    A1 --> A2
    A2 --> A3
    A3 --> A4
    A3 --> A5
    A3 --> A6
    A3 --> A7
    A3 --> A8
    A3 --> A9

    style A1 fill:#e1f5fe
    style A2 fill:#f3e5f5
    style A3 fill:#e8f5e8
    style A4 fill:#fff3e0
    style A5 fill:#fff3e0
    style A6 fill:#fff3e0
    style A7 fill:#fff3e0
    style A8 fill:#fff3e0
    style A9 fill:#fff3e0

关键配置参数

graph LR
    subgraph "NettyServerConfig配置"
        B1[serverSelectorThreads: 3<br/>Reactor线程池大小]
        B2[serverWorkerThreads: 8<br/>Worker线程池大小]
        B3[serverCallbackExecutorThreads: 0<br/>回调线程池大小]
        B4[serverOnewaySemaphoreValue: 256<br/>单向请求信号量]
        B5[serverAsyncSemaphoreValue: 64<br/>异步请求信号量]
    end

    subgraph "BrokerController业务线程池"
        B6[sendMessageExecutor<br/>发送消息线程池]
        B7[pullMessageExecutor<br/>拉取消息线程池]
        B8[ackMessageExecutor<br/>确认消息线程池]
        B9[heartbeatExecutor<br/>心跳处理线程池]
        B10[queryMessageExecutor<br/>查询消息线程池]
    end

    B1 --> B6
    B2 --> B7
    B3 --> B8
    B4 --> B9
    B5 --> B10

    style B1 fill:#e1f5fe
    style B2 fill:#f3e5f5
    style B3 fill:#e8f5e8
    style B4 fill:#fff3e0
    style B5 fill:#fff3e0
    style B6 fill:#e1f5fe
    style B7 fill:#f3e5f5
    style B8 fill:#e8f5e8
    style B9 fill:#fff3e0
    style B10 fill:#fff3e0

异常处理流程

graph TD
    A[请求处理] --> B{线程池是否满?}
    B -->|是| C[RejectedExecutionException]
    C --> D[返回SYSTEM_BUSY响应]
    D --> E[记录警告日志]

    B -->|否| F{业务处理是否异常?}
    F -->|是| G[捕获异常]
    G --> H[返回SYSTEM_ERROR响应]
    H --> I[记录错误日志]

    F -->|否| J[正常处理完成]
    J --> K[返回成功响应]

    style C fill:#ffebee
    style G fill:#ffebee
    style D fill:#fff3e0
    style H fill:#fff3e0
    style K fill:#e8f5e8

2. 主节点(Master)与从节点(Slave)的关系

机制说明
数据同步主节点异步/同步复制消息到从节点
(通过 brokerRole=ASYNC_MASTER/SYNC_MASTER/SLAVE 配置)
故障接管主节点宕机时,从节点不会自动升主
(需运维干预或依赖 RocketMQ 5.0 的 Dledger 自动选主)
读扩散消费者默认从主节点读
高负载时可配置 consumeFromWhere=CONSUME_FROM_SLAVE 从从节点读

⚠️ 注意

  • 主从切换非自动:传统架构需人工介入(如重启 Broker 切换角色);
  • 5.0 改进:Dledger 模式支持基于 Raft 协议自动选主(类似 Kafka Controller)。
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v3.3.0