sentinel整合feign


前言

  1. sentinel整合feign, 主要处理限流熔断等异常的处理
  2. 主要实现能够正常判断出是限流还是熔断等导致
  3. Spring Cloud Alibaba: 2.2.1.RELEASE
  4. sentinel: 1.7.1
  5. 以及总结了一下我的处理思路

一、未处理的时候返回

在这里插入图片描述

二、操作步骤

1.引入库

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

2.编写代码

服务提供者:

package com.chaim.common.sentinel.provider;

import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowException;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException;
import com.alibaba.csp.sentinel.slots.system.SystemBlockException;
import com.alibaba.fastjson.JSONObject;
import com.chaim.common.core.util.CommonResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;


import static com.chaim.common.core.constant.HttpStatus.SentinelConstants.*;

/**
 * 由于:
 * 服务提供者在经过sentinel限流等操作后, 返回数据不符合我们要求, 同时消费者无法区分是限流还是熔断等操作(FeignException.errorStatus)
 * 故:
 * 实现BlockExceptionHandler, 对返回数据进行重写, 从而符合我们的要求
 * 注:
 * 返回状态码需为 STATUS, 否者消费者那边不会进行处理
 *
 * @author Chaim
 * @date 2021/11/4 15:11
 */
@Component
@Slf4j
public class MyBlockExceptionHandler implements BlockExceptionHandler {
    # 这个只是我这边的定义
    public static final int STATUS = 606;

    @Override
    public void handle(HttpServletRequest httpServletRequest, HttpServletResponse response, BlockException ex) throws Exception {
        CommonResult commonResult = CommonResult.fail(DATA_LOGIC_FAIL, "FEIGN_SENTINEL_FAIL");
        if (ex instanceof FlowException) {
            log.debug("限流: ", ex);
            commonResult = CommonResult.fail(DATA_CURRENT_LIMITING_FAIL, "请稍后再试!");
        } else if (ex instanceof DegradeException) {
            log.debug("降级", ex);
            commonResult = CommonResult.fail(DATA_RELEGATION_FAIL, "请稍后再试!");
        } else if (ex instanceof ParamFlowException) {
            log.debug("热点参数限流", ex);
            commonResult = CommonResult.fail(HOTSPOT_PARAMETER_FAIL, "请稍后再试!");
        } else if (ex instanceof SystemBlockException) {
            log.debug("系统规则(负载/...不满足要求)", ex);
            commonResult = CommonResult.fail(SYSTEM_RULES_FAIL, "请稍后再试!");
        } else if (ex instanceof AuthorityException) {
            log.debug("授权规则不通过", ex);
            commonResult = CommonResult.fail(AUTHORIZATION_RULES_FAIL, "请稍后再试!");
        }
        // http状态码
        response.setStatus(STATUS);
        response.setCharacterEncoding("utf-8");
        response.setHeader("Content-Type", "application/json;charset=utf-8");
        response.setContentType("application/json;charset=utf-8");
        response.getWriter().write(JSONObject.toJSONString(commonResult));
    }
}

服务消费者:

feign:
  # 为feign整合sentinel
  sentinel:
    enabled: true
@FeignClient(name = "test", fallbackFactory = TestServiceFallbackFactory.class)
public interface TestFeignClient extends TestService {}
package com.chaim.feign.ali.fallback.factory;

import com.alibaba.fastjson.JSONObject;
import com.chaim.common.core.util.CommonResult;
import com.chaim.common.sentinel.consumer.FallbackFactoryFlowException;
import com.chaim.feign.ali.POJO.DTO.*;
import com.chaim.feign.ali.service.TestService;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import static com.chaim.common.core.constant.HttpStatus.SentinelConstants.DATA_LOGIC_FAIL;

/**
 * 指定feign的FallbackFactory, 对调用失败的接口, 进行逻辑处理
 *
 * @author Chaim
 * @date 2021/11/3 14:46
 */
@Component
@Slf4j
public class TestServiceFallbackFactory implements FallbackFactory<TestService> {

    @Override
    public TestService create(Throwable throwable) {
        return new TestService() {
            @Override
            public CommonResult test(TestScanPayDTO testScanPayDTO) {
                CommonResult commonResult = CommonResult.fail(DATA_LOGIC_FAIL, "FEIGN_SENTINEL_FAIL");
                try {
                    String message = throwable.getMessage();
                    log.debug("sentinel 限流熔断等: {}", message);
                    JSONObject jsonObject = JSONObject.parseObject(message);
                    JSONObject body = jsonObject.getJSONObject("body");
                    return CommonResult.fail(body.getInteger("code"), body.getString("message"));
                } catch (Exception e) {
                    log.error("异常_FallbackFactoryFlowException: ", e);
                    return commonResult;
                }
            }
        };
    }
}

package com.chaim.common.sentinel.consumer;

import com.alibaba.fastjson.JSONObject;
import com.chaim.common.sentinel.provider.MyBlockExceptionHandler;
import feign.*;
import feign.codec.ErrorDecoder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import static feign.Util.RETRY_AFTER;
import static feign.Util.checkNotNull;
import static java.util.Locale.US;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
 * 由于:
 * 当服务提供者通过sentinel进行限流等操作, 消费者这边无法捕获到对应的异常
 * 故:
 * 实现 ErrorDecoder, 对服务提供者返回的异常进行处理
 * 注:
 * 只会处理sentinel限制后的操作, 同时 response status == MyBlockExceptionHandler.STATUS, 才进行处理, 其余依旧通过sentinel进行处理
 *
 * @author Chaim
 * @date 2021/11/5 12:20
 */
@Component
public class MyErrorDecoder implements ErrorDecoder {
    private final RetryAfterDecoder retryAfterDecoder = new RetryAfterDecoder();

    @Override
    public Exception decode(String methodKey, Response response) {
        FeignException exception = errorStatus(methodKey, response);
        Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));
        if (retryAfter != null) {
            return new RetryableException(
                    response.status(),
                    exception.getMessage(),
                    response.request().httpMethod(),
                    exception,
                    retryAfter,
                    response.request());
        }
        return exception;
    }

    public static FeignException errorStatus(String methodKey, Response response) {
        byte[] body = {};
        try {
            if (response.body() != null) {
                body = Util.toByteArray(response.body().asInputStream());
            }
        } catch (IOException ignored) { // NOPMD
        }

        Map<String, Object> map = new HashMap<>();
        map.put("status", response.status());
        if (response.reason() != null) {
            map.put("reason", response.reason());
        }
        map.put("method", response.request().httpMethod());
        map.put("url", response.request().url());
        map.put("methodKey", methodKey);
        map.put("body", new String(body));

        // 只对返回码为606进行处理
        if (response.status() == MyBlockExceptionHandler.STATUS) {
            return new FeignException.FeignServerException(response.status(), JSONObject.toJSONString(map), response.request(), body);
        }
        return FeignException.errorStatus(methodKey, response);
    }

    private <T> T firstOrNull(Map<String, Collection<T>> map, String key) {
        if (map.containsKey(key) && !map.get(key).isEmpty()) {
            return map.get(key).iterator().next();
        }
        return null;
    }

    /**
     * 由于ErrorDecoder中的方法无法直接使用, 故复制一份出来
     */
    static class RetryAfterDecoder {

        static final DateFormat RFC822_FORMAT =
                new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'", US);
        private final DateFormat rfc822Format;

        RetryAfterDecoder() {
            this(RFC822_FORMAT);
        }

        RetryAfterDecoder(DateFormat rfc822Format) {
            this.rfc822Format = checkNotNull(rfc822Format, "rfc822Format");
        }

        protected long currentTimeMillis() {
            return System.currentTimeMillis();
        }

        /**
         * returns a date that corresponds to the first time a request can be retried.
         *
         * @param retryAfter String in
         *                   <a href="https://tools.ietf.org/html/rfc2616#section-14.37" >Retry-After format</a>
         */
        public Date apply(String retryAfter) {
            if (retryAfter == null) {
                return null;
            }
            if (retryAfter.matches("^[0-9]+\\.?0*$")) {
                retryAfter = retryAfter.replaceAll("\\.0*$", "");
                long deltaMillis = SECONDS.toMillis(Long.parseLong(retryAfter));
                return new Date(currentTimeMillis() + deltaMillis);
            }
            synchronized (rfc822Format) {
                try {
                    return rfc822Format.parse(retryAfter);
                } catch (ParseException ignored) {
                    return null;
                }
            }
        }
    }

}

3.效果

该处使用的url网络请求的数据。


三、处理思路

①: 第一步

首先分析一下, 按我的流程分析: sentinel限流是对服务提供者做的操作
那么限流之后肯定是有一个返回的, 返回的结果是什么了?
postman直接请求服务提供者, 返回状态码429, 因为sentinel是对提供者做的限流
在这里插入图片描述
然后我们在通过消费者来调用提供者:
在这里插入图片描述
对应的源码是在: feign-core: feign.SynchronousMethodHandler.executeAndDecode, (正常处理: response.status() >= 200 && response.status() < 300)
在这里插入图片描述


②: 第二步

通过上面的我们可以确定了, 只要处理提供者的返回状态码, 就可以解决问题了!
于是就有了:

// 以前的版本好像是 UrlBlockHandler
public class MyBlockExceptionHandler implements BlockExceptionHandler

到这一步, 其实我们只需要将response.setStatus(200) 设置成200, 就到此结束!


③: 第三步

但是我们可能想, 限流了就是限流了嘛, 标个200不符合我的出发思路, 继续处理
这是看下消费者的日志:

在这里插入图片描述
服务提供者返回的body已经符合了我的想法.

程序到这一步, 就由feign接管处理, 看了一下源码feign是没有处理sentinel返回的异常类型(feign是根据状态码进行的处理)

对应的处理源码: feign-core: feign.FeignException.errorStatus
在这里插入图片描述


④: 第四步

这里就得说一下
FallbackFactory

public class TestServiceFallbackFactory implements FallbackFactory<TestService>

这里能拿到Throwable
但是就目前的Throwable也不符合我们的要求:

feign.FeignException: [606] during [POST] to [http://***/pay/scan] [TestFeignClient#123(TestPayDTO)]: [{"code":5504,"message":"请稍后再试!"}]

⑤: 第五步

于是就有了:

public class MyErrorDecoder implements ErrorDecoder

那我就从写一下throwableMessage 让符合我的要求
但是我只想处理sentinel我定义的状态为606的
于是就有了:

if (response.status() == MyBlockExceptionHandler.STATUS) {
    return new FeignException.FeignServerException(response.status(), JSONObject.toJSONString(map), response.request(), body);
}
// 不是我定义的606, 还是走 FeignException.errorStatus
return FeignException.errorStatus(methodKey, response);

后面只需要对应处理 throwable.getMessage() 即可!