2024-01-18
原文作者:hashcon 原文地址: https://zhanghaoxin.blog.csdn.net/article/details/51459753

5. 路由模块

真正取得RouteResultset的步骤:AbstractRouteStrategy的route方法:

202401182020419711.png
对应源代码:

    public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
                String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
    
        /**
         * 处理一些路由之前的逻辑
         * 全局序列号,父子表插入
         */
        if ( beforeRouteProcess(schema, sqlType, origSQL, sc) )
            return null;
    
        /**
         * SQL 语句拦截
         */
        String stmt = MycatServer.getInstance().getSqlInterceptor().interceptSQL(origSQL, sqlType);
        if (origSQL != stmt && LOGGER.isDebugEnabled()) {
            LOGGER.debug("sql intercepted to " + stmt + " from " + origSQL);
        }
    
        //对应schema标签checkSQLschema属性,把表示schema的字符去掉
        if (schema.isCheckSQLSchema()) {
            stmt = RouterUtil.removeSchema(stmt, schema.getName());
        }
    
        RouteResultset rrs = new RouteResultset(stmt, sqlType);
    
        /**
         * 优化debug loaddata输出cache的日志会极大降低性能
         */
        if (LOGGER.isDebugEnabled() && origSQL.startsWith(LoadData.loadDataHint)) {
            rrs.setCacheAble(false);
        }
    
           /**
            * rrs携带ServerConnection的autocommit状态用于在sql解析的时候遇到
            * select ... for update的时候动态设定RouteResultsetNode的canRunInReadDB属性
            */
        if (sc != null ) {
            rrs.setAutocommit(sc.isAutocommit());
        }
    
        /**
         * DDL 语句的路由
         */
        if (ServerParse.DDL == sqlType) {
            return RouterUtil.routeToDDLNode(rrs, sqlType, stmt, schema);
        }
    
        /**
         * 检查是否有分片
         */
        if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
            rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
        } else {
            RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
            if (returnedSet == null) {
                rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool);
            }
        }
    
        return rrs;
    }

5.3 路由之前的逻辑 - 判断子表插入以及全局序列号的生成:

AbstractRouteStrategy.java

    /**
     * 路由之前必要的处理
     * 主要是全局序列号插入,还有子表插入
     */
    private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
            throws SQLNonTransientException {
    
        return RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
                || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
                || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
    }

这里利用了Java的一个特性,||表达式,前半部分如果为真,则后半部分不会被执行。首先执行RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc),这个方法是判断是否是显示使用全局序列号的sql语句,比如像:insert into table1(id,name) values(next value for MYCATSEQ_GLOBAL,‘test’);
对于这样的语句处理是先将改写next value for MYCATSEQ_GLOBAL 为调用全局ID生成的ID,之后进入AST语句解析路由

如果不是,则执行(sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc)),这个方法判断是否是子表插入:
部分代码:

    String tableName = StringUtil.getTableName(origSQL).toUpperCase();
    final TableConfig tc = schema.getTables().get(tableName);
    //判断是否为子表,如果不是,只会返回false
    if (null != tc && tc.isChildTable()) {
    final RouteResultset rrs = new RouteResultset(origSQL, ServerParse.INSERT);
    String joinKey = tc.getJoinKey();
    //因为是Insert语句,用MySqlInsertStatement进行parse
    MySqlInsertStatement insertStmt = (MySqlInsertStatement) (new MySqlStatementParser(origSQL)).parseInsert();
    ......

这里注意,所有类型的SQL语句都有druid对应的SQLparser,比如说这里的插入语句就用MySqlInsertStatement解析。druidparser在这节先不讲,会在 AST语义解析路由中详细讲述。

202401182020424902.png
接上面代码:

    //判断条件完整性,取得解析后语句列中的joinkey列的index
        int joinKeyIndex = getJoinKeyIndex(insertStmt.getColumns(), joinKey);
        if (joinKeyIndex == -1) {
            String inf = "joinKey not provided :" + tc.getJoinKey() + "," + insertStmt;
            LOGGER.warn(inf);
            throw new SQLNonTransientException(inf);
        }
        //子表不支持批量插入
        if (isMultiInsert(insertStmt)) {
            String msg = "ChildTable multi insert not provided";
            LOGGER.warn(msg);
            throw new SQLNonTransientException(msg);
        }
        //取得joinkey的值
        String joinKeyVal = insertStmt.getValues().getValues().get(joinKeyIndex).toString();
    
        String sql = insertStmt.toString();
    
        // try to route by ER parent partion key
        //如果是二级子表(父表不再有父表),并且分片字段正好是joinkey字段,调用routeByERParentKey
        RouteResultset theRrs = RouterUtil.routeByERParentKey(sc, schema, ServerParse.INSERT, sql, rrs, tc, joinKeyVal);
        if (theRrs != null) {
            boolean processedInsert=false;
            //判断是否需要全局序列号
                  if ( sc!=null && tc.isAutoIncrement()) {
                      String primaryKey = tc.getPrimaryKey();
                      processedInsert=processInsert(sc,schema,ServerParse.INSERT,sql,tc.getName(),primaryKey);
                  }
                  if(processedInsert==false){
                    rrs.setFinishedRoute(true);
                      sc.getSession2().execute(rrs, ServerParse.INSERT);
                  }
            return true;
        }
    
        // route by sql query root parent's datanode
        //如果不是二级子表或者分片字段不是joinKey字段结果为空,则启动异步线程去后台分片查询出datanode
        //只要查询出上一级表的parentkey字段的对应值在哪个分片即可
        final String findRootTBSql = tc.getLocateRTableKeySql().toLowerCase() + joinKeyVal;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("find root parent's node sql " + findRootTBSql);
        }
    
        ListenableFuture<String> listenableFuture = MycatServer.getInstance().
                getListeningExecutorService().submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler();
                return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes());
            }
        });
    
    
        Futures.addCallback(listenableFuture, new FutureCallback<String>() {
            @Override
            public void onSuccess(String result) {
                //结果为空,证明上一级表中不存在那条记录,失败
                if (Strings.isNullOrEmpty(result)) {
                    StringBuilder s = new StringBuilder();
                    LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +
                            " err:" + "can't find (root) parent sharding node for sql:" + origSQL);
                    sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "can't find (root) parent sharding node for sql:" + origSQL);
                    return;
                }
    
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("found partion node for child table to insert " + result + " sql :" + origSQL);
                }
                //找到分片,进行插入(和其他的一样,需要判断是否需要全局自增ID)
                boolean processedInsert=false;
                      if ( sc!=null && tc.isAutoIncrement()) {
                          try {
                              String primaryKey = tc.getPrimaryKey();
                        processedInsert=processInsert(sc,schema,ServerParse.INSERT,origSQL,tc.getName(),primaryKey);
                    } catch (SQLNonTransientException e) {
                        LOGGER.warn("sequence processInsert error,",e);
                        sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR , "sequence processInsert error," + e.getMessage());
                    }
                      }
                      if(processedInsert==false){
                        RouteResultset executeRrs = RouterUtil.routeToSingleNode(rrs, result, origSQL);
                        sc.getSession2().execute(executeRrs, ServerParse.INSERT);
                      }
    
            }
    
            @Override
            public void onFailure(Throwable t) {
                StringBuilder s = new StringBuilder();
                LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() +
                        " err:" + t.getMessage());
                sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, t.getMessage() + " " + s.toString());
            }
        }, MycatServer.getInstance().
                getListeningExecutorService());
        return true;
    }
    return false;

如果返回false,则继续执行(sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc))
这个是处理一般的SQL插入语句,将其中的自增主键字段的值改写成内置的全局ID生成器生成的id。
RouterUtil.java:

    public static boolean processInsert(SchemaConfig schema, int sqlType,
                                            String origSQL, ServerConnection sc) throws SQLNonTransientException {
        String tableName = StringUtil.getTableName(origSQL).toUpperCase();
        TableConfig tableConfig = schema.getTables().get(tableName);
        boolean processedInsert=false;
        //判断是有自增字段
        if (null != tableConfig && tableConfig.isAutoIncrement()) {
            String primaryKey = tableConfig.getPrimaryKey();
            processedInsert=processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey);
        }
        return processedInsert;
    }

调用processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey):

202401182020429033.png

    public static boolean processInsert(ServerConnection sc,SchemaConfig schema,
                int sqlType,String origSQL,String tableName,String primaryKey) throws SQLNonTransientException {
    
        int firstLeftBracketIndex = origSQL.indexOf("(");
        int firstRightBracketIndex = origSQL.indexOf(")");
        String upperSql = origSQL.toUpperCase();
        int valuesIndex = upperSql.indexOf("VALUES");
        int selectIndex = upperSql.indexOf("SELECT");
        int fromIndex = upperSql.indexOf("FROM");
        //屏蔽insert into table1 select * from table2语句
        if(firstLeftBracketIndex < 0) {
            String msg = "invalid sql:" + origSQL;
            LOGGER.warn(msg);
            throw new SQLNonTransientException(msg);
        }
        //屏蔽批量插入
        if(selectIndex > 0 &&fromIndex>0&&selectIndex>firstRightBracketIndex&&valuesIndex<0) {
            String msg = "multi insert not provided" ;
            LOGGER.warn(msg);
            throw new SQLNonTransientException(msg);
        }
        //插入语句必须提供列结构,因为MyCat默认对于表结构无感知
        if(valuesIndex + "VALUES".length() <= firstLeftBracketIndex) {
            throw new SQLSyntaxErrorException("insert must provide ColumnList");
        }
        //如果主键不在插入语句的fields中,则需要进一步处理
        boolean processedInsert=!isPKInFields(origSQL,primaryKey,firstLeftBracketIndex,firstRightBracketIndex);
        if(processedInsert){
            processInsert(sc,schema,sqlType,origSQL,tableName,primaryKey,firstLeftBracketIndex+1,origSQL.indexOf('(',firstRightBracketIndex)+1);
        }
        return processedInsert;
    }

对于主键不在插入语句的fields中的SQL,需要改写。比如hotnews主键为id,插入语句为:

    insert into hotnews(title) values('aaa');

需要改写成:

    insert into hotnews(id, title) values(next value for MYCATSEQ_hotnews,'aaa');

这个在下面这个函数实现:

    private static void processInsert(ServerConnection sc, SchemaConfig schema, int sqlType, String origSQL,
                String tableName, String primaryKey, int afterFirstLeftBracketIndex, int afterLastLeftBracketIndex) {
    
        int primaryKeyLength = primaryKey.length();
        int insertSegOffset = afterFirstLeftBracketIndex;
        String mycatSeqPrefix = "next value for MYCATSEQ_";
        int mycatSeqPrefixLength = mycatSeqPrefix.length();
        int tableNameLength = tableName.length();
    
        char[] newSQLBuf = new char[origSQL.length() + primaryKeyLength + mycatSeqPrefixLength + tableNameLength + 2];
        origSQL.getChars(0, afterFirstLeftBracketIndex, newSQLBuf, 0);
        primaryKey.getChars(0, primaryKeyLength, newSQLBuf, insertSegOffset);
        insertSegOffset += primaryKeyLength;
        newSQLBuf[insertSegOffset] = ',';
        insertSegOffset++;
        origSQL.getChars(afterFirstLeftBracketIndex, afterLastLeftBracketIndex, newSQLBuf, insertSegOffset);
        insertSegOffset += afterLastLeftBracketIndex - afterFirstLeftBracketIndex;
        mycatSeqPrefix.getChars(0, mycatSeqPrefixLength, newSQLBuf, insertSegOffset);
        insertSegOffset += mycatSeqPrefixLength;
        tableName.getChars(0, tableNameLength, newSQLBuf, insertSegOffset);
        insertSegOffset += tableNameLength;
        newSQLBuf[insertSegOffset] = ',';
        insertSegOffset++;
        origSQL.getChars(afterLastLeftBracketIndex, origSQL.length(), newSQLBuf, insertSegOffset);
        processSQL(sc, schema, new String(newSQLBuf), sqlType);
    }

最后的processSQL(sc, schema, new String(newSQLBuf), sqlType);是将语句放入执行队列:
这里MyCat考虑NIO线程吞吐量以及全局ID生成线程安全的问题,使用如下流程执行需要全局ID的SQL insert语句。
processSQL(sc, schema, new String(newSQLBuf), sqlType):

    SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
    MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);

202401182020432954.png

阅读全文