Catalog
  1. 1. 概要
  2. 2. 线程模型
  3. 3. 了解线程类及其工作原理
    1. 3.1. rtc::thread
    2. 3.2. webrtc::ProcessThread
  4. 4. 分析 webrtc 中的线程
    1. 4.1. threaded-ml
    2. 4.2. webrtc_audio_module_rec_thread
    3. 4.3. webrtc_audio_module_play_thread
    4. 4.4. AudioDeviceBufferTimer
    5. 4.5. rtc-low-prio
    6. 4.6. rtp_send_controller
    7. 4.7. AudioEncoder
    8. 4.8. PacerThread
    9. 4.9. ModuleProcessThread
    10. 4.10. signaling_thread
    11. 4.11. worker_thread
    12. 4.12. network_thread
  5. 5. SocketServer
webrtc线程模型

概要

分析一下 webrtc 中的线程,以及各个线程的职能分别是什么。

线程模型

来自官方的说明。WebRtc Native APIs 有两个全局线程: signaling 线程和 worker 线程。这两个线程可以在创建 PeerConnectionFactory 时指定,如果不指定,其内部会创建缺省线程。
Stream 和 PeerConnection 的 API 调用都会被代理到 signaling 线程,应用程序可以在任何线程调用这些接口。

回调函数会在 signaling 线程中执行。所以回调函数应该尽快返回,以免阻塞线程。资源密集型的任务应该放到别的线程中完成。

worker 线程被用来完成资源密集型的任务,比如 data streaming。

下面看一下他们的具体实现,以及其他类型的线程的功能和实现。

了解线程类及其工作原理

rtc::thread

posix 规范下, 使用了系统的 pthread 作为底层的线程对象。

1
2
3
#if defined(WEBRTC_POSIX)
pthread_t thread_ = 0;
#endif

启动线程, 内部实际上创建了一个 pthread 对象,运行指定的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
bool Thread::Start() {
... 省略部分代码
pthread_attr_t attr;
pthread_attr_init(&attr);

int error_code = pthread_create(&thread_, &attr, PreRun, this);
if (0 != error_code) {
RTC_LOG(LS_ERROR) << "Unable to create pthread, error " << error_code;
thread_ = 0;
return false;
}
... 省略部分代码
}

// 去除了一下平台相关的代码
void* Thread::PreRun(void* pv) {
Thread* thread = static_cast<Thread*>(pv);
ThreadManager::Instance()->SetCurrentThread(thread);
rtc::SetCurrentThreadName(thread->name_.c_str());
thread->Run();
ThreadManager::Instance()->SetCurrentThread(nullptr);
return nullptr;
}

接下来就是真正的线程循环, 可以看到代码中有一个无线循环,不断的 Get 以及 Dispatch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void Thread::Run() {
ProcessMessages(kForever);
}

bool Thread::ProcessMessages(int cmsLoop) {
// Using ProcessMessages with a custom clock for testing and a time greater
// than 0 doesn't work, since it's not guaranteed to advance the custom
// clock's time, and may get stuck in an infinite loop.
RTC_DCHECK(GetClockForTesting() == nullptr || cmsLoop == 0 ||
cmsLoop == kForever);
int64_t msEnd = (kForever == cmsLoop) ? 0 : TimeAfter(cmsLoop);
int cmsNext = cmsLoop;

while (true) {
#if defined(WEBRTC_MAC)
ScopedAutoReleasePool pool;
#endif
Message msg;
if (!Get(&msg, cmsNext))
return !IsQuitting();
Dispatch(&msg);

if (cmsLoop != kForever) {
cmsNext = static_cast<int>(TimeUntil(msEnd));
if (cmsNext < 0)
return true;
}
}
}

投递任务, 实际上是一个需要执行的函数。 从调用侧发起, 传递一个需要执行的函数 functor, 通过一系列的构造,最后创建一个 Message 对象压入 message_ 队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
  template <
class ReturnT,
typename = typename std::enable_if<std::is_void<ReturnT>::value>::type>
void Invoke(const Location& posted_from, FunctionView<void()> functor) {
InvokeInternal(posted_from, functor);
}

void Thread::InvokeInternal(const Location& posted_from,
rtc::FunctionView<void()> functor) {
TRACE_EVENT2("webrtc", "Thread::Invoke", "src_file", posted_from.file_name(),
"src_func", posted_from.function_name());

class FunctorMessageHandler : public MessageHandler {
public:
explicit FunctorMessageHandler(rtc::FunctionView<void()> functor)
: functor_(functor) {}
void OnMessage(Message* msg) override { functor_(); }

private:
rtc::FunctionView<void()> functor_;
} handler(functor);

Send(posted_from, &handler);
}

void Thread::Send(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata) {

... 省略部分代码

Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;

... 省略部分代码

PostTask(
webrtc::ToQueuedTask([msg]() mutable { msg.phandler->OnMessage(&msg); },
[this, &ready, current_thread] {
CritScope cs(&crit_);
ready = true;
current_thread->socketserver()->WakeUp();
}));

... 省略部分代码
}

void Thread::PostTask(std::unique_ptr<webrtc::QueuedTask> task) {
Post(RTC_FROM_HERE, &queued_task_handler_,
/*id=*/0, new ScopedMessageData<webrtc::QueuedTask>(std::move(task)));
}

void Thread::Post(const Location& posted_from,
MessageHandler* phandler,
uint32_t id,
MessageData* pdata,
bool time_sensitive) {
... 省略部分代码
Message msg;
msg.posted_from = posted_from;
msg.phandler = phandler;
msg.message_id = id;
msg.pdata = pdata;
messages_.push_back(msg);
... 省略部分代码
}

执行任务, 就是上文提到的,线程循环中会不断的 Get Message 然后执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void Thread::Dispatch(Message* pmsg) {
TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file",
pmsg->posted_from.file_name(), "src_func",
pmsg->posted_from.function_name());
int64_t start_time = TimeMillis();
// 这一行会调用 functor
pmsg->phandler->OnMessage(pmsg);
int64_t end_time = TimeMillis();
int64_t diff = TimeDiff(end_time, start_time);
if (diff >= kSlowDispatchLoggingThreshold) {
RTC_LOG(LS_INFO) << "Message took " << diff
<< "ms to dispatch. Posted from: "
<< pmsg->posted_from.ToString();
}
}

webrtc::ProcessThread

它持有一个 PlatformThread 对象,实际上底层也是持有一个 pthread 线程。

1
2
3
4
5
6

void PlatformThread::Start() {
... 省略部分代码
RTC_CHECK_EQ(0, pthread_create(&thread_, &attr, &StartThread, this));
... 省略部分代码
}

线程体会循环调用这个函数。会做这几件事情

  • 执行 module 的 callback
  • 执行 delayed_task_ 队列中的任务
  • 执行 queue_ 队列中的任务

下面说一下这几种情况的逻辑。

执行 module 里的 callback, 通过 GetNextCallbackTime 获取下一次执行 callback 的时间,如果小于当前时间或者需要立即执行,那么就会调用 module 的 callback 函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
bool ProcessThreadImpl::Process() {
... 省略代码
for (ModuleCallback& m : modules_) {
// TODO(tommi): Would be good to measure the time TimeUntilNextProcess
// takes and dcheck if it takes too long (e.g. >=10ms). Ideally this
// operation should not require taking a lock, so querying all modules
// should run in a matter of nanoseconds.
if (m.next_callback == 0)
m.next_callback = GetNextCallbackTime(m.module, now);

if (m.next_callback <= now ||
m.next_callback == kCallProcessImmediately) {
{
TRACE_EVENT2("webrtc", "ModuleProcess", "function",
m.location.function_name(), "file",
m.location.file_name());
m.module->Process();
}
// Use a new 'now' reference to calculate when the next callback
// should occur. We'll continue to use 'now' above for the baseline
// of calculating how long we should wait, to reduce variance.
int64_t new_now = rtc::TimeMillis();
m.next_callback = GetNextCallbackTime(m.module, new_now);
}

if (m.next_callback < next_checkpoint)
next_checkpoint = m.next_callback;
}
... 省略代码

}

执行 delayed_task_ 队列和 queue_ 队列中的任务。 这两个队列中的任务都是用户侧通过接口将任务压入队列,线程会逐一执行这些任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 下面两个接口是压任务
void ProcessThreadImpl::PostTask(std::unique_ptr<QueuedTask> task)
void ProcessThreadImpl::PostDelayedTask(std::unique_ptr<QueuedTask> task, uint32_t milliseconds)

//这里是执行任务
bool ProcessThreadImpl::Process() {
...省略部分代码
while (!delayed_tasks_.empty() && delayed_tasks_.top().run_at_ms <= now) {
queue_.push(delayed_tasks_.top().task);
delayed_tasks_.pop();
}

if (!delayed_tasks_.empty()) {
next_checkpoint =
std::min(next_checkpoint, delayed_tasks_.top().run_at_ms);
}

while (!queue_.empty()) {
QueuedTask* task = queue_.front();
queue_.pop();
lock_.Leave();
if (task->Run()) {
delete task;
}
lock_.Enter();
}
... 省略部分代码
}

下面的 PacerThread 和 ModuleProcessThread 中讲一下他们是怎么使用的。

分析 webrtc 中的线程

Linux 平台为例

threaded-ml

PulseAudio main loop thread?

webrtc_audio_module_rec_thread

音频采集线程

webrtc_audio_module_play_thread

音频播放线程

AudioDeviceBufferTimer

1
2
// init in audio_device_buffer.cc
AudioDeviceBuffer::AudioDeviceBuffer

仅做了非常少的几个操作,而且作业非常短,所以基本上不占用资源。

rtc-low-prio

1
2
3
// media/engine/webrtc_voice_engine.cc
// 只有这里用到了
WebRtcVoiceEngine::StartAecDump

看名字应该是低优先级任务线程。仅有一个人任务被设置了,看起来似乎是输出回音消除模块的信息,由于不开回音消除,所以它没有实际任务,可以说是一个空线程。但是没有开关来控制它是否创建。

rtp_send_controller

1
// call/rtp_transport_controller_send.cc

AudioEncoder

1
2
3
4
5
6
7
是一个 TaskQueueLibevent 对象,内部创建一个 rtc::PlatformThread。 采集线程会将 pcm 数据丢到这个线程中去做后续的处理, 主要是编码工作。
// audio/channel_send.cc
TaskQueueLibeventFactory::CreateTaskQueue
// 实际的代码是这样的
//encoder_queue_(task_queue_factory->CreateTaskQueue(
// "AudioEncoder",
// TaskQueueFactory::Priority::NORMAL))

PacerThread

是一个 webrtc::ProcessThread 对象, 由 rtc::call 创建 。

1
2
3
4
5
6
// scr/call/call.cc
Call* Call::Create(const Call::Config& config) {
return Create(config, Clock::GetRealTimeClock(),
ProcessThread::Create("ModuleProcessThread"),
ProcessThread::Create("PacerThread"));
}

PacerThread 通过注册一个 module 到 ProcessThread 线程来完成相关的任务。最终通过调用 PacedSender::TimeUntilNextProcess() 获取下一次 callback 的间隔。callback 最终会调用 PacedSender::Process 方法来发送数据包。初步了解,这个线程的处理单元对音频基本无效,主要针对视频。主要职责就是一定间隔从队列中取出数据包投递到发送模块中,其目的是为了使得数据平滑的被发送出去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 注册module
process_thread_->RegisterModule(&module_proxy_, RTC_FROM_HERE);

// module
class ModuleProxy : public Module {
public:
explicit ModuleProxy(PacedSender* delegate) : delegate_(delegate) {}

private:
int64_t TimeUntilNextProcess() override {
return delegate_->TimeUntilNextProcess();
}
void Process() override { return delegate_->Process(); }
void ProcessThreadAttached(ProcessThread* process_thread) override {
return delegate_->ProcessThreadAttached(process_thread);
}

PacedSender* const delegate_;
} module_proxy_{this}

ModuleProcessThread

是一个 webrtc::ProcessThread 对象, 由 rtc::call 创建 。

1
2
3
4
5
6
// scr/call/call.cc
Call* Call::Create(const Call::Config& config) {
return Create(config, Clock::GetRealTimeClock(),
ProcessThread::Create("ModuleProcessThread"),
ProcessThread::Create("PacerThread"));
}

ModuleProcessThread 注册了两个模块,分别看一下他们做了什么事情。

1
2
module_process_thread_->RegisterModule(receive_side_cc_.GetRemoteBitrateEstimator(true), RTC_FROM_HERE);
module_process_thread_->RegisterModule(&receive_side_cc_, RTC_FROM_HERE);

第一个是 webrtc::RemoteEstimatorProxy 模块, 他会根据策略发送 feedback 包(内容待分析)。

1
void webrtc::RemoteEstimatorProxy::SendPeriodicFeedbacks()

第二个是 webrtc::ReceiveSideCongestionController (接收测拥塞控制?), 定期执行下面的函数。

1
void RemoteBitrateEstimatorSingleStream::Process()

signaling_thread

根据 webrtc 官方的说明,对 webrtc 的几乎大部分 api 调用,都经过了封装,调用会被投递到 signaling_thread 线程进行处理,并同步的返回结果,并且这个线程主要完成短作业。
是一个 rtc::thread 对象, 可以是用户侧创建的实例,也可以是框架创建默认的实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
rtc::scoped_refptr<PeerConnectionFactoryInterface> CreatePeerConnectionFactory(
rtc::Thread* network_thread,
rtc::Thread* worker_thread,
rtc::Thread* signaling_thread,
... // 省略部分
) {

.... // 省略部分

PeerConnectionFactoryDependencies dependencies;
dependencies.network_thread = network_thread;
dependencies.worker_thread = worker_thread;
dependencies.signaling_thread = signaling_thread;

.... // 省略部分

}


PeerConnectionFactory::PeerConnectionFactory(
PeerConnectionFactoryDependencies dependencies)
: wraps_current_thread_(false),
network_thread_(dependencies.network_thread),
worker_thread_(dependencies.worker_thread),
signaling_thread_(dependencies.signaling_thread),
... // 省略部分
{
if (!network_thread_) {
owned_network_thread_ = rtc::Thread::CreateWithSocketServer();
owned_network_thread_->SetName("pc_network_thread", nullptr);
owned_network_thread_->Start();
network_thread_ = owned_network_thread_.get();
}

if (!worker_thread_) {
owned_worker_thread_ = rtc::Thread::Create();
owned_worker_thread_->SetName("pc_worker_thread", nullptr);
owned_worker_thread_->Start();
worker_thread_ = owned_worker_thread_.get();
}

if (!signaling_thread_) {
signaling_thread_ = rtc::Thread::Current();
if (!signaling_thread_) {
// If this thread isn't already wrapped by an rtc::Thread, create a
// wrapper and own it in this class.
signaling_thread_ = rtc::ThreadManager::Instance()->WrapCurrentThread();
wraps_current_thread_ = true;
}
}
}

如果用户侧创建了对应的线程,并传递给了接口,那框架会直接使用,用户侧没有指定的情况下, signaling_thread_ 默认使用当前线程,也就是调用 CreatePeerConnectionFactory 的线程。 worker 和 netwo 线程则都是创建默认的线程,下面不在赘述。

我们通过 CreatePeerConnection 方法获取到的 PeerConnection 是框架封装过后的一个对象,这个对象的函数调用都被 wrap 过,最终会通过 signaling_thread 去同步的调用对应的接口。

worker_thread

是一个 rtc::thread 对象, 可以是用户侧创建的实例,也可以是框架创建默认的实例。
根据 webrtc 描述,这个线程主要做长作业。

network_thread

是一个 rtc::thread 对象, 可以是用户侧创建的实例,也可以是框架创建默认的实例。

SocketServer

这里还要提一下 SocketServer, 因为 webrtc::Thread 对象会有一个 SocketServer 对象。

Author: 42
Link: http://blog.ikernel.cn/2020/09/10/webrtc%E7%BA%BF%E7%A8%8B%E6%A8%A1%E5%9E%8B/
Copyright Notice: All articles in this blog are licensed under CC BY-NC-SA 4.0 unless stating additionally.

Comment