void serialize(Object obj, OutputStream outputStream) throws IOException;
序列化方法,将指定对象obj的序列化二进制流写入outputStream
这是我的剑来框架的微服务版,不知道程序员们听没有听过“声明式”的概念。 实际上,SQL语句就是最典型的声明式编程,而我们常用的Feign也是一种声明式调用。 这很优雅,我们不需要关心程序的执行过程,只需要关注程序的输入和输出。 那么,服务可以声明吗?在回答这个问题之前,你首先要了解诸多概念,以便了解SpringHoppin是如何工作的。
SpringCloud是一个用于构建分布式系统的开发框架。 它提供了一系列的组件和工具,帮助开发者快速构建、部署和管理分布式应用。 SpringCloud包含了诸如服务注册与发现、配置中心、网关、负载均衡、断路器、链路追踪等功能,使得开发者能够更轻松地构建和管理复杂的微服务架构。 通过使用SpringCloud,开发者可以更好地实现服务之间的解耦、弹性扩展和容错处理,从而提高系统的稳定性和可靠性。
SpringHoppin是完全基于SpringCloud的思想,以SpringBoot作为脚手架,构建出了SpringCloud的雏形框架。 基于该框架开发了服务注册与发现(HoppinCore)、远程服务调用(HoppinRPC)、负载均衡(HoppinLoadBalance)、熔断器(HoppinCircuitBreaker)等功能。
模块功能 |
SpringCloud |
SpringHoppin |
---|---|---|
服务注册与发现 |
Eureka,Zookeeper,Nacos |
HoppinCore |
声明式调用 |
Feign |
HoppinHttpApi |
远程服务调用 |
Dubbo |
HoppinRPC |
负载均衡 |
Ribbon |
HoppinLoadBalance |
容错保护 |
Hystrix |
HoppinCircuitBreaker |
网关 |
SpringCloudZuul,SpringCloudGateway |
hoppinzq-gateway |
配置中心 |
SpringCloudConfig,Nacos |
未实现 |
消息总线与消息队列 |
SpringCloudBus,SpringCloudStream+kafka |
未实现 |
链路跟踪 |
SpringCloudSleuth+Zipkin |
已添加支持,本项目未体现 |
安全认证与授权 |
SpringSecurity |
HoppinRPC,hoppinzq-gateway,zauth |
目前该框架的各个功能还没拆出来,代码十分臃肿,完成度我给个61.6%,但是核心功能已经完备,也经过几个小项目的实战检验。 但是好在前期工作做的好,框架设计的足够健壮,使得该项目拓展性极高。
我等不及了,快端上来罢!
你可以直接在gitee上拉取该项目。它的结构如下:
/hoppinzqs-registration-center [项目目录]
│
├─hoppinzq-common ------------------[公共模块]
│
├─hoppinzq-service -------------------[服务端模块]
│ └─... -------------------[引入该模块,那么你的SpringBoot项目就是SpringHoppin项目了,可以开启服务注册,熔断器等功能]
│
│─hoppinzq-client -------------------[客户端模块]
│ └─... -------------------[引入该模块,可以开启远程服务调用,负载均衡等功能]
│
│─hoppinzq-gateway -------------------[网关模块]
│ └─... -------------------[因为依赖Redis,主分支暂时移除]
全部配置项一览
################ zqservice配置 #######################
zq-server:
# 本服务的IP或者你本地的IP(通用配置项)
ip: 127.0.0.1
# 注册中心所在地址,注意,如果你没有启动本地注册中心,直接使用我的注册中心(客户端配置项)
center-addr: http://150.158.28.40:8801/service
# 服务注册用户的用户名。(通用配置项)
username: zhangqi
# 服务注册用户的密码。(通用配置项)
password: 123456
# 服务调用凭证,用以屏蔽用户名密码。(可选配置项)
secret-id: zhangqi
secret-key: 123456
# 服务注册是否总是重试,该参数为true时,配置的重试次数将不起作用(客户端配置项,暂时弃用)
always-retry: false
# 服务注册重试次数,仅当服务注册alwaysRetry为false时生效(客户端配置项,暂时弃用)
retry-count: 10
# 服务注册重试间隔/ms(客户端配置项,暂时弃用)
retry-time: 10000
# 是否严格(注册中心配置项)
is-strict: false
############ 以下配置项先不用管,而且还没有格式化 ##############
zqServerCenter:
# 注册中心路径,同上
addr: http://150.158.28.40:8801/service
zqClient:
# 服务调用用户名,要调用其他服务需要验证授权
userName: zhangqi
# 服务调用密码
password: 123456
# 注册中心所在地址
centerAddr: http://150.158.28.40:8801/service
# 统一权限服务地址
authAddr: http://150.158.28.40:8804/service
# redis缓存时间/s
zqRedis:
csdnBlogTimeout: 60
zqApiStore:
# ZQ网关统一上传文件路径
fileUploadPath: E:\\baby_img
isAuth: false
# 索引库位置
lucene:
spiderIndex: D:\\index
1、去gitee上拉取该项目。
2、可以新建一个模块,若你的项目要转为SpringHoppin项目,则首先要在该模块的pom中引入依赖:
<dependency>
<groupId>com.hoppinzq</groupId>
<artifactId>hoppinzq-service</artifactId>
<version>1.0</version>
</dependency>
3、在启动类上添加注解@Hoppin,声明这个服务是SpringHoppin服务,只有这样,你才能使用框架提供的所有功能。 启动类的源码应该如下:
@Hoppin
@SpringBootApplication
public class TestCenterApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(TestCenterApplication.class, args);
}
}
4、配置文件也不用写了,直接用hoppinzq-testcore那个就行,然后你就可以启动我们的SpringHoppin 项目了,是不是很兴奋?在启动前你得要先了解SpringHoppin的服务是指的什么,什么能被注册到注册中心。
在SpringHoppin中的服务是指的类,而SpringCloud中的服务是指的模块。在一个模块的启动类上添加注解@Hoppin就可以把这个模块 里所有被@ServiceRegister注解修饰的类抽象为服务。每个SpringHoppin模块都是一个极简的注册中心,能暴露自己能提供的服务。
注册中心则是将SpringHoppin模块里所有暴露的服务整合起来统一对外开放,注册中心提供对服务的包装、服务的增删改查和服务的配置和管理。 并且与服务提供方建立心跳连接,以确保服务可用。
5、大家在访问服务注册的路径前,先添加一个服务类并添加@ServiceRegister注解,然后重启项目再访问本地的服务注册中心。服务类的源代码如下:
TestService接口服务类:
public interface TestService {
String test(String msg);
}
TestService接口服务实现类:
@ServiceRegister(title = "测试服务",description = "这是测试服务,仅仅是一个demo")
public class TestServiceImpl implements TestService{
@Override
@ServiceRegisterMethod(title = "测试方法",description = "略略略",isShow = true)
public String test(String msg) {
return "test:"+msg;
}
}
然后启动项目,打印信息如下:
6、上面说了SpringHoppin的服务是指的服务类,而不是模块。若要指定一个类是服务范畴,则在该类上添加注解@ServiceRegister,把需要暴露的方法上, 添加@ServiceRegisterMethod注解,并配上适当的描述。若要配合SpringHoppin的远程服务调用,需要让服务类为某接口的实现类。 最后被@ServiceRegister注解修饰的服务类也会被加入SpringIOC容器中管理,你可以通过@Autowired注解在任何其他模块中依赖注入。
7、访问hoppinzq-testcore自带的注册中心路径http://127.0.0.1:8801/service,发现TestServiceImpl服务类注册成功, 在SpringHoppin下,万物皆可注册中心!
下面简单介绍这些组件的使用方式,实现方式和细节
什么是服务注册和发现?
现在的微服务架构将各个模块拆分的很细,虽然大部分模块之间通过消息中间件进行解耦,但是还是存在相当的模块与模块之间的服务调用。那么开发人员怎么知道哪些模块有哪些功能? 怎么知道该去如何与模块交互?传参风格又是怎样的? 这就要求开发人员编写文档,以提供本模块的功能和接口细节。
那么这部分工作能不能交给程序去提供?没错,这就是服务注册————程序可以通过反射解析接口, 把接口的相关信息、注解信息、类信息、模块配置信息等一并上传到一个地方,这个地方就叫做注册中心。
开发人员在注册中心查找可用的服务,按照约定的调用规则就可以调用该服务了,这就是服务发现。用非程序员的话去解释:比如我要买房子。首先建筑公司A建造了一批新房子,我知道A有房子吗?No!除非A主动告诉我,那么这样做就很麻烦了。 A建造了一批房子,难道A要去每个人的耳边说:我建造了一批新房子,你买不买?很显然这不合理。
正确的做法是,A建造了一批房子,把这个信息发给中介或者发布在某个公告栏里。这个过程就是服务注册,中介充当注册中心。我要买房子,不是直接跟A交互, 而且先去公告栏里看看或者问问中介,房子在哪?价格是多少?A的联系方式是什么?这就是服务发现。
A与中介会时刻保持联系,以防A突然破产或者中介离职导致房源信息不可用,这就是服务治理。
当然A在跟中介谈的时候明确表示不让失信人员购买,这时,我们在对接中介买A房子的时候,中介会先检查我们的身份证,这就是身份验证。然后拿着我们的身份证去 查征信是不是失信人员,这就是授权验证。这就是一个基本的服务注册和发现的流程。
1、在SpringHoppin中,每一个模块都是一个极简的注册中心,只不过该注册中心只能暴露自己的服务。要想声明一个注册中心很简单,在启动类上添加 @HoppinCore注解即可,现在我们把hoppinzq-testcore声明为注册中心,修改后的启动类如下:
@HoppinCore
@Hoppin
@SpringBootApplication
public class TestCenterApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(TestCenterApplication.class, args);
}
}
启动项目,发现注册中心已经启动成功!
那么注册中心与普通的SpringHoppin项目有什么区别吗?因为你刚才提到了 每一个模块都是一个极简的注册中心。它们的区别有很多,首先注册中心无法注册自己;其次,注册中心拥有对外部服务进行增删改查和管理的能力; 最后,若一个模块被声明为注册中心,那么就会额外开辟定时线程。其他模块在注册服务到注册中心的时候,会额外注册一个心跳服务, 注册中心通过定时线程来进行心跳检测,以及时对注册的服务进行监控。心跳服务会返回给注册中心注册方的一些信息,你可以通过实现HeartbeatService接口定制返回值。
你可能回去马上访问那个地址了,你会发现怎么多了一个RegisterServiceImpl服务类?通过观察这个类的方法不难看出,这个类的所有方法都是在提供服务注册和管理 的能力,这个类是注册中心内置的注册服务,其他服务需要借助这个服务类注册并管理自己的服务。
2、外部服务注册:新建一个模块hoppinzq-test1,启动类上加只要加@Hoppin 即可。配置文件要改一下,端口号改成8802,并添加我们的注册中心的路径,如下:
zq-server:
name: hoppinzq-test1
username: zhangqi
password: 123456
prefix: /service
ip: 127.0.0.1
center-addr: http://127.0.0.1:8801/service # 注册中心路径
test1模块新建一个服务接口
public interface Test1Service {
String sayTest(String msg);
}
和它的实现类
@ServiceRegister(title = "测试1", description = "这是测试1的描述")
public class Test1ServiceImpl implements Test1Service{
@Override
@ServiceRegisterMethod(title = "方法测试",description = "打印test1")
public String sayTest(String msg) {
return "test1:"+msg;
}
}
3、先启动注册中心,然后在启动test1(若先启动test1,test1会连不上注册中心,会重试10次,每次60s) ,观察两个服务的打印,若注册成功,访问注册中心的路径http://127.0.0.1:8801/service,即可发现test1内的服务注册成功
4、只关闭注册中心,发现test1模块一直在尝试重连,直到注册中心启动成功,test1的服务重新注册。
5、只关闭test1模块,发现注册中心也在尝试重连,此时test1的服务为不可用状态。
当一段时间重连不上的时候,注册中心会直接移除test1模块注册的服务。
核心代码如下:
@PostConstruct
public void init() {
isCore=true;
startTime=System.currentTimeMillis();
String serviceAddress="http://"+ zqServerConfig.getIp() +":"+ zqServerConfig.getPort()+ zqServerConfig.getPrefix();
logger.debug("注册中心启动成功,注册中心路径:"+serviceAddress);
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
startHeartbeat(executorService);
startMessage(executorService);
startServiceListen();
}
private static void startHeartbeat(ScheduledExecutorService executorService){
Map heartbeatHeader=ServiceStore.heartbeatHeader;
heartbeatHeader.put(heartHeader,1);
// 每10秒心跳一次
executorService.scheduleAtFixedRate(() -> {
if (ServiceStore.heartbeatService.size() > 0) {
for (ServiceWrapper heart : ServiceStore.heartbeatService) {
if (heart.isAvailable()) {
String serviceID=heart.getId();
try {
String serviceIP=heart.getServiceMessage().getServiceIP();
// InetAddress inetAddress = InetAddress.getByName(serviceIP);
// boolean isReachable = inetAddress.isReachable(2000);
// if(!isReachable) {
// throw new RemotingException(new RuntimeException("服务连接失败"));
// }.
String serviceHeartAddress="http://"+ serviceIP +":"+heart.getServiceMessage().getServicePort()+heart.getServiceMessage().getServicePrefix();
logger.debug("开始心跳:服务路径"+serviceHeartAddress);
HeartbeatService service = ServiceProxyFactory.createProxy(HeartbeatService.class, serviceHeartAddress,heartbeatHeader);
service.areYouOk();
} catch (Exception ex) {
ex.printStackTrace();
//heart.setAvailable(Boolean.FALSE);
List<ServiceWrapperRPC> serviceWrapperRPCS= RPCServiceStore.serviceWrapperRPCList;
synchronized (serviceWrapperRPCS) {
Iterator<ServiceWrapperRPC> iterator = serviceWrapperRPCS.iterator();
while (iterator.hasNext()) {
ServiceWrapperRPC outerService = iterator.next();
if (serviceID.equals(outerService.getId())) {
outerService.setAvailable(false);
}
}
}
logger.error("检测到id为:" + serviceID + "的服务已不可用,尝试重新连接");
}
}
}
}
}, 0, 10, TimeUnit.SECONDS);
//每60s检查并移除不可用服务
executorService.scheduleAtFixedRate(() -> {
if (ServiceStore.heartbeatService.size() > 0) {
List<ServiceWrapperRPC> serviceWrapperRPCS=RPCServiceStore.serviceWrapperRPCList;
synchronized (serviceWrapperRPCS) {
String serviceID;
Iterator<ServiceWrapperRPC> iterator = serviceWrapperRPCS.iterator();
while (iterator.hasNext()) {
ServiceWrapperRPC outerService = iterator.next();
if (!outerService.getAvailable()) {
serviceID=outerService.getId();
iterator.remove();
List<ServiceWrapper> heartbeatService= ServiceStore.heartbeatService;
for(int i=0;i<heartbeatService.size();i++){
ServiceWrapper heart=heartbeatService.get(i);
if(serviceID.equals(heart.getId())){
logger.error("检测到id为:" + serviceID + "的服务已不可用,将移除该服务!");
logger.debug("服务移除成功!服务ID:"+serviceID);
heartbeatService.remove(i);
i--;
}
}
}
}
}
}
}, 0, 60, TimeUnit.SECONDS);
}
private static void startMessage(ScheduledExecutorService executorService){
//每10s消费一次消息队列里的消息
executorService.scheduleAtFixedRate(() -> {
if(MessageBuffer.count==0){
}
for (int i=0;i<MessageBuffer.count;i++) {
try {
MessageBean messageBean = MessageBuffer.take();
MessageHandler handler=messageBean.getMessageHandler();
handler.handle(messageBean);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, 0, 10, TimeUnit.SECONDS);
}
private static void startServiceListen(){
EventServiceListener eventListener = new EventServiceListener() {
@Override
public void onServiceChange(List<ServiceWrapperRPC> list) {
System.err.println("服务列表改变: " + list);
}
};
// 创建守护线程来监听服务的变化
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5000, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
// 允许核心线程超时
executor.allowCoreThreadTimeOut(true);
Thread thread=new Thread(new ServiceChangeListener(eventListener), "监听服务列表守护线程-" + System.currentTimeMillis());
thread.setDaemon(true);
executor.execute(thread);
}
6、注册中心暴露了一个接口,用来给前端展示 http://127.0.0.1:8801/service/serviceList
不知道大家服务与服务之间调用是如何实现的呢?我们日常开发一般是前后端通过Restful API接口交互, 随着微服务模块的细分,前端需要的数据可能无法由一个模块提供了,比如:我要查一个商品的信息,假设这个接口转发到商品模块, 商品模块需要再去调用库存模块、产地模块、价格模块、推荐模块、用户行为模块等,然后把这些数据拼起来返回给前端。整个过程虽然对前端是透明的,但是后台各个模块之间 的调用还是很麻烦的。我们且不谈这里面可能出现的问题,我们先要解决的问题是:后台各个模块之间如何调用?
答案很简单,网络传输呗。你也可以模拟Http请求,反正你服务能接收我的参数并处理,然后返回给我就行了。
SpringCloud给出了它的答案:声明式调用Feign+注册中心Eureka。
阿里的答案是Dubbo+注册中心Zookeeper。
我的答案是HoppinRPC+注册中心HoppinCore。
Feign里面最简单,最好集成的,因为人家是SpringCloud自家人,是权威。但正因为Feign是SpringCloud的组件,想用得先会SpringCloud,学习成本比较高。
而且会导致各个模块之间还要去统一SpringCloud的版本和依赖。Feign的拓展性不高,由于各个服务之间的差异巨大,尤其在用户认证、
用户鉴权上,Feign无法对这些服务的调用做有效的统一,导致这部分逻辑得分到各个模块上去。且Feign的性能较差,有一道经典的面试题就是说的“为什么Feign第一次调用很慢?”。
Dubbo是最难的,也是最强大的,毕竟人家是阿里的产品,而且不光能远程调用Java服务,甚至支持跨语言服务调用,我就很欣赏这个特性。缺点就是难,当然用起来不难,就是架构、学习、配置这些难。
HoppinRPC是最不可靠的(笑),它只是天马行空的产物,当然它可以完成预期的工作就是了。而且它的拓展性极高,支持自定义用户认证和用户授权,调用过程优雅,不失为一个学习对象。
1、如何使用HoppinRPC呢?首先启动类上添加@Hoppin注解,你可能感受到一丝诡谲。你在想,我本地的服务 是如何被注册到注册中心hoppinzq-testcore上的呢?没错,就是通过HoppinRPC,另外服务心跳也是这么调用的。服务注册到注册中心的源代码在 com.hoppinzq.service.servlet.SpringProxyServlet中的registerServiceIntoCore()方法中实现的。
public void registerServiceIntoCore(){
ZqServerConfig zqServerConfig =this.getPropertyBean();
TaskStore.taskQueue.push(new RetryRegisterService(zqServerConfig.getRetryCount(), zqServerConfig.getRetryTime(), zqServerConfig.getAlwaysRetry()) {
@Override
protected Object toDo() throws RemotingException {
UserPrincipal upp = new UserPrincipal(zqServerConfig.getUserName(), zqServerConfig.getPassword());
List<ServiceWrapper> serviceWrappers=modWrapper();
List<ServiceWrapperRPC> serviceWrapperRPCS=new ArrayList<>();
RegisterService registerService = ServiceProxyFactory.createProxy(RegisterService.class, zqServerConfig.getServerCenter(), upp);
System.err.println("****************************");
System.err.println(registerService.sayHello(zqServerConfig.getUserName())+",连接注册中心成功!");
for(ServiceWrapper serviceWrapper:serviceWrappers){
ServiceWrapperRPC serviceWrapperRPC=serviceWrapper.toRPCBean();
serviceWrapperRPCS.add(serviceWrapperRPC);
}
registerService.insertServices(serviceWrapperRPCS);
System.err.println("****************************");
logger.info("向注册中心注册服务成功!");
return true;
}
});
}
2、新建一个模块叫hoppinzq-testrpc,同样的启动类上添加@Hoppin注解, 配置文件复制hoppinzq-test1的就行,把端口号改成8803,服务名zq-server.name 改成hoppinzq-testrpc,其他的不变。
3、把hoppinzq-test1模块的Test1Service接口文件复制粘贴到hoppinzq-testrpc模块下,启动类上代码如下。
@Hoppin
@SpringBootApplication
public class TestRPCApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(TestRPCApplication.class, args);
//用户认证于鉴权,你先写死这两个就行
UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
//传入接口类对象,服务所在地址,test1Service服务所在的地址通过注册中心可以看到是http://127.0.0.1:8802/service
Test1Service test1Service = ServiceProxyFactory.createProxy(Test1Service.class, "http://127.0.0.1:8802/service", upp);
/调用
System.err.println(test1Service.sayTest("123"));
//I did it,Yes!
}
}
4、整个调用过程的流程是这样的:
1、首先我会通过代理工厂,创建服务接口的代理对象。这个对象一定要缓存,因为后面用到了 反射。由于反射比较耗时间,而且一个服务接口会调用无数次,因此缓存是十分有必要的。可以看出,我的第一次调用也是很耗时,就跟Feign一样。
2、由于接口的实现类在远程服务提供方,而不是在本地。如果真要调用一个本地未实现的接口类的方法,那肯定要报错,这就是为什么要使用 代理,我通过代理把这个调用给拦截了。类比aop,我把方法拦截了,我还不执行joinPoint.proceed(),那你这个方法永远不会执行。 不执行就不报错,我还能拿到你方法对象和传参。
3、我们很骚的拿到了要调用的方法对象和传参,而且不让程序执行那个肯定要报错的方法,因为那个方法的实现类在远程服务器。由于HoppinRPC能拿到注册到注册中心 的服务名和服务提供者的URI。我就把自己的用户信息,调用的类、方法、传参等包装起来,然后序列化这个包装类(类内的对象必须实现序列化接口)。 通过URI和网络传输,我就把序列化的包装对象传输到远程服务器,也就是服务提供者那里。
4、服务提供者拿到序列化的内容,经过反序列化就可以得到我包装的参数,通过用户信息进行用户认证和权限认证。 认证完成后,通过类、方法、参数列表进行反射调用自己的实现类。
5、最后把实现类的响应或者捕获的异常再包装序列化,最后通过网络传输传到调用方(客户端),调用方反序列化即可拿到响应,返回即可。
首先要理解用户认证与权限认证是两个不同的功能:用户认证一般是验证用户输入的用户名密码是否有效,验证你是这个账号的所有者; 而权限认证则是验证已认证的用户的权限,在操作某些功能的时候,往往需要验证用户有调用它的权限,一般是vip会员权限、超级管理员权限这些。
在SpringHoppin中拥有两套鉴权逻辑:一是应用于前后端交互的权限校验,这层是在网关进行校验,主要用来校验操作是否需要用户有未登录权限、登录权限或者vip权限。
这跟后台管理系统稍稍有些区别,目前我是基于cookie与统一权限认证平台来实现的。统一认证平台的目的是我有很多网站,我只期望用户一次注册,就能全站通用,并通过
签发一次性ucode解决跨域的cookie写入问题。总有人跟我说cookie过时了,靓仔无语,因为cookie本质上就是一个key为"cookie"的请求头。
而且cookie机制异常丰富,比请求头灵活,且自带超时和跨域机制,前后端都有现成的类库去操作cookie。总之,你只要能把签发的token或者jwt传给后台就行了。
传给后台之后,网关通过方法注解与redis或者jwt里的用户信息里的权限字段一比对,从而可以判断用户是否有权限调用。如评论接口需要登录权限,如果未登录用户调用,网关会在
自定义响应头内写入重定向信息,由zjax(zq定制ajax,适配hoppinzq-gateway)解析并携带当前url重定向到统一权限认证平台。
二是应用于RPC调用的权限认证,我借鉴了SpringSecurity的思想,所以下面是很有建设性的思路:
1、这个是在@ServiceRegister注解上配置的,注解源代码如下:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service
public @interface ServiceRegister {
@AliasFor(
annotation = Service.class
)
String value() default "";
String title() default "";
String description() default "";
int timeout() default 5000;
//全局身份验证方式
Class<? extends AuthenticationProvider> authentication() default SimpleUserCheckAuthenticator.class;
//全局权限验证方式
Class<? extends AuthorizationProvider> authorization() default AuthenticationNotCheckAuthorizer.class;
//全局服务跟踪参数方式
Class<? extends ModificationManager> modification() default NotModificationManager.class;
//服务调用所使用的序列化方式
Class<? extends CustomSerializer> serializer() default HessionSerializer.class;
RegisterType registerType() default RegisterType.AUTO;
enum RegisterType {
/**
* 项目启动自动注册
*/
AUTO,
/**
* 不会随着项目启动注册,声明服务需要手动注册。
* 服务仍然存储到一个集合内,但是没有注册,你可以在合适的时机将其手动注册(通过注册中心的RegisterServer内方法)
*/
NOT_AUTO
}
}
可以看到标记的两个方法参数,分别是身份验证authentication,权限验证authorization。身份验证的注解的默认
值是SimpleUserCheckAuthenticator,这个是内置的,会验证用户是否是服务提供者。权限验证是AuthenticationNotCheckAuthorizer,这个也是内置的,
默认通过用户验证就会有权限。
然后你就可以自定义你自己的鉴权逻辑了,你只想要实现AuthenticationProvider和AuthorizationProvider接口,并重写里面的方法即可。用户认证和权限认证交由业务的实现
类去实现,并抛出我给的两个异常AuthenticationFailedException和AuthorizationFailedException表示校验失败,然后后面我去处理。
我来教大家实现一个自定义的用户校验。
首先在hoppinzq-test1模块里,新建一个类CheckIsHoppinzq,使其实现序列化接口和AuthenticationProvider接口
public class CheckIsHoppinzq implements AuthenticationProvider, Serializable {
@Override
public void authenticate(InvocationRequest hoppinInvocationRequest) throws AuthenticationFailedException {
if (hoppinInvocationRequest.getCredentials() != null && hoppinInvocationRequest.getCredentials() instanceof UserPrincipal) {
//获取调用方凭证
UserPrincipal upp = (UserPrincipal) hoppinInvocationRequest.getCredentials();
if ("hoppinzq".equals(upp.getSecretId()))
AuthenticationContext.setPrincipal(upp);
else
throw new AuthenticationFailedException("身份验证失败");
} else
throw new AuthenticationFailedException("缺少凭据");
}
}
可以看出这个自定义方法是校验服务调用方的SecretId是不是hoppinzq,是的话用户认证通过,并放到ThreadLocal里,可能其他地方要用。不是hoppinzq的话,抛给客户端异常。
还记得Test1ServiceImpl吗?把里面的@ServiceRegister注解加点东西:
@ServiceRegister(title = "测试1", description = "这是测试1的描述",authentication = CheckIsHoppinzq.class)
public class Test1ServiceImpl implements Test1Service{
@Override
@ServiceRegisterMethod(title = "方法测试",description = "打印test1")
public String sayTest(String msg) {
return "test1:"+msg;
}
}
我们重启这个test1模块,然后启动testrpc模块,发现报错了。
道理很简单,服务提供者改变了它们的用户认证逻辑,我们只要在配置文件里配置这个secret-id,或者在启动类追加upp.setSecretId("hoppinzq");就可以继续调用了。懂?
在计算机网络七层架构中,直接为用户提供服务的是处于最上层的应用层。而应用层协议是调用双方约定好的标准,是数据解析的规则,比如常用的超文本传输协议http协议和应用于文件传输的ftp协议。 rpc的通信协议可以基于http,但它是一个比较独特的远程调用方式,需要对数据传输格式进行严格的设计。 我的这个rpc框架的规则是对自定义请求头HoppinInvocationRequest 和自定义响应头HoppinInvocationResponse的网络传输。
具体是服务调用方将一些参数封装在本框架的自定义请求头HoppinInvocationRequest,然后通过网络传输传到服务提供方。 服务提供方需要拿到数据后,去调用本地服务。然后将响应、响应头、状态码等参数封装在本框架的自定义响应头HoppinInvocationResponse并通过网络传输传到服务调用方。服务调用方 将解析这些数据并返回数据或异常。
那么现在的问题是两个自定义的类是如何通过网络传输呢?我们知道类是比较抽象的信息载体,如果我们可以用语言把它描述出来或者写到纸上,别人通过你的描述还能在脑海里生成这个类,这就是序列化与反序列化。 我们知道网络传输本质是原始比特流的传输,也就是0和1。如果我们能把实体类转为0和1,那就可以通过网络传输了,这个转换过程就是序列化。客户端拿到这些二进制,能够把这个类给还原出来,这个过程就是反序列化。
为了提供高质量的数据服务,和实现与上下游各系统进行良好的对接,类与二进制之间的转换尤为重要,表现在转换的效率和转换数据的大小。
由于转换过程是可以高度自定义的,这个自定义过程或者规则就是序列化协议。
现在的序列化协议很多,有Java自带的Serializer,有便于理解的JSON,也有高性能跨语言的Hession2.0等。本项目采用了Hesson2.0作为默认的序列化框架,当然你也可以自己拓展。
关于为什么要选择Hession2.0
首先dubbo的序列化框架就是用的Hession2.0,阿里内部在选择序列化框架的时候势必经过了深思熟虑和辩论,肯定是最优解,无脑用就行了。
Hession2.0的性能高,而且跨语言。Java自带的Serializer只能用于Java之间的服务调用,而Hession是序列化框架,势必可以为其他语言添加支持。 这就很完美了,我服务调用双方不需要规定用什么语言,只需要用你语言的Hession框架就可以解析。假设我写了一个c++程序,集成了ffmpeg和opencv并提供了一些基于Hession协议的rpc接口, 我直接就能通过Java和Hession调用这些服务接口,而无需通过JNI或者控制台去调用。这很酷,跟HTTP调用的思想一模一样。
实际上JSON或XML也很不错,因为易于理解和调试,当你反序列化报错的时候,你完全可以直接发现是什么问题。而不是盯着一堆看不懂的Stream发呆。
在本框架中,服务提供者和调用方无需关注序列化框架,只需关注实现业务功能即可,但有的程序员可能想拓展或者使用其他的序列化框架,请往下看。
本框架的序列化代码在com.hoppinzq.service.serializer包中,提供了三种序列化方式: HessionSerializer,JdkSerializer和JsonCustomSerializer。其中JsonCustomSerializer没有任何内容, 因为我不确定你喜欢使用Gson、Jackson还是FastJson。当然,FastJson yyds。
1、首先你的序列化类必须实现CustomSerializer接口。并重写下面两个方法:
序列化方法,将指定对象obj的序列化二进制流写入outputStream
反序列化方法,将inputStream内的二进制流强制转换为指定的类
(已弃用,不方便本框架调用)序列化方法,返回指定对象的原始比特流
(已弃用,不方便本框架调用)反序列化方法,将原始比特性强制转换为指定类
2、服务类上在@ServiceRegister注解内声明你自己的序列化处理类即可。
@ServiceRegister(title = "测试服务", description = "用来测试RPC的服务",serializer = HessionSerializer.class)
public class TestServiceImpl implements TestService{}
建议使用我的默认的序列化处理类HessionSerializer,你可以修改HessionSerializer的代码,以实现序列化行为或者自定义操作。比如你想在序列化前do something。 如果不是特别要求,不是很建议自己去实现CustomSerializer接口并实现自己的序列化方式,因为序列化方式是包装在服务包装类ServiceWrapper,一旦这个类解析出问题导致没有正确解析到你自定义的序列化方式,那么默认会使用 序列化处理类HessionSerializer对异常进行包装并响应给客户端。但是客户端可能工作正常,会按照预期自定义的序列化方式工作,由于双方的序列化协议不一致,就会导致解析失败。 由于我无法预知服务端/客户端的所有问题,先尽量保证使用序列化处理类HessionSerializer。
拓展:使用序列化和反序列化解决对象的浅拷贝问题
有的时候,你想复制一个对象,我们知道用Java简单的赋值实际上是引用传递,只是把原来对象的地址赋给了你。你操作这个对象都会改变原来的对象。
实际上Java提供了Cloneable接口用于拷贝对象,也就是Object的clone()方法,然而需要你重写代码,这里就可以用序列化和反序列化了。
序列化和反序列化就是一个简单的解决方式,比如JSON序列化。我将对象转JSON实际是对象转字符串,字符串是重新new的。然后反序列化就是字符串转对象,这个对象自然也是重新new的,就规避了引用传递。总之,要拷贝的对象一定是要重新new的。
这块太多太杂了,我想有空再写,大到服务注册、热插拔,小到日志记录全是写在内部的消息队列,由线程池去处理。
我们的消息体很简单,每个消息体持有一批数据message和一个处理接口的实现messageHandler。每种消息会按照其独有的处理类去处理:
public class MessageBean {
private String messageName;
private Object message;
private MessageEnum messageType;
private MessageHandler messageHandler;
}
我们的消息缓冲也简单,线程池和消息缓冲是消费者和生产者的关系,我们只需将待办消息写入缓冲,由线程池去持有消息并使用其消息接口的实现类去处理消息,这就是核心功能,。缓冲代码如下:
/**
* @author:ZhangQi
* 本地服务的消息缓冲
**/
public class MessageBuffer {
private static Lock lock = new ReentrantLock();
private static Condition notFull = lock.newCondition();
private static Condition notEmpty = lock.newCondition();
//缓冲,先放1024个消息
public static MessageBean[] items = new MessageBean[1024];
public static int putptr/*写索引*/,
takeptr/*读索引*/,
count/*队列中存在的数据个数*/;
/**
* 向队列内写入消息
* @param x
* @throws InterruptedException
*/
public static void put(MessageBean x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) {
notFull.await();
}
items[putptr] = x;
if (++putptr == items.length) {
putptr = 0;
}
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
/**
* 从队列获取消息
* @return
* @throws InterruptedException
*/
public static MessageBean take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
MessageBean x = items[takeptr];
if (++takeptr == items.length) {
takeptr = 0;
}
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
1、过滤器由于设计的不好,已暂时弃用,但是源代码和思想得到了保留。
2、hoppinRPC的拦截器接口如下:
public interface RPCInterceptorInterface {
/**
* 在验证用户前执行自定义操作
* @param request 请求体
* @param serviceWrapper 服务包装类
*/
void beforeAuthentication(HoppinInvocationRequest request, ServiceWrapper serviceWrapper);
/**
* 在执行rpc调用的方法实现类前执行自定义操作
* @param request 请求体
* @param serviceWrapper 服务包装类
* @param rpcBean rpc调用细节
*/
void beforeMethod(HoppinInvocationRequest request,ServiceWrapper serviceWrapper,RPCBean rpcBean);
/**
* 在执行rpc调用的方法实现类后执行自定义操作
* @param response 响应体
* @param serviceWrapper 服务包装类
* @param rpcBean rpc调用细节
*/
void after(HoppinInvocationResponse response, ServiceWrapper serviceWrapper, RPCBean rpcBean);
/**
* 在执行rpc报错后(在处理完异常后)执行自定义操作
* @param response 响应体
* @param serviceWrapper 服务包装类
* @param rpcBean rpc调用细节
*/
void exception(HoppinInvocationResponse response, ServiceWrapper serviceWrapper, RPCBean rpcBean,Exception exception);
/**
* 在整个调用过程完毕后执行自定义操作
* @param request 请求体
* @param response 响应体
* @param serviceWrapper 服务包装类
* @param rpcBean rpc调用细节
* @param logBean 服务调用日志
*/
void end(HoppinInvocationRequest request, HoppinInvocationResponse response, ServiceWrapper serviceWrapper, RPCBean rpcBean, ServiceLogBean logBean);
}
你只需编写一个类实现该接口,然后在该类上使用注解@RPCInterceptor(10)即可,其中10表示执行顺序。执行顺序按从大到小排序。
1、使用注解@ExceptionCatch声明在方法或者类上,声明需要捕获的异常和异常处理接口的实现类。
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(ExceptionContainer.class)
public @interface ExceptionCatch {
int code() default 500;
Class<? extends Exception> value();
Class<? extends ExceptionHandler> handler() default DefaultExceptionHandler.class;
}
2、我们提供了一个专门用来捕获rpc异常的注解供参考。
/**
* @author:ZhangQi
* 服务调用方只对rpc抛出的异常进行默认处理
**/
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@ExceptionContainer({
@ExceptionCatch(value = RemotingException.class , handler = DefaultExceptionHandler.class),
@ExceptionCatch(value = AuthenticationFailedException.class, handler = DefaultExceptionHandler.class),
@ExceptionCatch(value = AuthorizationFailedException.class, handler = DefaultExceptionHandler.class)
})
public @interface ExceptionCatchRemote {
}
rpc的网络架构是基于bio(阻塞式io),如果你阅读过hoppinzq-client模块的源代码,会发现很多nio(同步非阻塞io)和aio(异步io)的内容,这表明了我的挣扎。 我很清楚bio永远不会是最佳选择,但是nio学习成本太高了,一个简单的通讯代码多到令人发指。我们将在之后的时间尝试使用netty来改善这一情况, 这是目前最大的工作量,我们需要重写rpc会话、负载均衡等功能,但是改变是很有必要的。
负载均衡模块按我的设计是无法实现的,感谢Eureka给我带来的灵感。首先,我这个负载均衡是设计在客户端,也就是在调用前,我会使用
负载均衡算法获得一个URI,而URI是怎么准确的获得的呢?按照我之前的设计思路,服务类是最基础的注册单位。这就导致了很多问题——你无法知道两个相同的类是否是由
不同地址或不同端口的同一个服务注册的,更别说一个接口类可以有多个实现类了。而SpringCloud里,最基础的注册单位是一个模块。每个模块都有一个name值。Ribbon认为,拥有同一个name值的服务都视为同一个模块,
可以对这些拥有相同name的模块进行负载均衡。于是我及时在配置文件里增补了服务名,也就是name这个字段,用以负载均衡。
第二个问题是,由于负载均衡是发生在客户端,客户端如何去判断该对哪些服务进行负载均衡呢?这里我借鉴了Eureka的思想,即每个注册到Eureka上的服务都会同步,并保留Eureka
上注册的所有服务信息。那么这个问题就很好解决了,我只要启动负载均衡功能,我就开辟一个定时线程池,每隔一段时间去同步注册中心的所有服务到本地,从而实现在本地就可以精准的
筛选出,本次调用能负载均衡的服务。
尽管这个问题不好描述,但我仍然为当时的小伎俩而沾沾自喜。
1、新建一个模块hoppinzq-test2,其中这个模块直接把hoppinzq-test1的全部代码和配置文件复制粘贴就行了,端口号改为8804,你只需保证配置文件的name跟hoppinzq-test1模块一致就行了。 然后Test1ServiceImpl类里的sayHello方法改一下,让其return "test2:"+msg;
2、我们就在hoppinzq-testrpc模块下测试,启动类上再添加@EnableHoppinBalance注解,开启负载均衡模块。注意负载均衡注解一定要加在客户端也就是服务调用方。修改后的代码如下:
@EnableHoppinBalance
@Hoppin
@SpringBootApplication
public class TestRPCApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(TestRPCApplication.class, args);
UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
//这里不用写注册中心的URL了,而是写服务名,注意
Test1Service test1Service = ServiceProxyFactory.createProxy(Test1Service.class, upp,"hoppinzq-test1");
System.err.println(test1Service.sayTest("123"));
System.err.println(test1Service.sayTest("123"));
System.err.println(test1Service.sayTest("123"));
Test1Service test1Service2 = ServiceProxyFactory.createProxy(Test1Service.class, upp,"hoppinzq-test1");
System.err.println(test1Service2.sayTest("123"));
System.err.println(test1Service2.sayTest("123"));
}
}
负载均衡模块同样开放3个接口,用于测试和输出给前端
熔断器是一种服务保护策略,我们知道调用其他模块提供的服务可能是不可靠的。
当多个模块之间调用的时候,若有一个模块因为某些原因可能响应的很慢或者报错,这就导致下游的服务都卡在那里。
这时候就要请出我们的熔断器了,可以及时对这些异常服务进行熔断或者降级,防止故障在整个系统中的传播,避免系统的崩溃。熔断器既可以作用于服务端,防止某些服务或者上游服务不可靠。也可以作用于客户端,直接对调用过程可能由于服务端挂掉等原因及时做出响应。
熔断器的设计和思路就比较清晰了,我们只需写一个aop,将服务方法环绕,自定义一些配置和熔断策略就行了,以下就是我的熔断器设计的流程图。
1、要使用熔断器,在hoppinzq-test1模块的启动类前加@EnableHoppinBreaker注解。
2、我们在hoppinzq-test1模块的TestService添加一个方法
public interface Test1Service {
String sayTest(String msg);
String getAdd(int i, int j);
}
相应的实现类的方法
@Override
@ServiceRegisterMethod(title = "方法测试2",description = "打印i+j")
@CircuitBreaker(threshold = 2,timeout = 2000,aliveTime = 3000,fallback = "getAddFallback")
public String getAdd(int i, int j){
//若j<0,报错除数不能为0;若j为0,手动抛出异常;若j>0,随眠jms模拟超时操作
if(j<0){
i=j/0;
}
if(j==0){
throw new RuntimeException("j不能是0");
}
try{
Thread.sleep(j);
}catch (Exception ex){}
return "i+j="+(i+j);
}
//上面方法报错或者超时,直接执行熔断方法
public String getAddFallback(){
return "服务正在忙碌";
}
大家可以看这个方法,首先熔断器注解@CircuitBreaker(threshold = 2,timeout = 2000,aliveTime = 3000,fallback = "getAddFallback")需要加在方法上 其中threshold表示熔断器启动阈值,也就是说报错两次就会启动熔断器;timeout表示超时时长,若该方法的响应超过2000ms,直接启动熔断器。aliveTime表示熔断器存活时间, 我认为服务的不稳定是偶发现象,可能过一会就正常了,正常就不应该走熔断器,就给熔断器配置了一个存活时间;fallback是熔断器执行的逻辑,可以传一个方法名,意思就是一旦熔断器启动,就不执行原来的方法而是执行fallback的方法。
3、hoppinzq-testrpc模块的Test1Service别忘了加String getAdd(int i, int j);方法哦。然后我们修改启动类里的代码:
@Hoppin
@SpringBootApplication
public class TestRPCApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(TestRPCApplication.class, args);
UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
Test1Service test1Service = ServiceProxyFactory.createProxy(Test1Service.class, "http://127.0.0.1:8802/service", upp);
System.err.println(test1Service.getAdd(1,7));
System.err.println(test1Service.getAdd(1,-1));
System.err.println(test1Service.getAdd(1,0));
}
}
首先第一个getAdd是一定能通过的,第二个则不会,因为j为-1的时候会报除数不能为0,第三个抛出自定义异常,但是由于第二个引起的报错导致了熔断器被启动,所以第三次直接进入熔断器。 就可以得到预期的结果:
4、熔断器模块同样开放3个接口,用于监控和输出给前端
1、支持链路跟踪的traceId和自定义请求头。
UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
Map headers=new HashMap();
headers.put("traceId","asd");
Test1Service test1Service = ServiceProxyFactory
.createProxy(Test1Service.class, "http://127.0.0.1:8802/service", upp,headers);
2、内部是有日志的,只不过是记录在内存中,你需要将其异步地记录到数据库中,你的任何一次调用都可以观察到打印。 日志内部接口:http://127.0.0.1:8801/service/showLogs
3、服务端在业务开发的时候,可以从ThreadLocal中取得的参数,包括:
获取调用方凭证(用户名、密码、SecretId、SecretKey)
日志,调用信息、传参、细节等信息
RPC调用细节
4、若不想暴露注册中心,设置zq-server.show-center=false
5、还有很多细节,你还可以继承或实现很多类来重写你的方法,我还预留了相当多方法,用于执行自定义操作。不过现在不一一展示了,但注释都写了。快来体验吧!