声明式微服务框架 SpringHoppin

#

zhangqi

2023/08/16

这是我的剑来框架的微服务版,不知道程序员们听没有听过“声明式”的概念。 实际上,SQL语句就是最典型的声明式编程,而我们常用的Feign也是一种声明式调用。 这很优雅,我们不需要关心程序的执行过程,只需要关注程序的输入和输出。 那么,服务可以声明吗?在回答这个问题之前,你首先要了解诸多概念,以便了解SpringHoppin是如何工作的。

SpringCloud是一个用于构建分布式系统的开发框架。 它提供了一系列的组件和工具,帮助开发者快速构建、部署和管理分布式应用。 SpringCloud包含了诸如服务注册与发现、配置中心、网关、负载均衡、断路器、链路追踪等功能,使得开发者能够更轻松地构建和管理复杂的微服务架构。 通过使用SpringCloud,开发者可以更好地实现服务之间的解耦、弹性扩展和容错处理,从而提高系统的稳定性和可靠性。

SpringHoppin是完全基于SpringCloud的思想,以SpringBoot作为脚手架,构建出了SpringCloud的雏形框架。 基于该框架开发了服务注册与发现(HoppinCore)、远程服务调用(HoppinRPC)、负载均衡(HoppinLoadBalance)、熔断器(HoppinCircuitBreaker)等功能。

模块功能
SpringCloud
SpringHoppin

服务注册与发现

Eureka,Zookeeper,Nacos

HoppinCore

声明式调用

Feign

HoppinHttpApi

远程服务调用

Dubbo

HoppinRPC

负载均衡

Ribbon

HoppinLoadBalance

容错保护

Hystrix

HoppinCircuitBreaker

网关

SpringCloudZuul,SpringCloudGateway

hoppinzq-gateway

配置中心

SpringCloudConfig,Nacos

未实现

消息总线与消息队列

SpringCloudBus,SpringCloudStream+kafka

未实现

链路跟踪

SpringCloudSleuth+Zipkin

已添加支持,本项目未体现

安全认证与授权

SpringSecurity

HoppinRPC,hoppinzq-gateway,zauth

目前该框架的各个功能还没拆出来,代码十分臃肿,完成度我给个61.6%,但是核心功能已经完备,也经过几个小项目的实战检验。 但是好在前期工作做的好,框架设计的足够健壮,使得该项目拓展性极高。

我等不及了,快端上来罢!

你可以直接在gitee上拉取该项目。它的结构如下:

/hoppinzqs-registration-center [项目目录]
 │
 ├─hoppinzq-common ------------------[公共模块]
 │
 ├─hoppinzq-service -------------------[服务端模块]
 │  └─... -------------------[引入该模块,那么你的SpringBoot项目就是SpringHoppin项目了,可以开启服务注册,熔断器等功能]
 │
 │─hoppinzq-client -------------------[客户端模块]
 │  └─... -------------------[引入该模块,可以开启远程服务调用,负载均衡等功能]
 │
 │─hoppinzq-gateway -------------------[网关模块]└─... -------------------[因为依赖Redis,主分支暂时移除]
                            

全部配置项一览

 

################ zqservice配置 #######################
zq-server:
  # 本服务的IP或者你本地的IP(通用配置项)
  ip: 127.0.0.1
  # 注册中心所在地址,注意,如果你没有启动本地注册中心,直接使用我的注册中心(客户端配置项)
  center-addr: http://150.158.28.40:8801/service
  # 服务注册用户的用户名。(通用配置项)
  username: zhangqi
  # 服务注册用户的密码。(通用配置项)
  password: 123456
  # 服务调用凭证,用以屏蔽用户名密码。(可选配置项)
  secret-id: zhangqi
  secret-key: 123456
  # 服务注册是否总是重试,该参数为true时,配置的重试次数将不起作用(客户端配置项,暂时弃用)
  always-retry: false
  # 服务注册重试次数,仅当服务注册alwaysRetry为false时生效(客户端配置项,暂时弃用)
  retry-count: 10
  # 服务注册重试间隔/ms(客户端配置项,暂时弃用)
  retry-time: 10000
  # 是否严格(注册中心配置项)
  is-strict: false

############ 以下配置项先不用管,而且还没有格式化 ##############
zqServerCenter:
  # 注册中心路径,同上
  addr: http://150.158.28.40:8801/service

zqClient:
  # 服务调用用户名,要调用其他服务需要验证授权
  userName: zhangqi
  # 服务调用密码
  password: 123456
  # 注册中心所在地址
  centerAddr: http://150.158.28.40:8801/service
  # 统一权限服务地址
  authAddr: http://150.158.28.40:8804/service

# redis缓存时间/s
zqRedis:
  csdnBlogTimeout: 60
zqApiStore:
  # ZQ网关统一上传文件路径
  fileUploadPath: E:\\baby_img
  isAuth: false
# 索引库位置
lucene:
  spiderIndex: D:\\index

                                

开始使用

  1. 1、去gitee上拉取该项目。

  2. 2、可以新建一个模块,若你的项目要转为SpringHoppin项目,则首先要在该模块的pom中引入依赖:

    
    <dependency>
        <groupId>com.hoppinzq</groupId>
        <artifactId>hoppinzq-service</artifactId>
        <version>1.0</version>
    </dependency>
    

  3. 3、在启动类上添加注解@Hoppin声明这个服务是SpringHoppin服务,只有这样,你才能使用框架提供的所有功能。 启动类的源码应该如下:

  4.  
    @Hoppin
    @SpringBootApplication
    public class TestCenterApplication {
    
        public static void main(String[] args) throws Exception {
            SpringApplication.run(TestCenterApplication.class, args);
        }
    }
  5. 4、配置文件也不用写了,直接用hoppinzq-testcore那个就行,然后你就可以启动我们的SpringHoppin 项目了,是不是很兴奋?在启动前你得要先了解SpringHoppin的服务是指的什么,什么能被注册到注册中心。

          在SpringHoppin中的服务是指的类,而SpringCloud中的服务是指的模块。在一个模块的启动类上添加注解@Hoppin就可以把这个模块 里所有被@ServiceRegister注解修饰的类抽象为服务。每个SpringHoppin模块都是一个极简的注册中心,能暴露自己能提供的服务。

          注册中心则是将SpringHoppin模块里所有暴露的服务整合起来统一对外开放,注册中心提供对服务的包装、服务的增删改查和服务的配置和管理。 并且与服务提供方建立心跳连接,以确保服务可用。

  6. 5、大家在访问服务注册的路径前,先添加一个服务类并添加@ServiceRegister注解,然后重启项目再访问本地的服务注册中心。服务类的源代码如下:

    TestService接口服务类:

     
    public interface TestService {
        String test(String msg);
    }

    TestService接口服务实现类:

     
    @ServiceRegister(title = "测试服务",description = "这是测试服务,仅仅是一个demo")
    public class TestServiceImpl implements TestService{
    
        @Override
        @ServiceRegisterMethod(title = "测试方法",description = "略略略",isShow = true)
        public String test(String msg) {
            return "test:"+msg;
        }
    }

    然后启动项目,打印信息如下:

    6、上面说了SpringHoppin的服务是指的服务类,而不是模块。若要指定一个类是服务范畴,则在该类上添加注解@ServiceRegister,把需要暴露的方法上, 添加@ServiceRegisterMethod注解,并配上适当的描述。若要配合SpringHoppin的远程服务调用,需要让服务类为某接口的实现类。 最后被@ServiceRegister注解修饰的服务类也会被加入SpringIOC容器中管理,你可以通过@Autowired注解在任何其他模块中依赖注入。

  7. 7、访问hoppinzq-testcore自带的注册中心路径http://127.0.0.1:8801/service,发现TestServiceImpl服务类注册成功, 在SpringHoppin下,万物皆可注册中心!

注册中心

什么是服务注册和发现?

      现在的微服务架构将各个模块拆分的很细,虽然大部分模块之间通过消息中间件进行解耦,但是还是存在相当的模块与模块之间的服务调用。那么开发人员怎么知道哪些模块有哪些功能? 怎么知道该去如何与模块交互?传参风格又是怎样的? 这就要求开发人员编写文档,以提供本模块的功能和接口细节。
      那么这部分工作能不能交给程序去提供?没错,这就是服务注册————程序可以通过反射解析接口, 把接口的相关信息、注解信息、类信息、模块配置信息等一并上传到一个地方,这个地方就叫做注册中心。
      开发人员在注册中心查找可用的服务,按照约定的调用规则就可以调用该服务了,这就是服务发现。

      用非程序员的话去解释:比如我要买房子。首先建筑公司A建造了一批新房子,我知道A有房子吗?No!除非A主动告诉我,那么这样做就很麻烦了。 A建造了一批房子,难道A要去每个人的耳边说:我建造了一批新房子,你买不买?很显然这不合理。
      正确的做法是,A建造了一批房子,把这个信息发给中介或者发布在某个公告栏里。这个过程就是服务注册,中介充当注册中心。我要买房子,不是直接跟A交互, 而且先去公告栏里看看或者问问中介,房子在哪?价格是多少?A的联系方式是什么?这就是服务发现。
      A与中介会时刻保持联系,以防A突然破产或者中介离职导致房源信息不可用,这就是服务治理。
      当然A在跟中介谈的时候明确表示不让失信人员购买,这时,我们在对接中介买A房子的时候,中介会先检查我们的身份证,这就是身份验证。然后拿着我们的身份证去 查征信是不是失信人员,这就是授权验证。这就是一个基本的服务注册和发现的流程。

  1. 1、在SpringHoppin中,每一个模块都是一个极简的注册中心,只不过该注册中心只能暴露自己的服务。要想声明一个注册中心很简单,在启动类上添加 @HoppinCore注解即可,现在我们把hoppinzq-testcore声明为注册中心,修改后的启动类如下:

     
    @HoppinCore
    @Hoppin
    @SpringBootApplication
    public class TestCenterApplication {
    
        public static void main(String[] args) throws Exception {
            SpringApplication.run(TestCenterApplication.class, args);
        }
    }
    

    启动项目,发现注册中心已经启动成功!

    那么注册中心与普通的SpringHoppin项目有什么区别吗?因为你刚才提到了 每一个模块都是一个极简的注册中心。它们的区别有很多,首先注册中心无法注册自己;其次,注册中心拥有对外部服务进行增删改查和管理的能力; 最后,若一个模块被声明为注册中心,那么就会额外开辟定时线程。其他模块在注册服务到注册中心的时候,会额外注册一个心跳服务, 注册中心通过定时线程来进行心跳检测,以及时对注册的服务进行监控。心跳服务会返回给注册中心注册方的一些信息,你可以通过实现HeartbeatService接口定制返回值。

    你可能回去马上访问那个地址了,你会发现怎么多了一个RegisterServiceImpl服务类?通过观察这个类的方法不难看出,这个类的所有方法都是在提供服务注册和管理 的能力,这个类是注册中心内置的注册服务,其他服务需要借助这个服务类注册并管理自己的服务。

  2. 2、外部服务注册:新建一个模块hoppinzq-test1,启动类上加只要加@Hoppin 即可。配置文件要改一下,端口号改成8802,并添加我们的注册中心的路径,如下:

     
    zq-server:
      name: hoppinzq-test1
      username: zhangqi
      password: 123456
      prefix: /service
      ip: 127.0.0.1
      center-addr: http://127.0.0.1:8801/service   # 注册中心路径
    
    test1模块新建一个服务接口
     
    public interface Test1Service {
        String sayTest(String msg);
    }
    
    和它的实现类
     
    @ServiceRegister(title = "测试1", description = "这是测试1的描述")
    public class Test1ServiceImpl implements Test1Service{
        @Override
        @ServiceRegisterMethod(title = "方法测试",description = "打印test1")
        public String sayTest(String msg) {
            return "test1:"+msg;
        }
    }
    

    3、先启动注册中心,然后在启动test1(若先启动test1,test1会连不上注册中心,会重试10次,每次60s) ,观察两个服务的打印,若注册成功,访问注册中心的路径http://127.0.0.1:8801/service,即可发现test1内的服务注册成功

    4、只关闭注册中心,发现test1模块一直在尝试重连,直到注册中心启动成功,test1的服务重新注册。

    5、只关闭test1模块,发现注册中心也在尝试重连,此时test1的服务为不可用状态。 当一段时间重连不上的时候,注册中心会直接移除test1模块注册的服务。
    核心代码如下:

     
        @PostConstruct
        public void init() {
            isCore=true;
            startTime=System.currentTimeMillis();
            String serviceAddress="http://"+ zqServerConfig.getIp() +":"+ zqServerConfig.getPort()+ zqServerConfig.getPrefix();
            logger.debug("注册中心启动成功,注册中心路径:"+serviceAddress);
            ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
            startHeartbeat(executorService);
            startMessage(executorService);
            startServiceListen();
        }
    
        private static void startHeartbeat(ScheduledExecutorService executorService){
            Map heartbeatHeader=ServiceStore.heartbeatHeader;
            heartbeatHeader.put(heartHeader,1);
            // 每10秒心跳一次
            executorService.scheduleAtFixedRate(() -> {
                if (ServiceStore.heartbeatService.size() > 0) {
                    for (ServiceWrapper heart : ServiceStore.heartbeatService) {
                        if (heart.isAvailable()) {
                            String serviceID=heart.getId();
                            try {
                                String serviceIP=heart.getServiceMessage().getServiceIP();
    //                            InetAddress inetAddress = InetAddress.getByName(serviceIP);
    //                            boolean isReachable = inetAddress.isReachable(2000);
    //                            if(!isReachable) {
    //                                throw new RemotingException(new RuntimeException("服务连接失败"));
    //                            }.
                                String serviceHeartAddress="http://"+ serviceIP +":"+heart.getServiceMessage().getServicePort()+heart.getServiceMessage().getServicePrefix();
                                logger.debug("开始心跳:服务路径"+serviceHeartAddress);
                                HeartbeatService service = ServiceProxyFactory.createProxy(HeartbeatService.class, serviceHeartAddress,heartbeatHeader);
                                service.areYouOk();
                            } catch (Exception ex) {
                                ex.printStackTrace();
                                //heart.setAvailable(Boolean.FALSE);
                                List<ServiceWrapperRPC> serviceWrapperRPCS= RPCServiceStore.serviceWrapperRPCList;
                                synchronized (serviceWrapperRPCS) {
                                    Iterator<ServiceWrapperRPC> iterator = serviceWrapperRPCS.iterator();
                                    while (iterator.hasNext()) {
                                        ServiceWrapperRPC outerService = iterator.next();
                                        if (serviceID.equals(outerService.getId())) {
                                            outerService.setAvailable(false);
                                        }
                                    }
                                }
                                logger.error("检测到id为:" + serviceID + "的服务已不可用,尝试重新连接");
                            }
                        }
                    }
                }
            }, 0, 10, TimeUnit.SECONDS);
    
            //每60s检查并移除不可用服务
            executorService.scheduleAtFixedRate(() -> {
                if (ServiceStore.heartbeatService.size() > 0) {
                    List<ServiceWrapperRPC> serviceWrapperRPCS=RPCServiceStore.serviceWrapperRPCList;
                    synchronized (serviceWrapperRPCS) {
                        String serviceID;
                        Iterator<ServiceWrapperRPC> iterator = serviceWrapperRPCS.iterator();
                        while (iterator.hasNext()) {
                            ServiceWrapperRPC outerService = iterator.next();
                            if (!outerService.getAvailable()) {
                                serviceID=outerService.getId();
                                iterator.remove();
                                List<ServiceWrapper> heartbeatService= ServiceStore.heartbeatService;
                                for(int i=0;i<heartbeatService.size();i++){
                                    ServiceWrapper heart=heartbeatService.get(i);
                                    if(serviceID.equals(heart.getId())){
                                        logger.error("检测到id为:" + serviceID + "的服务已不可用,将移除该服务!");
                                        logger.debug("服务移除成功!服务ID:"+serviceID);
                                        heartbeatService.remove(i);
                                        i--;
                                    }
                                }
                            }
                        }
                    }
                }
            }, 0, 60, TimeUnit.SECONDS);
        }
    
        private static void startMessage(ScheduledExecutorService executorService){
            //每10s消费一次消息队列里的消息
            executorService.scheduleAtFixedRate(() -> {
                if(MessageBuffer.count==0){
    
                }
                for (int i=0;i<MessageBuffer.count;i++) {
                    try {
                        MessageBean messageBean = MessageBuffer.take();
                        MessageHandler handler=messageBean.getMessageHandler();
                        handler.handle(messageBean);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, 0, 10, TimeUnit.SECONDS);
        }
    
        private static void startServiceListen(){
            EventServiceListener eventListener = new EventServiceListener() {
                @Override
                public void onServiceChange(List<ServiceWrapperRPC> list) {
                    System.err.println("服务列表改变: " + list);
                }
            };
            // 创建守护线程来监听服务的变化
            ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 5000, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>());
            // 允许核心线程超时
            executor.allowCoreThreadTimeOut(true);
            Thread thread=new Thread(new ServiceChangeListener(eventListener), "监听服务列表守护线程-" + System.currentTimeMillis());
            thread.setDaemon(true);
            executor.execute(thread);
        }

    6、注册中心暴露了一个接口,用来给前端展示 http://127.0.0.1:8801/service/serviceList

远程服务调用

不知道大家服务与服务之间调用是如何实现的呢?我们日常开发一般是前后端通过Restful API接口交互, 随着微服务模块的细分,前端需要的数据可能无法由一个模块提供了,比如:我要查一个商品的信息,假设这个接口转发到商品模块, 商品模块需要再去调用库存模块、产地模块、价格模块、推荐模块、用户行为模块等,然后把这些数据拼起来返回给前端。整个过程虽然对前端是透明的,但是后台各个模块之间 的调用还是很麻烦的。我们且不谈这里面可能出现的问题,我们先要解决的问题是:后台各个模块之间如何调用?

答案很简单,网络传输呗。你也可以模拟Http请求,反正你服务能接收我的参数并处理,然后返回给我就行了。
SpringCloud给出了它的答案:声明式调用Feign+注册中心Eureka。
阿里的答案是Dubbo+注册中心Zookeeper。
我的答案是HoppinRPC+注册中心HoppinCore。
Feign里面最简单,最好集成的,因为人家是SpringCloud自家人,是权威。但正因为Feign是SpringCloud的组件,想用得先会SpringCloud,学习成本比较高。 而且会导致各个模块之间还要去统一SpringCloud的版本和依赖。Feign的拓展性不高,由于各个服务之间的差异巨大,尤其在用户认证、 用户鉴权上,Feign无法对这些服务的调用做有效的统一,导致这部分逻辑得分到各个模块上去。且Feign的性能较差,有一道经典的面试题就是说的“为什么Feign第一次调用很慢?”。
Dubbo是最难的,也是最强大的,毕竟人家是阿里的产品,而且不光能远程调用Java服务,甚至支持跨语言服务调用,我就很欣赏这个特性。缺点就是难,当然用起来不难,就是架构、学习、配置这些难。
HoppinRPC是最不可靠的(笑),它只是天马行空的产物,当然它可以完成预期的工作就是了。而且它的拓展性极高,支持自定义用户认证和用户授权,调用过程优雅,不失为一个学习对象。

  1. 1、如何使用HoppinRPC呢?首先启动类上添加@Hoppin注解,你可能感受到一丝诡谲。你在想,我本地的服务 是如何被注册到注册中心hoppinzq-testcore上的呢?没错,就是通过HoppinRPC,另外服务心跳也是这么调用的。服务注册到注册中心的源代码在 com.hoppinzq.service.servlet.SpringProxyServlet中的registerServiceIntoCore()方法中实现的。

    public void registerServiceIntoCore(){
        ZqServerConfig zqServerConfig =this.getPropertyBean();
        TaskStore.taskQueue.push(new RetryRegisterService(zqServerConfig.getRetryCount(), zqServerConfig.getRetryTime(), zqServerConfig.getAlwaysRetry()) {
            @Override
            protected Object toDo() throws RemotingException {
                UserPrincipal upp = new UserPrincipal(zqServerConfig.getUserName(), zqServerConfig.getPassword());
                List<ServiceWrapper> serviceWrappers=modWrapper();
                List<ServiceWrapperRPC> serviceWrapperRPCS=new ArrayList<>();
                RegisterService registerService = ServiceProxyFactory.createProxy(RegisterService.class, zqServerConfig.getServerCenter(), upp);
                System.err.println("****************************");
                System.err.println(registerService.sayHello(zqServerConfig.getUserName())+",连接注册中心成功!");
                for(ServiceWrapper serviceWrapper:serviceWrappers){
                    ServiceWrapperRPC serviceWrapperRPC=serviceWrapper.toRPCBean();
                    serviceWrapperRPCS.add(serviceWrapperRPC);
                }
                registerService.insertServices(serviceWrapperRPCS);
                System.err.println("****************************");
                logger.info("向注册中心注册服务成功!");
                return true;
            }
        });
    }
    

  2. 2、新建一个模块叫hoppinzq-testrpc,同样的启动类上添加@Hoppin注解, 配置文件复制hoppinzq-test1的就行,把端口号改成8803,服务名zq-server.name 改成hoppinzq-testrpc,其他的不变。

    3、把hoppinzq-test1模块的Test1Service接口文件复制粘贴到hoppinzq-testrpc模块下,启动类上代码如下。

     
    @Hoppin
    @SpringBootApplication
    public class TestRPCApplication {
    
        public static void main(String[] args) throws Exception {
            SpringApplication.run(TestRPCApplication.class, args);
            //用户认证于鉴权,你先写死这两个就行
            UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
            //传入接口类对象,服务所在地址,test1Service服务所在的地址通过注册中心可以看到是http://127.0.0.1:8802/service
            Test1Service test1Service = ServiceProxyFactory.createProxy(Test1Service.class, "http://127.0.0.1:8802/service", upp);
            /调用
            System.err.println(test1Service.sayTest("123"));
            //I did it,Yes!
        }
    }

    4、整个调用过程的流程是这样的:

    1. 1、首先我会通过代理工厂,创建服务接口的代理对象。这个对象一定要缓存,因为后面用到了 反射。由于反射比较耗时间,而且一个服务接口会调用无数次,因此缓存是十分有必要的。可以看出,我的第一次调用也是很耗时,就跟Feign一样。

    2. 2、由于接口的实现类在远程服务提供方,而不是在本地。如果真要调用一个本地未实现的接口类的方法,那肯定要报错,这就是为什么要使用 代理,我通过代理把这个调用给拦截了。类比aop,我把方法拦截了,我还不执行joinPoint.proceed(),那你这个方法永远不会执行。 不执行就不报错,我还能拿到你方法对象和传参。

    3. 3、我们很骚的拿到了要调用的方法对象和传参,而且不让程序执行那个肯定要报错的方法,因为那个方法的实现类在远程服务器。由于HoppinRPC能拿到注册到注册中心 的服务名和服务提供者的URI。我就把自己的用户信息,调用的类、方法、传参等包装起来,然后序列化这个包装类(类内的对象必须实现序列化接口)。 通过URI和网络传输,我就把序列化的包装对象传输到远程服务器,也就是服务提供者那里。

    4. 4、服务提供者拿到序列化的内容,经过反序列化就可以得到我包装的参数,通过用户信息进行用户认证和权限认证。 认证完成后,通过类、方法、参数列表进行反射调用自己的实现类。

    5. 5、最后把实现类的响应或者捕获的异常再包装序列化,最后通过网络传输传到调用方(客户端),调用方反序列化即可拿到响应,返回即可。

用户认证与权限认证

首先要理解用户认证与权限认证是两个不同的功能:用户认证一般是验证用户输入的用户名密码是否有效,验证你是这个账号的所有者; 而权限认证则是验证已认证的用户的权限,在操作某些功能的时候,往往需要验证用户有调用它的权限,一般是vip会员权限、超级管理员权限这些。

在SpringHoppin中拥有两套鉴权逻辑:一是应用于前后端交互的权限校验,这层是在网关进行校验,主要用来校验操作是否需要用户有未登录权限、登录权限或者vip权限。 这跟后台管理系统稍稍有些区别,目前我是基于cookie与统一权限认证平台来实现的。统一认证平台的目的是我有很多网站,我只期望用户一次注册,就能全站通用,并通过 签发一次性ucode解决跨域的cookie写入问题。总有人跟我说cookie过时了,靓仔无语,因为cookie本质上就是一个key为"cookie"的请求头。 而且cookie机制异常丰富,比请求头灵活,且自带超时和跨域机制,前后端都有现成的类库去操作cookie。总之,你只要能把签发的token或者jwt传给后台就行了。 传给后台之后,网关通过方法注解与redis或者jwt里的用户信息里的权限字段一比对,从而可以判断用户是否有权限调用。如评论接口需要登录权限,如果未登录用户调用,网关会在 自定义响应头内写入重定向信息,由zjax(zq定制ajax,适配hoppinzq-gateway)解析并携带当前url重定向到统一权限认证平台。
二是应用于RPC调用的权限认证,我借鉴了SpringSecurity的思想,所以下面是很有建设性的思路:

  1. 1、这个是在@ServiceRegister注解上配置的,注解源代码如下:

     
    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Service
    public @interface ServiceRegister {
        @AliasFor(
            annotation = Service.class
        )
        String value() default "";
        String title() default "";
        String description() default "";
        int timeout() default 5000;
        //全局身份验证方式
        Class<? extends AuthenticationProvider> authentication() default SimpleUserCheckAuthenticator.class;
        //全局权限验证方式
        Class<? extends AuthorizationProvider> authorization() default AuthenticationNotCheckAuthorizer.class;
        //全局服务跟踪参数方式
        Class<? extends ModificationManager> modification() default NotModificationManager.class;
        //服务调用所使用的序列化方式
        Class<? extends CustomSerializer> serializer() default HessionSerializer.class;
        RegisterType registerType() default RegisterType.AUTO;
    
        enum RegisterType {
            /**
             * 项目启动自动注册
             */
            AUTO,
            /**
             * 不会随着项目启动注册,声明服务需要手动注册。
             * 服务仍然存储到一个集合内,但是没有注册,你可以在合适的时机将其手动注册(通过注册中心的RegisterServer内方法)
             */
            NOT_AUTO
        }
    }
    可以看到标记的两个方法参数,分别是身份验证authentication,权限验证authorization。身份验证的注解的默认 值是SimpleUserCheckAuthenticator,这个是内置的,会验证用户是否是服务提供者。权限验证是AuthenticationNotCheckAuthorizer,这个也是内置的, 默认通过用户验证就会有权限。
    首先注解是在方法层面上的,所以每个方法都可以通过注解配置它独特的身份验证和权限验证方式。

    然后你就可以自定义你自己的鉴权逻辑了,你只想要实现AuthenticationProviderAuthorizationProvider接口,并重写里面的方法即可。用户认证和权限认证交由业务的实现 类去实现,并抛出我给的两个异常AuthenticationFailedException和AuthorizationFailedException表示校验失败,然后后面我去处理。
    我来教大家实现一个自定义的用户校验。
    首先在hoppinzq-test1模块里,新建一个类CheckIsHoppinzq,使其实现序列化接口和AuthenticationProvider接口

     
    public class CheckIsHoppinzq implements AuthenticationProvider, Serializable {
    
        @Override
        public void authenticate(InvocationRequest hoppinInvocationRequest) throws AuthenticationFailedException {
            if (hoppinInvocationRequest.getCredentials() != null && hoppinInvocationRequest.getCredentials() instanceof UserPrincipal) {
                //获取调用方凭证
                UserPrincipal upp = (UserPrincipal) hoppinInvocationRequest.getCredentials();
                if ("hoppinzq".equals(upp.getSecretId()))
                    AuthenticationContext.setPrincipal(upp);
                else
                    throw new AuthenticationFailedException("身份验证失败");
            } else
                throw new AuthenticationFailedException("缺少凭据");
        }
    }
    可以看出这个自定义方法是校验服务调用方的SecretId是不是hoppinzq,是的话用户认证通过,并放到ThreadLocal里,可能其他地方要用。不是hoppinzq的话,抛给客户端异常。

    还记得Test1ServiceImpl吗?把里面的@ServiceRegister注解加点东西:

     
    @ServiceRegister(title = "测试1", description = "这是测试1的描述",authentication = CheckIsHoppinzq.class)
    public class Test1ServiceImpl implements Test1Service{
        @Override
        @ServiceRegisterMethod(title = "方法测试",description = "打印test1")
        public String sayTest(String msg) {
            return "test1:"+msg;
        }
    }

    我们重启这个test1模块,然后启动testrpc模块,发现报错了。

    道理很简单,服务提供者改变了它们的用户认证逻辑,我们只要在配置文件里配置这个secret-id,或者在启动类追加upp.setSecretId("hoppinzq");就可以继续调用了。懂?

序列化与反序列化

在计算机网络七层架构中,直接为用户提供服务的是处于最上层的应用层。而应用层协议是调用双方约定好的标准,是数据解析的规则,比如常用的超文本传输协议http协议和应用于文件传输的ftp协议。 rpc的通信协议可以基于http,但它是一个比较独特的远程调用方式,需要对数据传输格式进行严格的设计。 我的这个rpc框架的规则是对自定义请求头HoppinInvocationRequest 和自定义响应头HoppinInvocationResponse的网络传输。

具体是服务调用方将一些参数封装在本框架的自定义请求头HoppinInvocationRequest,然后通过网络传输传到服务提供方。 服务提供方需要拿到数据后,去调用本地服务。然后将响应、响应头、状态码等参数封装在本框架的自定义响应头HoppinInvocationResponse并通过网络传输传到服务调用方。服务调用方 将解析这些数据并返回数据或异常。

那么现在的问题是两个自定义的类是如何通过网络传输呢?我们知道类是比较抽象的信息载体,如果我们可以用语言把它描述出来或者写到纸上,别人通过你的描述还能在脑海里生成这个类,这就是序列化与反序列化。 我们知道网络传输本质是原始比特流的传输,也就是0和1。如果我们能把实体类转为0和1,那就可以通过网络传输了,这个转换过程就是序列化。客户端拿到这些二进制,能够把这个类给还原出来,这个过程就是反序列化。

为了提供高质量的数据服务,和实现与上下游各系统进行良好的对接,类与二进制之间的转换尤为重要,表现在转换的效率和转换数据的大小。 由于转换过程是可以高度自定义的,这个自定义过程或者规则就是序列化协议。
现在的序列化协议很多,有Java自带的Serializer,有便于理解的JSON,也有高性能跨语言的Hession2.0等。本项目采用了Hesson2.0作为默认的序列化框架,当然你也可以自己拓展。

关于为什么要选择Hession2.0

首先dubbo的序列化框架就是用的Hession2.0,阿里内部在选择序列化框架的时候势必经过了深思熟虑和辩论,肯定是最优解,无脑用就行了。
Hession2.0的性能高,而且跨语言。Java自带的Serializer只能用于Java之间的服务调用,而Hession是序列化框架,势必可以为其他语言添加支持。 这就很完美了,我服务调用双方不需要规定用什么语言,只需要用你语言的Hession框架就可以解析。假设我写了一个c++程序,集成了ffmpeg和opencv并提供了一些基于Hession协议的rpc接口, 我直接就能通过Java和Hession调用这些服务接口,而无需通过JNI或者控制台去调用。这很酷,跟HTTP调用的思想一模一样。
实际上JSON或XML也很不错,因为易于理解和调试,当你反序列化报错的时候,你完全可以直接发现是什么问题。而不是盯着一堆看不懂的Stream发呆。

在本框架中,服务提供者和调用方无需关注序列化框架,只需关注实现业务功能即可,但有的程序员可能想拓展或者使用其他的序列化框架,请往下看。

  1. 本框架的序列化代码在com.hoppinzq.service.serializer包中,提供了三种序列化方式: HessionSerializerJdkSerializerJsonCustomSerializer。其中JsonCustomSerializer没有任何内容, 因为我不确定你喜欢使用Gson、Jackson还是FastJson。当然,FastJson yyds。

    1、首先你的序列化类必须实现CustomSerializer接口。并重写下面两个方法:

    void serialize(Object obj, OutputStream outputStream) throws IOException;

    序列化方法,将指定对象obj的序列化二进制流写入outputStream

    <T> T deserialize(InputStream inputStream, Class<T> clazz) throws IOException, ClassNotFoundException;

    反序列化方法,将inputStream内的二进制流强制转换为指定的类

    byte[] serialize(Object obj) throws IOException;

    (已弃用,不方便本框架调用)序列化方法,返回指定对象的原始比特流

    <T> T deserialize(byte[] bytes, Class<T> clazz) throws IOException, ClassNotFoundException;

    (已弃用,不方便本框架调用)反序列化方法,将原始比特性强制转换为指定类

  2. 2、服务类上在@ServiceRegister注解内声明你自己的序列化处理类即可。

    @ServiceRegister(title = "测试服务", description = "用来测试RPC的服务",serializer = HessionSerializer.class)
    public class TestServiceImpl implements TestService{}

    建议使用我的默认的序列化处理类HessionSerializer,你可以修改HessionSerializer的代码,以实现序列化行为或者自定义操作。比如你想在序列化前do something。 如果不是特别要求,不是很建议自己去实现CustomSerializer接口并实现自己的序列化方式,因为序列化方式是包装在服务包装类ServiceWrapper,一旦这个类解析出问题导致没有正确解析到你自定义的序列化方式,那么默认会使用 序列化处理类HessionSerializer对异常进行包装并响应给客户端。但是客户端可能工作正常,会按照预期自定义的序列化方式工作,由于双方的序列化协议不一致,就会导致解析失败。 由于我无法预知服务端/客户端的所有问题,先尽量保证使用序列化处理类HessionSerializer。

    拓展:使用序列化和反序列化解决对象的浅拷贝问题

    有的时候,你想复制一个对象,我们知道用Java简单的赋值实际上是引用传递,只是把原来对象的地址赋给了你。你操作这个对象都会改变原来的对象。
    实际上Java提供了Cloneable接口用于拷贝对象,也就是Object的clone()方法,然而需要你重写代码,这里就可以用序列化和反序列化了。
    序列化和反序列化就是一个简单的解决方式,比如JSON序列化。我将对象转JSON实际是对象转字符串,字符串是重新new的。然后反序列化就是字符串转对象,这个对象自然也是重新new的,就规避了引用传递。总之,要拷贝的对象一定是要重新new的。

消息队列

这块太多太杂了,我想有空再写,大到服务注册、热插拔,小到日志记录全是写在内部的消息队列,由线程池去处理。
我们的消息体很简单,每个消息体持有一批数据message和一个处理接口的实现messageHandler。每种消息会按照其独有的处理类去处理:

public class MessageBean {
    private String messageName;
    private Object message;
    private MessageEnum messageType;
    private MessageHandler messageHandler;
}
我们的消息缓冲也简单,线程池和消息缓冲是消费者和生产者的关系,我们只需将待办消息写入缓冲,由线程池去持有消息并使用其消息接口的实现类去处理消息,这就是核心功能,。缓冲代码如下:
 
/**
 * @author:ZhangQi
 * 本地服务的消息缓冲
 **/
public class MessageBuffer {
    private static Lock lock = new ReentrantLock();
    private static Condition notFull = lock.newCondition();
    private static Condition notEmpty = lock.newCondition();
    //缓冲,先放1024个消息
    public static MessageBean[] items = new MessageBean[1024];
    public static int putptr/*写索引*/,
            takeptr/*读索引*/,
            count/*队列中存在的数据个数*/;

    /**
     * 向队列内写入消息
     * @param x
     * @throws InterruptedException
     */
    public static void put(MessageBean x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length) {
                notFull.await();
            }
            items[putptr] = x;
            if (++putptr == items.length) {
                putptr = 0;
            }
            ++count;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 从队列获取消息
     * @return
     * @throws InterruptedException
     */
    public static MessageBean take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                notEmpty.await();
            }
            MessageBean x = items[takeptr];
            if (++takeptr == items.length) {
                takeptr = 0;
            }
            --count;
            notFull.signal();
            return x;
        } finally {
            lock.unlock();
        }
    }
}

拦截器和过滤器

1、过滤器由于设计的不好,已暂时弃用,但是源代码和思想得到了保留。

2、hoppinRPC的拦截器接口如下:

 
public interface RPCInterceptorInterface {
    /**
     * 在验证用户前执行自定义操作
     * @param request 请求体
     * @param serviceWrapper 服务包装类
     */
    void beforeAuthentication(HoppinInvocationRequest request, ServiceWrapper serviceWrapper);

    /**
     * 在执行rpc调用的方法实现类前执行自定义操作
     * @param request 请求体
     * @param serviceWrapper 服务包装类
     * @param rpcBean rpc调用细节
     */
    void beforeMethod(HoppinInvocationRequest request,ServiceWrapper serviceWrapper,RPCBean rpcBean);

    /**
     * 在执行rpc调用的方法实现类后执行自定义操作
     * @param response 响应体
     * @param serviceWrapper 服务包装类
     * @param rpcBean rpc调用细节
     */
    void after(HoppinInvocationResponse response, ServiceWrapper serviceWrapper, RPCBean rpcBean);

    /**
     * 在执行rpc报错后(在处理完异常后)执行自定义操作
     * @param response 响应体
     * @param serviceWrapper 服务包装类
     * @param rpcBean rpc调用细节
     */
    void exception(HoppinInvocationResponse response, ServiceWrapper serviceWrapper, RPCBean rpcBean,Exception exception);

    /**
     * 在整个调用过程完毕后执行自定义操作
     * @param request 请求体
     * @param response 响应体
     * @param serviceWrapper 服务包装类
     * @param rpcBean rpc调用细节
     * @param logBean 服务调用日志
     */
    void end(HoppinInvocationRequest request, HoppinInvocationResponse response, ServiceWrapper serviceWrapper, RPCBean rpcBean, ServiceLogBean logBean);
}

你只需编写一个类实现该接口,然后在该类上使用注解@RPCInterceptor(10)即可,其中10表示执行顺序。执行顺序按从大到小排序。

异常处理

1、使用注解@ExceptionCatch声明在方法或者类上,声明需要捕获的异常和异常处理接口的实现类。

 
@Target({ElementType.METHOD,ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(ExceptionContainer.class)
public @interface ExceptionCatch {
    int code() default 500;
    Class<? extends Exception> value();
    Class<? extends ExceptionHandler> handler() default DefaultExceptionHandler.class;
}

2、我们提供了一个专门用来捕获rpc异常的注解供参考。

 
/**
 * @author:ZhangQi
 * 服务调用方只对rpc抛出的异常进行默认处理
 **/
@Target({ElementType.TYPE,ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@ExceptionContainer({
    @ExceptionCatch(value = RemotingException.class , handler = DefaultExceptionHandler.class),
    @ExceptionCatch(value = AuthenticationFailedException.class, handler = DefaultExceptionHandler.class),
    @ExceptionCatch(value = AuthorizationFailedException.class, handler = DefaultExceptionHandler.class)
})
public @interface ExceptionCatchRemote {

}

网络架构

rpc的网络架构是基于bio(阻塞式io),如果你阅读过hoppinzq-client模块的源代码,会发现很多nio(同步非阻塞io)和aio(异步io)的内容,这表明了我的挣扎。 我很清楚bio永远不会是最佳选择,但是nio学习成本太高了,一个简单的通讯代码多到令人发指。我们将在之后的时间尝试使用netty来改善这一情况, 这是目前最大的工作量,我们需要重写rpc会话、负载均衡等功能,但是改变是很有必要的。

负载均衡

负载均衡模块按我的设计是无法实现的,感谢Eureka给我带来的灵感。首先,我这个负载均衡是设计在客户端,也就是在调用前,我会使用 负载均衡算法获得一个URI,而URI是怎么准确的获得的呢?按照我之前的设计思路,服务类是最基础的注册单位。这就导致了很多问题——你无法知道两个相同的类是否是由 不同地址或不同端口的同一个服务注册的,更别说一个接口类可以有多个实现类了。而SpringCloud里,最基础的注册单位是一个模块。每个模块都有一个name值。Ribbon认为,拥有同一个name值的服务都视为同一个模块, 可以对这些拥有相同name的模块进行负载均衡。于是我及时在配置文件里增补了服务名,也就是name这个字段,用以负载均衡。
第二个问题是,由于负载均衡是发生在客户端,客户端如何去判断该对哪些服务进行负载均衡呢?这里我借鉴了Eureka的思想,即每个注册到Eureka上的服务都会同步,并保留Eureka 上注册的所有服务信息。那么这个问题就很好解决了,我只要启动负载均衡功能,我就开辟一个定时线程池,每隔一段时间去同步注册中心的所有服务到本地,从而实现在本地就可以精准的 筛选出,本次调用能负载均衡的服务。
尽管这个问题不好描述,但我仍然为当时的小伎俩而沾沾自喜。

1、新建一个模块hoppinzq-test2,其中这个模块直接把hoppinzq-test1的全部代码和配置文件复制粘贴就行了,端口号改为8804,你只需保证配置文件的name跟hoppinzq-test1模块一致就行了。 然后Test1ServiceImpl类里的sayHello方法改一下,让其return "test2:"+msg;

2、我们就在hoppinzq-testrpc模块下测试,启动类上再添加@EnableHoppinBalance注解,开启负载均衡模块。注意负载均衡注解一定要加在客户端也就是服务调用方。修改后的代码如下:

 
@EnableHoppinBalance
@Hoppin
@SpringBootApplication
public class TestRPCApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(TestRPCApplication.class, args);
        UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
        //这里不用写注册中心的URL了,而是写服务名,注意
        Test1Service test1Service = ServiceProxyFactory.createProxy(Test1Service.class, upp,"hoppinzq-test1");
        System.err.println(test1Service.sayTest("123"));
        System.err.println(test1Service.sayTest("123"));
        System.err.println(test1Service.sayTest("123"));
        Test1Service test1Service2 = ServiceProxyFactory.createProxy(Test1Service.class, upp,"hoppinzq-test1");
        System.err.println(test1Service2.sayTest("123"));
        System.err.println(test1Service2.sayTest("123"));
    }
}

负载均衡模块同样开放3个接口,用于测试和输出给前端

熔断器

熔断器是一种服务保护策略,我们知道调用其他模块提供的服务可能是不可靠的。 当多个模块之间调用的时候,若有一个模块因为某些原因可能响应的很慢或者报错,这就导致下游的服务都卡在那里。 这时候就要请出我们的熔断器了,可以及时对这些异常服务进行熔断或者降级,防止故障在整个系统中的传播,避免系统的崩溃。熔断器既可以作用于服务端,防止某些服务或者上游服务不可靠。也可以作用于客户端,直接对调用过程可能由于服务端挂掉等原因及时做出响应。
熔断器的设计和思路就比较清晰了,我们只需写一个aop,将服务方法环绕,自定义一些配置和熔断策略就行了,以下就是我的熔断器设计的流程图。

1、要使用熔断器,在hoppinzq-test1模块的启动类前加@EnableHoppinBreaker注解。

2、我们在hoppinzq-test1模块的TestService添加一个方法

 
public interface Test1Service {
    String sayTest(String msg);
    String getAdd(int i, int j);
}
相应的实现类的方法
 
@Override
    @ServiceRegisterMethod(title = "方法测试2",description = "打印i+j")
    @CircuitBreaker(threshold = 2,timeout = 2000,aliveTime = 3000,fallback = "getAddFallback")
    public String getAdd(int i, int j){
        //若j<0,报错除数不能为0;若j为0,手动抛出异常;若j>0,随眠jms模拟超时操作
        if(j<0){
            i=j/0;
        }
        if(j==0){
            throw new RuntimeException("j不能是0");
        }
        try{
            Thread.sleep(j);
        }catch (Exception ex){}
        return "i+j="+(i+j);
    }
    //上面方法报错或者超时,直接执行熔断方法
    public String getAddFallback(){
        return "服务正在忙碌";
    }

大家可以看这个方法,首先熔断器注解@CircuitBreaker(threshold = 2,timeout = 2000,aliveTime = 3000,fallback = "getAddFallback")需要加在方法上 其中threshold表示熔断器启动阈值,也就是说报错两次就会启动熔断器;timeout表示超时时长,若该方法的响应超过2000ms,直接启动熔断器。aliveTime表示熔断器存活时间, 我认为服务的不稳定是偶发现象,可能过一会就正常了,正常就不应该走熔断器,就给熔断器配置了一个存活时间;fallback是熔断器执行的逻辑,可以传一个方法名,意思就是一旦熔断器启动,就不执行原来的方法而是执行fallback的方法。

3、hoppinzq-testrpc模块的Test1Service别忘了加String getAdd(int i, int j);方法哦。然后我们修改启动类里的代码:

 
@Hoppin
@SpringBootApplication
public class TestRPCApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(TestRPCApplication.class, args);
        UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
        Test1Service test1Service = ServiceProxyFactory.createProxy(Test1Service.class, "http://127.0.0.1:8802/service", upp);
        System.err.println(test1Service.getAdd(1,7));
        System.err.println(test1Service.getAdd(1,-1));
        System.err.println(test1Service.getAdd(1,0));
    }
}

首先第一个getAdd是一定能通过的,第二个则不会,因为j为-1的时候会报除数不能为0,第三个抛出自定义异常,但是由于第二个引起的报错导致了熔断器被启动,所以第三次直接进入熔断器。 就可以得到预期的结果:

4、熔断器模块同样开放3个接口,用于监控和输出给前端

其他

1、支持链路跟踪的traceId和自定义请求头。

 
UserPrincipal upp = new UserPrincipal("zhangqi", "123456");
        Map headers=new HashMap();
        headers.put("traceId","asd");
        Test1Service test1Service = ServiceProxyFactory
                .createProxy(Test1Service.class, "http://127.0.0.1:8802/service", upp,headers);

2、内部是有日志的,只不过是记录在内存中,你需要将其异步地记录到数据库中,你的任何一次调用都可以观察到打印。 日志内部接口:http://127.0.0.1:8801/service/showLogs

3、服务端在业务开发的时候,可以从ThreadLocal中取得的参数,包括:

(UserPrincipal) AuthenticationContext.getPrincipal()

获取调用方凭证(用户名、密码、SecretId、SecretKey)

(ServiceLogBean) LogContext.getPrincipal()

日志,调用信息、传参、细节等信息

(RPCBean) RPCContext.getPrincipal();

RPC调用细节

使用这些变量后你无需关注内存泄漏等问题,我将在合适的时机将其清理。其次,服务调用者无法获取上面的这些变量,这些信息对于服务的调用方应该是透明的。 但对于服务注册方或服务开发者而言,把这些数据暴露给他们是很有必要的,他们可以感知是谁调用了他们的服务,并输出日志来关注服务质量和调试等。

4、若不想暴露注册中心,设置zq-server.show-center=false

5、还有很多细节,你还可以继承或实现很多类来重写你的方法,我还预留了相当多方法,用于执行自定义操作。不过现在不一一展示了,但注释都写了。快来体验吧!