博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Dubbo 浅读
阅读量:6292 次
发布时间:2019-06-22

本文共 9108 字,大约阅读时间需要 30 分钟。

最近公司一直在用阿里的开源框架Dubbo,正好上一篇文章也是讲到了RPC的概念,Dubbo听过的兄弟都知道在业界好评还是很高的,不光是设计优雅,文档也很齐全,这次就简单的分享下LZ的解读成果,当然本文章只是浅层次的,着重分析的是Dubbo核心层如何去高效的执行调用远程RPC服务的。

这里要简单跟兄弟们区分下概念,最常见最具代表性也是比较简单的HTTP协议(短连接)与Socket编程(长连接)的区别,这里不再多讲前者,这次主要最涉及到后者。

知识点储备前提:

 JAVA 动态代理(网上很多,这里推荐比较全面的文章:)

JAVA NIO()

这里LZ依然推荐的是带着问题去学习,依旧是Scrum三要素:服务端怎么暴露服务,怎么引用服务,两者怎么实现通讯。

1:服务端怎么暴露服务

ServiceConfig类里export方法有一段

if (delay != null && delay > 0) {            Thread thread = new Thread(new Runnable() {                public void run() {                    try {                        Thread.sleep(delay);                    } catch (Throwable e) {                    }                    doExport();                }            });            thread.setDaemon(true);            thread.setName("DelayExportServiceThread");            thread.start();        } else {            doExport();        }

看上面代码,如果没有设置延迟暴露服务,直接运行doExport();

一路跟踪代码,校验HOST,类型转换等跳过,最终看到下面代码。

Invoker
invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));Exporter
exporter = protocol.export(invoker);exporters.add(exporter);

光看着几行依旧不太明了,首先关注Invoker类,最终又会看到URL类。

关注下URL类会看到一些连接需要的字段信息,大概猜测即为连接所用的信息实体类。

继续往下看proxyFactory.getInvoker(ref, (Class) interfaceClass, url);

public 
Invoker
getInvoker(T proxy, Class
type, URL url) { // TODO Wrapper类不能正确处理带$的类名 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker
(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName, Class
[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }

可以看到此方法根据参数,返回一个Invoker实体。再看protocol.export(invoker);

往下追会看到如下:

     try {            server = Exchangers.bind(url, requestHandler);        } catch (RemotingException e) {            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);        }

再往下追,看到

return getTransporter().bind(url, handler);

发现Transporter有三个实现类:GrizzlyTransporter,MinaTransporter,NettyTransporter 。

看到这些名字也许脑海里会有点概念,哦~这里就开始用Grizzly,Mina,Netty这里开源框架去实现通讯啦~

但是还是不是很彻底,没关系,接着第二个问题

2:怎么引用服务?

ReferenceConfig类init();方法里有段

ref = createProxy(map);我们从这可以看得出有Proxy字样,这就可以确定我们的方向没有走错,继续往下看。

我们又可以看到下面代码:

if (urls.size() == 1) {    invoker = refprotocol.refer(interfaceClass, urls.get(0));} else {    List
> invokers = new ArrayList
>(); URL registryURL = null; for (URL url : urls) { invokers.add(refprotocol.refer(interfaceClass, url)); if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { registryURL = url; // 用了最后一个registry url } } if (registryURL != null) { // 有 注册中心协议的URL // 对有注册中心的Cluster 只用 AvailableCluster URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME); invoker = cluster.join(new StaticDirectory(u, invokers)); } else { // 不是 注册中心的URL invoker = cluster.join(new StaticDirectory(invokers)); }}

这里的代码回顾之前的暴露服务时的Invoker有没有熟悉?是的,这里就是根据URL来得到Invoker实体,

最后有个 getProxy(invoker, interfaces);

@SuppressWarnings("unchecked")    public 
T getProxy(Invoker
invoker, Class
[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }

看到这里就看熟悉了啦~再看InvokerInvocationHandler类,实现InvocationHandler接口,

if (method.getDeclaringClass() == Object.class) {    return method.invoke(invoker, args);}

其实最终还是认出了它的本质。

那么问题来了......

3:两者怎么实现通讯

回到刚刚暴露服务那里,Transporter有三个实现类GrizzlyTransporter,MinaTransporter,NettyTransporter 。

我们以NettyTransporter为例:

 

public Server bind(URL url, ChannelHandler listener) throws RemotingException {        return new NettyServer(url, listener);    }    public Client connect(URL url, ChannelHandler listener) throws RemotingException {        return new NettyClient(url, listener);    }

 

我们跟踪NettyServer看到doOpen()里:

channel = bootstrap.bind(getBindAddress());

这个channel也就是Netty里的channel,终于交给Netty啦。

我们再跟踪NettyClient的时候貌似不是简单的连接,往后跟踪发现

 

cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));

 

这一段就可以看到利用ExecutorService去分发连接。

那么可能看到以上分析还是有点晕乎乎,没有连续感有木有?

是的,其实光分析以上的也不会有太直观的感受,LZ根据这些原理写了个最直观最简单的Demo:

 这是简化过的核心代理层:

import java.io.ObjectInputStream;import java.io.ObjectOutputStream;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.net.ServerSocket;import java.net.Socket;/** * Created by luyichen718 on 14-12-31. */public class RpcDemo {    /**     * 暴露服务     *     * @param service 服务实现     * @param port    服务端口     * @throws Exception     */    public static void export(final Object service, int port) throws Exception {        try {            ServerSocket server = new ServerSocket(port);            //该处采用不间断阻塞式监听端口            //TODO:此处可以用NIO来优化实现            while (true){                final Socket socket = server.accept();                try {                    try {                        ObjectInputStream input = new ObjectInputStream(socket.getInputStream());                        try {                            //也可以加服务注册方式管理服务                            String methodName = input.readUTF();                            Class
[] parameterTypes = (Class
[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); //TODO:前后可以做监控 try { Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { output.close(); } } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } /** * 引用服务 * * @param
接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static
T reference(final Class
interfaceClass, final String host, final int port) throws Exception { return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class
[]{interfaceClass}, new InvocationHandler() { public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); }}

服务生产者:

public static void main(String[] args) throws Exception {        BizService service = new BizServiceImpl();        RpcDemo.export(service, 6543);    }

服务消费者:

public static void main(String[] args) throws Exception {        BizService service = RpcDemo.reference(BizService.class, "127.0.0.1", 6543);        service.hello();    }

该处的BizService就是一个最简单的业务接口,里面可以实现具体的业务,接口一般单独生产jar包给消费方使用。

可能对于一般读者也是最有价值的了吧,以上晕乎乎的可以先看Demo然后再下载了源码看看分析^_^

这个Demo也算是对一般长连接的最简易实现,当然Dubbo加了很多扩展及监控,还运用了很多Decorator模式,接口设计也很优雅,光看官网的一些接口图都晕乎乎了,所以还是要静下心来慢慢研读。

此处也算是对长连接RPC的实现做了一个小结,欢迎各兄弟交流吐槽~

转载于:https://www.cnblogs.com/Vincentlu/p/4196175.html

你可能感兴趣的文章
PHP版微信权限验证配置,音频文件下载,FFmpeg转码,上传OSS和删除转存服务器本地文件...
查看>>
教程前言 - 回归宣言
查看>>
PHP 7.1是否支持操作符重载?
查看>>
Vue.js 中v-for和v-if一起使用,来判断select中的option为选中项
查看>>
Java中AES加密解密以及签名校验
查看>>
定义内部类 继承 AsyncTask 来实现异步网络请求
查看>>
VC中怎么读取.txt文件
查看>>
如何清理mac系统垃圾
查看>>
企业中最佳虚拟机软件应用程序—Parallels Deskto
查看>>
Nginx配置文件详细说明
查看>>
怎么用Navicat Premium图标编辑器创建表
查看>>
Spring配置文件(2)配置方式
查看>>
MariaDB/Mysql 批量插入 批量更新
查看>>
ItelliJ IDEA开发工具使用—创建一个web项目
查看>>
solr-4.10.4部署到tomcat6
查看>>
切片键(Shard Keys)
查看>>
淘宝API-类目
查看>>
virtualbox 笔记
查看>>
Git 常用命令
查看>>
驰骋工作流引擎三种项目集成开发模式
查看>>