在运用 Apache Doris 时,咱们能够经过 Apache Doris FE Web 页面或许 Mysql 协议履行 SQL 句子,可是关于 Apache Doris 背面怎么对 SQL 进行处理,咱们无从所知。本文章内容首要解说 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询句子和市道干流的数据库处理阶段都差不多,需求经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 担任查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 担任履行 FE 下发 Plan Fragment

一、前言

在运用 Apache Doris 时,咱们能够经过 Apache Doris FE Web 页面或许 Mysql 协议履行 SQL 句子,可是关于 Apache Doris 背面怎么对 SQL 进行处理,咱们无从所知。本文章内容首要解说 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询句子和市道干流的数据库处理阶段都差不多,需求经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 担任查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 担任履行 FE 下发 Plan Fragment

二、名词解释

  • FE:Frontend,即 Doris 的前端节点。首要担任接纳和回来客户端恳求、元数据以及集群管理、查询方案生成等作业。
  • BE:Backend,即 Doris 的后端节点。首要担任数据存储与管理、查询方案履行等作业。
  • slot:核算槽,是一个资源单位, 只有给 task 分配了一个 slot 之后, 这个 task 才能够运转
  • planNode : 逻辑算子
  • planNodeTree: 逻辑履行方案

三、履行流程

在运用 Apache Doris 时,咱们能够经过 Apache Doris FE Web 页面或许 Mysql 协议履行 SQL 句子,可是关于 Apache Doris 背面怎么对 SQL 进行处理,咱们无从所知。本文章内容首要解说 Apache Doris 查询 SQL 在 FE 节点处理原理。Doris 查询句子和市道干流的数据库处理阶段都差不多,需求经过 Parse,Analyze,Optimize,Plan,Schedule,Execute 等阶段。 在 Doris 中,FE 担任查询的 Parse,Analyze,Optimize,Plan, Schedule,BE 担任履行 FE 下发 Plan Fragment

四、Apache Doris 查询原理

(一)SQL 接纳

本文只说 mysql 协议怎么接纳 SQL 句子, 假如感兴趣的同学能够看看 Apache Doris FE Web 的 Rest Api。Apache Doris 兼容 Mysql 协议,用户能够经过 Mysql 客户端和其他支持 Mysql 协议的东西向 Doris 发送查询恳求。MysqlServer Listener() 担任监听客户端发送来的 Mysql 连接恳求,每个连接恳求都被封装成一个 ConnectContext 目标,并被提交给 ConnectScheduler。ConnectScheduler 会保护一个线程池,每个 ConnectContext 会在线程池中由一个 ConnectProcessor 线程处理。

  • MysqlServer 类 Listener 处理:
private class Listener implements Runnable {
        @Override
        public void run(){while (running && serverChannel.isOpen()) {
                SocketChannel clientChannel;
                try {clientChannel = serverChannel.accept();
                    if (clientChannel == null) {continue;}
                    // 构建 ConnectContext 目标
                    ConnectContext context = new ConnectContext(clientChannel);
                    // catelog 日志
                    context.setCatalog(Catalog.getCurrentCatalog());
                    // 向 ExecutorService 提交 new LoopHandler(context) ==>(源码)executor.submit(new LoopHandler(context))
                    if (!scheduler.submit(context)) {LOG.warn("Submit one connect request failed. Client=" + clientChannel.toString());
                        // clear up context
                        context.cleanup();}
                } catch (IOException e) {
                    // ClosedChannelException
                    // AsynchronousCloseException
                    // ClosedByInterruptException
                    // Other IOException, for example "to many open files" ...
                    LOG.warn("Query server encounter exception.", e);
                    try {Thread.sleep(100);
                    } catch (InterruptedException e1) {// Do nothing}
                } catch (Throwable e) {
                    // NotYetBoundException
                    // SecurityException
                    LOG.warn("Query server failed when calling accept.", e);
                }
            }
        }
    }
  • ExecutorService 线程 LoopHandler 处理:
@Override
        public void run() {
            try {
                // Set thread local info
                context.setThreadLocalInfo();
                context.setConnectScheduler(ConnectScheduler.this);
                // authenticate check failed.
                if (!MysqlProto.negotiate(context)) {return;}
                if (registerConnection(context)) {MysqlProto.sendResponsePacket(context);
                } else {context.getState().setError(ErrorCode.ERR_USER_LIMIT_REACHED, "Reach limit of connections");
                    MysqlProto.sendResponsePacket(context);
                    return;
                }
                context.setStartTime();
                ConnectProcessor processor = new ConnectProcessor(context);
                processor.loop();} catch (Exception e) {
                // for unauthorized access such lvs probe request, may cause exception, just log it in debug level
                if (context.getCurrentUserIdentity() != null){LOG.warn("connect processor exception because", e);
                } else {LOG.debug("connect processor exception because", e);
                }
            } finally {unregisterConnection(context);
                context.cleanup();}
        }
  • processOnce(读取 Mysql 客户端的 sql) 办法
// 处理 mysql 的恳求
    public void processOnce()throws IOException {ctx.getState().reset();
        executor = null;
        // 重置 MySQL 协议的序列号
        final MysqlChannel channel = ctx.getMysqlChannel();
        channel.setSequenceId(0);
        // 从通道读取数据包 ==>SQL
        try {packetBuf = channel.fetchOnePacket();
            if (packetBuf == null) {LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
                throw new IOException("Error happened when receiving packet.");
            }
        } catch (AsynchronousCloseException e) {
            // when this happened, timeout checker close this channel
            // killed flag in ctx has been already set, just return
            return;
        }
        // 下发 SQL 
        dispatch();
        // finalize
        finalizeCommand();
        ctx.setCommand(MysqlCommand.COM_SLEEP);
    }

(二)Parse

ConnectProcessor 接纳到 SQL 之后会进行 analyze ,Apache Doris SQL 解析运用的 Parse 是 Java CUP Parser,语法规矩 定义的文件在 sql_parser.cup。

感兴趣的同学能够详细看一下 StatementBase 类

  • analyze 办法, 回来 List (这儿首要是语法解析)
// 解析 origin,回来 list<stmt>
    private List<StatementBase> analyze(String originStmt) throws AnalysisException, DdlException {LOG.debug("the originStmts are: {}", originStmt);
        // 运用 CUP&FLEX 生成的解析器解析句子
        SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode());
        SqlParser parser = new SqlParser(input);
        try {return SqlParserUtils.getMultiStmts(parser);
        } catch (Error e) {throw new AnalysisException("Please check your sql, we meet an error when parsing.", e);
        } catch (AnalysisException | DdlException e) {String errorMessage = parser.getErrorMsg(originStmt);
            LOG.debug("origin stmt: {}; Analyze error message: {}", originStmt, parser.getErrorMsg(originStmt), e);
            if (errorMessage == null) {throw e;} else {throw new AnalysisException(errorMessage, e);
            }
        } catch (Exception e) {// TODO(lingbin): we catch 'Exception' to prevent unexpected error,
            // should be removed this try-catch clause future.
            throw new AnalysisException("Internal Error, maybe syntax error or this is a bug");
        }
    }

因为本文讲述的是查询句子(不同类型会转换成不通 Stmt,比方 InsertStmt, ShowStmt, SetStmt, AlterStmt, AlterTableStmt, CreateTableStmt 等),终究咱们会得到 QueryStmt,originStmt 会转换成 QueryStmt,QueryStmt 通常是用 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成

(三)Analyze

SQL 句子被解析成 AST 之后,会被交给 StmtExecutor 。StmtExecutor 会首先对 AST 进行语法和语义剖析,大概会做下面的作业:

  1. 查看并绑定 Cluster, Database, Table, Column 等元信息。
  2. SQL 的合法性查看:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等。
  3. SQL 重写:比方将 select * 扩展成 select 所有列,count distinct 查询重写等。
  4. Table 与 Column 别名处理。
  5. 为 Tuple, Slot, Expr 等分配唯一 ID。
  6. 函数参数的合法性检测。
  7. 表达式替换。
  8. 类型查看,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需求 Cast 成 DECIMAL)。

首要代码:

analyzeAndGenerateQueryPlan 办法 -->  parsedStmt.analyze(analyzer);

(四)Rewrite

  • analyzeAndGenerateQueryPlan 办法(部分代码,此处不做要点解说)

StmtExecutor 在对 AST 进行语法和语义剖析后,会让 ExprRewriter 依据 ExprRewriteRule 进行一次 Rewrite。现在 Doris 的重写规矩比较简单,首要是进行了常量表达式的化简和谓词的简单处理。 常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。

假如重写后,有部分节点被成功改写,比方, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义剖析的过程。

关于有子查询的 SQL,StmtRewriter 会进行重写,比方将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。

if (parsedStmt instanceof QueryStmt || parsedStmt instanceof InsertStmt) {ExprRewriter rewriter = analyzer.getExprRewriter();
            rewriter.reset();
            if (context.getSessionVariable().isEnableFoldConstantByBe()) {parsedStmt.foldConstant(rewriter);
            }
            // explan 标签
            ExplainOptions explainOptions = parsedStmt.getExplainOptions();
            boolean reAnalyze = false;
            parsedStmt.rewriteExprs(rewriter);
            reAnalyze = rewriter.changed();
            if (analyzer.containSubquery()) {parsedStmt = StmtRewriter.rewrite(analyzer, parsedStmt);
                reAnalyze = true;
            }
            if (parsedStmt instanceof SelectStmt) {if (StmtRewriter.rewriteByPolicy(parsedStmt, analyzer)) {reAnalyze = true;}
            }
            if (parsedStmt instanceof SetOperationStmt) {List<SetOperationStmt.SetOperand> operands = ((SetOperationStmt) parsedStmt).getOperands();
                for (SetOperationStmt.SetOperand operand : operands) {if (StmtRewriter.rewriteByPolicy(operand.getQueryStmt(), analyzer)){reAnalyze = true;}
                }
            }
            if (parsedStmt instanceof InsertStmt) {QueryStmt queryStmt = ((InsertStmt) parsedStmt).getQueryStmt();
                if (queryStmt != null && StmtRewriter.rewriteByPolicy(queryStmt, analyzer)) {reAnalyze = true;}
            }
            if (reAnalyze) {
                // 对重写句子进行处理
                List<Type> origResultTypes = Lists.newArrayList();
                for (Expr e : parsedStmt.getResultExprs()) {origResultTypes.add(e.getType());
                }
                List<String> origColLabels =
                        Lists.newArrayList(parsedStmt.getColLabels());
                // 重写句子进行 analyzer
                analyzer = new Analyzer(context.getCatalog(), context);
                // 重写句子 analyzer 信息
                parsedStmt.reset();
                parsedStmt.analyze(analyzer);
                // 康复原始成果类型和列标签
                parsedStmt.castResultExprs(origResultTypes);
                parsedStmt.setColLabels(origColLabels);
                if (LOG.isTraceEnabled()) {LOG.trace("rewrittenStmt:" + parsedStmt.toSql());
                }
                if (explainOptions != null) {parsedStmt.setIsExplain(explainOptions);
                }
            }
        }

(五)SingleNodePlan

经过 parse、Analyze、Rewrite 阶段后,AST 会生成 singleNodePlanner,源码如下:

singleNodePlanner = new SingleNodePlanner(plannerContext);
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();

单机 Plan 由 SingleNodePlanner 履行,输入是 AST,输出是单机物理履行 Plan, Plan 中每个节点是一个 PlanNode。

SingleNodePlanner 中心使命便是依据 AST 生成 OlapScanNode, AggregationNode, HashJoinNode, SortNode, UnionNode 等。

Doris 在生成单机 Plan 的时分首要进行了以下作业或优化

  1. Slot 物化:指确定一个表达式对应的列需求 Scan 和核算,比方聚合节点的聚合函数表达式和 Group By 表达式需求进行物化

//Slot物化,处理 Base表
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
// Slot物化 处理 where 句子的子查询
selectStmt.materializeRequiredSlots(analyzer);
  1. 投影下推:BE 在 Scan 时只会 Scan 必须读取的列
    projectPlanNode(resultSlotIds, root);
  1. 谓词下推:在满意语义正确的前提下将过滤条件尽可能下推到 Scan 节点
    pushDownPredicates(analyzer, selectStmt);
  1. 分区,分桶裁剪:比方建表时依照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,能够大大削减不必要的数据读取

  2. Join Reorder:关于 join操作,在确保成果不变的情况,经过规矩核算最优(最少资源)join 操作。

    createCheapestJoinPlan(analyzer, refPlans);
  1. Sort + Limit 优化成 TopN(FE 进行useTopN标识,BE标识履行)
    root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),useTopN, limit == -1, stmt.getOffset());
  1. MaterializedView 挑选:会依据查询需求的列,过滤,排序和 Join 的列,行数,列数等要素挑选最佳的 MaterializedView
    boolean selectFailed = singleNodePlanner.selectMaterializedView(queryStmt, analyzer);
  1. 向量化履行引擎挑选:根据现代CPU的特色与火山模型的履行特色,从头规划列式存储系统的SQL履行引擎,从而提高了CPU在SQL履行时的效率,提高了SQL查询的功能。
    if (VectorizedUtil.isVectorized()) {
            singleNodePlan.convertToVectoriezd();
    }
  1. Runtime Filter Join:Doris 在进行 Hash Join 核算时会在右表构建一个哈希表,左表流式的经过右表的哈希表从而得出 Join 成果。而 RuntimeFilter 便是充分利用了右表的 Hash 表,在右表生成哈希表的时,一起生成一个根据哈希表数据的一个过滤条件,然后下推到左表的数据扫描节点
RuntimeFilterGenerator.generateRuntimeFilters(analyzer, rootFragment.getPlanRoot());

创立 singleNodePlanner 首要代码:createSingleNodePlan()

(六)DistributedPlan

分布式查询方案 PlanFragmentTree ,每个 PlanFragment 是由
PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。

分布式查询方案 PlanFragmentTree ,每个 PlanFragment 是由
PlanNodeTree 的子树 和 Sink 节点组成的。分布式化的目标是最小化数据移动和最大化本地 Scan。

每个 PlanFragment 由 PlanNodeTree 和 Data Sink 组成,咱们从上图的 Plan Fragment 2 能够看出,由 AggregationNode、HashJoinNode 和 DataSink。Plan 分布式化的办法是添加 ExchangeNode,履行方案树会以 ExchangeNode 为鸿沟拆分为 PlanFragment。

ExchangeNode 首要是用于 BE 之间的数据交换与同享,相似 Spark 和 MR 中的 Shuffle。

各个 Fragment 的数据流通和终究的成果发送依靠:DataSink。比方 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的成果集发送到 FE。

每个 PlanFragment 能够在每个 BE 节点生成 1 个或多个履行实例,不同履行实例处理不同的数据集,经过并发来提高查询功能。

DistributedPlanner 中最首要的作业是决议 Join 的分布式履行策略:Shuffle Join,Bucket Join,Broadcast Join,Colocate Join,和添加 Aggregation 的 Merge 阶段。

决议 Join 的分布式履行策略的逻辑如下:

假如两种表明 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会履行 Colocate Join
假如 Join 的右表比较少,集群节点数较少,核算出的 Broadcast Join 本钱较低,就会挑选 Broadcast Join,不然就会挑选 Shuffle Join。

假如两种表明 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会履行 Colocate Join
假如 Join 的右表比较少,集群节点数较少,核算出的 Broadcast Join 本钱较低,就会挑选 Broadcast Join,不然就会挑选 Shuffle Join。

(七)Schedule

生成了 Plan Fragment Tree 之后,Apache Doris FE 经过 Coordinator 类对 Fragment 进行分配、分发过程,首要触及的办法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。

生成了 Plan Fragment Tree 之后,Apache Doris FE 经过 Coordinator 类对 Fragment 进行分配、分发过程,首要触及的办法有:computeScanRangeAssignment()、computeFragmentExecParams()、sendFragment()。

  • computeScanRangeAssignment():首要逻辑对fragment合理分配,尽可能确保每个BE节点的恳求都是均匀。
  • computeFragmentExecParams():处理Fragment履行参数。
  • sendFragment():发送Fragment至BE节点,

(八)Execute

Doris 的查询履行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。

BE 的 BackendService 会接纳 FE 的 查询恳求,让 FragmentMgr 进行处理。 FragmentMgr::exec_plan_fragment 会发动一个线程由 PlanFragmentExecutor 具体履行一个 plan fragment。PlanFragmentExecutor 会依据 plan fragment 创立一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。

PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的履行,会自顶向下调用每个 ExecNode 的 get_next 办法,终究数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会依照自己的逻辑处理 RowBatch。 PlanFragmentExecutor 在拿到每个 RowBatch 后,假如是中间成果,就会将数据传输给其他 BE 节点,假如是终究成果,就会将数据传输给 FE 节点。

五、参考献文

  • Apache Doris Join原理

    doris.apache.org/zh-CN/advan…

  • Apache Doris 存储层规划

    doris.apache.org/zh-CN/artic…

  • Apache Doris 元数据触及

    doris.apache.org/zh-CN/desig…

  • Apache Doris 查询原理

    blog.bcmeng.com/post/apache…

  • Apache Doris Join原理

  • Apache Doris 存储层规划

  • Apache Doris 元数据触及

  • Apache Doris 查询原理

六、实践共享

  • Apache Doris 在网易互娱的使用实践
  • Apache Doris 在知乎用户画像与实时数据的架构与实践
  • Apache Doris 物化视图与索引在京东的典型使用
  • Apache Doris Join 实现与调优实践

七、总结

本文首要介绍查询 SQL 在 Apache Doris Fe 节点经历 parse、analyze、rewrite、GenerateQueryPlan、schedule、send 等阶段处理。Apache Doris Fe 的 parse、analyze、rewrite 阶段和其他数据库处理过程差不多,本文首要解说的中心是 GenerateQueryPlan、schedule、send 阶段的原理。咱们能够深度了解 Apache Doris Fe 节点对查询 SQL 的优化操作,以及未来遇到相关功能问题不会无从下手。

本文由mdnice多平台发布