基于Vue+SpringCloudAlibaba微服务电商项目实战-016:构建微服务电商项目智能报警系统替代elk分布式日志收集

1 微服务智能报警系统实现效果演示

今日课程任务

  1. 构建企业级微服务智能报警系统产生背景
  2. 微服务智能报警系统与elk的区别有那些
  3. 智能报警系统整体架构实现原理
  4. 构建Aop拦截系统错误日志写入kafka中
  5. 构建消费者获取kafka异常日志消息
  6. 调用微信模板实现智能报警系统

2 微服务智能报警系统设计原理

分布式日志采集系统elk
“ELK”是三个开源项目的首字母缩写,这三个项目分别是:Elasticsearch、Logstash 和 Kibana。
Elasticsearch 是一个搜索和分析引擎。
Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中。
Kibana 则可以让用户在 Elasticsearch 中使用图形和图表对数据进行可视化。

传统的elk日志存在哪些缺陷?
搜索日志成本时间长,不如出现错误/异常主动上报自动报警功能的预警系统。

微服务智能报警系统设计思想

  1. 主动上报错误日志,用户调用接口如果出现错误异常的情况下,都会主动提醒给开发者。
  2. 可以使用第三方可视化界面管理分析日志。

3 分布式日志采集系统模块分析

分布式日志采集系统mt_log分析思路

  1. 采用全局或者Aop捕获当前系统出现的异常,将错误的信息转换成json内容投递到MQ服务端;
  2. 定义消费者获取MQ中错误的日志,将错误日志持久化到es/mongodb/数据库中;
  3. 采用主动上报的形式,将错误日志以消息模板或者短信/邮件推送给开发者,减少搜索日志成本;
  4. 可以采用web系统可以直接对数据库的日志实现图像报表的分析。
    在这里插入图片描述

模块构建
mt-shop-service-log-aop --采用全局或者AOP捕获当前系统出现的异常/错误
mt-shop-service-log-mqconsumer --消费者获取MQ中错误的日志,将错误日志持久化到es/mongodb/数据库中,消费者一定要集群加速写日志
mt-shop-service-log-web --对数据库的日志实现图像报表的分析

4 基于全局捕获异常将日志投递到MQ服务器端中

全局捕获异常获取详细方法错误信息参数分析
系统打印的Exception记录包含出现异常代码的类、接口等相关信息,请求参数从HttpServletRequest中获取

分布式日志采集数据库表结构分析
数据库记录错误日志表

CREATE TABLE `mt_error_log` (
  `message_id` varchar(255) NOT NULL DEFAULT '',
  `service_id` varchar(30) DEFAULT NULL,
  `class_name` varchar(255) DEFAULT NULL,
  `method_name` varchar(255) DEFAULT NULL,
  `parameter_content` varchar(255) DEFAULT NULL,
  `errorContent` varchar(255) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `line_number` int(11) DEFAULT NULL,
`server_ip` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

message_id作为主键id可以防止mq重试导致插入相同的数据,从而调用两次模板消息推送,解决MQ消费者幂等性问题。

创建mt-shop-service-log-aop模块,会员服务引入该模块依赖即可
PoliceGlobalExceptionHandler

@ControllerAdvice
public class PoliceGlobalExceptionHandler extends BaseApiService {

    @Value("${spring.application.name}")
    private String serverIdName;
    @Value("${server.port}")
    private String serverPort;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @ExceptionHandler(RuntimeException.class)
    @ResponseBody
    public BaseResponse<String> errorResult(HttpServletRequest request, Exception ex) throws UnknownHostException {
        // 采用全局捕获异常 拦截系统中的错误,返回友好的提示给客户端
        Map<String, String[]> parameterMap = request.getParameterMap();
        StackTraceElement[] stackTrace = ex.getStackTrace();
        StackTraceElement stackTraceElement = stackTrace[0];
        // 转换成json
        JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(stackTraceElement));
        // 新增参数内容
        jsonObject.put("parameterContent", parameterMap);
        // 新增错误日志
        jsonObject.put("errorContent", ex.getMessage());
        jsonObject.put("serviceId", serverIdName);
        jsonObject.put("createTime", new Date());
        jsonObject.put("messageId", serverIdName + "-" + UUID.randomUUID().toString());
        // 获取ip地址
        jsonObject.put("serverIp", getServerAddress());
        // 投递到kafka中
        send(jsonObject);
        return setResultError("系统错误");
    }

    /**
     * 发送消息的方法
     *
     * @param data 推送数据的data
     */
    private void send(JSONObject data) {
        // topic 名称 key data 消息数据
        kafkaTemplate.send("mayikt-topic-log", null, data.toJSONString());
    }

    private String getServerAddress() throws UnknownHostException {
        InetAddress ip4 = Inet4Address.getLocalHost();
        return ip4.getHostAddress() + ":" + serverPort;
    }
}

5 定义MQ消费者将错误日志统一写入数据库db中

为什么不在AOP类中写日志需要单独通过MQ进行?
主要目的是为了解耦,不影响到主要的业务代码

创建mt-shop-service-log-mqconsumer模块

@Data
public class ErrorLogEntity {
    private Long id;
    /**
     * 服务的id
     */
    private String serviceId;
    /**
     * 类地址
     */
    private String className;
    /**
     * 方法名称
     */
    private String methodName;
    /**
     * 参数内容
     */
    private String parameterContent;
    /**
     * 错误内容
     */
    private String errorContent;
    /**
     * 创建时间
     */
    private Date createTime;
    /**
     * 错误行数
     */
    private int lineNumber;

    /**
     * 消息全局id 主要解决消息幂等问题
     */
    private String messageId;

    private String serverIp;
}
public interface ErrorLogMapper {


    @Insert("INSERT INTO `mt_error_log` VALUES (#{messageId}, #{serviceId}, #{className}, " +
            "#{methodName}, #{parameterContent}, #{errorContent},now(),#{lineNumber},#{serverIp});")
    int insertErrorLog(ErrorLogEntity errorLogEntity);
}
@Component
public class LogConsumer {

    @Autowired
    private ErrorLogMapper errorLogMapper;

    @KafkaListener(topics = "mayikt-topic-log")
    public void receive(ConsumerRecord<?, ?> consumer) throws Exception {
        String json = (String) consumer.value();
        if (StringUtils.isEmpty(json)) {
            return;
        }
        // 存放到数据库或者es、mongdb中
        ErrorLogEntity errorLogEntity = JSON.parseObject(json, ErrorLogEntity.class);
        // 根据消息的全局id主动查询,如果在数据库中已经存在的情况下 不会执行以下流程
        try {
            int result = errorLogMapper.insertErrorLog(errorLogEntity);
            if (result <= 0) {
                throw new Exception("日志插入数据库失败,开始触发MQ重试策略");
            }
        } catch (Exception e) {

        }
        return;
    }
}
@SpringBootApplication
@MapperScan("com.mayikt.log.mapper")
public class AppLogConsumer {
    public static void main(String[] args) {
        SpringApplication.run(AppLogConsumer.class);
    }
}

测试效果:
在这里插入图片描述

6 定义微信消息推送错误日志模板接口

创建消息模板
http://mp.weixin.qq.com/debug/cgi-bin/sandboxinfo?action=showinfo&t=sandbox/index
模板内容

系统服务名称:{{first.DATA}} 
在{{keyword1.DATA}} 出现了错误
服务ip信息:{{keyword2.DATA}}
错误内容:
{{keyword3.DATA}}

mt-shop-service-api-weixin

@Data
@AllArgsConstructor
public class ErrorTemplateDto {
    String serviceId;
    Date errorDate;
    String errorMsg;
    String serverIp;
    String openId;
}
public interface WeiXinErrorTemplateService {

    @PostMapping("/sendErrorTemplate")
    BaseResponse<String> sendErrorTemplate(@RequestBody ErrorTemplateDto errorTemplateDto);
}

mt-shop-service-weixin

@RestController
public class WeiXinErrorTemplateServiceImpl extends BaseApiService implements WeiXinErrorTemplateService {
    @Autowired
    private WxMpProperties wxMpProperties;

    @Override
    public BaseResponse<String> sendErrorTemplate(ErrorTemplateDto errorTemplateDto) {
        String serviceId = errorTemplateDto.getServiceId();
        if (StringUtils.isEmpty(serviceId)) {
            return setResultError("serviceId不能为空");
        }
        Date errorDate = errorTemplateDto.getErrorDate();
        if (errorDate == null) {
            return setResultError("errorDate不能为空");
        }
        String errorMsg = errorTemplateDto.getErrorMsg();
        if (StringUtils.isEmpty(errorMsg)) {
            return setResultError("errorMsg不能为空");
        }
        String openId = errorTemplateDto.getOpenId();
        if (StringUtils.isEmpty(openId)) {
            return setResultError("openId参数不能为空!");
        }
        String serverIp = errorTemplateDto.getServerIp();
        if (StringUtils.isEmpty(serverIp)) {
            return setResultError("serverIp参数不能为空!");
        }
        WxMpTemplateMessage wxMpTemplateMessage = new WxMpTemplateMessage();
        wxMpTemplateMessage.
                setTemplateId("0dz6fPAphJ6gtjjp9gtLfWvMgW4F8QDhsTBuoZMGGLY");
        wxMpTemplateMessage.setToUser(openId);
        List<WxMpTemplateData> data = new ArrayList<>();
        data.add(new WxMpTemplateData("first", serviceId));
        data.add(new WxMpTemplateData("keyword1",
                SimpleDateFormatUtil.getFormatStrByPatternAndDate(new Date())));
        data.add(new WxMpTemplateData("keyword2", serverIp));
        data.add(new WxMpTemplateData("keyword3", errorMsg));
        wxMpTemplateMessage.setUrl("http://www.mayikt.com");
        wxMpTemplateMessage.setData(data);
        try {
            String appId = wxMpProperties.getConfigs().get(0).getAppId();
            WxMpTemplateMsgService templateMsgService = WxMpConfiguration.getMpServices().get(appId).getTemplateMsgService();
            templateMsgService.sendTemplateMsg(wxMpTemplateMessage);
            return setResultSuccess();
        } catch (Exception e) {
            return setResultError("发送失败");
        }
    }
}

测试效果:
在这里插入图片描述

7 分布式日志采集报警系统流程测试

微信openid表

CREATE TABLE `sender_contacts` (
  `id` int(11) NOT NULL,
  `wechat_name` varchar(255) DEFAULT NULL,
  `openid` varchar(255) DEFAULT NULL,
  `availability` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

生成二维码链接提供扫码插入该表,记录发送微信模板消息联系人openid信息。

相关类代码

@Data
public class SenderContactsEntity {
    private Long id;
    private String wechatName;
    private String openId;
    private String availability;
}
public interface SenderContactsMapper {

    @Select("SELECT ID AS ID ,wechat_name AS wechatname,openid as openid , availability as availability from sender_contacts  " +
            "where availability='1' ")
    List<SenderContactsEntity> getSenderContactsOpenId();
}
@FeignClient("mayikt-weixin")
public interface WeiXinErrorTemplateServiceFeign extends WeiXinErrorTemplateService {
}
@Component
public class LogConsumer {

    @Autowired
    private ErrorLogMapper errorLogMapper;
    @Autowired
    private WeiXinErrorTemplateServiceFeign weiXinErrorTemplateServiceFeign;
    @Autowired
    private SenderContactsMapper senderContactsMapper;

    @KafkaListener(topics = "mayikt-topic-log")
    public void receive(ConsumerRecord<?, ?> consumer) throws Exception {
        String json = (String) consumer.value();
        if (StringUtils.isEmpty(json)) {
            return;
        }
        // 存放到数据库或者es、mongdb中
        ErrorLogEntity errorLogEntity = JSON.parseObject(json, ErrorLogEntity.class);
//        String messageId = errorLogEntity.getMessageId();
        // 根据消息的全局id主动查询 ,如果在数据库存在的情况下 不会执行以下流程
        // 插入到数据库中
        try {
            int result = errorLogMapper.insertErrorLog(errorLogEntity);
            if (result <= 0) {
                throw new Exception("日志插入数据库失败,开始触发MQ重试策略");

            }
            List<SenderContactsEntity> openIdList = senderContactsMapper.getSenderContactsOpenId();
            for (SenderContactsEntity sd :
                    openIdList) {
                // 调用微信服务接口 发送消息模板
                String serviceId = errorLogEntity.getServiceId();
                Date createTime = errorLogEntity.getCreateTime();
                String errorMsg = errorLogEntity.getClassName() + ",的" + errorLogEntity.getMethodName() + "方法," + errorLogEntity.getLineNumber()
                        + "行" + "出现了" + errorLogEntity.getErrorContent() + "异常";
                ErrorTemplateDto errorTemplateDto =
                        new ErrorTemplateDto(serviceId, createTime, errorMsg, errorLogEntity.getServerIp(), sd.getOpenId());
                weiXinErrorTemplateServiceFeign.sendErrorTemplate(errorTemplateDto);
            }
            return;

        } catch (Exception e) {
            e.printStackTrace();
            return;
        }
    }
}

测试效果:
在这里插入图片描述