伪原创 api(保证系统线上问题最好推送通知(一)_光明网(组图))

优采云 发布时间: 2022-02-12 13:03

  伪原创 api(保证系统线上问题最好推送通知(一)_光明网(组图))

  前言

  为了保证系统的稳定性,公司增加了很多监控,比如:接口响应时间、cpu使用情况、内存使用情况、错误日志等,如果系统出现异常情况,相关人员会通过邮件通知,让大家第一时间解决系统隐藏问题。另外,我们这里有个不成文的规定,网上的问题最好当天解决,除非有非常难的问题。

  1.原因

  一个周一早上,我去公司上班,查了邮箱,收到了老板转发的邮件,让我在网上追查一个NPE(NullPointException)问题。

  电子邮件是通过 sentry 发送的。通过点击邮件中的相关链接,我们可以直接跳转到哨兵的详情页面。在这个页面上,显示了很多关键信息,比如:操作时间、请求的接口、错误码位置、错误信息、请求经过了哪些链接等等。确实是居家旅行和查虫的良药。有了这些,小案子一眼就能找到原因。

  不费吹灰之力,我就访问了NPE的哨兵错误页面(其实就是双击鼠标完成的)。果然上面有很多关键信息,一眼就能看出NPE的具体代码位置:

  notify.setName(CurrentUser.getCurrent().getUserName());

  剧情进展的这么顺利,我有点不好意思。

  根据类名和代码行号,我很快在idea里找到了那行代码,好像不是我写的,这下可以放心了,不用自责了。于是查看了那行的代码修改记录,最后修改的是XXX。

  什么?是他?

  一个月前他就辞职了,看来这个无头公案也没办法问,只能自己找原因了。

  当时心里的操作系统是:代码不兼容。

  你为什么这么说?

  这行代码其实很简单,就是从当前用户上下文中获取用户名,然后设置到notify实体的inUserName字段中,最后将notify数据保存到数据库中。

  该字段表示添加推送通知的人员。一般情况下是没用的,主要是网上出现问题的时候有地方可以追根溯源。如果有错误的情况,你可以被清除。

  顺便说一下,这里所说的推送通知和mq中的消息是不一样的。前者指的是websocket长连接推送的实时通知。在我们这边很多业务场景中,页面功能完成后,会实时发送通知给指定的人。用户,以便用户及时处理相关文件,例如:您有需要审批的审批文件,请及时处理。

  CurrentUser 收录一个 ThreadLocal 对象,该对象负责保存当前线程的用户上下文信息。当然,为了保证可以从线程池中的用户上下文中获取正确的用户信息,这里使用了阿里的TransmittableThreadLocal。伪代码如下:

  @Data

public class CurrentUser {

    private static final TransmittableThreadLocal THREA_LOCAL = new TransmittableThreadLocal();

    

    private String id;

    private String userName;

    private String password;

    private String phone;

    ...

    

    public statis void set(CurrentUser user) {

      THREA_LOCAL.set(user);

    }

    

    public static void getCurrent() {

      return THREA_LOCAL.get();

    }

}

  为什么用阿里的TransmittableThreadLocal而不是普通的ThreadLocal?在线程池中,由于线程会被多次复用,所以无法从普通的ThreadLocal中获取到正确的用户信息。父线程中的参数不能传给子线程,TransmittableThreadLocal很好的解决了这个问题。

  然后在项目中定义一个全局的spring mvc*敏*感*词*,专门设置用户上下文为ThreadLocal。伪代码如下:

  public class UserInterceptor extends HandlerInterceptorAdapter {

   

   @Override  

   public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {

      CurrentUser user = getUser(request);

      if(Objects.nonNull(user)) {

         CurrentUser.set(user);

      }

   } 

}

  当用户请求我们的接口时,首先会触发*敏*感*词*,它会根据用户cookie中的token调用调用接口获取redis中的用户信息。如果能获取到,说明用户已经登录,用户信息设置到CurrentUser类的ThreadLocal中。

  接下来在api服务的下层,也就是在业务层的方法中,通过CurrentUser.getCurrent()可以方便的获取到想要的用户上下文信息;方法。

  

  这个用户系统的思路很好,但是深入使用后发现了一个小插曲:

  api服务和mq消费者服务都指业务层,业务层中的方法都可以直接被两个服务调用。

  我们都知道用户需要登录api服务,但是mq消费服务不需要登录。

  

  如果业务中的某个方法首先是为api开发的,CurrentUser.getCurrent();在获取用户上下文的方法中深入使用。但是后来一个新帅哥也在mq consumer中调用了那个方法,没找到这个小机制,会被骗,会​​出现找不到用户上下文的问题。

  

  所以当时我的第一个想法是:代码不兼容,因为这种问题以前也有过。

  要解决这个问题,其实很简单。只需要先判断是否可以从CurrentUser获取用户信息。如果没有,则获取配置的系统用户信息。伪代码如下:

  @Autowired

private BusinessConfig businessConfig;

CurrentUser user = CurrentUser.getCurrent();

if(Objects.nonNull(user)) {

   entity.setUserId(user.getUserId());

   entity.setUserName(user.getUserName());

} else {

   entity.setUserId(businessConfig.getDefaultUserId());

   entity.setUserName(businessConfig.getDefaultUserName());

}

  这种简单无污染的代码,只加一两处就可以了。

  但是如果有多个地方在获取用户信息,是否需要在每个地方都写相同的判断逻辑呢?对于有抱负的程序员来说,这种简单的重复是写代码时的禁忌。如何更优雅地解决问题?

  答案将在文章之后揭晓。

  从表面上看,这个NPE问题已经有了答案。根据以往的经验,由于代码中没有进行兼容性处理,所以在mq消费者服务中获取的用户信息是空的,当调用它的方法对空对象时,会出现NPE。

  2.第一次反转

  不过这个回答好像有点草率,会不会有什么机制?

  于是我在项目中全局搜索了CurrentUser.set关键字,居然找到了一个机制。

  剧情第一次反转。

  有个地方写了一个rocketmq AOP*敏*感*词*,伪代码如下:

  @Aspect

@Component

public class RocketMqAspect {

   @Pointcut("execution(* onMessage(..)&&@within(org.apache.rocketmq.spring.annotation.RocketMQMessageListener))")

   public void pointcut() {

   

   }

   ...

   @Around(value="pointcut")

   public void around(ProceedingJoinPoint point) throws Throwable {

      if(point.getArgs().length == 1 && point.getArgs()[0] instanceof MessageExt) {

         Message message = (Message)point.getArgs()[0];

         String userId = message.getUserProperty("userId");

         String userName = message.getUserProperty("userName");

         if(StringUtils.notEmpty(userId) && StringUtils.notEmpty(userName))  {

             CurrentUser user = new CurrentUser();

             user.setUserId(userId);

             user.setUserName(userName);

             CurrentUser.set(user);

         }

      }

      

      ...

   }

}

  它拦截所有mq消费者中的onMessage方法,在方法执行前从userProperty获取用户信息,创建一个用户对象,并将其设置到用户上下文中。

  温馨提醒,免得有些朋友拿着葫芦画踩坑。上面的伪代码只给出了设置用户上下文的关键代码。用完后,没有给出删除用户上下文的代码。有兴趣的朋友可以私聊我。

  既然有地方可以获取用户信息,我想肯定有地方可以设置。这时,我突然意识到自己有成为侦探的潜质,因为后来我真的发现了。

  你惊讶吗,你惊讶吗?

  另一个同事自己定制了一个RocketMQTemplate。伪代码如下:

  public class MyRocketMQTemplate extends RocketMQTemplate {

    

    @Override

    public void asyncSend(String destnation, Meassage message, SendCallback sendCallback, long timeout, int delayLevel) {

      

      MessageBuilder builder = withPayload(message.getPayLoad());

      CurrentUser user = CurrentUser.getCurrent();

      builder.setHeader("userId", user.getUserId());

      builder.setHeader("userName", user.getUserName());

      

      super.asyncSend(destnation,message,sendCallback,timeout,delayLevel);

    }

}

  这段代码的主要作用是在mq生产者发送异步消息之前,将当前用户上下文信息设置到mq消息的header中,以便mq consumer通过userProperty获取,其本质也是从header中获取的。从...获取。

  

  这个设计巧妙,完美解决了mq消费者无法通过CurrentUser.getCurrent();获取用户信息的问题。

  至此,线索突然被切断,毫无进展。

  我再次检查了服务器日志。确认有问题的mq消息头信息中没有userId和userName字段。

  莫非mq生产者没有把用户信息塞到header里?这是需要严重怀疑的地方。

  因为mq生产者是另外一个团队写的代码,所以EOA(登录系统)回调他们的系统时,会向我们发送mq消息通知我们登录状态。

  EOA是第三方系统,用户系统与我们无关。所以在另外一个团队的回调接口中,无法获取当前登录的用户信息,AOP*敏*感*词*也无法自动将用户信息插入到header中,这样自然无法被mq消费者使用。

  

  这么一想,还真是有道理。

  3.第二次反转

  但真的是这样吗?

  我们寄予厚望,向他们发送了一封电子邮件,并请他们帮助查找问题。

  很快,他们就回了邮件。

  但是他们说:它已经在本地测试过,并且运行良好。

  剧情就这样第二次反转了。

  在这一点上我有点好奇,他们是如何将用户信息填充到标题中的。抱着“学习的心态”,我让他们一起检查相关代码。

  在他们发送 mq 消息之前,他们会调用一个 UserUtil 工具来注入用户。工具类的伪代码如下:

  @Component

public class UserUtil{

    @Value("${susan.userId}")

    private String userId;

    @Value("${susan.userName}")

    private String userName;

    public void injectUser() {

        CurrentUser user = new CurrentUser();

        user.setUserId(userId);

        user.setUserName(userName);

        CurrentUser.set(user);

    }

}

  好吧,不得不承认,这确实可以解决header中传入用户信息的问题,比之前手动判断用户信息是否为空要优雅得多,因为注入的用户信息肯定不是空的。

  

  折腾了半天,NPE问题还是没有解决。

  回头仔细查看了自定义的RocketMQTemplate类,发现里面覆盖了方法:asyncSend,里面收录了5个参数。当他们向我们推送消息时,调用的 asyncSend 只传递了 3 个参数。

  一下子,问题有了新的进展。会不会是他们调错了界面?

  应该调用具有 5 个参数的方法,但他们实际上调用了具有 3 个参数的方法。

  这将解释它。

  4.第三次反转

  终于,我有了一些想法,欣喜若狂,准备开始验证刚才的猜想。

  但事实证明,我真的高兴得太早了,马上就被扇了耳光。

  这一次是最快的逆转。

  这是怎么回事?

  本来我还以为是另外一个团队的人在发送mq消息的时候调错了方法。他们应该调用带有 5 个参数的 asyncSend 方法,但他们的代码实际上调用了带有 3 个参数的同名方法。

  为了防止不法同事发生。抱着认真的态度,我仔细查看了 RocketMQTemplate 类的所有方法,该类是由 RocketMQ 框架提供的。

  偶然发现了一些不连贯的关系,伪代码如下:

  public void asyncSend(String destination, Message message, SendCallback sendCallback, long timeout, int delayLevel) {

  if (Objects.isNull(message) || Objects.isNull(message.getPayload())) {

      log.error("asyncSend failed. destination:{}, message is null ", destination);

      throw new IllegalArgumentException("`message` and `message.payload` cannot be null");

    }

    try {

        org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper,

            charset, destination, message);

        if (delayLevel > 0) {

            rocketMsg.setDelayTimeLevel(delayLevel);

        }

        producer.send(rocketMsg, sendCallback, timeout);

    } catch (Exception e) {

        log.info("asyncSend failed. destination:{}, message:{} ", destination, message);

        throw new MessagingException(e.getMessage(), e);

    }

}

    

public void asyncSend(String destination, Message message, SendCallback sendCallback, long timeout) {

    asyncSend(destination,message,sendCallback,timeout,0);

}

public void asyncSend(String destination, Message message, SendCallback sendCallback) {

    asyncSend(destination, message, sendCallback, producer.getSendMsgTimeout());

}

public void asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout) {

     Message message = this.doConvert(payload, null, null);

     asyncSend(destination, message, sendCallback, timeout);

}

public void asyncSend(String destination, Object payload, SendCallback sendCallback) {

    asyncSend(destination, payload, sendCallback, producer.getSendMsgTimeout());

}

  这背后有一个大秘密。这些同名的方法目标相同,最终都会调用5个参数的asyncSend方法。

  这样,如果在一个子类中重写 5 个 asyncSend 方法,就相当于重写了所有 asyncSend 方法。

  

  再次证明他们是对的。

  温馨提示,有些类的重载方法会互相调用。如果在子类中重写底层重载方法,则相当于重写所有重载方法。

  头痛,回到第一方。

  5.第四次反转

  在这一点上,我有点失落。

  但是,当遇到在线问题不知道该怎么办时,多查看日志是一个好习惯。

  没想到会报什么,没想到又查了日志。

  发生了第四次逆转。

  这次抱着试试看的心态,根据messageID查看了mq生产者的日志,找到了一条消息的发送日志。

  这一次,我的眼睛很敏锐,我发现了一个小细节:时间不对。

  此日志显示消息是在 2021-05-21 发送的,而实际上 mq 消费者在 2021-05-28 处理它。

  这条消息在一周内被消费了?

  明显不是。

  我有点敬畏。回去用messageID查看mq消费者的日志,发现该消息实际上被消费了6次。前5次竟然是同一天,都是2021-05-21,都失败了。另一次是2021-05-28,处理成功。

  为什么同一条消息在同一天被消费了 5 次?

  如果你熟悉rocketmq,你一定知道它支持重试机制。

  如果mq消费者消息处理失败,可以在业务代码中抛出异常。然后框架层捕获异常并返回ConsumeConcurrentlyStatus.RECONSUME_LATER,rocketmq会自动将消息放入重试队列。

  

  流程图如下:

  

  这样mq消费者可以在下次再消费消息直到达到一定次数(这里我们配置5次),rocketmq会将消息发送到死信队列。

  

  流程图如下:

  

  之后不再消费。

  为什么最后要花不止一次?

  最后一条消息不能被其他mq生产者发送,因为messageID是唯一的,其他生产者不能生成相同的messageID。

  那么,只有一种可能,那就是人为地发送消息。

  在查看在线日志时,时间、messageID、traceID、记录数这几个维度非常重要。

  6.真相

  后来才发现,确实是人发的信息。

  一周前,网上有用户因为EOA页面回调接口失败(重试也失败),导致修改审核状态失败。审计订单在EOA系统中已经审核通过,但是mq消费者在处理审计订单时发现状态还是pending,没有完成后续流程就直接返回,导致审计订单数据异常.

  为了解决这个问题,我们首先修改了当时在线审核订单的状态。接下来,在rocketmq后台手动发送消息。由于当时在rocketmq后台看不到header信息,所以发送消息时忽略header,直接将消息发送到指定topic。

  注意,手动发送mq消息时,一定要注意是否需要在header中设置相关参数,尤其是rocketmq,否则可能会出现问题。

  mq消费者消费完消息后,审计订单正常完成流程。当时测试一起测试,数据库状态正常。

  大家以为没有问题,但是大家忽略了一个小细节:就是正常的业务逻辑处理完之后,会向指定的用户发送一个websocket通知。但是这个功能是已经离开的同事添加的新逻辑,其他人都不知道。从手动发送消息的人的角度来看,他是对的,因为他不知道新功能的存在。

  由于这行代码是最后一行代码,和前面的代码不在同一个东西里,所以即使出现问题,也不会影响正常的业务逻辑。

  所以这个NPE问题的影响很小,只是商家没有收到一定的通知。

  一个好习惯是把与核心业务逻辑无关的代码放在事务之外,防止问题影响到主流程。

  说实话,有时候遇到线上问题对我们来说不一定是坏事。通过这次线上问题定位,我熟悉了公司更多的新功能,从其他同事那里学到了一些好的想法,总结了一些经验和教训。这是一个难得的提升自己的机会。

  最后一句话(请注意,不要白嫖我)

  码字不易,但要做到并珍惜。您的认可是我坚持的最大动力。我要求一键连续三个链接:点赞、转发和观看。

0 个评论

要回复文章请先登录注册


官方客服QQ群

微信人工客服

QQ人工客服


线