WebRTC服务质量(01)- Qos概述
WebRTC服务质量(02)- RTP协议
WebRTC服务质量(03)- RTCP协议
WebRTC服务质量(04)- 重传机制(01) RTX NACK概述
WebRTC服务质量(05)- 重传机制(02) NACK判断丢包
WebRTC服务质量(06)- 重传机制(03) NACK找到真正的丢包
WebRTC服务质量(07)- 重传机制(04) 接收NACK消息
WebRTC服务质量(08)- 重传机制(05) RTX机制
WebRTC服务质量(09)- Pacer机制(01) 流程概述
WebRTC服务质量(10)- Pacer机制(02) RoundRobinPacketQueue
WebRTC服务质量(11)- Pacer机制(03) IntervalBudget
WebRTC服务质量(12)- Pacer机制(04) 向Pacer中插入数据
一、前言:
先想想,我们其实已经有了目标码率了,为什么还要控制码率?
因为,我们获取的目标码率是以秒为单位的,比如每秒钟发送300k,这是我们通过webrtc的网络拥塞控制可以获得的。 但是,具体到每个时间分片,我们前面看了pacer是5ms作为一个时间分片,IntervalBudget
就是控制每个时间分片内应该发送的数据大小。
二、概念:
IntervalBudget
是 WebRTC 中的一个数据流速率控制工具,目的是在固定时间间隔内,根据目标码率动态管理和分配可以发送的字节数(预算字节)。它类似一个预算系统,平衡上一时间间隔的欠载(underuse)和过载(overuse)情况,确保整体码率维持在目标值,同时允许一定程度的积累和补偿。
详细分主要作用是:
- 维持目标码率(
target_rate_kbps
) 根据目标速率(如 300 kbps),动态调整每段时间的字节发送目标。 - 处理过载和欠载 若某次时间片中没有充分利用发送预算,是否允许将未用的预算累积到后续时间片(由
can_build_up_underuse
决定)。 - 预算限制 限制过载和欠载的值,最高不能超过特定的预算范围(
max_bytes_in_budget_
)。 - 字节消耗管理 随着数据发送,会减少预算,维持运行中实时的发送和控制。
三、类定义:
class IntervalBudget {public:explicit IntervalBudget(int initial_target_rate_kbps);IntervalBudget(int initial_target_rate_kbps, bool can_build_up_underuse);// 用于设置目标码率void set_target_rate_kbps(int target_rate_kbps);// TODO(tschumim): Unify IncreaseBudget and UseBudget to one function.// 用于计算我们这个时间分片,有多少数据可以发送void IncreaseBudget(int64_t delta_time_ms);void UseBudget(size_t bytes);size_t bytes_remaining() const;double budget_ratio() const;int target_rate_kbps() const;private:// 通过带宽评估算法评估出的目标码率(也就是1s内发送多少数据)int target_rate_kbps_;// 在budget中最大可以存放多少字节int64_t max_bytes_in_budget_;// 在一个时间分片内,还有多少数据可以发送int64_t bytes_remaining_;bool can_build_up_underuse_;
};
四、Pacer中两个重要的IntervalBudget:
IntervalBudget media_budget_; // 用于计算可以发送媒体的数据量IntervalBudget padding_budget_; // 用于计算可以发送padding的数据量
五、设置目标码率:
要使用IntervalBudget,就需要先设置目标码率。两种设置目标码率的途径:
- 周期执行
RtpTransportControllerSend::UpdateControllerWithTimeInterval
; - 通过收到对方发送来的transport-cc,然后使用
OnTransportPacketsFeedback
函数来分析,并更新目标码率;
5.1、调用UpdateControllerWithTimeInterval过程:
红框中的都属于RepeatingTask
,进行任务的反复执行。
// 周期执行当前任务
bool RepeatingTaskBase::Run() {// 执行当前任务TimeDelta delay = RunClosure();// ...// 然后再把this打包成任务,放入线程的任务队列当中,之后这个线程又会执行Run,反复如此// delay.ms()是每隔多长时间执行这个任务task_queue_->PostDelayedTask(absl::WrapUnique(this), delay.ms());return false;
}// 在其父类 RepeatingTaskBase 当中会通过Run来执行任务
template <class Closure>
class RepeatingTaskImpl final : public RepeatingTaskBase {public:RepeatingTaskImpl(TaskQueueBase* task_queue,TimeDelta first_delay,Closure&& closure,Clock* clock): RepeatingTaskBase(task_queue, first_delay, clock),closure_(std::forward<Closure>(closure)) {static_assert(std::is_same<TimeDelta, typename std::result_of<decltype (&Closure::operator())(Closure)>::type>::value, "");}private:// 执行当前任务TimeDelta RunClosure() override { return closure_(); }typename std::remove_const<typename std::remove_reference<Closure>::type>::type closure_;
};
5.2、根据Transport-cc执行:
当我们收到transport-cc反馈后,会执行下面函数:
// 每当我们收到transport-cc之后,就会调用下面的方法
void RtpTransportControllerSend::OnTransportFeedback(const rtcp::TransportFeedback& feedback) {feedback_demuxer_.OnTransportFeedback(feedback);auto feedback_time = Timestamp::Millis(clock_->TimeInMilliseconds());// 生成一个匿名函数任务,这个任务当中通过 OnTransportPacketsFeedback 来解析transport-cc中的内容// 解析完这个内容之后,就会计算我们当前的带宽是多少,拿到带宽就可以为budget设置目标码率了// 将返回值(也就是目标码率)传给PostUpdates,这里面会设置给budget目标码率task_queue_.PostTask([this, feedback, feedback_time]() {RTC_DCHECK_RUN_ON(&task_queue_);absl::optional<TransportPacketsFeedback> feedback_msg =transport_feedback_adapter_.ProcessTransportFeedback(feedback, feedback_time);if (feedback_msg && controller_) {PostUpdates(controller_->OnTransportPacketsFeedback(*feedback_msg));}pacer()->UpdateOutstandingData(transport_feedback_adapter_.GetOutstandingData());});
}
就是解析出人家反馈给我们的内容,根据这个计算我们自己的目标码率,并设置给budget;
六、使用media_budget:
看下ProcessPackets如何使用media_budget的:
// 周期处理包的发送
void PacingController::ProcessPackets() {if (mode_ == ProcessMode::kPeriodic) {// 将前面设置给pacer的目标码率设置给media_budgetmedia_budget_.set_target_rate_kbps(target_rate.kbps());UpdateBudgetWithElapsedTime(elapsed_time);} else {media_rate_ = target_rate;}
}
其中,elapsed_time
就是逝去的时间,在UpdateBudgetWithElapsedTime
当中会使用:
/*** 使用逝去的时间更新media_budget_*/
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {if (mode_ == ProcessMode::kPeriodic) {// 获取较小者(kMaxProcessingInterval是30ms)delta = std::min(kMaxProcessingInterval, delta);// 将delta时间传给media_budgetmedia_budget_.IncreaseBudget(delta.ms());padding_budget_.IncreaseBudget(delta.ms());} else {media_debt_ -= std::min(media_debt_, media_rate_ * delta);padding_debt_ -= std::min(padding_debt_, padding_rate_ * delta);}
}
再看看传进去IncreaseBudget
怎么使用的:
void IntervalBudget::IncreaseBudget(int64_t delta_time_ms) {// target_rate_kbps_是我们之前传入的目标码率// 加入我们现在目标码率target_rate_kbps_是300,delta是10ms,那么bytes约等于300字节,// 也就是说我们接下来这段时间可以发送的数据是300字节int64_t bytes = target_rate_kbps_ * delta_time_ms / 8;if (bytes_remaining_ < 0 || can_build_up_underuse_) {// We overused last interval, compensate this interval.// 这种情况说明过载了(比如上次要求发100,最终发了150,多发了50)// 那么我们就用本次应该发送的数据量减去(因为bytes_remaining_为负数,下面看着是加)上次多出来的50,就是本次要发送的// 当然如果大于我们budget可以发送的最大值max_bytes_in_budget_,那么,只能允许发送max_bytes_in_budget_bytes_remaining_ = std::min(bytes_remaining_ + bytes, max_bytes_in_budget_);} else {// If we underused last interval we can't use it this interval.// 如果没有过载,使用当前计算的结果bytes_remaining_ = std::min(bytes, max_bytes_in_budget_);}
}
我尝试再好好解释下:
IncreaseBudget
增加新的字节预算,用于下一时间片发送。
核心逻辑:
-
计算新增预算字节:
bytes = target_rate_kbps_ * delta_time_ms / 8
此预算决定时间间隔
delta_time_ms
内允许发送的字节数。 -
判断是否存在欠载积累(
bytes_remaining_
为负值),以及是否允许保留欠载(can_build_up_underuse_
)。两种情况:- 允许欠载/欠载存在:补偿上一次过载,同时加入当前新增预算,更新剩余字节。
- 不允许欠载/无欠载:仅允许当前时间段的预算。
-
使用
std::min()
限制最终预算,确保预算总值不超过max_bytes_in_budget_
。
示例:
-
目标码率:300 kbps,时间段:10 ms:
bytes = 300 * 10 / 8 = 375 字节
-
欠载情况:假设
bytes_remaining_ = -100
且can_build_up_underuse = true
:bytes_remaining_ = min(-100 + 375, max_bytes_in_budget_)= min(275, 375)= 275 字节
-
正常情况:没有欠载,直接按当前时间段计算:
bytes_remaining_ = min(375, 375) = 375 字节
七、总结:
本文主要介绍了Pacer模块中怎么根据目标码率,然后通过IntervalBudget
来完成动态调整每段时间的字节预算,使整体码率稳定在设计的目标值附近,同时平衡欠载和过载的情况。