2024-01-18  阅读(52)
原文作者:hashcon 原文地址: https://zhanghaoxin.blog.csdn.net/article/details/50733289

3. 连接模块

202401182020277371.png

3.4 FrontendConnection前端连接

202401182020282132.png
构造方法:

    public FrontendConnection(NetworkChannel channel) throws IOException {
            super(channel);
            InetSocketAddress localAddr = (InetSocketAddress) channel.getLocalAddress();
            InetSocketAddress remoteAddr = null;
            if (channel instanceof SocketChannel) {
                remoteAddr = (InetSocketAddress) ((SocketChannel) channel).getRemoteAddress();  
    
            } else if (channel instanceof AsynchronousSocketChannel) {
                remoteAddr = (InetSocketAddress) ((AsynchronousSocketChannel) channel).getRemoteAddress();
            }
    
            this.host = remoteAddr.getHostString();
            this.port = localAddr.getPort();
            this.localPort = remoteAddr.getPort();
            this.handler = new FrontendAuthenticator(this);
        }

FrontendConnection是对前端连接channel的封装,接受NetworkChannel作为参数构造。前端连接建立,需要先验证其权限,所以,handler首先设置为FrontendAuthenticator
等到验证成功,handler会被设置成FrontendCommandHandler。
下面来看和FrontendConnection相关的Handler:

202401182020285623.png
FrontendCommandHandler会先解析请求类型,之后调用不同的方法处理不同类型的请求。例如,FrontendQueryHandler会解析query类型的sql请求语句:

     @Override
        public void handle(byte[] data)
        {
            if(source.getLoadDataInfileHandler()!=null&&source.getLoadDataInfileHandler().isStartLoadData())
            {
                MySQLMessage mm = new MySQLMessage(data);
                int  packetLength = mm.readUB3();
                if(packetLength+4==data.length)
                {
                    source.loadDataInfileData(data);
                }
                return;
            }
            switch (data[4])
            {
                case MySQLPacket.COM_INIT_DB:
                    commands.doInitDB();
                    source.initDB(data);
                    break;
                case MySQLPacket.COM_QUERY:
                    commands.doQuery();
                    source.query(data);
                    break;
                case MySQLPacket.COM_PING:
                    commands.doPing();
                    source.ping();
                    break;
                case MySQLPacket.COM_QUIT:
                    commands.doQuit();
                    source.close("quit cmd");
                    break;
                case MySQLPacket.COM_PROCESS_KILL:
                    commands.doKill();
                    source.kill(data);
                    break;
                case MySQLPacket.COM_STMT_PREPARE:
                    commands.doStmtPrepare();
                    source.stmtPrepare(data);
                    break;
                case MySQLPacket.COM_STMT_EXECUTE:
                    commands.doStmtExecute();
                    source.stmtExecute(data);
                    break;
                case MySQLPacket.COM_STMT_CLOSE:
                    commands.doStmtClose();
                    source.stmtClose(data);
                    break;
                case MySQLPacket.COM_HEARTBEAT:
                    commands.doHeartbeat();
                    source.heartbeat(data);
                    break;
                default:
                         commands.doOther();
                         source.writeErrMessage(ErrorCode.ER_UNKNOWN_COM_ERROR,
                                 "Unknown command");
    
            }
        }

FrontendCommandHandler会调用FrontendConnection合适的方法解析处理不同的请求,例如它的initDB(byte[] data)方法:

        public void initDB(byte[] data) {
    
            MySQLMessage mm = new MySQLMessage(data);
            mm.position(5);
            String db = mm.readString();
    
            // 检查schema的有效性
            if (db == null || !privileges.schemaExists(db)) {
                writeErrMessage(ErrorCode.ER_BAD_DB_ERROR, "Unknown database '" + db + "'");
                return;
            }
    
            if (!privileges.userExists(user, host)) {
                writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "Access denied for user '" + user + "'");
                return;
            }
    
            Set<String> schemas = privileges.getUserSchemas(user);
            if (schemas == null || schemas.size() == 0 || schemas.contains(db)) {
                this.schema = db;
                write(writeToBuffer(OkPacket.OK, allocate()));
            } else {
                String s = "Access denied for user '" + user + "' to database '" + db + "'";
                writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
            }
        }

方法调用:

202401182020288834.png
通过查看可以发现,在command packet被解析出是initDB类型的请求时(其实就是用户发送的查询语句为“use XXX”),会调用此方法进行处理,同时,这些方法都是被RW线程执行的。
此方法从FrontedPrivilege中验证用户是否有权限访问这个逻辑库,如果有就把当前连接的逻辑库设为用户请求的逻辑库。
其他方法与handler也是相似的关系,可以看出,FrontendConnection组合了多种封装的handler来处理不同的请求的不同阶段。至于各种handler,会在之后sql解析,sql路由,协议实现等模块详细介绍。

3.4.1 ServerConnection服务端连接

前端连接包括ServerConnection(服务端连接)和ManagerConnection(管理端连接)。前端链接不会直接创建,而是通过工厂创建:
工厂方法:

    @Override
        protected FrontendConnection getConnection(NetworkChannel channel) throws IOException {
            SystemConfig sys = MycatServer.getInstance().getConfig().getSystem();
            ServerConnection c = new ServerConnection(channel);
            MycatServer.getInstance().getConfig().setSocketParams(c, true);
            c.setPrivileges(MycatPrivileges.instance());
            c.setQueryHandler(new ServerQueryHandler(c));
            c.setLoadDataInfileHandler(new ServerLoadDataInfileHandler(c));
            // c.setPrepareHandler(new ServerPrepareHandler(c));
            c.setTxIsolation(sys.getTxIsolation());
            c.setSession2(new NonBlockingSession(c));
            return c;
        }

可以看出,每个新的ServerConnection都会绑定一个新的ServerQueryHandler负责处理sql指令,一个ServerLoadDataInfileHandler负责处理文件载入命令,一个session负责处理事务
下面是相关的类图

202401182020295115.png
这里的所有独立的handler里面都是static方法,可供其他类直接调用。每个ServerConnection都会有一个NonBlockingSession来处理。
这里说下连接、会话、逻辑库、MyCat实例的关系(与MySQL里面的连接、会话、数据库、MySQL实例的关系不太一样);首先每个MyCat实例都管理多个数据库。连接是针对MyCat实例建立的,并且,MyCat的连接(AbstractConnection)是不可复用的,在close方法会关闭连接并清理使用的资源。但是缓存资源(buffer)是可以复用的。比如,在一个前端连接长时间空闲时或者出现异常时,会被清理掉。每个连接会拥有一个session来处理事务,保存会话信息。
这里,每个连接拥有一个会话。每个连接中的方法,被RW线程执行,相当于与RW线程绑定。RW线程是可以复用的,这里相当于MySQL中的连接是可以复用的(连接池)。
Session.java:

    public interface Session {
    
        /**
         * 取得源端连接
         */
        FrontendConnection getSource();
    
        /**
         * 取得当前目标端数量
         */
        int getTargetCount();
    
        /**
         * 开启一个会话执行
         */
        void execute(RouteResultset rrs, int type);
    
        /**
         * 提交一个会话执行
         */
        void commit();
    
        /**
         * 回滚一个会话执行
         */
        void rollback();
    
        /**
         * 取消一个正在执行中的会话
         * 
         * @param sponsor
         *            如果发起者为null,则表示由自己发起。
         */
        void cancel(FrontendConnection sponsor);
    
        /**
         * 终止会话,必须在关闭源端连接后执行该方法。
         */
        void terminate();
    
    }

下面我们着重研究它的实现类NonBlockingSession:
首先,取得源端连接方法FrontendConnection getSource();,其实就是NonBlockingSession在创建时就已绑定一个连接,谁会调用这个方法取得源端链接呢?

202401182020298006.png
可以发现,主要有各种查询的handler还有SQLengine会去调用。因为处理无论返回什么结果,都需要返回给源端。
int getTargetCount();取得当前目标端数量。根据目标端的数量不同会用不同的handler处理转发SQL和合并结果。

    @Override
        public void execute(RouteResultset rrs, int type) {
            // 清理之前处理用的资源
            clearHandlesResources();
            if (LOGGER.isDebugEnabled()) {
                StringBuilder s = new StringBuilder();
                LOGGER.debug(s.append(source).append(rrs).toString() + " rrs ");
            }
    
            // 检查路由结果是否为空
            RouteResultsetNode[] nodes = rrs.getNodes();
            if (nodes == null || nodes.length == 0 || nodes[0].getName() == null
                    || nodes[0].getName().equals("")) {
                //如果为空,则表名有误,提示客户端
                source.writeErrMessage(ErrorCode.ER_NO_DB_ERROR,
                        "No dataNode found ,please check tables defined in schema:"
                                + source.getSchema());
                return;
            }
            //如果路由结果个数为1,则为单点查询或事务
            if (nodes.length == 1) {
                //使用SingleNodeHandler处理单点查询或事务
                singleNodeHandler = new SingleNodeHandler(rrs, this);
                try {
                    singleNodeHandler.execute();
                } catch (Exception e) {
                    LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
                }
            } else {
                //如果路由结果>1,则为多点查询或事务
                boolean autocommit = source.isAutocommit();
                SystemConfig sysConfig = MycatServer.getInstance().getConfig()
                        .getSystem();
                //mutiNodeLimitType没有用。。。
                int mutiNodeLimitType = sysConfig.getMutiNodeLimitType();
                //使用multiNodeHandler处理多点查询或事务
                multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit,
                        this);
    
                try {
                    multiNodeHandler.execute();
                } catch (Exception e) {
                    LOGGER.warn(new StringBuilder().append(source).append(rrs), e);
                    source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
                }
            }
        }

每次一个Session执行SQL时,会先清理handler使用的资源。SingleNodeHandler与multiNodeHandler之后会讲。这里的handler我们之后会在每个模块去讲,Session之后也还会提到,敬请期待

阅读全文
  • 点赞