点击查看目录
作者:哗啦啦 mesh 团队,热衷于 kubernetes、devops、apollo、istio、linkerd、openstack、calico 等领域技术。
概述
Linkerd2 由控制平面
和数据平面
组成:
-
控制平面
是在一个专门的 Kubernetes 命名空间(默认是 linkerd)中运行的一组服务,这些服务共同实现了聚合遥测数据、提供一组面向用户的 API、向数据平面
提供控制指令等功能。 -
数据平面
由一组用 Rust 编写的轻量级代理组成,它们安装在服务的每个 pod 中。它通过initContainer
配置iptables
来接管 Pod 的所有出入流量。它对服务毫无侵入,服务本身不需要修改任何代码,甚至可以将它添加到正在运行
的服务中。
以下是官方的架构示意图:
tap 是 Linkerd2 的一个非常有特色的功能,它可以随时抓取某资源的实时流量。有效的利用该功能可以非常方便的监控服务的请求流量情况,协助调试服务。
tap 相关的功能组件如下:
- web/CLI: 发起 tap 请求,展示 tap 监控结果
- tap: 将来自web/CLI的tap请求转为gRPC请求并发至proxy组件,将proxy回复的tap事件回复给web/CLI
- proxy: 处理 tap 请求,从经过的 request/response 数据中获取需要的信息,组成 tap 事件上报
前两者逻辑相对简单,此处主要关注 proxy 与 tap 组件交互相关的一些逻辑,简单分析 proxy 内部的运行逻辑。
注:本文基于
Linkerd2
stable-2.6.0
版本,linkerd-proxy
v2.76.0
版本。
初始化
首先是初始化,在build_proxy_task
中:
let (tap_layer, tap_grpc, tap_daemon) = tap::new();
进入tap::new()
:
let (daemon, register, subscribe) = daemon::new();
let layer = Layer::new(register);
let server = Server::new(subscribe);
(layer, server, daemon)
此处创建了如下 3 个对象:
tap_layer
用于后续的inbound
和outbound
逻辑,及后续请求处理tap_grpc
用于TapServer
创建,处理 tap 组件的 grpc 请求tap_daemon
则作为任务正常运行,负责黏合 layer 与 grpc
下面分别介绍。
tap_daemon
进入daemon::new()
:
let (svc_tx, svc_rx) = mpsc::channel(super::REGISTER_CHANNEL_CAPACITY);
let (tap_tx, tap_rx) = mpsc::channel(super::TAP_CAPACITY);
let daemon = Daemon {
svc_rx,
svcs: Vec::default(),
tap_rx,
taps: Vec::default(),
};
(daemon, Register(svc_tx), Subscribe(tap_tx))
注意此处分别创建 svc 通道和 tap 通道,并且将两通道的接收端都存于tap_daemon
中,然后将 svc 的发送端定义为新类型Register
的实例,将 tap 的发送端定义为新类型Subscribe
的实例。
在主逻辑中,通过 tokio 框架执行 daemon 任务:
tokio::spawn(tap_daemon.map_err(|_| ()).in_current_span());
在Daemon::poll
中(见linkerd2-proxy/linkerd/app/core/src/tap/daemon.rs:60
,保留骨干):
// 只保留未完成的 tap
self.taps.retain(|t| t.can_tap_more());
for idx in (0..self.svcs.len()).rev() {
// 剔除状态不对的 svc
if self.svcs[idx].poll_ready().is_err() {
self.svcs.swap_remove(idx);
}
}
// 获取 svc(即 tap_layer 中创建的 tap2_tx)
while let Ok(Async::Ready(Some(mut svc))) = self.svc_rx.poll() {
for tap in &self.taps {
// 将 tap 发至 tap2_tx
let err = svc.try_send(tap.clone()).err();
}
if !dropped {
// 保存 tap2_tx
self.svcs.push(svc);
}
}
// 获取打包的 tap 及一个一次性的信号通知通道 ack
while let Ok(Async::Ready(Some((tap, ack)))) = self.tap_rx.poll() {
for idx in (0..self.svcs.len()).rev() {
// 将 tap 发至 tap2_tx
let err = self.svcs[idx].try_send(tap.clone()).err();
if err.map(|e| e.is_disconnected()).unwrap_or(false) {
// 报错就干掉这个 tap2_tx
self.svcs.swap_remove(idx);
}
}
// 保存 tap
self.taps.push(tap);
// 发送信号,通知 grpc 线程,tap 已就绪
let _ = ack.send(());
}
// 标记任务未完成,会放回队列等待下一次执行
Ok(Async::NotReady)
这段逻辑主要将 grpc 那边来的 tap 送到 layer,从而将前面的 layer 和 grpc 部分的逻辑串了起来。
tap_layer
layer
相关逻辑主要在初始化和后续实际处理请求那。其创建会用到刚才生成的Register
的实例,并用于之后的Stack::call
(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:96
)中:
let inspect = target.clone();
let inner = self.inner.call(target);
let tap_rx = self.registry.register();
MakeFuture {
inner,
next: Some((tap_rx, inspect)),
}
在registry.register()
中(见linkerd2-proxy/linkerd/app/core/src/tap/daemon.rs:148
):
// 再创建一个通道,将其命名为:tap2_tx, tap2_rx
let (tx, rx) = mpsc::channel(super::TAP_CAPACITY);
// 将 tap2_tx 塞入最开始的 svc_tx
if let Err(_) = self.0.try_send(tx) {
debug!("failed to register service");
}
rx
在后续生成的Service
中,首先看poll_ready
(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:150
):
// 此处的 tap_rx 实际上是 tap2_rx.
// 从该通道取出所有 tap,存到 self.taps
while let Ok(Async::Ready(Some(t))) = self.tap_rx.poll() {
self.taps.push(t);
}
// 只保留还需要继续 tap 的对象
self.taps.retain(|t| t.can_tap_more());
self.inner.poll_ready()
接着是call
(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:161
):
let mut req_taps = Vec::new();
let mut rsp_taps = Vec::new();
for t in &mut self.taps {
// 对 req 调用 Tap::tap 接口,获取请求信息,并生成 TapRequestPayload 结构和 TapResponse 结构
if let Some((req_tap, rsp_tap)) = t.tap(&req, &self.inspect) {
req_taps.push(req_tap);
rsp_taps.push(rsp_tap);
}
}
// 将 tap 请求与原始 req 请求体合为 Payload 对象,并替代原始请求体
// 在后续从请求体中获取数据时,调用 tap 的接口(data/eos/fail)对请求数据进行处理
let req = req.map(move |inner| Payload {
inner,
taps: req_taps,
});
let inner = self.inner.call(req);
ResponseFuture {
inner,
taps: rsp_taps,
}
在ResponseFuture::poll
中(见linkerd2-proxy/linkerd/app/core/src/tap/service.rs:200
):
// 实际服务回复的 future,调用 poll 拿到实际 response
match self.inner.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(rsp)) => {
// 对 response 调用 TapResponse::tap 获取信息
let taps = self.taps.drain(..).map(|t| t.tap(&rsp)).collect();
let rsp = rsp.map(move |inner| {
let mut body = Payload { inner, taps };
if body.is_end_stream() {
body.eos(None);
}
body
});
Ok(Async::Ready(rsp))
}
Err(e) => {
for tap in self.taps.drain(..) {
tap.fail(&e);
}
Err(e)
}
}
前面这些逻辑,其中出现了 2 个 tap 接口,一个是Tap::tap
,一个是TapResponse::tap
,这俩的核心作用都是从请求或回复数据中获取需要的 tap 信息,然后发往某个通道,细节下面再讲。
tap_grpc
tap_grpc
由Server::new(subscribe)
生成,实现了api::server::Tap
这个 grpc server,响应observe
这个 method 请求。该请求来自 Linkerd2 的 tap 组件。
收到请求后:
- 解析请求参数:
limit
match
extract
- 生成一个唯一 id
base_id
- 创建一个用于传递
api::TapEvent
事件的通道,用其发送端events_tx
构造一个Tap
对象 - 调用
subscribe.subscribe(tap)
,得到一个SubscribeFuture
任务 - 构造任务
ResponseFuture
在任务ResponseFuture
的poll
中,会先执行SubscribeFuture::poll
(见linkerd2-proxy/linkerd/app/core/src/tap/daemon.rs:178
):
loop {
self.0 = match self.0 {
FutState::Subscribe {
ref mut tap,
ref mut tap_tx,
} => {
// 此处的 tap_tx 即为最开始创建的 tap 通道发送端,待其就绪后再往下走
try_ready!(tap_tx.poll_ready().map_err(|_| super::iface::NoCapacity));
// tap 为上面步骤 3 处构造
let tap = tap.take().expect("tap must be set");
// 构造一个一次性通道
let (tx, rx) = oneshot::channel();
// 将其发送端与 tap 打包发送到 tap 通道
tap_tx
.try_send((tap, tx))
.map_err(|_| super::iface::NoCapacity)?;
FutState::Pending(rx)
}
FutState::Pending(ref mut rx) => {
// 从接收端获取到信号,表示所依赖任务执行完毕
return rx.poll().map_err(|_| super::iface::NoCapacity);
}
}
}
SubscribeFuture::poll
执行完毕,会接着构造一个ResponseStream
流,并包装成 grpc 结果返回。
在ResponseStream::poll
中(见linkerd2-proxy/linkerd/app/core/src/tap/grpc/server.rs:225
):
// 限制判断
self.shared = self.shared.take().and_then(|shared| {
if shared.is_under_limit() {
Some(shared)
} else {
None
}
});
// 从事件通道获取事件并返回给 stream 流,发给 grpc 客户端,即 tap 组件
self.events_rx.poll().or_else(|_| Ok(None.into()))
由此看出,每个 grpc 请求会对应着一个Tap
对象,往Tap.shard.events_tx
中发送事件,则该事件会最终发到 grpc 请求方。
看Tap::tap
(见linkerd2-proxy/linkerd/app/core/src/tap/grpc/server.rs:267
):
let shared = self.shared.upgrade()?;
// 判断是否需要 tap
if !shared.match_.matches(req, inspect) {
return None;
}
// 省略若干从 req 中提取事件信息...
// tap 事件
let event = api::TapEvent {
event: Some(api::tap_event::Event::Http(api::tap_event::Http {
event: Some(api::tap_event::http::Event::RequestInit(init)),
})),
..base_event.clone()
};
// 发送事件
let mut events_tx = shared.events_tx.clone();
events_tx.try_send(event).ok()?;
let tap = TapTx { id, tx: events_tx };
let req = TapRequestPayload {
tap: tap.clone(),
base_event: base_event.clone(),
};
let rsp = TapResponse {
tap,
base_event,
request_init_at,
extract_headers,
};
Some((req, rsp))
图示
总结
至此,以上 3 个不同的角色互相合作,实现了:
- Linkerd2 的 tap 组件下发 tap 请求
- proxy 向所有流量请求中插入 tap 请求
- 抓取到 tap 数据后,上报至 Linkerd2 的 tap 组件