您现在的位置是:亿华云 > 人工智能
《一起玩Dubbo》系列四之服务如何被调用
亿华云2025-10-03 02:18:44【人工智能】1人已围观
简介了解过rpc的大概都听过,rpc就是为了解决远程方法的本地调用的难题的,其实说穿了,就是为了解决方法在被调用到远程服被执行的流程问题,那么这个流程到底是怎么样的呢?同样的,我继续在 dubbo流程图
了解过rpc的起玩大概都听过,rpc就是服务为了解决远程方法的本地调用的难题的,其实说穿了,何被就是调用为了解决方法在被调用到远程服被执行的流程问题,那么这个流程到底是起玩怎么样的呢?
同样的,我继续在 dubbo流程图 中继续绘画我的服务流程
首先是根据文章一起玩dubbo,先入个门搭建起demo,何被包括注册中心、调用服务消费方和服务提供方,起玩接下来来撸撸整个过程
这边为了方便解说,服务先直接给个demo
这是何被服务提供方
public class DemoServiceImpl implements DemoService { @Override public String sayHello(String name) { System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress(); } }这是服务消费方
public class Consumer { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{ "META-INF/spring" + "/dubbo-demo-consumer.xml"}); context.start(); DemoService demoService = (DemoService) context.getBean("demoService"); while (true) { try { Thread.sleep(1000); String hello = demoService.sayHello("world"); System.out.println(hello); } catch (Throwable throwable) { throwable.printStackTrace(); } } } }我断点了下这里
image-20210714015553406
走到服务消费方的最底层可以看到
在开始分析细节之前我们先在大脑风暴下大致流程
一次调用过程需要经历哪些步骤?
不用看dubbo代码都可以大概猜到:
要知道远程服务的地址, 把要调用的调用方法的具体信息告诉远程服务,让远程服务解析这些信息 远程服务根据这些信息找到对应的起玩实现类,进行调用,服务调用完了 调用结果原路返回,高防服务器何被然后客户端解析响应第一点,我们通过前几篇文章已经知道,消费方在发起调用的时候已经知晓了远程服务的地址
那么要调用的方法的具体信息包括哪些呢?
客户端肯定要告诉服务方调用的哪个接口,所以需要方法名、方法的参数类型、方法的参数值,然后有可能存在多个版本的情况,所以还得带上版本号,有这些数据后,服务方就可以精准的调用具体的方法了。
我这边将上面调用的例子先贴出来
mdata也就是我上面说的那些数据。
看到这个Request这里,应该就清楚了远程调用的基本原理了。
这个时候很容易就想到另一个问题,消费方和提供方是如何通信的?
消费方和提供方如何通信?
其实很简单,就是消费方和提供方通过协议进行了通信罢了,云南idc服务商dubbo的协议属于很常见的header+body 形式,而且也有特殊的字符 0xdabb,用来解决 TCP 网络粘包问题的。这种header是固定长度的,然后header里面填写 body 的长度是比较常见的做法,包括我司的游戏框架也是用这种模式。
我们可以看看dubbo协议的鬼样
可以看到,协议分为协议头和协议体,16 字节的头部主要携带了魔法数,也就是之前说的 0xdabb,然后一些请求的设置,消息体的长度等等,16 字节之后就是协议体了,包括协议版本、接口名字、亿华云计算接口版本、方法名字等等。
看到这里又很容易的引申出另一个问题了,协议是如何序列化的?
协议的序列化?
序列化的概念其实也简答, 在消费方先把Java对象转换为字节序列,这个过程也被称为对象的序列化,然后在服务方又把字节序列恢复为Java对象,这个过程称为对象的反序列化。
dubbo默认使用的是 hessian2 序列化协议,hessian2是阿里对于hessian进行行了修改的版本,应该还不错。
大致总结下,消费方发起调用,在那一刻,实际调用的是代理类,代理类最终调用的是Client,Client将 Java 的对象序列化生成协议体,然后通过网络传输给服务方,服务方Server接到这个请求之后,分发给业务线程池,由业务线程调用具体的实现方法。
先see see官网图吧
分析下消费方的调用链路
我们先看看服务消费方的调用逻辑,大家可以对着我这张图来
好了,我继续说
可以看到调用的接口生成的代理类是
而在invoke的时候会先释放掉部分不需要拦截的方法啦,比如toString什么的,这样正常吧,这些方法确实不需要拦截的嘛
看看RpcInvocation是什么
可以看到生成的 RpcInvocation 包含了方法名、参数类和参数值什么的。
接下来往里进一步看看MockClusterInvoker#invoke 代码,先解释下为啥会进来了MockClusterInvoker,看过文章 想学dubbo的看过来,2万字整理服务引入流程 应该可以理解这个过程,这个过程可以认为是套娃吧,A套B,B套C,一直套到最外层的invoker就是MockClusterInvoker,如果不理解这个过程可以往回看我的文章,很肝却很实用
这里可以看到就是判断配置里面有没有配置mock,mock 的话后续再展开说,继续看看this.invoker.invoke 的实现,实际上会调用 AbstractClusterInvoker#invoker
这里倒是涉及到了一个模板方法的设计模式,其实很简单,就是在抽象类中定好代码的执行骨架,之后将具体的实现延迟到子类中,由子类来决定逻辑,这样可以在不改变整体执行步骤的情况下修改步骤里面的实现,减少了重复的代码,也利于扩展,符合了开闭原则。
接下来看看
做了啥,这一步算是比较重要吧,单独拎出来讲讲
这里其实就是先由路由过滤一波,然后返回invoker
继续看看doInvoke的流程,我们默认使用的是 FailoverClusterInvoker,也就是失败自动切换的容错方式,
这里说说为啥默认是这个哦,其实从实际应用上来说,失败后自动切换下个服务实例还是比较符合场景的,如果想替换其他模式可以在xml里边配置
那我们继续看看那doInvoke的实现
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); // 重试次数 for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again checkInvokers(copyinvokers, invocation); } // 通过负载选择了一个invoker Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); // 上下文保留了调用过的invoker RpcContext.getContext().setInvokers((List) invoked); try { // 发起调用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }这个调用稍微总结一下就是FailoverClusterInvoker 拿到 Directory 返回的 Invoker 列表,并且经过路由之后,通过LoadBalance 从 Invoker 列表中选择一个 Invoker,也就是负载均衡啦,最后FailoverClusterInvoker会将参数传给选择出的那个 Invoker 实例的 invoke 方法,进行真正的远程调用。
后面发起调用的这个 invoke 又是调用抽象类中的 invoke 然后再调用子类的 doInvoker,抽象类中的方法很简单我就不展示了,我们直接看子类 DubboInvoker 的 doInvoke 方法。
protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; // 选择client if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否异步 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // 是否oneway方式发送,也就是需不需要返回值 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 超时时间 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 不需要返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 协议发送 currentClient.send(inv, isSent); // future直接是Null RpcContext.getContext().setFuture(null); // 返回空的结果 return new RpcResult(); } else if (isAsync) { // 异步发送 ResponseFuture future = currentClient.request(inv, timeout); // 设置future RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); // 返回空结果 return new RpcResult(); } else { // 同步发送 RpcContext.getContext().setFuture(null); // 直接调用了future.get去等待 return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }这里可以看到调用的方式有三种,分别是 oneway、异步、同步,我分别说说
oneway是比较常见的方式了,就是当我们不关心请求是否发送成功的情况下,就用 oneway 的方式发送,这种方式消耗最小。 异步调用,我们可以看到其实 Dubbo 天然支持异步的,client 发送请求之后会得到一个 ResponseFuture,然后把 future 包装一下塞到上下文中,这样用户就可以从上下文中拿到这个 future,然后调用方可以做了一波操作之后再调用 future.whenComplete什么的异步做点什么。 同步调用,Dubbo 底层也帮我们做了,可以看到在 Dubbo 源码中就调用了 future.get,所以给我们的感觉就是我调用了这个接口的方法之后就阻塞住了,必须要等待结果到了之后才能返回,所以就是同步的。那么这个回调是怎么做的?
其实很简单的,就是在调用的时候生成一个唯一的id,将回调和这个id缓存起来,然后将这个id传递到服务方,服务方在处理好业务后将结果和这个id重新发回到消费方,消费方拿到回调触发即可。
我们看看代码层面的
看看DefaultFuture是什么
看到啦,里边生成了唯一id,然后放到FUTURES这个并发容器里边,我们看看用的地方
这里比较清楚了吧,在收到返回的协议后将future拿出来去触发,基于这种思路,很多做回调的都可以用这种设计思路。
到这里服务消费方怎么去触发rpc的这个行为基本上就到这了,其实还是很清晰的,先是起服订阅的时候层层封装了invoker,然后搞出了一个代理对象注入到我们的接口中,然后在调用接口的时候就一个个调用invoker啦,最后就是发协议给服务提供方。
爱了爱了,简单清晰的逻辑。
接下来说说服务提供方的调用流程。
分析下提供方的调用电路
同样的,我们先看看服务提供方的调用链
这个流程也是特别长的,我这边只拎几个重点出来,先看下HeaderExchangeHandler,handleRequest
这里很容易理解啦,就是把request对象中的data取出来传到DubboProtocol.requestHandler中,这个data就是前面的解码后的DecodeableRpcInvocation对象它是Invocation接口的一个实现,我们可以看看里边有啥
可以看到调用信息都在这里啦,接下来就简单了,根据这些参数拿到对应的对象反射调用下就可以了,接下来看看DubboProtocol比较核心的reply方法
@Override public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; // 根据调用的参数拿到对应的invoker,其实就是之前服务暴露的时候有说过的Exporter里边取的 Invoker<?> invoker = getInvoker(channel, inv); // 这里边是对callback回来的一些处理,先不管 if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); // 最后invoke一下啦 return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); }getInvoker的逻辑也简单,之前的文章服务暴露有说过这个过程啦,其实就是从一个DubboProtocol.exporterMap内找到一个Exporter,再从里边取出invoker,那么key是啥呢,key其实是由URL生成的serviceKey,此时通过Invocation中的信息就可还原该serviceKey并且找到对应的Exporter和Invoker。再看看之前提过的 JavassistProxyFactory,这是一个给提供方的服务对象生成代理的工厂类
这个也说过啦,调用invoker.invoke时,通过反射调用最终的服务实现执行相关逻辑,入口就是这里了。因为这块之前的文章比较详细的说过,这里就不重复了。
到了这一步,调用就已经技术了,我们再看看调用结束后怎么将结果返回给服务消费方。
调用结束后,服务提供方方就会创建一个Response对象返回给服务消费方,那么自然在执行服务实现时会出现两种结果:成功和失败
如果成功的话,则把返回值设置到Response的result中,Response的status设置成OK
如果失败,把失败异常设置到Response的errorMessage中,status设置成SERVICE_ERROR
我们会回到HeaderExchangeHandler.received中的代码来看看,在handleRequest之后,调用channel.send把Response发送到客户端,这个channel封装客户端-服务端通信链路,最终会调用Netty框架,把响应写回到客户端。
惯例总结下
终于将调用这个过程说完啦,其实思路还是比较清晰的,不过最好是自己全程断点细看下啦,可以学到很多东西的。
说说后续安排:
SPI dubbo中的AOP机制 服务治理 ....等好几个模块,最后就是带大家撸一个RPC框架了,还是那句话,想学dubbo的可以持续关注这一系列。
很赞哦!(9)
下一篇: 提高数据中心温度,减少排放