-->
侧边栏壁纸
博主头像
断钩鱼 博主等级

行动起来,活在当下

  • 累计撰写 28 篇文章
  • 累计创建 34 个标签
  • 累计收到 0 条评论

目 录CONTENT

文章目录

Dubbo进阶使用

halt
2021-11-07 / 0 评论 / 0 点赞 / 1389 阅读 / 0 字

Dubbo进阶

高级用法官方文档总览:http://dubbo.apache.org/zh/docs/advanced/

涉及代码:https://e.coding.net/duangouyu/demo/dubbo_demo.git

SPI机制

Java中的spi机制

SPI(Service Provider Interface),是JDK内置的一种服务提供发现机制,可以用来启用框架扩展和替换组件,主要是被框架的开发人员使用,比如java.sql.Driver接口,其他不同厂商可以针对同一接口做出不同的实现,MySQL和PostgreSQL都有不同的实现提供给用户,而Java的SPI机制可以为某个接口寻找服务实现。Java中SPI机制主要思想是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要,其核心思想就是解耦

SPI与API区别:

  • API是调用并用于实现目标的类、接口、方法等的描述;
  • SPI是扩展和实现以实现目标的类、接口、方法等的描述;

换句话说,API 为操作提供特定的类、方法,SPI 通过操作来符合特定的类、方法。

使用

  1. 由上层服务定义接口或标准 interface
  2. 开发者实现接口,并在 resources/META-INF/services/${interface_name}中添加具体实现的 全限定名
  3. 使用 java.util.ServiceLoader#load(java.lang.Class<S>) 方法来加载

demo:

接口和实现应属于不同的模块

例如:我写了一个工具包,有一个功能产生的数据默认放在了内存,比如集群环境需要共享这份数据或者其他需求,需要放入redis中,那么你就可以使用我定义的接口,来实现一个将产生的数据存入redis的实现,并将实现类的全限定名放入 resources/META-INF/services/${interface_name} 文件里,我就可以通过spi机制发现并使用,但是如果spi没有发现就使用默认的实现

在代码实现中为了方便我没有分包

  1. 创建接口和实现类

    子类会继承父类的方法,所以接口中给默认实现即可

    package top.mengshuo.service.jdk;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public interface FileService {
    
        /**
         * 输出
         */
        default void printClassName(){
            System.out.println("全限定名: "+this.getClass().getName());
        }
    }
    
    package top.mengshuo.service.jdk.impl;
    
    import top.mengshuo.service.jdk.FileService;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public class ImageFileServiceImpl implements FileService {
    
    }
    
    package top.mengshuo.service.jdk.impl;
    
    import top.mengshuo.service.jdk.FileService;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public class VideoFileServiceImpl implements FileService {
    
    
    }
    
    
  2. 添加实现配置

    resources 目录下添加 META-INF/services/top.mengshuo.service.jdk.FileService 文件并写入实现类全限定名

    top.mengshuo.service.jdk.impl.ImageFileServiceImpl
    top.mengshuo.service.jdk.impl.VideoFileServiceImpl
    

    图示

  3. 加载并测试

    package top.mengshuo;
    
    import top.mengshuo.service.jdk.FileService;
    
    import java.util.ServiceLoader;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public class Application {
    
        public static void main(String[] args) {
            ServiceLoader<FileService> fileServices = ServiceLoader.load(FileService.class);
            fileServices.forEach(FileService::printClassName);
        }
    }
    
    

    结果

不足

1.不能按需加载,需要遍历所有的实现,并实例化,然后在循环中才能找到我们需要的实现。如果不想用某些实现类,或者某些类实例化很耗时,它也被载入并实例化了,这就造成了浪费。

2.获取某个实现类的方式不够灵活,只能通过 Iterator ( 迭代器 ) 形式获取,不能根据某个参数来获取对应的实现类。

3.多个并发多线程使用 ServiceLoader 类的实例是不安全的。

Dubbo中的SPI机制

具体支持的扩展&用法:https://dubbo.apache.org/zh/docs/references/spis/

使用

  1. 由上层服务定义接口或标准 interface

  2. 开发者实现接口,并在 resources/META-INF/dubbo/${interface_name}中添加key=具体实现的 全限定名 也可以直接写全限定名 但是就不能进行动态选择

  3. 使用 org.apache.dubbo.common.extension.ExtensionDirector#getExtensionLoader 方法来加载,例如:

    ApplicationModel.defaultModel().getExtensionLoader(FileService.class)
    

    org.apache.dubbo.common.extension.ExtensionLoader#getExtensionLoader 已经弃用

弃用

demo:

  1. 创建接口和实现类

    package top.mengshuo.service.dubbo;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public interface FileService {
    
        /**
         * 输出
         */
        default void printClassName(){
            System.out.println("全限定名: "+this.getClass().getName());
        }
    }
    
    package top.mengshuo.service.dubbo.impl;
    
    import top.mengshuo.service.dubbo.FileService;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public class ImageFileServiceImpl implements FileService {
    
    }
    
    package top.mengshuo.service.dubbo.impl;
    
    import top.mengshuo.service.dubbo.FileService;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public class VideoFileServiceImpl implements FileService {
    
    
    }
    
  2. 添加实现配置

    resources 目录下添加 META-INF/dubbo/top.mengshuo.service.dubbo.FileService 文件并写入key=实现类全限定名

    imageService=top.mengshuo.service.dubbo.impl.ImageFileServiceImpl
    videoService=top.mengshuo.service.dubbo.impl.VideoFileServiceImpl
    

    图示

  3. 加载并测试

    package top.mengshuo;
    
    import org.apache.dubbo.common.extension.ExtensionLoader;
    import org.apache.dubbo.rpc.model.ApplicationModel;
    import top.mengshuo.service.dubbo.FileService;
    
    import java.util.Set;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public class Application {
    
        public static void main(String[] args) {
    
            ExtensionLoader<FileService> extensionLoader = ApplicationModel.defaultModel().getExtensionLoader(FileService.class);
            Set<FileService> extensions = extensionLoader.getSupportedExtensionInstances();
            extensions.forEach(FileService::printClassName);
    
        }
    }
    
    

    结果

动态选择拓展点

  1. 对上面的代码进行稍微改造即可实现动态选择拓展点

    @SPI("imageService") 指定默认的拓展点

    @Adaptive 启用动态选择拓展点

    url参数 也是必须的

    package top.mengshuo.service.dubbo;
    
    import org.apache.dubbo.common.URL;
    import org.apache.dubbo.common.extension.Adaptive;
    import org.apache.dubbo.common.extension.SPI;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    @SPI("imageService")
    public interface FileService {
    
        /**
         * 输出
         */
        default void printClassName() {
            System.out.println("全限定名: " + this.getClass().getName());
        }
    
        /**
         * 输出
         *
         * @param url 根据url中的key选择对应的类去执行
         */
        @Adaptive
        default void adaptiveUrl(URL url) {
            System.out.println("全限定名: " + this.getClass().getName() + "\nurl: " + url);
        }
    }
    
    
  2. 测试

    根据url选择 格式:url?接口名=key 接口名的规则 首字母小写 其他大写字母小写且以 . 分割

    使用 ApplicationModel.defaultModel().getAdaptiveExtension(class); 方法进行加载

    package top.mengshuo;
    
    import org.apache.dubbo.common.URL;
    import org.apache.dubbo.rpc.model.ApplicationModel;
    import top.mengshuo.service.dubbo.FileService;
    
    /**
     * @author mengshuo
     * @since 2021-11-03
     */
    public class Application {
    
        public static void main(String[] args) {
            FileService service = ApplicationModel.defaultModel().getAdaptiveExtension(FileService.class);
            service.adaptiveUrl(URL.valueOf("https://test.com?file.service=videoService"));
        }
    }
    
    

    指定keyw为videoService

    不指定key 默认选择imageService

Dubbo中定义拦截器

dubbo的拦截器也是基于dubbo的spi机制来加载

使用

  1. 创建过滤器类 实现 org.apache.dubbo.rpc.Filter 接口

  2. 添加注解 org.apache.dubbo.common.extension.Activate 指定 group 分组 消费者或提供者

  3. META-INF\dubbo\下创建org.apache.dubbo.rpc.Filter 文件, 填写 key=过滤器全限定名

    官方示例:https://dubbo.apache.org/zh/docs/references/spis/filter/

  1. 创建实现类, 并添加注解

    package top.mengshuo.dubbo.filter;
    
    
    import org.apache.dubbo.common.constants.CommonConstants;
    import org.apache.dubbo.common.extension.Activate;
    import org.apache.dubbo.rpc.*;
    
    /**
     * @author mengshuo
     * @since 2021-11-04
     */
    @Activate(group = {CommonConstants.CONSUMER})
    public class LogFilter implements Filter {
    
        /**
         * Always call invoker.invoke() in the implementation to hand over the request to the next filter node.
         *
         * @param invoker
         * @param invocation
         */
        @Override
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            long startTime = 0;
            try {
                startTime = System.currentTimeMillis();
                return invoker.invoke(invocation);
            } finally {
                System.out.println("请求耗时:" + (System.currentTimeMillis() - startTime)+"ms");
            }
        }
    }
    
    
  2. 创建META-INF/dubbo/org.apache.dubbo.rpc.Filter文件

    log=top.mengshuo.dubbo.filter.LogFilter
    
  3. 测试

    将项目install 到本地仓库 用之前springboot整合dubbo的项目引入dubbo-filter 依赖 测试请求

    测试结果:

    测试结果

负载均衡

在集群负载均衡时,Dubbo 提供了多种均衡策略(包括随机、轮询、最少活跃调用数、一致性 Hash),缺省为random随机调用。

详见官方文档: http://dubbo.apache.org/zh/docs/advanced/loadbalance/

负载均衡基本配置

在消费者或者提供者一方设置即可

  1. 消费者端负载均衡

    @DubboReference(loadbalance = LoadbalanceRules.RANDOM)
    
  2. 生产者端负载均衡

    @DubboService(loadbalance = LoadbalanceRules.RANDOM)
    
  3. 修改服务提供者方法

    package top.mengshuo.serivce.impl;
    
    import org.apache.dubbo.common.constants.LoadbalanceRules;
    import org.apache.dubbo.config.annotation.DubboService;
    import org.springframework.beans.factory.annotation.Value;
    import top.mengshuo.base.service.HelloService;
    
    /**
     * @author mengshuo
     * @since 2021-11-01
     */
    @DubboService
    public class HelloServiceImpl implements HelloService {
    
        @Value("${dubbo.application.name}")
        private String applicationName;
    
        /**
         * 简单demo方法
         *
         * @param name name
         * @return res
         */
        @Override
        public String simpleMethod(String name) {
            return applicationName + ": hello " + name;
        }
    }
    
    
  4. 修改启动配置进行测试

    添加启动参数 并复制为多份,启动多个服务提供者

    示例

    测试

    测试结果

自定义负载均衡

  1. 实现 org.apache.dubbo.rpc.cluster.LoadBalance

  2. 创建 META-INF/dubbo/org.apache.dubbo.rpc.cluster.LoadBalance文件 写入配置

    官方文档:https://dubbo.apache.org/zh/docs/references/spis/load-balance/

src
 |-main
    |-java
        |-com
            |-xxx
                |-XxxLoadBalance.java (实现LoadBalance接口)
    |-resources
        |-META-INF
            |-dubbo
                |-org.apache.dubbo.rpc.cluster.LoadBalance (纯文本文件,内容为:xxx=com.xxx.XxxLoadBalance)
  1. 实现类

    package top.mengshuo.loadbalance;
    
    import org.apache.dubbo.common.URL;
    import org.apache.dubbo.common.extension.Adaptive;
    import org.apache.dubbo.rpc.Invocation;
    import org.apache.dubbo.rpc.Invoker;
    import org.apache.dubbo.rpc.RpcException;
    import org.apache.dubbo.rpc.cluster.LoadBalance;
    import org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance;
    
    import java.util.List;
    import java.util.Optional;
    
    /**
     * @author mengshuo
     * @since 2021-11-06
     */
    public class MyLoadBalance implements LoadBalance {
        /**
         * select one invoker in list.
         *
         * @param invokers   invokers.
         * @param url        refer url
         * @param invocation invocation.
         * @return selected invoker.
         */
        @Override
        public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    
            System.out.println("自定义负载均衡");
    
            return invokers.get(0);
    
        }
    }
    
  2. 配置文件

    myLoadBalance=top.mengshuo.loadbalance.MyLoadBalance
    
  3. 测试

    调试

    结果

自定义线程池

介绍

dubbo 中的线程池也是基于dubbo spi机制来实现的 , 默认有四个实现,默认使用 FixedThreadPool

ThreadPool

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
  • eager 优先创建Worker线程池。在任务数量大于corePoolSize但是小于maximumPoolSize时,优先创建Worker来处理任务。当任务数量大于maximumPoolSize时,将任务放入阻塞队列中。阻塞队列充满时抛出RejectedExecutionException。(相比于cached:cached在任务数量超过maximumPoolSize时直接抛出异常而不是将任务放入阻塞队列)

引用:http://dubbo.apache.org/zh/docs/advanced/thread-model/

dubbo默认提供的线程池

使用

文档:http://dubbo.apache.org/zh/docs/references/spis/threadpool/

示例

自定义一个带简单监控的线程池

  1. 添加依赖

    <dependency>
        <groupId>org.apache.dubbo</groupId>
        <artifactId>dubbo-common</artifactId>
        <version>${dubbo.version}</version>
    </dependency>
    
  2. 自定义线程池

    每秒输出一次线程池状态,并且在占用率超出阈值时发出警报

    package top.mengshuo.threadpool;
    
    import lombok.extern.log4j.Log4j2;
    import org.apache.dubbo.common.URL;
    
    import org.apache.dubbo.common.threadpool.support.fixed.FixedThreadPool;
    
    
    import java.util.Map;
    import java.util.concurrent.*;
    
    /**
     * @author mengshuo
     * @since 2021-11-08
     */
    @Log4j2
    public class WatchingThreadPool extends FixedThreadPool implements Runnable {
    
        private static final Map<URL, ThreadPoolExecutor> THREAD_POOL_MAP = new ConcurrentHashMap<>();
        /**
         * 线程使用率临界阈值
         */
        private static final double ALARM_PERCENT = 0.9;
    
        public WatchingThreadPool() {
            // 定时执行 3秒一次
            Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this, 1, 1, TimeUnit.SECONDS);
        }
    
        @Override
        public Executor getExecutor(URL url) {
    
            // 从父类中创建线程池
            Executor executor = super.getExecutor(url);
            if (executor instanceof ThreadPoolExecutor) {
                THREAD_POOL_MAP.put(url, (ThreadPoolExecutor) executor);
            }
            return executor;
        }
    
        /**
         * @see Thread#run()
         */
        @Override
        public void run() {
            for (Map.Entry<URL, ThreadPoolExecutor> entry : THREAD_POOL_MAP.entrySet()) {
    
                URL url = entry.getKey();
                ThreadPoolExecutor executor = entry.getValue();
    
                // 已使用的线程数量
                int activeCount = executor.getActiveCount();
                // 线程总量
                int poolSize = executor.getCorePoolSize();
    
                // 已使用
                double used = activeCount / (poolSize * 1.0);
                log.info("线程池使用状态:[{}/{}]:{}%", activeCount, poolSize, used * 100);
                if (used > ALARM_PERCENT) {
                    log.warn("warning: IP:{}, 线程池使用状态:[{}/{}]:{}%", url.getIp(), activeCount, poolSize, used * 100);
                }
    
            }
    
        }
    }
    
    
  3. 添加spi配置

    创建 META-INF/dubbo/org.apache.dubbo.common.threadpool.ThreadPool 文件

    watching=top.mengshuo.threadpool.WatchingThreadPool
    
  4. 引入自定义线程池依赖包,并添加配置

    dubbo:
      provider:
        threadpool: watching
    
  5. 测试

    @GetMapping("/threadPool/watching")
    public String threadPoolTest() throws Exception{
        for (int i = 0; i < 1500; i++) {
            Thread.sleep(5);
            new Thread(()-> System.out.println(this.helloService.sayHello("threadPool",1000L))).start();
        }
        return "success";
    }
    

    结果

异步执行

官方文档:https://dubbo.apache.org/zh/docs/advanced/async-execute-on-provider/

  1. 使用 CompletableFuture 使用这种方法可以轻松达到consumer 和 provider 的双异步模式 最后由 CompletableFuture 即可获取到结果
  2. 使用 AsyncContext 这种方式的服务消费者会阻塞等待,直到结果返回, 若是需要双异步,则需要进行RpcContext异步调用

使用

CompletableFuture 模式

  1. 添加接口

        /**
         * 异步执行 CompletableFuture 模式
         *
         * @param name name
         * @return res
         */
        CompletableFuture<String> asyncFutureSayHello(String name);
    
  2. 添加实现类

    官方文档中的 RpcContext.getContext() 已经弃用 可以使用他的子类 RpcContext.getServiceContext()

    RpcContext.getServiceContext() 用于在整个调用中传递环境参数。

    RpcContext.getServerContext() 获取服务器端上下文

        /**
         * 异步执行 CompletableFuture 模式
         *
         * @param name name
         * @return res
         */
        @Override
        public CompletableFuture<String> asyncFutureSayHello(String name) {
            RpcContext savedContext = RpcContext.getServiceContext();
            savedContext.setAttachment("test","测试数据");
            // 建议为supplyAsync提供自定义线程池,避免使用JDK公用线程池
            CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
                System.out.println(savedContext.getAttachment("test"));
                // 异步执行的逻辑
                try {
                    Thread.sleep(10000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return applicationName + ": hello " + name;
            });
            System.out.println("可以执行其他操作");
            // 为Future添加回调 当异步调用执行完成后会调用此方法 无论成功与失败
            future.whenComplete((retValue, exception) -> {
                if (exception == null) {
                    System.out.println("异步执行正常结束");
                } else {
                    System.out.println("需要进行异常处理");
                }
            });
    
            return future;
        }
    
  3. 添加api接口测试

    这里需要注意:因为需要等待结果返回 dubbo默认超时1s 而线程阻塞了10s 所以这一进行了超时配置,

    其实这样达到了双异步模式 此时的consumer & provider都为异步模式

    @DubboReference(methods = {
         @Method(name = "asyncSayHello", timeout = 50000)
    })
    
        /**
         * 服务提供者端的异步执行
         *
         * @param name name
         * @return res
         * @throws ExecutionException   异常
         * @throws InterruptedException 异常
         */
        @GetMapping("/provider/async/{name}")
        public String asyncHello(@PathVariable String name) throws ExecutionException, InterruptedException {
    
            CompletableFuture<String> future = this.helloService.asyncSayHello(name);
            System.out.println("可以执行其他操作");
            // 阻塞直到结果返回
            return future.get();
        }
    
    
  4. 测试结果

    consumer端

    provider端

AsyncContext 模式

这种方式consumer会阻塞代码执行直到provider执行结束

  1. 添加接口方法

        /**
         * AsyncContext  模式
         *
         * @param name name
         * @return res
         */
        String asyncContextSayHello(String name);
    
  2. 添加实现方法

        /**
         * AsyncContext  模式
         *
         * @param name name
         * @return res
         */
        @Override
        public String asyncContextSayHello(String name) {
            final AsyncContext asyncContext = RpcContext.startAsync();
            new Thread(() -> {
                // 如果要使用上下文,则必须要放在第一句执行
                asyncContext.signalContextSwitch();
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 写回响应
                System.out.println("我后出现");
                asyncContext.write("Hello " + name + ", response from provider.");
            }).start();
            System.out.println("我先出现");
            return null;
        }
    
  3. 测试

    provider

    consumer

异步调用

当服务提供者操作耗时较长时可以开启异步调用方式进行操作

使用

官方使用有三种:http://dubbo.apache.org/zh/docs/advanced/async-call/

  1. 使用 CompletableFuture 签名的接口 需要服务提供者提供异步执行 参照上面

  2. 使用 RpcContext

  3. 重载服务接口

    目前仅支持dubbo协议 这种方法使用起来就是第一种方法 就是接口中重载了一个默认实现,Dubbo 官方提供 compiler hacker,编译期自动重写同步方法

这里使用 第二种进行操作

CompletableFuture 模式

这个模式需要服务提供者为 CompletableFuture 异步执行

其实上面已经说过了,CompletableFuture 异步执行模式 不会阻塞 consumer端 的执行 ,所以这里就等同于CompletableFuture 异步执行的测试

可以添加一个监听,用于异常后的处理

    /**
     * 服务提供者端的异步执行 Future 模式
     *
     * @param name name
     * @return res
     * @throws ExecutionException   异常
     * @throws InterruptedException 异常
     */
    @GetMapping("/provider/future/async/{name}")
    public String providerFutureAsync(@PathVariable String name) throws ExecutionException, InterruptedException {

        CompletableFuture<String> future = this.helloService.asyncFutureSayHello(name);
        System.out.println("可以执行其他操作");
        // 为Future添加回调 当异步调用执行完成后会调用此方法 无论成功与失败
        future.whenComplete((retValue, exception) -> {
            if (exception == null) {
                System.out.println("异步调用正常结束");
            } else {
                System.out.println("需要进行异常处理");
            }
        });
        // 阻塞直到结果返回
        return future.get();

    }

RpcContext 模式

  1. 创建服务提供者 模拟耗时操作

    // 接口
    /**
     * 阻塞方发执行指定时间
     *
     * @param name      name
     * @param sleepTime 阻塞时长
     * @return res
     */
    String sayHello(String name, Long sleepTime);
    
    // 实现类
    
    /**
     * 阻塞方发执行指定时间
     *
     * @param name      name
     * @param sleepTime 阻塞时长
     * @return res
     */
    @Override
    public String sayHello(String name, Long sleepTime) {
        try {
            Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "hello "+name;
    }
    
  2. 创建服务提供者 进行异步调用

    需要使用注解表示方法启用异步操作@DubboReference(methods = {@Method(name = "sayHello", async = true, timeout = 50000)})

    @Method 等同于 dubbo:reference 标签下的 dubbo:method 的使用 注解与标签的对应关系官方有说明,但是点进去404..... 不过也能猜出来

    @GetMapping("/async/{name}/{sleepTime}")
    public String asyncHello(@PathVariable String name, @PathVariable Long sleepTime) 
        throws ExecutionException, InterruptedException {
    
        // 此调用会立即返回null
        String res = this.helloService.sayHello(name, sleepTime);
        System.out.println("程序立即返回: " + res);
        // 拿到调用的Future引用,当结果返回后,会被通知和设置到此Future
        CompletableFuture<String> helloFuture = RpcContext.getServiceContext().getCompletableFuture();
        System.out.println("在这里可以执行其他业务");
        // 为Future添加回调 当异步调用执行完成后会调用此方法 无论成功与失败
        helloFuture.whenComplete((retValue, exception) -> {
            if (exception == null) {
                System.out.println("异步调用正常结束");
            } else {
                System.out.println("需要进行异常处理");
            }
        });
        // 会在get方法持续阻塞 直到获取到结果
        return helloFuture.get();
    }
    
  3. 测试

    结果

    响应

异步调用&执行的总结

注意

Provider 端异步执行和 Consumer 端异步调用是相互独立的,你可以任意正交组合两端配置

  • Consumer同步 - Provider同步
  • Consumer异步 - Provider同步
  • Consumer同步 - Provider异步
  • Consumer异步 - Provider异步

引用自官方文档:http://dubbo.apache.org/zh/docs/advanced/async-execute-on-provider/

  1. Consumer同步 - Provider同步

    默认就是这种模式

  2. Consumer异步 - Provider同步

    这种就是异步调用 感觉还是 CompletableFuture 模式 比较好用 provider 如果需要同步那么直接使用 future.get() 即可,解放了 provider

    RpcContext 模式 则会阻塞到执行完成返回

  3. Consumer同步 - Provider异步

    这种就是异步调用 感觉还是 CompletableFuture 模式 比较好用 provider 如果需要同步那么直接使用 future.get() 即可,解放了 provider

    跟上面类似 在那一端等待不是等

  4. Consumer异步 - Provider异步

    这种就肯定是 CompletableFuture 模式 好用了 通过上面也能看出了

路由规则

新路由规则:http://dubbo.apache.org/zh/docs/advanced/routing-rule/

旧路由规则:http://dubbo.apache.org/zh/docs/advanced/routing-rule-deprecated/

dubbo admin 使用手册 http://dubbo.apache.org/zh/docs/v2.7/admin/

准备工作

  1. 安装dubbo admin http://dubbo.apache.org/zh/docs/v2.7/admin/install/admin-console/

  2. 改造代码

    package top.mengshuo.serivce.impl;
    
    import org.apache.dubbo.config.annotation.DubboService;
    import org.springframework.beans.factory.annotation.Value;
    import top.mengshuo.base.service.HelloService;
    
    /**
     * @author mengshuo
     * @since 2021-11-01
     */
    @DubboService
    public class HelloServiceImpl implements HelloService {
    
        @Value("${base.name}")
        private String baseName;
    
        /**
         * 简单demo方法
         *
         * @param name name
         * @return res
         */
        @Override
        public String simpleMethod(String name) {
            return baseName + ": hello " + name;
        }
    }
    
    
  3. 添加启动参数

    --base.name=local_server3 --dubbo.protocol.port=20883 依此类推 方便观察

    修改启动配置

  4. 启动DubboAdmin 访问 http://localhost:8080

条件路由

支持以服务或 Consumer 应用为粒度配置路由规则。

简介

  • 应用粒度

    # app1的消费者只能消费所有端口为20880的服务实例
    # app2的消费者只能消费所有端口为20881的服务实例
    ---
    scope: application
    force: true
    runtime: true
    enabled: true
    key: governance-conditionrouter-consumer
    conditions:
      - application=app1 => address=*:20880
      - application=app2 => address=*:20881
    ...
    
  • 服务粒度

    # DemoService的sayHello方法只能消费所有端口为20880的服务实例
    # DemoService的sayHi方法只能消费所有端口为20881的服务实例
    ---
    scope: service
    force: true
    runtime: true
    enabled: true
    key: org.apache.dubbo.samples.governance.api.DemoService
    conditions:
      - method=sayHello => address=*:20880
      - method=sayHi => address=*:20881
    ...
    

规则详解

各字段含义

  • scope
    

    表示路由规则的作用粒度,scope的取值会决定key的取值。必填

    • service 服务粒度
    • application 应用粒度
  • Key
    

    明确规则体作用在哪个服务或应用。必填

    • scope=service时,key取值为[:][:]的组合
    • scope=application时,key取值为application名称
  • enabled=true 当前路由规则是否生效,可不填,缺省生效。

  • force=false 当路由结果为空时,是否强制执行,如果不强制执行,路由结果为空的路由规则将自动失效,可不填,缺省为 false

  • runtime=false 是否在每次调用时执行路由规则,否则只在提供者地址列表变更时预先执行并缓存结果,调用时直接从缓存中获取路由结果。如果用了参数路由,必须设为 true,需要注意设置会影响调用的性能,可不填,缺省为 false

  • priority=1 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为 0

  • conditions 定义具体的路由规则内容。必填

Conditions规则体

conditions部分是规则的主体,由1到任意多条规则组成,下面我们就每个规则的配置语法做详细说明:

  1. 格式
  • => 之前的为消费者匹配条件,所有参数和消费者的 URL 进行对比,当消费者满足匹配条件时,对该消费者执行后面的过滤规则。
  • => 之后为提供者地址列表的过滤条件,所有参数和提供者的 URL 进行对比,消费者最终只拿到过滤后的地址列表。
  • 如果匹配条件为空,表示对所有消费方应用,如:=> host != 10.20.153.11
  • 如果过滤条件为空,表示禁止访问,如:host = 10.20.153.10 =>
  1. 表达式

参数支持:

  • 服务调用信息,如:method, argument 等,暂不支持参数路由
  • URL 本身的字段,如:protocol, host, port 等
  • 以及 URL 上的所有参数,如:application, organization 等

条件支持:

  • 等号 = 表示"匹配",如:host = 10.20.153.10
  • 不等号 != 表示"不匹配",如:host != 10.20.153.10

值支持:

  • 以逗号 , 分隔多个值,如:host != 10.20.153.10,10.20.153.11
  • 以星号 * 结尾,表示通配,如:host != 10.20.*
  • 以美元符 $ 开头,表示引用消费者参数,如:host = $host
  1. Condition示例
  • 排除预发布机:
=> host != 172.22.3.91
  • 白名单:
register.ip != 10.20.153.10,10.20.153.11 =>

注意

一个服务只能有一条白名单规则,否则两条规则交叉,就都被筛选掉了

  • 黑名单:
register.ip = 10.20.153.10,10.20.153.11 =>
  • 服务寄宿在应用上,只暴露一部分的机器,防止整个集群挂掉:
=> host = 172.22.3.1*,172.22.3.2*
  • 为重要应用提供额外的机器:
application != kylin => host != 172.22.3.95,172.22.3.96
  • 读写分离:
method = find*,list*,get*,is* => host = 172.22.3.94,172.22.3.95,172.22.3.96
method != find*,list*,get*,is* => host = 172.22.3.97,172.22.3.98
  • 前后台分离:
application = bops => host = 172.22.3.91,172.22.3.92,172.22.3.93
application != bops => host = 172.22.3.94,172.22.3.95,172.22.3.96
  • 隔离不同机房网段:
host != 172.22.3.* => host != 172.22.3.*
  • 提供者与消费者部署在同集群内,本机只访问本机的服务:
=> host = $host

测试

  1. 创建条件路由

    defaultt_consumer禁止访问端口为 20881 的服务提供者

    规则名对应着配置 key

    按照应用配置

  2. 测试路由规则

    测试条件路由

标签路由规则

简介

标签路由通过将某一个或多个服务的提供者划分到同一个分组,约束流量只在指定分组中流转,从而实现流量隔离的目的,可以作为蓝绿发布、灰度发布等场景的能力基础。

Provider

标签主要是指对Provider端应用实例的分组,目前有两种方式可以完成实例分组,分别是动态规则打标静态规则打标,其中动态规则相较于静态规则优先级更高,而当两种规则同时存在且出现冲突时,将以动态规则为准。

  • 动态规则打标,可随时在服务治理控制台下发标签归组规则

    # governance-tagrouter-provider应用增加了两个标签分组tag1和tag2
    # tag1包含一个实例 127.0.0.1:20880
    # tag2包含一个实例 127.0.0.1:20881
    ---
      force: false
      runtime: true
      enabled: true
      key: governance-tagrouter-provider
      tags:
        - name: tag1
          addresses: ["127.0.0.1:20880"]
        - name: tag2
          addresses: ["127.0.0.1:20881"]
     ...
    
  • 静态打标

    注解同理

    <dubbo:provider tag="tag1"/>
    

    or

    <dubbo:service tag="tag1"/>
    

    or

    java -jar xxx-provider.jar -Ddubbo.provider.tag={the tag you want, may come from OS ENV}
    

Consumer

RpcContext.getContext().setAttachment(Constants.REQUEST_TAG_KEY,"tag1");

请求标签的作用域为每一次 invocation,使用 attachment 来传递请求标签,注意保存在 attachment 中的值将会在一次完整的远程调用中持续传递,得益于这样的特性,我们只需要在起始调用时,通过一行代码的设置,达到标签的持续传递。

目前仅仅支持 hardcoding 的方式设置 requestTag。注意到 RpcContext 是线程绑定的,优雅的使用 TagRouter 特性,建议通过 servlet 过滤器(在 web 环境下),或者定制的 SPI 过滤器设置 requestTag。

规则详解

格式

  • Key明确规则体作用到哪个应用。必填

  • enabled=true 当前路由规则是否生效,可不填,缺省生效。

  • force=false 当路由结果为空时,是否强制执行,如果不强制执行,路由结果为空的路由规则将自动失效,可不填,缺省为 false

  • runtime=false 是否在每次调用时执行路由规则,否则只在提供者地址列表变更时预先执行并缓存结果,调用时直接从缓存中获取路由结果。如果用了参数路由,必须设为 true,需要注意设置会影响调用的性能,可不填,缺省为 false

  • priority=1 路由规则的优先级,用于排序,优先级越大越靠前执行,可不填,缺省为 0

  • tags
    

    定义具体的标签分组内容,可定义任意n(n>=1)个标签并为每个标签指定实例列表。

    必填

    • name, 标签名称
  • addresses, 当前标签包含的实例列表

降级约定

  1. request.tag=tag1 时优先选择 标记了tag=tag1 的 provider。若集群中不存在与请求标记对应的服务,默认将降级请求 tag为空的provider;如果要改变这种默认行为,即找不到匹配tag1的provider返回异常,需设置request.tag.force=true
  2. request.tag未设置时,只会匹配tag为空的provider。即使集群中存在可用的服务,若 tag 不匹配也就无法调用,这与约定1不同,携带标签的请求可以降级访问到无标签的服务,但不携带标签/携带其他种类标签的请求永远无法访问到其他标签的服务。

测试

  1. 创建标签规则

    创建标签规则

  2. consumer添加代码

    package top.mengshuo.controller;
    
    import org.apache.dubbo.config.annotation.DubboReference;
    
    import org.apache.dubbo.rpc.Constants;
    import org.apache.dubbo.rpc.RpcContext;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.PathVariable;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import top.mengshuo.base.service.HelloService;
    
    /**
     * @author mengshuo
     * @since 2021-11-01
     */
    @RestController
    @RequestMapping("/hello")
    public class HelloController {
    
        @DubboReference
        private HelloService helloService;
    
        @GetMapping("/{name}")
        public String hello(@PathVariable String name){
            RpcContext.getServiceContext().setAttachment("dubbo.tag","tag1");
            return this.helloService.simpleMethod(name);
        }
    }
    
    
  3. 测试

    因为这里上面的条件路由已经将 20881 排除了 所以只会有 20883

    结果

0
博主关闭了所有页面的评论