sentinel整合feign
前言
- sentinel整合feign, 主要处理限流熔断等异常的处理
- 主要实现能够正常判断出是限流还是熔断等导致
- Spring Cloud Alibaba: 2.2.1.RELEASE
- sentinel: 1.7.1
- 以及总结了一下我的处理思路
一、未处理的时候返回

二、操作步骤
1.引入库
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
2.编写代码
服务提供者:
package com.chaim.common.sentinel.provider;
import com.alibaba.csp.sentinel.adapter.spring.webmvc.callback.BlockExceptionHandler;
import com.alibaba.csp.sentinel.slots.block.BlockException;
import com.alibaba.csp.sentinel.slots.block.authority.AuthorityException;
import com.alibaba.csp.sentinel.slots.block.degrade.DegradeException;
import com.alibaba.csp.sentinel.slots.block.flow.FlowException;
import com.alibaba.csp.sentinel.slots.block.flow.param.ParamFlowException;
import com.alibaba.csp.sentinel.slots.system.SystemBlockException;
import com.alibaba.fastjson.JSONObject;
import com.chaim.common.core.util.CommonResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import static com.chaim.common.core.constant.HttpStatus.SentinelConstants.*;
/**
* 由于:
* 服务提供者在经过sentinel限流等操作后, 返回数据不符合我们要求, 同时消费者无法区分是限流还是熔断等操作(FeignException.errorStatus)
* 故:
* 实现BlockExceptionHandler, 对返回数据进行重写, 从而符合我们的要求
* 注:
* 返回状态码需为 STATUS, 否者消费者那边不会进行处理
*
* @author Chaim
* @date 2021/11/4 15:11
*/
@Component
@Slf4j
public class MyBlockExceptionHandler implements BlockExceptionHandler {
# 这个只是我这边的定义
public static final int STATUS = 606;
@Override
public void handle(HttpServletRequest httpServletRequest, HttpServletResponse response, BlockException ex) throws Exception {
CommonResult commonResult = CommonResult.fail(DATA_LOGIC_FAIL, "FEIGN_SENTINEL_FAIL");
if (ex instanceof FlowException) {
log.debug("限流: ", ex);
commonResult = CommonResult.fail(DATA_CURRENT_LIMITING_FAIL, "请稍后再试!");
} else if (ex instanceof DegradeException) {
log.debug("降级", ex);
commonResult = CommonResult.fail(DATA_RELEGATION_FAIL, "请稍后再试!");
} else if (ex instanceof ParamFlowException) {
log.debug("热点参数限流", ex);
commonResult = CommonResult.fail(HOTSPOT_PARAMETER_FAIL, "请稍后再试!");
} else if (ex instanceof SystemBlockException) {
log.debug("系统规则(负载/...不满足要求)", ex);
commonResult = CommonResult.fail(SYSTEM_RULES_FAIL, "请稍后再试!");
} else if (ex instanceof AuthorityException) {
log.debug("授权规则不通过", ex);
commonResult = CommonResult.fail(AUTHORIZATION_RULES_FAIL, "请稍后再试!");
}
// http状态码
response.setStatus(STATUS);
response.setCharacterEncoding("utf-8");
response.setHeader("Content-Type", "application/json;charset=utf-8");
response.setContentType("application/json;charset=utf-8");
response.getWriter().write(JSONObject.toJSONString(commonResult));
}
}
服务消费者:
feign:
# 为feign整合sentinel
sentinel:
enabled: true
@FeignClient(name = "test", fallbackFactory = TestServiceFallbackFactory.class)
public interface TestFeignClient extends TestService {}
package com.chaim.feign.ali.fallback.factory;
import com.alibaba.fastjson.JSONObject;
import com.chaim.common.core.util.CommonResult;
import com.chaim.common.sentinel.consumer.FallbackFactoryFlowException;
import com.chaim.feign.ali.POJO.DTO.*;
import com.chaim.feign.ali.service.TestService;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import static com.chaim.common.core.constant.HttpStatus.SentinelConstants.DATA_LOGIC_FAIL;
/**
* 指定feign的FallbackFactory, 对调用失败的接口, 进行逻辑处理
*
* @author Chaim
* @date 2021/11/3 14:46
*/
@Component
@Slf4j
public class TestServiceFallbackFactory implements FallbackFactory<TestService> {
@Override
public TestService create(Throwable throwable) {
return new TestService() {
@Override
public CommonResult test(TestScanPayDTO testScanPayDTO) {
CommonResult commonResult = CommonResult.fail(DATA_LOGIC_FAIL, "FEIGN_SENTINEL_FAIL");
try {
String message = throwable.getMessage();
log.debug("sentinel 限流熔断等: {}", message);
JSONObject jsonObject = JSONObject.parseObject(message);
JSONObject body = jsonObject.getJSONObject("body");
return CommonResult.fail(body.getInteger("code"), body.getString("message"));
} catch (Exception e) {
log.error("异常_FallbackFactoryFlowException: ", e);
return commonResult;
}
}
};
}
}
package com.chaim.common.sentinel.consumer;
import com.alibaba.fastjson.JSONObject;
import com.chaim.common.sentinel.provider.MyBlockExceptionHandler;
import feign.*;
import feign.codec.ErrorDecoder;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import static feign.Util.RETRY_AFTER;
import static feign.Util.checkNotNull;
import static java.util.Locale.US;
import static java.util.concurrent.TimeUnit.SECONDS;
/**
* 由于:
* 当服务提供者通过sentinel进行限流等操作, 消费者这边无法捕获到对应的异常
* 故:
* 实现 ErrorDecoder, 对服务提供者返回的异常进行处理
* 注:
* 只会处理sentinel限制后的操作, 同时 response status == MyBlockExceptionHandler.STATUS, 才进行处理, 其余依旧通过sentinel进行处理
*
* @author Chaim
* @date 2021/11/5 12:20
*/
@Component
public class MyErrorDecoder implements ErrorDecoder {
private final RetryAfterDecoder retryAfterDecoder = new RetryAfterDecoder();
@Override
public Exception decode(String methodKey, Response response) {
FeignException exception = errorStatus(methodKey, response);
Date retryAfter = retryAfterDecoder.apply(firstOrNull(response.headers(), RETRY_AFTER));
if (retryAfter != null) {
return new RetryableException(
response.status(),
exception.getMessage(),
response.request().httpMethod(),
exception,
retryAfter,
response.request());
}
return exception;
}
public static FeignException errorStatus(String methodKey, Response response) {
byte[] body = {};
try {
if (response.body() != null) {
body = Util.toByteArray(response.body().asInputStream());
}
} catch (IOException ignored) { // NOPMD
}
Map<String, Object> map = new HashMap<>();
map.put("status", response.status());
if (response.reason() != null) {
map.put("reason", response.reason());
}
map.put("method", response.request().httpMethod());
map.put("url", response.request().url());
map.put("methodKey", methodKey);
map.put("body", new String(body));
// 只对返回码为606进行处理
if (response.status() == MyBlockExceptionHandler.STATUS) {
return new FeignException.FeignServerException(response.status(), JSONObject.toJSONString(map), response.request(), body);
}
return FeignException.errorStatus(methodKey, response);
}
private <T> T firstOrNull(Map<String, Collection<T>> map, String key) {
if (map.containsKey(key) && !map.get(key).isEmpty()) {
return map.get(key).iterator().next();
}
return null;
}
/**
* 由于ErrorDecoder中的方法无法直接使用, 故复制一份出来
*/
static class RetryAfterDecoder {
static final DateFormat RFC822_FORMAT =
new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss 'GMT'", US);
private final DateFormat rfc822Format;
RetryAfterDecoder() {
this(RFC822_FORMAT);
}
RetryAfterDecoder(DateFormat rfc822Format) {
this.rfc822Format = checkNotNull(rfc822Format, "rfc822Format");
}
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
/**
* returns a date that corresponds to the first time a request can be retried.
*
* @param retryAfter String in
* <a href="https://tools.ietf.org/html/rfc2616#section-14.37" >Retry-After format</a>
*/
public Date apply(String retryAfter) {
if (retryAfter == null) {
return null;
}
if (retryAfter.matches("^[0-9]+\\.?0*$")) {
retryAfter = retryAfter.replaceAll("\\.0*$", "");
long deltaMillis = SECONDS.toMillis(Long.parseLong(retryAfter));
return new Date(currentTimeMillis() + deltaMillis);
}
synchronized (rfc822Format) {
try {
return rfc822Format.parse(retryAfter);
} catch (ParseException ignored) {
return null;
}
}
}
}
}
3.效果

三、处理思路
①: 第一步
首先分析一下, 按我的流程分析: sentinel限流是对服务提供者做的操作
那么限流之后肯定是有一个返回的, 返回的结果是什么了?
postman直接请求服务提供者, 返回状态码429, 因为sentinel是对提供者做的限流

然后我们在通过消费者来调用提供者:

对应的源码是在: feign-core: feign.SynchronousMethodHandler.executeAndDecode, (正常处理: response.status() >= 200 && response.status() < 300)

②: 第二步
通过上面的我们可以确定了, 只要处理提供者的返回状态码, 就可以解决问题了!
于是就有了:
// 以前的版本好像是 UrlBlockHandler
public class MyBlockExceptionHandler implements BlockExceptionHandler
到这一步, 其实我们只需要将response.setStatus(200) 设置成200, 就到此结束!
③: 第三步
但是我们可能想, 限流了就是限流了嘛, 标个200不符合我的出发思路, 继续处理
这是看下消费者的日志:

服务提供者返回的body已经符合了我的想法.
程序到这一步, 就由feign接管处理, 看了一下源码feign是没有处理sentinel返回的异常类型(feign是根据状态码进行的处理)
对应的处理源码: feign-core: feign.FeignException.errorStatus

④: 第四步
这里就得说一下
FallbackFactory
public class TestServiceFallbackFactory implements FallbackFactory<TestService>
这里能拿到Throwable
但是就目前的Throwable也不符合我们的要求:
feign.FeignException: [606] during [POST] to [http://***/pay/scan] [TestFeignClient#123(TestPayDTO)]: [{"code":5504,"message":"请稍后再试!"}]
⑤: 第五步
于是就有了:
public class MyErrorDecoder implements ErrorDecoder
那我就从写一下throwableMessage 让符合我的要求
但是我只想处理sentinel我定义的状态为606的
于是就有了:
if (response.status() == MyBlockExceptionHandler.STATUS) {
return new FeignException.FeignServerException(response.status(), JSONObject.toJSONString(map), response.request(), body);
}
// 不是我定义的606, 还是走 FeignException.errorStatus
return FeignException.errorStatus(methodKey, response);
后面只需要对应处理 throwable.getMessage() 即可!