封面来源:碧蓝航线 铁血、音符 & 誓言 活动CG
本文参考:尚硅谷Dubbo教程(dubbo经典之作) 雷丰阳老师主讲
参考资料:Dubbo 官方文档
1. 基本知识
1.1 分布式基础理论
什么是分布式系统
《分布式系统原理与范型》定义:
“分布式系统是若干独立计算机的集合,这些计算机对于用户来说就像单个相关系统”。
分布式系统(distributed system)是建立在网络之上的软件系统。
随着互联网的发展,网站应用的规模不断扩大,常规的垂直应用架构已无法应对,分布式服务架构以及流动计算架构势在必行,亟需一个治理系统确保架构有条不紊的演进。
发展演变
Dubbo官网背景介绍:背景
单一应用架构
当网站流量很小时,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。此时,用于简化增删改查工作量的数据访问框架(ORM)是关键。
垂直应用架构
当访问量逐渐增大,单一应用增加机器带来的加速度越来越小,提升效率的方法之一是将应用拆成互不相干的几个应用,以提升效率。此时,用于加速前端页面开发的 Web 框架(MVC)是关键。
这种模式下存在以下问题:
1、界面和业务逻辑无法实现分离
2、应用不可能完全独立,大量的应用之间需要交互
分布式服务架构
当垂直应用越来越多,应用之间交互不可避免,将核心业务抽取出来,作为独立的服务,逐渐形成稳定的服务中心,使前端应用能更快速的响应多变的市场需求。 此时,用于提高业务复用及整合的分布式服务框架(RPC)是关键。
流动计算架构
当服务越来越多,容量的评估,小服务资源的浪费等问题逐渐显现,此时需增加一个调度中心基于访问压力实时管理集群容量,提高集群利用率。此时,用于提高机器利用率的资源调度和治理中心(SOA)是关键。
RPC
参考链接:RPC原理解析 、 你应该知道的RPC原理
RPC【Remote Procedure Call】:远程过程调用,是一种进程间通信方式,是一种技术的思想,而不是规范。它允许程序调用另一个地址空间(通常是共享网络的另一台机器上)的过程或函数,而不用程序员显式编码这个远程调用的细节。即程序员无论是调用本地的还是远程的函数,本质上编写的调用代码基本相同。
也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数或方法,由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。
为什么要用RPC呢?就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,比如不同的系统间的通讯,甚至不同的组织间的通讯,由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用。RPC就是要像调用本地的函数一样去调远程函数。
RPC的两个核心模块:通讯、序列化。
RPC步骤时序图:
1、客户端(client)以本地调用方式(即以接口的方式)调用服务;
2、客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);
3、客户端通过 sockets 将消息发送到服务端;
4、服务端存根(server stub)收到消息后进行解码(将消息对象反序列化);
5、服务端存根(server stub)根据解码结果调用本地的服务;
6、本地服务执行并将结果返回给服务端存根(server stub);
7、服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
8、服务端(server)通过 sockets 将消息发送到客户端;
9、客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
10、客户端(client)得到最终结果。
RPC 的目标是要把 2、3、4、7、8、9 这些步骤都封装起来。
注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。
影响一个 RPC 框架的性能有两点:
1、能否快速在各个服务器之间建立起连接
2、能否快速序列化与反序列化
市面上的 RPC 框架有很多,比如:Dubbo、gRPC、Thrift、HSF(High Speed Service Framework)等。
1.2 Dubbo 基本概念
Dubbo |ˈdʌbəʊ| 官网:Dubbo
Dubbo 提供了六大核心能力:面向接口代理的高性能RPC调用,智能容错和负载均衡,服务自动注册和发现,高度可扩展能力,运行期流量调度,可视化的服务治理与运维。
Dubbo 的设计架构:
服务提供者(Provider) :暴露服务的服务提供方,服务提供者在启动时,向注册中心注册自己提供的服务。
服务消费者(Consumer) :调用远程服务的服务消费方,服务消费者在启动时,向注册中心订阅自己所需的服务,服务消费者,从提供者地址列表中,基于软负载均衡算法,选一台提供者进行调用,如果调用失败,再选另一台调用。
注册中心(Registry) :注册中心返回服务提供者地址列表给消费者,如果有变更,注册中心将基于长连接推送变更数据给消费者
监控中心(Monitor) :服务消费者和提供者,在内存中累计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心
1.3 Dubbo 与注册中心
为什么要使用 Dubbo?或者说 Dubbo 是为了解决什么呢?
在分布式服务架构里,A 服务器上的程序想要调用 B 服务器上的一段代码,就需要进行远程过程调用。不同服务之间如何进行通信、如何传递数据、如何调用?这就需要用到 Dubbo,而不需要我们自己解决。
Dubbo 可以实现服务自动注册与发现,这是什么意思呢?
假设网站规模已经变得很大,用户业务在服务器 1 号、2 号、3 号和 4 号上,支付业务在服务器 9 号、10 号 和 11 号。这时候前端订单 web 想要调用支付业务,那么 RPC 框架怎么知道支付业务在哪些服务器上呢?又或者,9 号服务器突然炸了,但是要调用支付业务,RPC 框架又要怎么自动发现出问题的服务器呢?
我们可以引入一种机制——注册中心!
为了能够动态感知到各个服务的状态,我们可以将所有服务都注册到注册中心里,也包括前端的程序。这个时候注册中心就相当于维护了一个清单,这个清单里有每个服务的信息(比如这些服务都在哪些服务器上)。
假设支付业务所在的 9 号服务器炸了,那么注册中心就会更改支付业务的信息。当需要调用支付业务时,Dubbo(或者说 RPC 框架)就要先问下注册中心,支付业务都在哪些服务器上啊。注册中心一看服务清单,就告诉 RPC 框架,这时候支付业务处在 10 号和 11 号服务器上。这时候,RPC 框架就会随机选择 10 号或 11 号服务器,或者根据负载均衡选择一个请求量较小的服务器,与这个服务器建立通信,传递数据,进行远程调用(我们可以把注册中心当成现实生活中的婚介网或者婚介所)。
为了使用 Dubbo,我们得先安装一个注册中心,Dubbo 支持的注册中心有很多,我们在此选择 ZooKeeper 作为注册中心。
具体使用与整合 SpringBoot 参考【SpringBoot 高级应用】一文。
简单来说,使用了 Dubbo 可以利用接口在一个 Module 中调用另一个 Module 的接口实现类的方法。在项目部署后,这两个 Module 很有可能会在两个服务器上。
2. Dubbo 配置
2.1 配置文件覆盖策略
在使用 Dubbo 时,需要进行一些配置,这些配置都书写在配置文件中,但是具体可以书写哪些配置标签,这些标签又有什么用,标签内又有哪些属性,可以参考官方文档。
在官方文档中,有【配置】和【Schema 配置参考手册】 两栏,在这两栏中就可以看到相关的配置和配置解释。
在官方文档的【Dubbo 配置】中,有名为【属性配置】的一栏,可以看到 Dubbo 配置文件的覆盖策略(重写与优先级):
优先级从高到低:
JVM -D 参数:当你部署或者启动应用时,它可以轻易地重写配置,比如,改变 Dubbo 协议端口;
XML:XML 中的当前配置会重写 dubbo.properties 中的;
Properties:默认配置,仅仅作用于以上两者没有配置时。
1、如果在 classpath 下有超过一个 dubbo.properties 文件,比如,两个 jar 包都各自包含了 dubbo.properties,dubbo 将随机选择一个加载,并且打印错误日志。
2、如果 id 没有在 protocol 中配置,将使用 name 作为默认属性。
2.2 启动检查
在开发中,会有一个服务提供者,还有一个服务消费者,它们都会被注册到注册中心中,消费者会从注册中心中获取提供者的调用地址。那如果注册中心中没有提供者,但是启动了消费者,那么启动期间消费者 默认 就会报错,消费者会发现它需要的服务不在注册中心中。
在官方文档中,【用法示例】有一栏名为【启动时检查】。
Dubbo 缺省会在启动时检查依赖的服务是否可用,不可用时会抛出异常,阻止 Spring 初始化完成,以便上线时,能及早发现问题,默认 check="true",将开启检查。
也可以通过 check="false" 关闭检查。比如,测试时,有些服务不关心,或者出现了循环依赖,必须有一方先启动。
在 SpringBoot 中可以这样配置:
1 2 3 dubbo.reference.check = false
dubbo.consumer.check = false
dubbo.registry.check = false
dubbo.reference.check=false,强制改变所有 reference 的 check 值,就算配置中有声明,也会被覆盖。
dubbo.consumer.check=false,是设置 check 的缺省值,如果配置中有显式的声明,如:<dubbo:reference check="true"/>,不会受影响。
dubbo.registry.check=false,前面两个都是指订阅成功,但提供者列表是否为空是否报错,如果注册订阅失败时,也允许启动,需使用此选项,将在后台定时重试。简单来说,就是检查注册中心,如果设置为 false,就算没有注册中心也不会报错,等到注册中心启动了,就会连接上注册中心。
2.3 超时设置
服务消费者在引用服务提供方时,可能由于网络等原因,服务提供方执行一个方法用了很长时间,导致大量线程都在此阻塞,就会引起性能下降。为了解决这个问题,我们可以指定超时设置 timeout。
指定了超时设置后,一个方法在指定的时间内没有返回,那么就会终止方法,不让线程大量阻塞。
超时设置的单位是毫秒。
在官方文档中,可以查看【schema 参考配置手册】,在 <dubbo:reference> 中的 timeout 属性是可选的,默认使用 <dubbo:consumer> 的 timeout。而 <dubbo:consumer> 的 timeout 默认值 是 1000 毫秒,也就是 1 秒。
与启动检查一样,在 SpringBoot 中可以这样配置:
1 2 3 dubbo.consumer.timeout = 3000
dubbo.registry.timeout = 3000
dubbo.provider.timeout = 3000
如果需要配置某个接口的超时设置,可以配置 @Reference 的 timeout 属性。
为了便于了解,可以看下 XML 中不同粒度配置的覆盖关系。
以 timeout 为例,下图显示了配置的查找顺序,其它 retries、loadbalance、actives 等类似:
方法级优先,接口级次之,全局配置再次之。
如果级别一样,则消费方优先,提供方次之(顾客是上帝 😋)。
其中,服务提供方配置,通过 URL 经由注册中心传递给消费方。
关于覆盖关系,简单来说就两点:精确优先和消费者优先。
官方文档建议由服务提供方设置超时,因为一个方法需要执行多长时间,服务提供方更清楚,如果一个消费方同时引用多个服务,就不需要关心每个服务的超时设置。
理论上 ReferenceConfig 中除了 interface 这一项,其他所有配置项都可以缺省不配置,框架会自动使用 ConsumerConfig、ServiceConfig、ProviderConfig 等提供的缺省配置。
2.4 重试次数
前面说了超时设置,超时设置一般会与重试次数搭配使用。
当某个服务由于各种原因,比如网络原因,导致了超时,造成远程调用失败,可以通过设置重试次数来多尝试几次。
retries 属性表示远程服务调用重试次数,不包括第一次调用,不需要重试就设置为0,仅在 cluster 为 failback / failover时有效。
在官方文档中,可以查看【schema 参考配置手册】,在 <dubbo:reference> 中的 retries 属性是可选的,默认使用 <dubbo:consumer> 的 retries。而 <dubbo:consumer> 的 retries 默认值 是 2。
如果设置 retries = 3,假设第一次调用超时了,那么 还会 尝试调用 3 次。
假设某一业务的服务提供者有 3 个,且设置了超时,并将重试次数设置为 3。服务消费者在调用提供者的方法时产生了超时,服务消费者不会在某一个服务提供者上“吊死”,会尝试其他几个服务提供者。
一般来说,我们会在幂等操作的方法上设置重试次数,而不会在非幂等操作的方法上设置重试次数。
那么什么是幂等,什么又是非幂等?
所谓幂等,就是 就是一个操作,不论执行多少次,产生的效果和返回的结果都是一样的 ,比如查询、删除、修改。
而非幂等就是一个操作,每执行一次,都会产生新的效果或返回新的结果,比如添加。
那要怎么实现幂等呢?可以使用 MVCC 方案、去重表、悲观锁、token 机制、全局唯一 ID 等方法,具体就不在此讨论了。
2.5 多版本
当某一个接口出现了不兼容的升级, 我们不能让系统的所有用户都用上新功能,因为新功能很有可能不稳定,进而导致整个系统不稳定。
可以让系统的一部分用户先用上新功能,另一部分用户仍然使用功能的老版本,当新功能的版本都稳定了,才将另一部分用户使用的老版本全部替换成新版本。
这也叫作灰度发布,百度百科对【灰度发布】是这样介绍的:
在 Dubbo 中,需要使用 version 来指定版本。官方文档是这么使用的:
老版本服务提供者配置:
1 < dubbo:service interface = "com.foo.BarService" version = "1.0.0" />
新版本服务提供者配置:
1 < dubbo:service interface = "com.foo.BarService" version = "2.0.0" />
老版本服务消费者配置:
1 < dubbo:reference id = "barService" interface = "com.foo.BarService" version = "1.0.0" />
新版本服务消费者配置:
1 < dubbo:reference id = "barService" interface = "com.foo.BarService" version = "2.0.0" />
在 Dubbo 2.2.0 以上版本,可以指定随机使用版本:
1 < dubbo:reference id = "barService" interface = "com.foo.BarService" version = "*" />
在 SpringBoot 中,可以配置 @Reference 注解的 version 属性值。
2.6 本地存根
官方文档如是说:
远程服务后,客户端通常只剩下接口,而实现全在服务器端,但提供方有些时候想在客户端也执行部分逻辑,比如:做 ThreadLocal 缓存,提前验证参数,调用失败后伪造容错数据等等,此时就需要在 API 中带上 Stub,客户端生成 Proxy 实例,会把 Proxy 通过构造函数传给 Stub,然后把 Stub 暴露给用户,Stub 可以决定要不要去调 Proxy。
简单来说:消费者利用接口调用提供者的方法前,想进行一些其他操作,比如判断某些值是否符合条件,当符合条件时才调用提供者的方法,不符合条件又有另一套逻辑,这就需要用到本地存根。
那要怎么实现本地存根呢?
首先需要在编写接口的实现方法,在实现方法内要提供一个构造方法,比如像官方文档中一样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package com.foo ;
public class BarServiceStub implements BarService {
private final BarService barService ;
// 构造函数传入真正的远程代理对象
public BarServiceStub ( BarService barService ){
this . barService = barService;
}
public String sayHello ( String name ) {
// 此代码在客户端执行, 你可以在客户端做 ThreadLocal 本地缓存,或预先验证参数是否合法,等等
try {
return barService . sayHello (name); // 调用代理对象的方法
} catch ( Exception e ) {
// 你可以容错,可以做任何AOP拦截事项
return "容错数据" ;
}
}
}
然后还需要进行配置,比如:
1 < dubbo:service interface = "com.foo.BarService" stub = "true" />
或者
1 < dubbo:service interface = "com.foo.BarService" stub = "true" />
在 SpringBoot 中,可以配置 @Reference 注解的 stub 属性值。
2.7 整合 SpringBoot 的方式
首先是【SpringBoot 高级应用】一文中采用的方法:导入 dubbo-starter,在 application.properties 中配置属性,使用 @Service 注解暴露服务,使用 @Reference 注解引用服务。
如果想要做到方法级别的精确配置,还可以继续使用 dubbo.xml,这时候就不在 application.properties 配置文件中进行配置,也不需要使用 @Service 注解,但是需要在主启动类上使用注解 @ImportResource 导入 dubbo.xml 配置文件。如:
1 2 3 4 5 6 7 @ SpringBootApplication
@ ImportResource ( locations = "classpath:provider.xml" )
public class ConsumerServerApplication {
public static void main ( String [] args ) {
SpringApplication . run ( ConsumerServerApplication . class , args);
}
}
除了以上两种方式外,还可以使用注解 API,简单来说就是编写配置类,然后将每一个组件手动创建到容器中。可以参考官方文档中【配置】一栏中的【注解配置】。
3. 高可用
3.1 ZK 宕机与 Dubbo 直连
现象:ZooKeeper 注册中心宕机,还可以消费 Dubbo 暴露的服务。
原因:
1、健壮性:
监控中心宕掉不影响使用,只是丢失部分采样数据
数据库宕掉后,注册中心仍能通过缓存提供服务列表查询,但不能注册新服务
注册中心对等集群,任意一台宕掉后,将自动切换到另一台
注册中心全部宕掉后,服务提供者和服务消费者仍能通过本地缓存通讯
服务提供者无状态,任意一台宕掉后, 不影响使用
服务提供者全部宕掉后,服务消费者应用将无法使用,并无限次重连等待服务提供者恢复
2、高可用:通过设计,减少系统不能提供服务的时间
其实在使用时,可以绕过注册中心,直接使用 Dubbo 直连。也就是说,没有注册中心,Dubbo 调用也是没有问题的。
直连配置也很简单,只需要在 注解上进行配置 url 属性值即可,如:
1 2 @ Reference ( url = "127.0.0.1:20882" )
TicketService ticketService ;
3.2 负载均衡机制
在集群负载均衡时,Dubbo 提供了多种均衡策略,缺省为 random 随机调用。
负载均衡策略
Random LoadBalance
随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
RoundRobin LoadBalance
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题。比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
LeastActive LoadBalance
最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
ConsistentHash LoadBalance
一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。算法参见:http://en.wikipedia.org/wiki/Consistent_hashing
缺省只对第一个参数 Hash,如果要修改,请配置:
1 < dubbo:parameter key = "hash.arguments" value = "0, 1" />
缺省用 160 份虚拟节点,如果要修改,请配置:
1 < dubbo:parameter key = "hash.nodes" value = "320" />
Dubbo 默认 采用的是基于权重的 随机 负载均衡机制。
我们搜索 LoadBalance 接口,可以看到 @SPI 接口,就明白了默认采用的什么策略:
1 2 3 4 5 @ SPI ( "random" )
public interface LoadBalance {
@ Adaptive ({ "loadbalance" })
< T > Invoker < T > select ( List < Invoker < T >> var1 , URL var2 , Invocation var3 ) throws RpcException ;
}
搜索 LoadBalance 接口的实现类,可以看到一个名为 AbstractLoadBalance 的抽象类,再看这个抽象类的子类,就可以看到四种负载均衡机制:
如果想要修改 Dubbo 的负载均衡策略,可以参考官方文档【用法示例】的【负载均衡】:
服务端服务级别
1 < dubbo:service interface = "..." loadbalance = "roundrobin" />
客户端服务级别
1 < dubbo:reference interface = "..." loadbalance = "roundrobin" />
服务端方法级别
1 2 3 < dubbo:service interface = "..." >
< dubbo:method name = "..." loadbalance = "roundrobin" />
</ dubbo:service >
客户端方法级别
1 2 3 < dubbo:reference interface = "..." >
< dubbo:method name = "..." loadbalance = "roundrobin" />
</ dubbo:reference >
在 SpringBoot 中,可以设置 @Reference 或 @Service 注解的 loadbalance 属性。
1 @ Reference ( loadbalance = "roundrobin" )
3.3 服务降级
什么是服务降级
当服务器压力剧增的情况下,根据实际业务情况及流量,对一些服务和页面有策略的不处理或换种简单的方式处理,从而释放服务器资源以保证核心交易正常运作或高效运作。
可以通过服务降级功能临时屏蔽某个出错的非关键服务,并定义降级后的返回策略。
官方文档有:
向注册中心写入动态配置覆盖规则:
1 2 3 RegistryFactory registryFactory = ExtensionLoader . getExtensionLoader ( RegistryFactory . class ). getAdaptiveExtension ();
Registry registry = registryFactory . getRegistry ( URL . valueOf ( "zookeeper://10.20.153.10:2181" ));
registry . register ( URL . valueOf ( "override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null" ));
其中:
1、mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。(屏蔽)
2、还可以改为 mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。(容错)
以上两种可以在 dubbo-admin 中服务治理的消费者中对服务进行设置。
服务降级在 2.2.0 以上版本支持。
3.4 服务容错
当集群调用失败时,Dubbo 提供了多种容错方案,缺省为 failover 重试。
集群容错模式
Failover Cluster
失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries="2" 来设置重试次数(不含第一次)。该配置为缺省配置。
重试次数配置如下:
1 < dubbo:service retries = "2" />
或
1 < dubbo:reference retries = "2" />
或
1 2 3 < dubbo:reference >
< dubbo:method name = "findFoo" retries = "2" />
</ dubbo:reference >
Failfast Cluster
快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
Failsafe Cluster
失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
Failback Cluster
失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
Forking Cluster
并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks="2" 来设置最大并行数。
Broadcast Cluster
广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。
服务容错在 2.1.0 开始支持。
集群模式配置
按照以下示例在服务提供方和消费方配置集群模式:
1 < dubbo:service cluster = "failsafe" />
或
1 < dubbo:reference cluster = "failsafe" />
3.5 Hystrix
要实现服务容错,我们一般与 Hystrix 进行整合。
Hystrix 旨在通过控制那些访问远程系统、服务和第三方库的节点,从而对延迟和故障提供更强大的容错能力。Hystrix 具备拥有回退机制和断路器功能的线程和信号隔离,请求缓存和请求打包,以
及监控和配置等功能。
1、配置 spring-cloud-starter-netflix-hystrix
SpringBoot 官方提供了对 Hystrix 的集成,直接在 pom.xml 里加入依赖:
1 2 3 4 5 < dependency >
< groupld >org.springframework.cloud</ groupld >v
< artifactld >spring-cloud-starter-netflix-hystrix</ artifactld >v
< version >1.4.4.RELEASE</ version >
</ dependency >
然后在 Application 类上增加 @EnableHystrix 来启用hystrix starter:
1 2 3 @ SpringBootApplication
@ EnableHystrix
public class ProviderApplication {}
2、配置 Provider 端
在 Dubbo 的 Provider (服务提供者实现类的方法)上增加 @HystrixCommand 配置,这样子调用就会经过 Hystrix 代理。
1 2 3 4 5 6 7 8 9 10 11 12 @ Service ( version = "1.0.0" )
public class HelloServiceImpl implements HelloService {
@ HystrixCommand ( commandProperties = {
@ HystrixProperty ( name = "circuitBreaker.requestVolumeThreshold" , value = "10" ) ,
@ HystrixProperty ( name = "execution.isolation.thread.timeoutInMilliseconds" , value = "2000" ) })
@ Override
public String sayHello ( String name ) {
// System.out.println("async provider received: " + name);
// return "annotation: hello, " + name;
throw new RuntimeException ( "Exception to show hystrix enabled." );
}
}
3、在服务消费者调用服务提供者的方法上添加 @HystrixCommand 注解并指定 fallbackMethod 属性,重写 fallbackMethod 指定的方法。这样当出现服务容错时,就会调用指定的方法。
1 2 3 4 5 6 7 8 9 10 @ Reference ( version = "1.0.0" )
private HelloService demoService ;
@ HystrixCommand ( fallbackMethod = "reliable" )
public String doSayHello ( String name) {
return demoService . sayHello (name);
}
public String reliable ( String name) {
return "hystrix fallback value" ;
}
4. Dubbo 原理
4.1 RPC 和 Netty 原理
RPC 原理
1、客户端(client)以本地调用方式(即以接口的方式)调用服务;
2、客户端存根(client stub)接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体(将消息体对象序列化为二进制);
3、客户端通过 sockets 将消息发送到服务端;
4、服务端存根( server stub)收到消息后进行解码(将消息对象反序列化);
5、服务端存根( server stub)根据解码结果调用本地的服务;
6、本地服务执行并将结果返回给服务端存根( server stub);
7、服务端存根( server stub)将返回结果打包成消息(将结果消息对象序列化);
8、服务端(server)通过 sockets 将消息发送到客户端;
9、客户端存根(client stub)接收到结果消息,并进行解码(将结果消息发序列化);
10、客户端(client)得到最终结果。
RPC的目标是要把 2、3、4、7、8、9 这些步骤都封装起来,这些细节对用户来说是透明的,不可见的。
Netty 通信原理
Netty 是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它极大地简化并简化了 TCP 和 UDP 套接字服务器等网络编程。
参考链接:Java中BIO和NIO
BIO(Blocking IO):
NIO(Non-Blocking IO):
Selector 一般称为选择器,也可以翻译为多路复用器。
Connect(连接就绪)、Accept(接受就绪)、Read(读就绪)、Write (写就绪)
Netty 基本原理:
4.2 框架设计
在官方文档的【开发指南】的【框架设计】一栏中,有这样一幅图:
这幅图就很好的解释了 Dubbo 的整体设计。
官方文档对这幅图也做了很好的解释:
图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口。
图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系,每一层都可以剥离上层被复用,其中,Service 和 Config 层为 API,其它各层均为 SPI。
图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类。
图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法。
各层说明
config 配置层 :对外配置接口,以 ServiceConfig, ReferenceConfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类
proxy 服务代理层 :服务接口透明代理,生成服务的客户端 Stub 和服务器端 Skeleton, 以 ServiceProxy 为中心,扩展接口为 ProxyFactory
registry 注册中心层 :封装服务地址的注册与发现,以服务 URL 为中心,扩展接口为 RegistryFactory, Registry, RegistryService
cluster 路由层 :封装多个提供者的路由及负载均衡,并桥接注册中心,以 Invoker 为中心,扩展接口为 Cluster, Directory, Router, LoadBalance
monitor 监控层 :RPC 调用次数和调用时间监控,以 Statistics 为中心,扩展接口为 MonitorFactory, Monitor, MonitorService
protocol 远程调用层 :封装 RPC 调用,以 Invocation, Result 为中心,扩展接口为 Protocol, Invoker, Exporter
exchange 信息交换层 :封装请求响应模式,同步转异步,以 Request, Response 为中心,扩展接口为 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer
transport 网络传输层 :抽象 mina 和 netty 为统一接口,以 Message 为中心,扩展接口为 Channel, Transporter, Client, Server, Codec
serialize 数据序列化层 :可复用的一些工具,扩展接口为 Serialization, ObjectInput, ObjectOutput, ThreadPool
以上内容均摘自 Dubbo 官方文档【开发指南】的【框架设计】一栏,其他信息可以参考官方文档。
4.3 标签解析
标签解析简单来说就是配置文件(XML)的解析。
这个配置文件也是 Spring 的配置文件,启动的时候也是以 Spring 的方式来加载启动。Spring 在解析配置文件的标签时,都会用到一个主接口 BeanDefinitionParser:
1 2 3 4 public interface BeanDefinitionParser {
@ Nullable
BeanDefinition parse ( Element var1 , ParserContext var2 );
}
再看下这个接口的继承树,可以看到这样一个实现类 DubboBeanDefinitionParser:
这个类就是 Dubbo 的标签解析器,在这个类中,有一个名为 parse() 的方法,这个方法就是用来解析标签的。
在解析不同的标签时,参数 beanClass 传入的值也不一样。
1 2 3 4 5 private static BeanDefinition parse ( Element element ,
ParserContext parserContext ,
Class < ? > beanClass , boolean required) {
// ...
}
那为什么会这样呢?
DubboBeanDefinitionParser 类有一个构造方法,构造方法有一个参数就是 beanClass:
1 2 3 4 public DubboBeanDefinitionParser ( Class < ? > beanClass , boolean required) {
this . beanClass = beanClass ;
this . required = required ;
}
那这个构造方法是怎么执行的呢?
在 DubboNamespaceHandler 类中执行一个初始化 init() 方法,真是因为执行了这个初始化方法,才导致 beanClass 参数的多样性:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class DubboNamespaceHandler extends NamespaceHandlerSupport {
public DubboNamespaceHandler () {
}
public void init () {
this . registerBeanDefinitionParser ( "application" , new DubboBeanDefinitionParser ( ApplicationConfig . class , true ));
this . registerBeanDefinitionParser ( "module" , new DubboBeanDefinitionParser ( ModuleConfig . class , true ));
this . registerBeanDefinitionParser ( "registry" , new DubboBeanDefinitionParser ( RegistryConfig . class , true ));
this . registerBeanDefinitionParser ( "config-center" , new DubboBeanDefinitionParser ( ConfigCenterBean . class , true ));
this . registerBeanDefinitionParser ( "metadata-report" , new DubboBeanDefinitionParser ( MetadataReportConfig . class , true ));
this . registerBeanDefinitionParser ( "monitor" , new DubboBeanDefinitionParser ( MonitorConfig . class , true ));
this . registerBeanDefinitionParser ( "metrics" , new DubboBeanDefinitionParser ( MetricsConfig . class , true ));
this . registerBeanDefinitionParser ( "provider" , new DubboBeanDefinitionParser ( ProviderConfig . class , true ));
this . registerBeanDefinitionParser ( "consumer" , new DubboBeanDefinitionParser ( ConsumerConfig . class , true ));
this . registerBeanDefinitionParser ( "protocol" , new DubboBeanDefinitionParser ( ProtocolConfig . class , true ));
this . registerBeanDefinitionParser ( "service" , new DubboBeanDefinitionParser ( ServiceBean . class , true ));
this . registerBeanDefinitionParser ( "reference" , new DubboBeanDefinitionParser ( ReferenceBean . class , false ));
this . registerBeanDefinitionParser ( "annotation" , new AnnotationBeanDefinitionParser ());
}
static {
Version . checkDuplicate ( DubboNamespaceHandler . class );
}
}
简单来说:
解析配置文件的目的就是让配置文件的每个标签的每个属性值都解析出来,然后保存到标签对应的 xxxConfig 类中。
只不过需要注意的是,<dubbo:service> 和 <dubbo:reference> 标签对应的类是 xxxBean。
4.4 服务暴露
前面说到 <dubbo:service> 标签对应的类是 ServiceBean,研究研究这个类:
1 2 3 4 5 6 7 8 9 public class ServiceBean < T > extends ServiceConfig < T > implements
InitializingBean ,
DisposableBean ,
ApplicationContextAware ,
ApplicationListener < ContextRefreshedEvent >,
BeanNameAware ,
ApplicationEventPublisherAware {
// ...
}
这个类实现了很多接口,这里面有两个接口需要注意。一个名为 InitializingBean,另一个名为 ApplicationListener。
当组件创建完对象以后,会调用 InitializingBean 中的 afterPropertiesSet() 方法(属性设置完以后回调方法)。
ApplicationListener 接口的泛型是 ContextRefreshedEvent,就是说当整个 IoC 容器都刷新完成(IoC 容器内所有对象都创建完以后),会回调 ApplicationListener 接口中的 onApplicationEvent() 方法。
简单来说: ServiceBean 会在容器创建完对象以后调用 afterPropertiesSet() 方法,还会在 IoC 容器启动完以后调用 onApplicationEvent() 方法。
执行 afterPropertiesSet() 方法相当于将我们配置的 <dubbo:service> 内的信息都保存起来。
执行 onApplicationEvent() 方法时,如果接口的方法是需要暴露的,但还没暴露,并且不是延迟暴露,就会调用 export() 方法(服务暴露方法)。
执行 export() 方法时,会检查判断并获取信息,但是最后会执行一个名为 doExport() 方法,这个方法用来执行暴露。
在 doExport() 方法中,同样会进行检查判断并获取信息,最后会执行一个名为 doExportUrls() 的方法,这个方法会执行暴露 URL 地址。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void doExportUrls () {
// 加载注册中心信息
List < URL > registryURLs = this . loadRegistries ( true );
// 获取暴露的协议与端口
Iterator var2 = this . protocols . iterator ();
while ( var2 . hasNext () ) {
ProtocolConfig protocolConfig = (ProtocolConfig) var2 . next ();
String pathKey =
URL . buildKey ((String) this
. getContextPath (protocolConfig). map ((p) -> {
return p + "/" + this . path ;
}). orElse ( this . path ), this . group , this . version );
ProviderModel providerModel =
new ProviderModel (pathKey , this . ref , this . interfaceClass ) ;
ApplicationModel . initProviderModel (pathKey, providerModel);
this . doExportUrlsFor1Protocol (protocolConfig, registryURLs);
}
}
在 doExportUrls() 方法中最后有一个名为 doExportUrlsFor1Protocol() 的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void doExportUrlsFor1Protocol ( ProtocolConfig protocolConfig , List < URL > registryURLs) {
// 省略其他代码
Invoker < ? > invoker = PROXY_FACTORY . getInvoker ( this . ref , this . interfaceClass , registryURL . addParameterAndEncoded ( "export" , url . toFullString ()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker (invoker , this ) ;
Exporter < ? > exporter = protocol . export (wrapperInvoker);
this . exporters . add (exporter);
}
}
} else {
Invoker < ? > invoker = PROXY_FACTORY . getInvoker ( this . ref , this . interfaceClass , url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker (invoker , this ) ;
Exporter < ? > exporter = protocol . export (wrapperInvoker);
this . exporters . add (exporter);
// 省略其他代码
}
在上述代码中,利用 PROXY_FACTORY (代理工厂)获取到包含接口和接口实现类的 Invoker 对象(执行者)。这个执行者其实就是相当于将实现类和 URL 包装了一下。
然后还会把 Invoker 对象包装成 DelegateProviderMetaDataInvoker 对象。
再然后会执行 protocol.export(wrapperInvoker) 将我们的执行者进行暴露。
这个 protocol 是啥?在源代码中有:
1 2 3 4 5 // 基于 Java 的 SPI 机制
private static final Protocol protocol =
(Protocol) ExtensionLoader
. getExtensionLoader ( Protocol . class )
. getAdaptiveExtension ();
由于会将信息注册到注册中心,因此来看看 RegistryProtocol 类中的服务暴露方法 export():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public < T > Exporter < T > export ( Invoker < T > originInvoker) throws RpcException {
URL registryUrl = this . getRegistryUrl (originInvoker);
URL providerUrl = this . getProviderUrl (originInvoker);
URL overrideSubscribeUrl = this . getSubscribedOverrideUrl (providerUrl);
RegistryProtocol . OverrideListener overrideSubscribeListener = new RegistryProtocol . OverrideListener (overrideSubscribeUrl, originInvoker);
this . overrideListeners . put (overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = this . overrideUrlWithConfig (providerUrl, overrideSubscribeListener);
// 进行本地暴露
RegistryProtocol . ExporterChangeableWrapper < T > exporter = this . doLocalExport (originInvoker, providerUrl);
Registry registry = this . getRegistry (originInvoker);
URL registeredProviderUrl = this . getRegisteredProviderUrl (providerUrl, registryUrl);
// 在提供者消费者注册表中注册提供者
// originInvoker: 提供者 registryUrl: 注册中心地址
// registeredProviderUrl: 提供者 URL 地址
ProviderInvokerWrapper < T > providerInvokerWrapper = ProviderConsumerRegTable . registerProvider (originInvoker, registryUrl, registeredProviderUrl);
boolean register = registeredProviderUrl . getParameter ( "register" , true );
if (register) {
this . register (registryUrl, registeredProviderUrl);
providerInvokerWrapper . setReg ( true );
}
registry . subscribe (overrideSubscribeUrl, overrideSubscribeListener);
exporter . setRegisterUrl (registeredProviderUrl);
exporter . setSubscribeUrl (overrideSubscribeUrl);
return new RegistryProtocol . DestroyableExporter (exporter);
}
在 doLocalExport() 方法中有:
1 2 3 4 5 6 7 8 private < T > RegistryProtocol . ExporterChangeableWrapper < T > doLocalExport ( Invoker < T > originInvoker , URL providerUrl) {
String key = this . getCacheKey (originInvoker);
return ( RegistryProtocol . ExporterChangeableWrapper ) this . bounds . computeIfAbsent (key, (s) -> {
Invoker < ? > invokerDelegate = new RegistryProtocol. InvokerDelegate (originInvoker, providerUrl);
// 再进行服务暴露
return new RegistryProtocol. ExporterChangeableWrapper ( this . protocol . export (invokerDelegate), originInvoker);
});
}
由于我们使用了 Dubbo 协议,在上一步执行服务暴露时就会来到 DubboProtocol 类中的服务暴露方法 export():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public < T > Exporter < T > export ( Invoker < T > invoker) throws RpcException {
URL url = invoker . getUrl ();
String key = serviceKey (url) ;
// 将 url 地址包装成 DubboExporter
DubboExporter < T > exporter = new DubboExporter (invoker , key , this . exporterMap ) ;
this . exporterMap . put (key, exporter);
Boolean isStubSupportEvent = url . getParameter ( "dubbo.stub.event" , false );
Boolean isCallbackservice = url . getParameter ( "is_callback_service" , false );
if (isStubSupportEvent && ! isCallbackservice) {
String stubServiceMethods = url . getParameter ( "dubbo.stub.event.methods" );
if (stubServiceMethods != null && stubServiceMethods . length () != 0 ) {
this . stubServiceMethodsMap . put ( url . getServiceKey (), stubServiceMethods);
} else if ( this . logger . isWarnEnabled () ) {
this . logger . warn ( new IllegalStateException ( "consumer [" + url . getParameter ( "interface" ) + "], has set stubproxy support event ,but no stub methods founded." ));
}
}
this . openServer (url); // 打开服务器
this . optimizeSerialization (url);
return exporter ;
}
上述方法最后有一个 openServer() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private void openServer ( URL url) {
String key = url . getAddress (); // 先获取 URL 地址
boolean isServer = url . getParameter ( "isserver" , true );
if (isServer) {
ExchangeServer server = (ExchangeServer) this . serverMap . get (key);
if (server == null ) {
synchronized ( this ) {
server = (ExchangeServer) this . serverMap . get (key);
if (server == null ) {
// 创建服务器
this . serverMap . put (key, this . createServer (url));
}
}
} else {
server . reset (url);
}
}
}
看一手 createServer() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private ExchangeServer createServer ( URL url) {
url = URLBuilder . from (url). addParameterIfAbsent ( "channel.readonly.sent" , Boolean . TRUE . toString ()). addParameterIfAbsent ( "heartbeat" , String . valueOf ( 60000 )). addParameter ( "codec" , "dubbo" ). build ();
String str = url . getParameter ( "server" , "netty" );
if (str != null && str . length () > 0 && ! ExtensionLoader . getExtensionLoader ( Transporter . class ). hasExtension (str) ) {
throw new RpcException ( "Unsupported server type: " + str + ", url: " + url) ;
} else {
ExchangeServer server ;
try {
// 绑定服务器和请求处理器
server = Exchangers . bind (url, this . requestHandler );
} catch ( RemotingException var5 ) {
throw new RpcException ( "Fail to start server(url: " + url + ") " + var5 . getMessage (), var5) ;
}
str = url . getParameter ( "client" );
if (str != null && str . length () > 0 ) {
Set < String > supportedTypes = ExtensionLoader . getExtensionLoader ( Transporter . class ). getSupportedExtensions ();
if ( ! supportedTypes . contains (str) ) {
throw new RpcException ( "Unsupported client type: " + str) ;
}
}
return server ;
}
}
上述方法出现了一个名为 bind() 的方法,如果我们一层一层进入这些方法,我们就会看到 Netty 的底层。
对于 openServer() 方法来说就是创建服务器暴露服务,相当于启动 Netty 服务器,然后监听 20880 端口。
回到 RegistryProtocol 类中的服务暴露方法 export(),这个方法中还有一个名为 registerProvider() 的方法,进入这个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 public static < T > ProviderInvokerWrapper < T > registerProvider ( Invoker < T > invoker , URL registryUrl , URL providerUrl) {
ProviderInvokerWrapper < T > wrapperInvoker = new ProviderInvokerWrapper (invoker , registryUrl , providerUrl) ;
String serviceUniqueName = providerUrl . getServiceKey ();
ConcurrentMap < Invoker , ProviderInvokerWrapper > invokers = (ConcurrentMap) providerInvokers . get (serviceUniqueName);
if (invokers == null ) {
providerInvokers . putIfAbsent (serviceUniqueName, new ConcurrentHashMap ());
// 服务提供者执行者
invokers = (ConcurrentMap) providerInvokers . get (serviceUniqueName);
}
// invokers 添加真正的服务
invokers . put (invoker, wrapperInvoker);
return wrapperInvoker ;
}
这个方法相当于保存了一些信息,在这个方法所在类中有这样两个成员变量:
1 2 3 4 5 public class ProviderConsumerRegTable {
public static ConcurrentHashMap < String , ConcurrentMap < Invoker , ProviderInvokerWrapper >> providerInvokers = new ConcurrentHashMap () ;
public static ConcurrentHashMap < String , Set < ConsumerInvokerWrapper >> consumerInvokers = new ConcurrentHashMap () ;
// 省略其他代码
}
这两个成员变量相当于保存了每个 URL 地址对应的服务提供者执行器 ProviderInvokerWrapper 和服务消费者执行器 ConsumerInvokerWrapper。而在执行器里才有真正的服务对象,即:接口的实现类。
至此,服务就暴露完成了。
Dubbo 服务暴露过程
具体服务暴露过程可以参考这篇文章:Dubbo原理和源码解析之服务暴露
4.5 服务引用
<dubbo:service> 标签是用来做服务暴露的,而 <dubbo:reference> 标签就是用来做服务引用的(它俩也对应着注解 @Service 和 @Reference)。
与 <dubbo:service> 标签一样,<dubbo:reference> 标签也对应着一个解析类,名为 ReferenceBean。
这个类比较特殊,它实现了 FactoryBean,它是 Spring 的工厂 Bean。当我们需要获取 <dubbo:reference> 标签中配置的 interface 属性值时,需要通过依赖注入的方式(@Autowried)获取,这个时候会就前往 Spring 的容器中获取相关的 Bean。
那怎么获取呢?
由于 ReferenceBean 是一个工厂 Bean,就会调用 getObject() 方法:
1 2 3 public Object getObject () {
return this . get ();
}
这个方法返回的对象,就会作为标签配置返回的对象。
在 get() 方法中有:
1 2 3 4 5 6 7 8 9 10 11 12 public synchronized T get () {
this . checkAndUpdateSubConfigs ();
if ( this . destroyed ) {
throw new IllegalStateException ( "The invoker of ReferenceConfig(" + this . url + ") has already destroyed!" ) ;
} else {
if ( this . ref == null ) { // 如果引用为 null
this . init (); // 进行初始化
}
return this . ref ;
}
}
再看下 init() 方法是个怎样的流程。在这个方法中前面大部分都是些信息检查、获取属性等操作,但是这个方法中也有一个很重要的方法:
1 2 3 4 5 6 7 8 9 10 private void init () {
// 省略部分代码
this . ref = this . createProxy (map); // 创建代理对象
String serviceKey
= URL . buildKey ( this . interfaceName , this . group , this . version );
ApplicationModel . initConsumerModel (serviceKey,
this . buildConsumerModel (serviceKey,
attributes));
this . initialized = true ;
}
createProxy() 方法就是创建代理对象的方法,源码中传入了 map,这个 map 保存了标签的配置。
在 createProxy() 方法中,有这样一段代码:
1 2 3 4 5 if ( this . urls . size () == 1 ) {
// 引用的协议远程引用 interfaceClass 接口
this . invoker = REF_PROTOCOL . refer ( this . interfaceClass ,
(URL) this . urls . get ( 0 ));
}
interfaceClass 接口也就是我们在标签 <dubbo:reference> 配置的 interface 属性值。
urls 中保存了注册中心的地址,相当于会从注册中心中获取远程接口。
那 refer() 方法是怎么引用的呢?
调用 REF_PROTOCOL.refer() 方法时,会先前往注册协议类 RegistryProtocol 中执行 refer() 方法。
进入 RegistryProtocol 类中的 refer() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 public < T > Invoker < T > refer ( Class < T > type , URL url) throws RpcException {
url = URLBuilder . from (url). setProtocol ( url . getParameter ( "registry" , "dubbo" )). removeParameter ( "registry" ). build ();
// 根据注册中心地址得到注册中心信息
Registry registry = this . registryFactory . getRegistry (url);
if ( RegistryService . class . equals (type) ) {
return this . proxyFactory . getInvoker (registry, type, url);
} else {
// 获取注册中心服务参数
Map < String , String > qs = StringUtils . parseQueryString ( url . getParameterAndDecoded ( "refer" ));
String group = (String) qs . get ( "group" );
return group == null || group . length () <= 0 || CommonConstants . COMMA_SPLIT_PATTERN . split (group). length <= 1 && ! "*" . equals (group) ? this . doRefer ( this . cluster , registry, type, url) : this . doRefer ( this . getMergeableCluster (), registry, type, url);
}
}
RegistryProtocol 类中的 refer() 方法中还有一个名为 doRefer() 的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private < T > Invoker < T > doRefer ( Cluster cluster , Registry registry , Class < T > type , URL url) {
RegistryDirectory < T > directory = new RegistryDirectory (type , url) ;
directory . setRegistry (registry);
directory . setProtocol ( this . protocol );
Map < String , String > parameters = new HashMap ( directory . getUrl (). getParameters () ) ;
URL subscribeUrl = new URL ( "consumer" , (String) parameters . remove ( "register.ip" ), 0 , type . getName (), parameters) ;
if ( ! "*" . equals ( url . getServiceInterface ()) && url . getParameter ( "register" , true ) ) {
directory . setRegisteredConsumerUrl ( this . getRegisteredConsumerUrl (subscribeUrl, url));
registry . register ( directory . getRegisteredConsumerUrl ());
}
directory . buildRouterChain (subscribeUrl);
// 在注册中心中订阅服务提供者为我们提供的服务
directory . subscribe ( subscribeUrl . addParameter ( "category" , "providers,configurators,routers" ));
Invoker invoker = cluster . join (directory);
ProviderConsumerRegTable . registerConsumer (invoker, url, subscribeUrl, directory);
return invoker ;
}
由于我们使用了 dubbo 的协议,订阅服务的同时会来到 DubboProtocol 类中的 refer() 方法。但是 DubboProtocol 中并没有 refer() 方法,去它的父类看看:
1 2 3 4 5 6 public < T > Invoker < T > refer ( Class < T > type , URL url) throws RpcException {
return new AsyncToSyncInvoker ( this . protocolBindingRefer (type, url) ) ;
}
protected abstract < T > Invoker < T > protocolBindingRefer ( Class < T > var1 , URL var2)
throws RpcException ;
进入 DubboProtocol 中看下重写的 protocolBindingRefer() 方法:
1 2 3 4 5 6 7 8 9 10 public < T > Invoker < T > protocolBindingRefer ( Class < T > serviceType , URL url) throws RpcException {
this . optimizeSerialization (url);
// Dubbo 想要远程引用谁?serviceType
// serviceType 在哪?url
DubboInvoker < T > invoker = new DubboInvoker (serviceType , url ,
this . getClients (url),
this . invokers ) ;
this . invokers . add (invoker);
return invoker ;
}
在这里也有一个很重要的方法 —— getClients() 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private ExchangeClient [] getClients ( URL url) {
boolean useShareConnect = false ;
int connections = url . getParameter ( "connections" , 0 );
List < ReferenceCountExchangeClient > shareClients = null ;
if (connections == 0 ) {
useShareConnect = true ;
String shareConnectionsStr = url . getParameter ( "shareconnections" , (String) null );
connections = Integer . parseInt ( StringUtils . isBlank (shareConnectionsStr) ? ConfigUtils . getProperty ( "shareconnections" , "1" ) : shareConnectionsStr);
shareClients = this . getSharedClient (url, connections);
}
// 根据连接数创建客户端
ExchangeClient [] clients = new ExchangeClient [connections] ;
for ( int i = 0 ; i < clients . length ; ++ i) {
if (useShareConnect) {
// 获取共享的客户端
clients[i] = (ExchangeClient) shareClients . get (i);
} else {
clients[i] = this . initClient (url);
}
}
return clients ;
}
看看如何获取共享客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 private List < ReferenceCountExchangeClient > getSharedClient ( URL url , int connectNum) {
String key = url . getAddress (); // 获取 URL 地址
List < ReferenceCountExchangeClient > clients = (List) this . referenceClientMap . get (key);
if ( this . checkClientCanUse (clients) ) {
this . batchClientRefIncr (clients);
return clients ;
} else {
this . locks . putIfAbsent (key, new Object ());
synchronized ( this . locks . get (key) ) {
clients = (List) this . referenceClientMap . get (key);
if ( this . checkClientCanUse (clients) ) {
this . batchClientRefIncr (clients);
return clients ;
} else {
connectNum = Math . max (connectNum, 1 );
if ( CollectionUtils . isEmpty (clients) ) {
clients = this . buildReferenceCountExchangeClientList (url, connectNum);
this . referenceClientMap . put (key, clients);
} else {
for ( int i = 0 ; i < clients . size (); ++ i) {
ReferenceCountExchangeClient referenceCountExchangeClient = (ReferenceCountExchangeClient) clients . get (i);
if (referenceCountExchangeClient != null && ! referenceCountExchangeClient . isClosed () ) {
referenceCountExchangeClient . incrementAndGetCount ();
} else {
// 初始化客户端
clients . set (i, this . buildReferenceCountExchangeClient (url));
}
}
}
this . locks . remove (key);
return clients ;
}
}
}
}
查看 buildReferenceCountExchangeClient() 方法:
1 2 3 4 5 private ReferenceCountExchangeClient buildReferenceCountExchangeClient ( URL url) {
// 初始化客户端
ExchangeClient exchangeClient = this . initClient (url);
return new ReferenceCountExchangeClient (exchangeClient) ;
}
看看 initClient() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 private ExchangeClient initClient ( URL url) {
String str = url . getParameter ( "client" , url . getParameter ( "server" , "netty" ));
url = url . addParameter ( "codec" , "dubbo" );
url = url . addParameterIfAbsent ( "heartbeat" , String . valueOf ( 60000 ));
if (str != null && str . length () > 0 && ! ExtensionLoader . getExtensionLoader ( Transporter . class ). hasExtension (str) ) {
throw new RpcException ( "Unsupported client type: " + str + ", supported client type is " + StringUtils . join ( ExtensionLoader . getExtensionLoader ( Transporter . class ). getSupportedExtensions (), " " ) ) ;
} else {
try {
Object client ;
if ( url . getParameter ( "lazy" , false ) ) {
client = new LazyConnectExchangeClient (url , this . requestHandler ) ;
} else {
// 进行连接
client = Exchangers . connect (url, this . requestHandler );
}
return (ExchangeClient)client ;
} catch ( RemotingException var5 ) {
throw new RpcException ( "Fail to create remoting client for service(" + url + "): " + var5 . getMessage (), var5) ;
}
}
}
进行连接的 Exchangers.connect() 方法:
1 2 3 4 5 6 7 8 9 10 public static ExchangeClient connect ( URL url , ExchangeHandler handler) throws RemotingException {
if (url == null ) {
throw new IllegalArgumentException ( "url == null" ) ;
} else if (handler == null ) {
throw new IllegalArgumentException ( "handler == null" ) ;
} else {
url = url . addParameterIfAbsent ( "codec" , "exchange" );
return getExchanger (url) . connect (url, handler);
}
}
方法最后的 connect() 方法:
1 2 3 4 public ExchangeClient connect ( URL url , ExchangeHandler handler) throws RemotingException {
// 返回时调用传输器的连接方法
return new HeaderExchangeClient ( Transporters . connect (url, new ChannelHandler []{ new DecodeHandler ( new HeaderExchangeHandler (handler))}), true ) ;
}
Transporters.connect() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public static Client connect ( URL url , ChannelHandler ... handlers ) throws RemotingException {
if (url == null ) {
throw new IllegalArgumentException ( "url == null" ) ;
} else {
Object handler ;
if (handlers != null && handlers . length != 0 ) {
if ( handlers . length == 1 ) {
handler = handlers[ 0 ] ;
} else {
handler = new ChannelHandlerDispatcher (handlers) ;
}
} else {
handler = new ChannelHandlerAdapter () ;
}
// 拿到传输器的连接
return getTransporter () . connect (url, (ChannelHandler)handler);
}
}
点击最后的 connect() 方法可以进入一个名为 Transporter 的接口。
实现 Transporter 接口的类:
点击 Netty 传输器类,可以查看到 Netty 的底层:
1 2 3 4 public Client connect ( URL url , ChannelHandler listener) throws RemotingException {
// 创建 Netty 客户端,url 地址监听一个端口
return new NettyClient (url , listener) ;
}
这样的话,就成功包装好了 DubboInvoker 对象:
1 2 3 4 5 6 7 8 9 10 public < T > Invoker < T > protocolBindingRefer ( Class < T > serviceType , URL url) throws RpcException {
this . optimizeSerialization (url);
// Dubbo 想要远程引用谁?serviceType
// serviceType 在哪?url
DubboInvoker < T > invoker = new DubboInvoker (serviceType , url ,
this . getClients (url),
this . invokers ) ;
this . invokers . add (invoker);
return invoker ;
}
最后,在类中成功获取到 Invoker 对象:
1 2 3 4 5 6 7 8 9 private < T > Invoker < T > doRefer ( Cluster cluster , Registry registry , Class < T > type , URL url) {
// 省略其他代码
// 获取 Invoker 对象
Invoker invoker = cluster . join (directory);
// 将 invoker 注册到提供者消费者注册表
// subscribeUrl:消费者消费的服务的地址
ProviderConsumerRegTable . registerConsumer (invoker, url, subscribeUrl, directory);
return invoker ;
}
提供者消费者注册表 ProviderConsumerRegTable:
1 2 3 4 5 public class ProviderConsumerRegTable {
public static ConcurrentHashMap < String , ConcurrentMap < Invoker , ProviderInvokerWrapper >> providerInvokers = new ConcurrentHashMap () ;
public static ConcurrentHashMap < String , Set < ConsumerInvokerWrapper >> consumerInvokers = new ConcurrentHashMap () ;
// 省略其他代码
}
这样就知道了注册表中提供者的 URL 地址对应的是什么,消费者的 URL 地址对应的又是哪个代理(代理对象 invoker)。
到此,init() 就得到了 invoker 代理对象,成功创建了对象 ref。
1 2 3 4 5 6 7 8 9 10 private void init () {
// 省略部分代码
this . ref = this . createProxy (map); // 创建代理对象
String serviceKey
= URL . buildKey ( this . interfaceName , this . group , this . version );
ApplicationModel . initConsumerModel (serviceKey,
this . buildConsumerModel (serviceKey,
attributes));
this . initialized = true ;
}
流程总结
纵观整个流程,可以得出以下这张图:
4.6 服务调用
前面分析了如何进行服务引用,最终会创建一个代理对象,那这个代理对象是如何进行方法调用的呢?
在 Dubbo 官方文档的【调用链】一栏有这样一幅图,这张图就很好地解释了调用过程:
具体分析
得到代理对象后,执行 InvokerInvocationHandler 类中的 invoke() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 public Object invoke ( Object proxy , Method method , Object [] args) throws Throwable {
String methodName = method . getName (); // 方法信息
Class < ? > [] parameterTypes = method . getParameterTypes (); // 方法参数信息
if ( method . getDeclaringClass () == Object . class ) {
return method . invoke ( this . invoker , args);
} else if ( "toString" . equals (methodName) && parameterTypes . length == 0 ) {
return this . invoker . toString ();
} else if ( "hashCode" . equals (methodName) && parameterTypes . length == 0 ) {
return this . invoker . hashCode ();
} else {
return "equals" . equals (methodName) && parameterTypes . length == 1 ? this . invoker . equals (args[ 0 ]) : this . invoker . invoke ( new RpcInvocation (method, args)). recreate ();
}
}
将方法和参数封装成 RpcInvocation 对象:
1 2 3 public RpcInvocation ( Method method , Object [] arguments) {
this ((Method)method , ( Object [])arguments , (Map) null ) ;
}
对象封装之后,执行最后的 invoke() 方法,这个方法有很多重写。
先执行 MockClusterInvoker 类的 invoke() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public Result invoke ( Invocation invocation) throws RpcException {
Result result = null ;
String value = this . directory . getUrl (). getMethodParameter ( invocation . getMethodName (), "mock" , Boolean . FALSE . toString ()). trim ();
if ( value . length () != 0 && ! value . equalsIgnoreCase ( "false" ) ) {
if ( value . startsWith ( "force" ) ) {
if ( logger . isWarnEnabled () ) {
logger . warn ( "force-mock: " + invocation . getMethodName () + " force-mock enabled , url : " + this . directory . getUrl ());
}
result = this . doMockInvoke (invocation, (RpcException) null );
} else {
try {
// no mock
result = this . invoker . invoke (invocation);
} catch ( RpcException var5 ) {
if ( var5 . isBiz () ) {
throw var5 ;
}
if ( logger . isWarnEnabled () ) {
logger . warn ( "fail-mock: " + invocation . getMethodName () + " fail-mock enabled , url : " + this . directory . getUrl (), var5);
}
result = this . doMockInvoke (invocation, var5);
}
}
} else {
result = this . invoker . invoke (invocation);
}
return result ;
}
MockClusterInvoker 类的 invoke() 方法又会执行 AbstractClusterInvoker 类的 invoke() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 public Result invoke ( Invocation invocation) throws RpcException {
this . checkWhetherDestroyed ();
Map < String , String > contextAttachments = RpcContext . getContext (). getAttachments ();
if (contextAttachments != null && contextAttachments . size () != 0 ) {
((RpcInvocation)invocation) . addAttachments (contextAttachments);
}
// list() 在注册中心中找到我们想要执行的方法到底有几个 Invoker
List < Invoker < T >> invokers = this . list (invocation);
// 获取到负载均衡机制
LoadBalance loadbalance = this . initLoadBalance (invokers, invocation);
RpcUtils . attachInvocationIdIfAsync ( this . getUrl (), invocation);
return this . doInvoke (invocation, invokers, loadbalance);
}
最后执行 doInvoker() 方法,这个方法有很多重写。以 FailoverClusterInvoker 类举例,在这个类的 duInvoker() 方法中有这样一段代码:
1 2 3 4 5 6 7 8 9 public Result doInvoke ( Invocation invocation , List < Invoker < T >> invokers , LoadBalance loadbalance) throws RpcException {
// 省略其他代码
// 根据负载均衡策略选择一个 Invoker
Invoker < T > invoker = this . select (loadbalance, invocation, copyInvokers, invoked);
invoked . add (invoker);
RpcContext . getContext (). setInvokers (invoked);
// 省略其他代码
Result result = invoker . invoke (invocation);
}
执行 invoke() 方法会进入到各种 Filter 中,相当于各层的 Filter 进行层层过滤。
最终执行 AbstractInvoker 类中的 doInvoke() 方法,并来到 DubboInvoker 类中执行 duInvoke() 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 protected Result doInvoke ( Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation)invocation ;
String methodName = RpcUtils . getMethodName (invocation);
inv . setAttachment ( "path" , this . getUrl (). getPath ());
inv . setAttachment ( "version" , this . version );
ExchangeClient currentClient ;
if ( this . clients . length == 1 ) {
// 获取 Netty 客户端
currentClient = this . clients [ 0 ] ;
} else {
currentClient = this . clients [ this . index . getAndIncrement () % this . clients . length ] ;
}
try {
boolean isOneway = RpcUtils . isOneway ( this . getUrl (), invocation);
int timeout = this . getUrl (). getMethodPositiveParameter (methodName, "timeout" , 1000 );
if (isOneway) {
boolean isSent = this . getUrl (). getMethodParameter (methodName, "sent" , false );
currentClient . send (inv, isSent);
return AsyncRpcResult . newDefaultAsyncResult (invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult (inv) ;
// 客户端发起请求,获取请求结果并返回
CompletableFuture < Object > responseFuture = currentClient . request (inv, timeout);
asyncRpcResult . subscribeTo (responseFuture);
FutureContext . getContext (). setCompatibleFuture (responseFuture);
return asyncRpcResult ;
}
} catch ( TimeoutException var9 ) {
throw new RpcException ( 2 , "Invoke remote method timeout. method: " + invocation . getMethodName () + ", provider: " + this . getUrl () + ", cause: " + var9 . getMessage (), var9) ;
} catch ( RemotingException var10 ) {
throw new RpcException ( 1 , "Failed to invoke remote method: " + invocation . getMethodName () + ", provider: " + this . getUrl () + ", cause: " + var10 . getMessage (), var10) ;
}
}
获取请求结果时,可能会遇到超时,如果超时会默认重试一次。
如果最终成功获取到请求结果,执行各种 invoke() 方法。
DubboInvoker 类的 doInvoke() 方法执行完之后,可以得到想要的结果,还会进行编码或解码,最终返回。
结果返回之后,我们的调用也就结束了! 🎊
结语
到此,Dubbo 就真的入门了。
还有一点需要注意,Dubbo 只是一个 RPC 框架,它解决了 RPC 相关的问题,但分布式中还有其他的问题,这些其他的问题就需要用其他的框架或技术来解决了。
RPC 框架 — Dubbo 入门完