阻塞队列与Protobuf的Udp通信 - 基于Cat的代理(Agent)项目拆解

  created  by  鱼鱼 {{tag}}
创建于 2020年07月19日 20:45:47 最后修改于 2020年07月19日 21:58:27

CAT介绍

    CAT是美团点评的一个基于Java开发的异常和性能监控项目,github地址:https://github.com/dianping/cat。本篇文章不是对CAT本身的源码拆解,而是基于本人依赖CAT client开发的代理项目进行拆解,但是并不会纰漏任何技术细节。

    CAT当前已有很多不同语言的Client,当然暂且是不

CAT大致原理和使用

    CAT本身是通过CAT client收集数据并上报至CAT server,server会进行并,共有六种常见数据格式:Transaction、Event、Problem、Metric、HeartBeat、调用链标记,其实如果不考虑复杂的处理(譬如Metric是可以基于指标生成折线图,Problem可以根据具体的异常类型追溯到相应的会话Track)除去Transaction剩余的数据格式都可以理解为特殊的Event。即CAT的基本消息类型只有两类:Transaction(会话)和Event(事件),原则上CAT会存储每次会话的事件内容,基于本地文件和数据库的二进制存储,并在需要的时候展示出来。以下是一个请求为例的CAT使用例子,我们在Java中一般使用AOP进行CAT事件上报,此处针对具体的例子直接调用CAT的相关方法:

  //获取用户信息 
    @GetMappting("user/{id}")
     public User test(@PathVariable Integer uid, @RequestParam(required = false) String fields) throws InterruptedException {
                   //开启会话1
            Transaction transaction = Cat.newTransaction("request","/user");
            //上报事件
            Cat.logEvent("userId",uid.toString());
            if(null==fields||fields.equals("")){
                Cat.logError("null fields",new Exception());
                transaction.complete();
            }
            Cat.logEvent("fields",fields);
                    //开启会话2
            Transaction transaction1 = Cat.newTransaction("dao","getUserById");
            User user = userDao.getUserById();
            //结束会话2
            transaction1.setStatus(Transaction.SUCCESS);
            transaction1.complete();
            Cat.logEvent("return",user.toString());
            transaction.setStatus(Transaction.SUCCESS);
            transaction.complete();
            return user;
     }

    当我们发出一次合法请求后,Cat中就会有相应的记录(需要通过messageId查询,UI不提供无异常的track logview查询):

    以上Cat的事件上报并没有传入当前所在会话的对象或是任何信息,因为Cat的会话概念是与线程共存的(基于ThreadLocal),我们无法指定地将事件“塞入”会话中。

基于Cat client的Agent(代理)

    Cat目前仍有较大的局限性,尤其是Cat本身不能跨语言,Cat目前仅支持Java、C、C++、Python、Go、Node.js几种语言的客户端,所以本人基于Cat构建了一个基本的代理服务,使用语言无关的http或是udp、tcp进行通信。同时也要保持高性能和非 阻塞的特性,封装Client使其变为其他应用(例如php的应用或是浏览器)的server。需要改造的主要是:

  • 改造异常的定义,使其能脱离Java进行自定义异常;

  • 改进Transaction的定义,想办法兼容线程对应messageId生成的策略;

  • 调整信息收集方式,在内存中手动维护每一条未结束的Transaction,同时确保消息具有顺序性;

  • 使用Netty服务器封装并兼容udp,提高并发程度,避免 阻塞业务进程。

阻塞队列和消息模型设计

     阻塞队列其实是普通队列的扩展,在普通队列的功能上多了出队和入队的 阻塞方法:put和take,前者在队列满时 阻塞直到队列有空间入队,后者在队列空时 阻塞直到队列有新元素出队。由于上文中提到了Cat的message的唯一性与线程相关联,因此利用 阻塞队列的这一特性可以很好的构建Cat Agent:

Udp与Protobuf

    基于性能和日志系统的特性考虑,可使用基于Protobuf类型的udp通信,虽然Protobuf在实施上很麻烦:需要手动定义他的数据结构。好在基于以上模型的消息类型也可以用一种数据结构直接定义,并使用webFlux开放udp端口:

UdpServer.create()
                    .handle((in, out) -> {
                        in.receive()
                                .asByteArray()
                                .subscribe();
                        return Flux.never();
                    })
                    .host(InetAddress.getLocalHost().getHostAddress())
                    .port(udpPort)
                    .doOnBound(conn -> conn
                            .addHandler("decoder", udpDecoderHandler)
                    )
                    .bindNow(Duration.ofSeconds(30));

定义数据类型:

syntax = "proto3";
option java_package = "xxx.cat";
option java_outer_classname = "CatProto";
message CatMsg {
    string id = 1;
    //0 transaction, 1 event ,2 metric ,3 exception, 4 error , 5 endTransaction
    int32 msgType = 2;
    //transaction: id+(pid)+name+type;
    //event: (id)+name+type;
    //metric: (id)+name+metric;
    //exception/error: (id)+type+name+error+errorT
    //endTransaction: id;
    string type = 3;
    string name = 4;
    int32  metric = 5;
    string traceInfo = 6;
    string pid = 7;
}
评论区
评论
{{comment.creator}}
{{comment.createTime}} {{comment.index}}楼
评论

阻塞队列与Protobuf的Udp通信 - 基于Cat的代理(Agent)项目拆解

阻塞队列与Protobuf的Udp通信 - 基于Cat的代理(Agent)项目拆解

CAT介绍

    CAT是美团点评的一个基于Java开发的异常和性能监控项目,github地址:https://github.com/dianping/cat。本篇文章不是对CAT本身的源码拆解,而是基于本人依赖CAT client开发的代理项目进行拆解,但是并不会纰漏任何技术细节。

    CAT当前已有很多不同语言的Client,当然暂且是不

CAT大致原理和使用

    CAT本身是通过CAT client收集数据并上报至CAT server,server会进行并,共有六种常见数据格式:Transaction、Event、Problem、Metric、HeartBeat、调用链标记,其实如果不考虑复杂的处理(譬如Metric是可以基于指标生成折线图,Problem可以根据具体的异常类型追溯到相应的会话Track)除去Transaction剩余的数据格式都可以理解为特殊的Event。即CAT的基本消息类型只有两类:Transaction(会话)和Event(事件),原则上CAT会存储每次会话的事件内容,基于本地文件和数据库的二进制存储,并在需要的时候展示出来。以下是一个请求为例的CAT使用例子,我们在Java中一般使用AOP进行CAT事件上报,此处针对具体的例子直接调用CAT的相关方法:

  //获取用户信息 
    @GetMappting("user/{id}")
     public User test(@PathVariable Integer uid, @RequestParam(required = false) String fields) throws InterruptedException {
                   //开启会话1
            Transaction transaction = Cat.newTransaction("request","/user");
            //上报事件
            Cat.logEvent("userId",uid.toString());
            if(null==fields||fields.equals("")){
                Cat.logError("null fields",new Exception());
                transaction.complete();
            }
            Cat.logEvent("fields",fields);
                    //开启会话2
            Transaction transaction1 = Cat.newTransaction("dao","getUserById");
            User user = userDao.getUserById();
            //结束会话2
            transaction1.setStatus(Transaction.SUCCESS);
            transaction1.complete();
            Cat.logEvent("return",user.toString());
            transaction.setStatus(Transaction.SUCCESS);
            transaction.complete();
            return user;
     }

    当我们发出一次合法请求后,Cat中就会有相应的记录(需要通过messageId查询,UI不提供无异常的track logview查询):

    以上Cat的事件上报并没有传入当前所在会话的对象或是任何信息,因为Cat的会话概念是与线程共存的(基于ThreadLocal),我们无法指定地将事件“塞入”会话中。

基于Cat client的Agent(代理)

    Cat目前仍有较大的局限性,尤其是Cat本身不能跨语言,Cat目前仅支持Java、C、C++、Python、Go、Node.js几种语言的客户端,所以本人基于Cat构建了一个基本的代理服务,使用语言无关的http或是udp、tcp进行通信。同时也要保持高性能和非 阻塞的特性,封装Client使其变为其他应用(例如php的应用或是浏览器)的server。需要改造的主要是:

阻塞队列和消息模型设计

     阻塞队列其实是普通队列的扩展,在普通队列的功能上多了出队和入队的 阻塞方法:put和take,前者在队列满时 阻塞直到队列有空间入队,后者在队列空时 阻塞直到队列有新元素出队。由于上文中提到了Cat的message的唯一性与线程相关联,因此利用 阻塞队列的这一特性可以很好的构建Cat Agent:

Udp与Protobuf

    基于性能和日志系统的特性考虑,可使用基于Protobuf类型的udp通信,虽然Protobuf在实施上很麻烦:需要手动定义他的数据结构。好在基于以上模型的消息类型也可以用一种数据结构直接定义,并使用webFlux开放udp端口:

UdpServer.create()
                    .handle((in, out) -> {
                        in.receive()
                                .asByteArray()
                                .subscribe();
                        return Flux.never();
                    })
                    .host(InetAddress.getLocalHost().getHostAddress())
                    .port(udpPort)
                    .doOnBound(conn -> conn
                            .addHandler("decoder", udpDecoderHandler)
                    )
                    .bindNow(Duration.ofSeconds(30));

定义数据类型:

syntax = "proto3";
option java_package = "xxx.cat";
option java_outer_classname = "CatProto";
message CatMsg {
    string id = 1;
    //0 transaction, 1 event ,2 metric ,3 exception, 4 error , 5 endTransaction
    int32 msgType = 2;
    //transaction: id+(pid)+name+type;
    //event: (id)+name+type;
    //metric: (id)+name+metric;
    //exception/error: (id)+type+name+error+errorT
    //endTransaction: id;
    string type = 3;
    string name = 4;
    int32  metric = 5;
    string traceInfo = 6;
    string pid = 7;
}

阻塞队列与Protobuf的Udp通信 - 基于Cat的代理(Agent)项目拆解2020-07-19鱼鱼

{{commentTitle}}

评论   ctrl+Enter 发送评论