基于Vue+SpringCloudAlibaba微服务电商项目实战-016:构建微服务电商项目智能报警系统替代elk分布式日志收集
016:构建微服务电商项目智能报警系统替代elk分布式日志收集
1 微服务智能报警系统实现效果演示
今日课程任务
- 构建企业级微服务智能报警系统产生背景
- 微服务智能报警系统与elk的区别有那些
- 智能报警系统整体架构实现原理
- 构建Aop拦截系统错误日志写入kafka中
- 构建消费者获取kafka异常日志消息
- 调用微信模板实现智能报警系统
2 微服务智能报警系统设计原理
分布式日志采集系统elk
“ELK”是三个开源项目的首字母缩写,这三个项目分别是:Elasticsearch、Logstash 和 Kibana。
Elasticsearch 是一个搜索和分析引擎。
Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中。
Kibana 则可以让用户在 Elasticsearch 中使用图形和图表对数据进行可视化。
传统的elk日志存在哪些缺陷?
搜索日志成本时间长,不如出现错误/异常主动上报自动报警功能的预警系统。
微服务智能报警系统设计思想
- 主动上报错误日志,用户调用接口如果出现错误异常的情况下,都会主动提醒给开发者。
- 可以使用第三方可视化界面管理分析日志。
3 分布式日志采集系统模块分析
分布式日志采集系统mt_log分析思路
- 采用全局或者Aop捕获当前系统出现的异常,将错误的信息转换成json内容投递到MQ服务端;
- 定义消费者获取MQ中错误的日志,将错误日志持久化到es/mongodb/数据库中;
- 采用主动上报的形式,将错误日志以消息模板或者短信/邮件推送给开发者,减少搜索日志成本;
- 可以采用web系统可以直接对数据库的日志实现图像报表的分析。

模块构建
mt-shop-service-log-aop --采用全局或者AOP捕获当前系统出现的异常/错误
mt-shop-service-log-mqconsumer --消费者获取MQ中错误的日志,将错误日志持久化到es/mongodb/数据库中,消费者一定要集群加速写日志
mt-shop-service-log-web --对数据库的日志实现图像报表的分析
4 基于全局捕获异常将日志投递到MQ服务器端中
全局捕获异常获取详细方法错误信息参数分析
系统打印的Exception记录包含出现异常代码的类、接口等相关信息,请求参数从HttpServletRequest中获取
分布式日志采集数据库表结构分析
数据库记录错误日志表
CREATE TABLE `mt_error_log` (
`message_id` varchar(255) NOT NULL DEFAULT '',
`service_id` varchar(30) DEFAULT NULL,
`class_name` varchar(255) DEFAULT NULL,
`method_name` varchar(255) DEFAULT NULL,
`parameter_content` varchar(255) DEFAULT NULL,
`errorContent` varchar(255) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`line_number` int(11) DEFAULT NULL,
`server_ip` varchar(255) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
message_id作为主键id可以防止mq重试导致插入相同的数据,从而调用两次模板消息推送,解决MQ消费者幂等性问题。
创建mt-shop-service-log-aop模块,会员服务引入该模块依赖即可
PoliceGlobalExceptionHandler
@ControllerAdvice
public class PoliceGlobalExceptionHandler extends BaseApiService {
@Value("${spring.application.name}")
private String serverIdName;
@Value("${server.port}")
private String serverPort;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@ExceptionHandler(RuntimeException.class)
@ResponseBody
public BaseResponse<String> errorResult(HttpServletRequest request, Exception ex) throws UnknownHostException {
// 采用全局捕获异常 拦截系统中的错误,返回友好的提示给客户端
Map<String, String[]> parameterMap = request.getParameterMap();
StackTraceElement[] stackTrace = ex.getStackTrace();
StackTraceElement stackTraceElement = stackTrace[0];
// 转换成json
JSONObject jsonObject = JSON.parseObject(JSON.toJSONString(stackTraceElement));
// 新增参数内容
jsonObject.put("parameterContent", parameterMap);
// 新增错误日志
jsonObject.put("errorContent", ex.getMessage());
jsonObject.put("serviceId", serverIdName);
jsonObject.put("createTime", new Date());
jsonObject.put("messageId", serverIdName + "-" + UUID.randomUUID().toString());
// 获取ip地址
jsonObject.put("serverIp", getServerAddress());
// 投递到kafka中
send(jsonObject);
return setResultError("系统错误");
}
/**
* 发送消息的方法
*
* @param data 推送数据的data
*/
private void send(JSONObject data) {
// topic 名称 key data 消息数据
kafkaTemplate.send("mayikt-topic-log", null, data.toJSONString());
}
private String getServerAddress() throws UnknownHostException {
InetAddress ip4 = Inet4Address.getLocalHost();
return ip4.getHostAddress() + ":" + serverPort;
}
}
5 定义MQ消费者将错误日志统一写入数据库db中
为什么不在AOP类中写日志需要单独通过MQ进行?
主要目的是为了解耦,不影响到主要的业务代码
创建mt-shop-service-log-mqconsumer模块
@Data
public class ErrorLogEntity {
private Long id;
/**
* 服务的id
*/
private String serviceId;
/**
* 类地址
*/
private String className;
/**
* 方法名称
*/
private String methodName;
/**
* 参数内容
*/
private String parameterContent;
/**
* 错误内容
*/
private String errorContent;
/**
* 创建时间
*/
private Date createTime;
/**
* 错误行数
*/
private int lineNumber;
/**
* 消息全局id 主要解决消息幂等问题
*/
private String messageId;
private String serverIp;
}
public interface ErrorLogMapper {
@Insert("INSERT INTO `mt_error_log` VALUES (#{messageId}, #{serviceId}, #{className}, " +
"#{methodName}, #{parameterContent}, #{errorContent},now(),#{lineNumber},#{serverIp});")
int insertErrorLog(ErrorLogEntity errorLogEntity);
}
@Component
public class LogConsumer {
@Autowired
private ErrorLogMapper errorLogMapper;
@KafkaListener(topics = "mayikt-topic-log")
public void receive(ConsumerRecord<?, ?> consumer) throws Exception {
String json = (String) consumer.value();
if (StringUtils.isEmpty(json)) {
return;
}
// 存放到数据库或者es、mongdb中
ErrorLogEntity errorLogEntity = JSON.parseObject(json, ErrorLogEntity.class);
// 根据消息的全局id主动查询,如果在数据库中已经存在的情况下 不会执行以下流程
try {
int result = errorLogMapper.insertErrorLog(errorLogEntity);
if (result <= 0) {
throw new Exception("日志插入数据库失败,开始触发MQ重试策略");
}
} catch (Exception e) {
}
return;
}
}
@SpringBootApplication
@MapperScan("com.mayikt.log.mapper")
public class AppLogConsumer {
public static void main(String[] args) {
SpringApplication.run(AppLogConsumer.class);
}
}
测试效果:

6 定义微信消息推送错误日志模板接口
创建消息模板
http://mp.weixin.qq.com/debug/cgi-bin/sandboxinfo?action=showinfo&t=sandbox/index
模板内容
系统服务名称:{{first.DATA}}
在{{keyword1.DATA}} 出现了错误
服务ip信息:{{keyword2.DATA}}
错误内容:
{{keyword3.DATA}}
mt-shop-service-api-weixin
@Data
@AllArgsConstructor
public class ErrorTemplateDto {
String serviceId;
Date errorDate;
String errorMsg;
String serverIp;
String openId;
}
public interface WeiXinErrorTemplateService {
@PostMapping("/sendErrorTemplate")
BaseResponse<String> sendErrorTemplate(@RequestBody ErrorTemplateDto errorTemplateDto);
}
mt-shop-service-weixin
@RestController
public class WeiXinErrorTemplateServiceImpl extends BaseApiService implements WeiXinErrorTemplateService {
@Autowired
private WxMpProperties wxMpProperties;
@Override
public BaseResponse<String> sendErrorTemplate(ErrorTemplateDto errorTemplateDto) {
String serviceId = errorTemplateDto.getServiceId();
if (StringUtils.isEmpty(serviceId)) {
return setResultError("serviceId不能为空");
}
Date errorDate = errorTemplateDto.getErrorDate();
if (errorDate == null) {
return setResultError("errorDate不能为空");
}
String errorMsg = errorTemplateDto.getErrorMsg();
if (StringUtils.isEmpty(errorMsg)) {
return setResultError("errorMsg不能为空");
}
String openId = errorTemplateDto.getOpenId();
if (StringUtils.isEmpty(openId)) {
return setResultError("openId参数不能为空!");
}
String serverIp = errorTemplateDto.getServerIp();
if (StringUtils.isEmpty(serverIp)) {
return setResultError("serverIp参数不能为空!");
}
WxMpTemplateMessage wxMpTemplateMessage = new WxMpTemplateMessage();
wxMpTemplateMessage.
setTemplateId("0dz6fPAphJ6gtjjp9gtLfWvMgW4F8QDhsTBuoZMGGLY");
wxMpTemplateMessage.setToUser(openId);
List<WxMpTemplateData> data = new ArrayList<>();
data.add(new WxMpTemplateData("first", serviceId));
data.add(new WxMpTemplateData("keyword1",
SimpleDateFormatUtil.getFormatStrByPatternAndDate(new Date())));
data.add(new WxMpTemplateData("keyword2", serverIp));
data.add(new WxMpTemplateData("keyword3", errorMsg));
wxMpTemplateMessage.setUrl("http://www.mayikt.com");
wxMpTemplateMessage.setData(data);
try {
String appId = wxMpProperties.getConfigs().get(0).getAppId();
WxMpTemplateMsgService templateMsgService = WxMpConfiguration.getMpServices().get(appId).getTemplateMsgService();
templateMsgService.sendTemplateMsg(wxMpTemplateMessage);
return setResultSuccess();
} catch (Exception e) {
return setResultError("发送失败");
}
}
}
测试效果:

7 分布式日志采集报警系统流程测试
微信openid表
CREATE TABLE `sender_contacts` (
`id` int(11) NOT NULL,
`wechat_name` varchar(255) DEFAULT NULL,
`openid` varchar(255) DEFAULT NULL,
`availability` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
生成二维码链接提供扫码插入该表,记录发送微信模板消息联系人openid信息。
相关类代码
@Data
public class SenderContactsEntity {
private Long id;
private String wechatName;
private String openId;
private String availability;
}
public interface SenderContactsMapper {
@Select("SELECT ID AS ID ,wechat_name AS wechatname,openid as openid , availability as availability from sender_contacts " +
"where availability='1' ")
List<SenderContactsEntity> getSenderContactsOpenId();
}
@FeignClient("mayikt-weixin")
public interface WeiXinErrorTemplateServiceFeign extends WeiXinErrorTemplateService {
}
@Component
public class LogConsumer {
@Autowired
private ErrorLogMapper errorLogMapper;
@Autowired
private WeiXinErrorTemplateServiceFeign weiXinErrorTemplateServiceFeign;
@Autowired
private SenderContactsMapper senderContactsMapper;
@KafkaListener(topics = "mayikt-topic-log")
public void receive(ConsumerRecord<?, ?> consumer) throws Exception {
String json = (String) consumer.value();
if (StringUtils.isEmpty(json)) {
return;
}
// 存放到数据库或者es、mongdb中
ErrorLogEntity errorLogEntity = JSON.parseObject(json, ErrorLogEntity.class);
// String messageId = errorLogEntity.getMessageId();
// 根据消息的全局id主动查询 ,如果在数据库存在的情况下 不会执行以下流程
// 插入到数据库中
try {
int result = errorLogMapper.insertErrorLog(errorLogEntity);
if (result <= 0) {
throw new Exception("日志插入数据库失败,开始触发MQ重试策略");
}
List<SenderContactsEntity> openIdList = senderContactsMapper.getSenderContactsOpenId();
for (SenderContactsEntity sd :
openIdList) {
// 调用微信服务接口 发送消息模板
String serviceId = errorLogEntity.getServiceId();
Date createTime = errorLogEntity.getCreateTime();
String errorMsg = errorLogEntity.getClassName() + ",的" + errorLogEntity.getMethodName() + "方法," + errorLogEntity.getLineNumber()
+ "行" + "出现了" + errorLogEntity.getErrorContent() + "异常";
ErrorTemplateDto errorTemplateDto =
new ErrorTemplateDto(serviceId, createTime, errorMsg, errorLogEntity.getServerIp(), sd.getOpenId());
weiXinErrorTemplateServiceFeign.sendErrorTemplate(errorTemplateDto);
}
return;
} catch (Exception e) {
e.printStackTrace();
return;
}
}
}
测试效果:
