Linkerd2 proxy tap 学习笔记

点击查看目录

作者:哗啦啦 mesh 团队,热衷于 kubernetes、devops、apollo、istio、linkerd、openstack、calico 等领域技术。

概述

Linkerd2 由控制平面数据平面组成:

  • 控制平面是在一个专门的 Kubernetes 命名空间(默认是 linkerd)中运行的一组服务,这些服务共同实现了聚合遥测数据、提供一组面向用户的 API、向数据平面提供控制指令等功能。

  • 数据平面由一组用 Rust 编写的轻量级代理组成,它们安装在服务的每个 pod 中。它通过initContainer配置iptables来接管 Pod 的所有出入流量。它对服务毫无侵入,服务本身不需要修改任何代码,甚至可以将它添加到正在运行的服务中。

以下是官方的架构示意图:

proxy-destination
proxy-destination

tap 是 Linkerd2 的一个非常有特色的功能,它可以随时抓取某资源的实时流量。有效的利用该功能可以非常方便的监控服务的请求流量情况,协助调试服务。

tap 相关的功能组件如下:

  • web/CLI: 发起 tap 请求,展示 tap 监控结果
  • tap: 将来自web/CLI的tap请求转为gRPC请求并发至proxy组件,将proxy回复的tap事件回复给web/CLI
  • proxy: 处理 tap 请求,从经过的 request/response 数据中获取需要的信息,组成 tap 事件上报

前两者逻辑相对简单,此处主要关注 proxy 与 tap 组件交互相关的一些逻辑,简单分析 proxy 内部的运行逻辑。

注:本文基于Linkerd2 stable-2.6.0版本,linkerd-proxy v2.76.0版本。

初始化

首先是初始化,在build_proxy_task中:

let (tap_layer, tap_grpc, tap_daemon) = tap::new();

进入tap::new()

    let (daemon, register, subscribe) = daemon::new();
    let layer = Layer::new(register);
    let server = Server::new(subscribe);
    (layer, server, daemon)

此处创建了如下 3 个对象:

  • tap_layer 用于后续的 inboundoutbound 逻辑,及后续请求处理
  • tap_grpc 用于 TapServer 创建,处理 tap 组件的 grpc 请求
  • tap_daemon 则作为任务正常运行,负责黏合 layer 与 grpc

下面分别介绍。

tap_daemon

进入daemon::new()

    let (svc_tx, svc_rx) = mpsc::channel(super::REGISTER_CHANNEL_CAPACITY);
    let (tap_tx, tap_rx) = mpsc::channel(super::TAP_CAPACITY);

    let daemon = Daemon {
        svc_rx,
        svcs: Vec::default(),

        tap_rx,
        taps: Vec::default(),
    };

    (daemon, Register(svc_tx), Subscribe(tap_tx))

注意此处分别创建 svc 通道和 tap 通道,并且将两通道的接收端都存于tap_daemon中,然后将 svc 的发送端定义为新类型Register的实例,将 tap 的发送端定义为新类型Subscribe的实例。

在主逻辑中,通过 tokio 框架执行 daemon 任务:

        tokio::spawn(tap_daemon.map_err(|_| ()).in_current_span());

Daemon::poll中(见linkerd2-proxy/linkerd/app/core/src/tap/daemon.rs:60,保留骨干):

        // 只保留未完成的 tap
        self.taps.retain(|t| t.can_tap_more());
        for idx in (0..self.svcs.len()).rev() {
            // 剔除状态不对的 svc
            if self.svcs[idx].poll_ready().is_err() {
                self.svcs.swap_remove(idx);
            }
        }

        // 获取 svc(即 tap_layer 中创建的 tap2_tx)
        while let Ok(Async::Ready(Some(mut svc))) = self.svc_rx.poll() {
            for tap in &self.taps {
                // 将 tap 发至 tap2_tx
                let err = svc.try_send(tap.clone()).err();
            }

            if !dropped {
                // 保存 tap2_tx
                self.svcs.push(svc);
            }
        }

        // 获取打包的 tap 及一个一次性的信号通知通道 ack
        while let Ok(Async::Ready(Some((tap, ack)))) = self.tap_rx.poll() {
            for idx in (0..self.svcs.len()).rev() {
                // 将 tap 发至 tap2_tx
                let err = self.svcs[idx].try_send(tap.clone()).err();
                if err.map(|e| e.is_disconnected()).unwrap_or(false) {
                    // 报错就干掉这个 tap2_tx
                    self.svcs.swap_remove(idx);
                }
            }
            // 保存 tap
            self.taps.push(tap);
            // 发送信号,通知 grpc 线程,tap 已就绪
            let _ = ack.send(());
        }
        // 标记任务未完成,会放回队列等待下一次执行
        Ok(Async::NotReady)

这段逻辑主要将 grpc 那边来的 tap 送到 layer,从而将前面的 layer 和 grpc 部分的逻辑串了起来。

tap_layer

layer相关逻辑主要在初始化和后续实际处理请求那。其创建会用到刚才生成的Register的实例,并用于之后的Stack::call(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:96)中:

        let inspect = target.clone();
        let inner = self.inner.call(target);
        let tap_rx = self.registry.register();
        MakeFuture {
            inner,
            next: Some((tap_rx, inspect)),
        }

registry.register()中(见linkerd2-proxy/linkerd/app/core/src/tap/daemon.rs:148):

        // 再创建一个通道,将其命名为:tap2_tx, tap2_rx
        let (tx, rx) = mpsc::channel(super::TAP_CAPACITY);
        // 将 tap2_tx 塞入最开始的 svc_tx
        if let Err(_) = self.0.try_send(tx) {
            debug!("failed to register service");
        }
        rx

在后续生成的Service中,首先看poll_ready(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:150):

        // 此处的 tap_rx 实际上是 tap2_rx.
        // 从该通道取出所有 tap,存到 self.taps
        while let Ok(Async::Ready(Some(t))) = self.tap_rx.poll() {
            self.taps.push(t);
        }
        // 只保留还需要继续 tap 的对象
        self.taps.retain(|t| t.can_tap_more());

        self.inner.poll_ready()

接着是call(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:161):

        let mut req_taps = Vec::new();
        let mut rsp_taps = Vec::new();

        for t in &mut self.taps {
            // 对 req 调用 Tap::tap 接口,获取请求信息,并生成 TapRequestPayload 结构和 TapResponse 结构
            if let Some((req_tap, rsp_tap)) = t.tap(&req, &self.inspect) {
                req_taps.push(req_tap);
                rsp_taps.push(rsp_tap);
            }
        }

        // 将 tap 请求与原始 req 请求体合为 Payload 对象,并替代原始请求体
        // 在后续从请求体中获取数据时,调用 tap 的接口(data/eos/fail)对请求数据进行处理
        let req = req.map(move |inner| Payload {
            inner,
            taps: req_taps,
        });

        let inner = self.inner.call(req);

        ResponseFuture {
            inner,
            taps: rsp_taps,
        }

ResponseFuture::poll中(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:200):

        // 实际服务回复的 future,调用 poll 拿到实际 response
        match self.inner.poll() {
            Ok(Async::NotReady) => Ok(Async::NotReady),
            Ok(Async::Ready(rsp)) => {
                // 对 response 调用 TapResponse::tap 获取信息
                let taps = self.taps.drain(..).map(|t| t.tap(&rsp)).collect();
                let rsp = rsp.map(move |inner| {
                    let mut body = Payload { inner, taps };
                    if body.is_end_stream() {
                        body.eos(None);
                    }
                    body
                });
                Ok(Async::Ready(rsp))
            }
            Err(e) => {
                for tap in self.taps.drain(..) {
                    tap.fail(&e);
                }
                Err(e)
            }
        }

前面这些逻辑,其中出现了 2 个 tap 接口,一个是Tap::tap,一个是TapResponse::tap,这俩的核心作用都是从请求或回复数据中获取需要的 tap 信息,然后发往某个通道,细节下面再讲。

tap_grpc

tap_grpcServer::new(subscribe)生成,实现了api::server::Tap这个 grpc server,响应observe这个 method 请求。该请求来自 Linkerd2 的 tap 组件。

收到请求后:

  1. 解析请求参数:limit match extract
  2. 生成一个唯一 idbase_id
  3. 创建一个用于传递api::TapEvent事件的通道,用其发送端events_tx构造一个Tap对象
  4. 调用subscribe.subscribe(tap),得到一个SubscribeFuture任务
  5. 构造任务ResponseFuture

在任务ResponseFuturepoll中,会先执行SubscribeFuture::poll(见linkerd2-proxy/linkerd/app/core/src/tap/daemon.rs:178):

        loop {
            self.0 = match self.0 {
                FutState::Subscribe {
                    ref mut tap,
                    ref mut tap_tx,
                } => {
                    // 此处的 tap_tx 即为最开始创建的 tap 通道发送端,待其就绪后再往下走
                    try_ready!(tap_tx.poll_ready().map_err(|_| super::iface::NoCapacity));

                    // tap 为上面步骤 3 处构造
                    let tap = tap.take().expect("tap must be set");
                    // 构造一个一次性通道
                    let (tx, rx) = oneshot::channel();
                    // 将其发送端与 tap 打包发送到 tap 通道
                    tap_tx
                        .try_send((tap, tx))
                        .map_err(|_| super::iface::NoCapacity)?;

                    FutState::Pending(rx)
                }
                FutState::Pending(ref mut rx) => {
                    // 从接收端获取到信号,表示所依赖任务执行完毕
                    return rx.poll().map_err(|_| super::iface::NoCapacity);
                }
            }
        }

SubscribeFuture::poll执行完毕,会接着构造一个ResponseStream流,并包装成 grpc 结果返回。

ResponseStream::poll中(见linkerd2-proxy/linkerd/app/core/src/tap/grpc/server.rs:225):

        // 限制判断
        self.shared = self.shared.take().and_then(|shared| {
            if shared.is_under_limit() {
                Some(shared)
            } else {
                None
            }
        });

        // 从事件通道获取事件并返回给 stream 流,发给 grpc 客户端,即 tap 组件
        self.events_rx.poll().or_else(|_| Ok(None.into()))

由此看出,每个 grpc 请求会对应着一个Tap对象,往Tap.shard.events_tx中发送事件,则该事件会最终发到 grpc 请求方。

Tap::tap(见linkerd2-proxy/linkerd/app/core/src/tap/grpc/server.rs:267):

        let shared = self.shared.upgrade()?;
        // 判断是否需要 tap
        if !shared.match_.matches(req, inspect) {
            return None;
        }

        // 省略若干从 req 中提取事件信息...

        // tap 事件
        let event = api::TapEvent {
            event: Some(api::tap_event::Event::Http(api::tap_event::Http {
                event: Some(api::tap_event::http::Event::RequestInit(init)),
            })),
            ..base_event.clone()
        };

        // 发送事件
        let mut events_tx = shared.events_tx.clone();
        events_tx.try_send(event).ok()?;

        let tap = TapTx { id, tx: events_tx };

        let req = TapRequestPayload {
            tap: tap.clone(),
            base_event: base_event.clone(),
        };
        let rsp = TapResponse {
            tap,
            base_event,
            request_init_at,
            extract_headers,
        };
        Some((req, rsp))

图示

proxy-tap
proxy-tap

总结

至此,以上 3 个不同的角色互相合作,实现了:

  1. Linkerd2 的 tap 组件下发 tap 请求
  2. proxy 向所有流量请求中插入 tap 请求
  3. 抓取到 tap 数据后,上报至 Linkerd2 的 tap 组件

编辑本页