dubbo - server的bind过程分析

简介: 开篇这篇文章主要是为了讲清楚dubbo server端在bind过程中整个调用链,之前在dubbo服务发布的流程中已经讲解过在dubbo的服务发布过程中底层最终是通过bind()方法来实现监听的。这篇文章会对bind的过程进行细化讲解,包括核心的Exchangers、HeaderExchanger、HeaderExchangeServer等类。

开篇

这篇文章主要是为了讲清楚dubbo server端在bind过程中整个调用链,之前在dubbo服务发布的流程中已经讲解过在dubbo的服务发布过程中底层最终是通过bind()方法来实现监听的。
这篇文章会对bind的过程进行细化讲解,包括核心的Exchangers、HeaderExchanger、HeaderExchangeServer等类。


调用链

image

说明:

  • 重点关注类Exchangers、HeaderExchanger、HeaderExchangeServer、Transporters、NettyTransporter、NettyServer。

DubboProtocol

public class DubboProtocol extends AbstractProtocol {

   public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 省略相关代码
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

    private ExchangeServer createServer(URL url) {
        // 省略相关代码
        ExchangeServer server;
        try {
            // 核心的代码绑定
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
}

说明:

  • DubboProtocol类的createServer()方法调用Exchangers.bind()进入Exchangers类。


Exchangers & HeaderExchanger

public class Exchangers {

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {

        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");

        // 绑定过程的入口
        return getExchanger(url).bind(url, handler);
    }


    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    // header=com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchanger
    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }
}



public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

说明:

  • Exchangers的bind()方法内部调用getExchanger(url).bind()。
  • getExchanger()方法内部返回HeaderExchanger对象,涉及SPI机制。
  • getExchanger(url).bind()执行HeaderExchanger的bind()方法。
  • HeaderExchanger的bind内部返回HeaderExchangeServer对象。
  • HeaderExchangeServer类的构造函数参数是Transporters.bind()返回的server值。
  • Transporters.bind()方法返回NettyServer对象。


HeaderExchangeServer

public class HeaderExchangeServer implements ExchangeServer {

    private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1,
            new NamedThreadFactory(
                    "dubbo-remoting-server-heartbeat",
                    true));
    private final Server server;
    // heartbeat timer
    private ScheduledFuture<?> heatbeatTimer;
    // heartbeat timeout (ms), default value is 0 , won't execute a heartbeat.
    private int heartbeat;
    private int heartbeatTimeout;
    private AtomicBoolean closed = new AtomicBoolean(false);

    public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        startHeatbeatTimer();
    }

    public void reset(URL url) {
        server.reset(url);
        try {
            if (url.hasParameter(Constants.HEARTBEAT_KEY)
                    || url.hasParameter(Constants.HEARTBEAT_TIMEOUT_KEY)) {
                int h = url.getParameter(Constants.HEARTBEAT_KEY, heartbeat);
                int t = url.getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, h * 3);
                if (t < h * 2) {
                    throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
                }
                if (h != heartbeat || t != heartbeatTimeout) {
                    heartbeat = h;
                    heartbeatTimeout = t;
                    startHeatbeatTimer();
                }
            }
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }

    @Deprecated
    public void reset(com.alibaba.dubbo.common.Parameters parameters) {
        reset(getUrl().addParameters(parameters.getParameters()));
    }

    public void send(Object message) throws RemotingException {
        if (closed.get()) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
        }
        server.send(message);
    }

    public void send(Object message, boolean sent) throws RemotingException {
        if (closed.get()) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The server " + getLocalAddress() + " is closed!");
        }
        server.send(message, sent);
    }
}

说明:

  • HeaderExchangeServer类包含变量Server server,server为NettyServer对象。
  • HeaderExchangeServer对相当于对Server类进行了一层分装,所有对Server层的操作都通过HeaderExchangeServer进行封装并对外提供服务。


Transporters

public class Transporters {

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }

        return getTransporter().bind(url, handler);
    }

    // netty4=com.alibaba.dubbo.remoting.transport.netty4.NettyTransporter
    // netty=com.alibaba.dubbo.remoting.transport.netty.NettyTransporter
    // mina=com.alibaba.dubbo.remoting.transport.mina.MinaTransporter
    public static Transporter getTransporter() {
        return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
    }
}

说明:

  • Transporters.bind()方法通过getTransporter()返回NettyTransporter对象,涉及SPI机制。
  • getTransporter().bind()方法执行到NettyTransporter.bind()方法,返回NettyServer对象。


NettyTransporter

public class NettyTransporter implements Transporter {

    public static final String NAME = "netty";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new NettyClient(url, listener);
    }

}



public class NettyServer extends AbstractServer implements Server {

    private Map<String, Channel> channels; // <ip:port, channel>

    private ServerBootstrap bootstrap;

    private org.jboss.netty.channel.Channel channel;

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

    @Override
    protected void doOpen() throws Throwable {

        ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
        ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
        ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
        bootstrap = new ServerBootstrap(channelFactory);

        final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
        channels = nettyHandler.getChannels();

        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                ChannelPipeline pipeline = Channels.pipeline();

                pipeline.addLast("decoder", adapter.getDecoder());
                pipeline.addLast("encoder", adapter.getEncoder());
                pipeline.addLast("handler", nettyHandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getBindAddress());
    }

    @Override
    protected void doClose() throws Throwable {
        try {
            if (channel != null) {
                // unbind.
                channel.close();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
            if (channels != null && channels.size() > 0) {
                for (com.alibaba.dubbo.remoting.Channel channel : channels) {
                    try {
                        channel.close();
                    } catch (Throwable e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (bootstrap != null) {
                // release external resource.
                bootstrap.releaseExternalResources();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
        try {
            if (channels != null) {
                channels.clear();
            }
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    public Collection<Channel> getChannels() {
        Collection<Channel> chs = new HashSet<Channel>();
        for (Channel channel : this.channels.values()) {
            if (channel.isConnected()) {
                chs.add(channel);
            } else {
                channels.remove(NetUtils.toAddressString(channel.getRemoteAddress()));
            }
        }
        return chs;
    }

    public Channel getChannel(InetSocketAddress remoteAddress) {
        return channels.get(NetUtils.toAddressString(remoteAddress));
    }

    public boolean isBound() {
        return channel.isBound();
    }

}

说明:

  • NettyTransporter的bind()返回NettyServer对象。
  • NettyTransporter的connect()返回NettyClient对象。
  • NettyServer内部绑定的流程都是Netty相关的服务。


GrizzlyTransporter

public class GrizzlyTransporter implements Transporter {

    public static final String NAME = "grizzly";

    public Server bind(URL url, ChannelHandler listener) throws RemotingException {
        return new GrizzlyServer(url, listener);
    }

    public Client connect(URL url, ChannelHandler listener) throws RemotingException {
        return new GrizzlyClient(url, listener);
    }

}



public class GrizzlyServer extends AbstractServer {

    private static final Logger logger = LoggerFactory.getLogger(GrizzlyServer.class);

    private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); // <ip:port, channel>

    private TCPNIOTransport transport;

    public GrizzlyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
    }

    @Override
    protected void doOpen() throws Throwable {
        FilterChainBuilder filterChainBuilder = FilterChainBuilder.stateless();
        filterChainBuilder.add(new TransportFilter());

        filterChainBuilder.add(new GrizzlyCodecAdapter(getCodec(), getUrl(), this));
        filterChainBuilder.add(new GrizzlyHandler(getUrl(), this));
        TCPNIOTransportBuilder builder = TCPNIOTransportBuilder.newInstance();
        ThreadPoolConfig config = builder.getWorkerThreadPoolConfig();
        config.setPoolName(SERVER_THREAD_POOL_NAME).setQueueLimit(-1);
        String threadpool = getUrl().getParameter(Constants.THREADPOOL_KEY, Constants.DEFAULT_THREADPOOL);
        if (Constants.DEFAULT_THREADPOOL.equals(threadpool)) {
            int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
            config.setCorePoolSize(threads).setMaxPoolSize(threads)
                    .setKeepAliveTime(0L, TimeUnit.SECONDS);
        } else if ("cached".equals(threadpool)) {
            int threads = getUrl().getPositiveParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            config.setCorePoolSize(0).setMaxPoolSize(threads)
                    .setKeepAliveTime(60L, TimeUnit.SECONDS);
        } else {
            throw new IllegalArgumentException("Unsupported threadpool type " + threadpool);
        }
        builder.setKeepAlive(true).setReuseAddress(false)
                .setIOStrategy(SameThreadIOStrategy.getInstance());
        transport = builder.build();
        transport.setProcessor(filterChainBuilder.build());
        transport.bind(getBindAddress());
        transport.start();
    }

    @Override
    protected void doClose() throws Throwable {
        try {
            transport.stop();
        } catch (Throwable e) {
            logger.warn(e.getMessage(), e);
        }
    }

    public boolean isBound() {
        return !transport.isStopped();
    }

    public Collection<Channel> getChannels() {
        return channels.values();
    }

    public Channel getChannel(InetSocketAddress remoteAddress) {
        return channels.get(NetUtils.toAddressString(remoteAddress));
    }

    @Override
    public void connected(Channel ch) throws RemotingException {
        channels.put(NetUtils.toAddressString(ch.getRemoteAddress()), ch);
        super.connected(ch);
    }

    @Override
    public void disconnected(Channel ch) throws RemotingException {
        channels.remove(NetUtils.toAddressString(ch.getRemoteAddress()));
        super.disconnected(ch);
    }

}

说明:

  • GrizzlyTransporter和NettyTransporter对象的实现逻辑是一致的。



image



Transport 依赖图



Server 实现类图



Client 实现类图

目录
相关文章
|
应用服务中间件 Dubbo
Dubbo Provider export过程分析
开篇  这篇文章尝试对Dubbo服务发布和调用中关于ServiceImpl->invoker->Exporter层面的过程进行分析,希望能够回答ServiceImpl到Exporter的转化过程。  因为Netty转发部分的逻辑也是一个比较复杂的过程,所以拆解成几篇文章分开讲解,这里我们只关注服务发布过程中对象的转换以及部分调用的过程。
1031 0
|
应用服务中间件 Dubbo
dubbo之Protocol获取适应扩展过程分析
开篇  这篇文章尝试分析Protocol的getAdaptiveExtension过程,和dubbo之ExtensionFactory获取适应扩展过程分析不一样的过程在于ExtensionFactory的扩展类是定义的AdaptiveExtensionFactory,而Protocol的适应扩展是动态生成的。
2632 0
|
Java Spring 应用服务中间件
dubbo之ExtensionFactory获取适应扩展过程分析
开篇  这篇文章尝试分析dubbo中getAdaptiveExtension过程,且以ExtensionFactory作为例子进行分析。  在这篇文章中我们会了解ExtensionFactory的几个对象AdaptiveExtensionFactory、SpringExtensionFactory、SpiExtensionFactory的初始化过程,然后在这个过程中一窥getAdaptiveExtension的全貌。
12637 0
|
Dubbo 应用服务中间件 Java
|
5月前
|
负载均衡 Dubbo 应用服务中间件
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
微服务技术系列教程(31) - Dubbo-原理及负载均衡分析
54 0
|
5月前
|
Dubbo Java 应用服务中间件
微服务技术系列教程(30) - Dubbo-SpringCloud与Dubbo区别
微服务技术系列教程(30) - Dubbo-SpringCloud与Dubbo区别
46 0
|
5月前
|
Dubbo Java 应用服务中间件
阿里新框架干掉微服务,换下Dubbo,Spring CloudAlibaba王者降临
tm快了,不知不觉中金九银十的秋招已经快结束了,不少同学现在已经拿到offer了吧~现在的面试可是越来越难了,动不动就是“互联网三高”。
阿里新框架干掉微服务,换下Dubbo,Spring CloudAlibaba王者降临
|
4月前
|
Dubbo Java 应用服务中间件
阿里巴巴资深架构师深度解析微服务架构设计之SpringCloud+Dubbo
软件架构是一个包含各种组织的系统组织,这些组件包括Web服务器,应用服务器,数据库,存储,通讯层),它们彼此或和环境存在关系。系统架构的目标是解决利益相关者的关注点。
|
19天前
|
Java fastjson 数据安全/隐私保护
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
【Dubbo3技术专题】「云原生微服务开发实战」 一同探索和分析研究RPC服务的底层原理和实现
38 0

热门文章

最新文章