// Copyright 2013 The Flutter Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "flutter/fml/concurrent_message_loop.h" #include #include "flutter/fml/thread.h" #include "flutter/fml/trace_event.h" namespace fml { ConcurrentMessageLoop::ConcurrentMessageLoop(size_t worker_count) : worker_count_(std::max(worker_count, 1ul)) { for (size_t i = 0; i < worker_count_; ++i) { workers_.emplace_back([i, this]() { fml::Thread::SetCurrentThreadName(fml::Thread::ThreadConfig( std::string{"io.worker." + std::to_string(i + 1)})); WorkerMain(); }); } for (const auto& worker : workers_) { worker_thread_ids_.emplace_back(worker.get_id()); } } ConcurrentMessageLoop::~ConcurrentMessageLoop() { Terminate(); for (auto& worker : workers_) { FML_DCHECK(worker.joinable()); worker.join(); } } size_t ConcurrentMessageLoop::GetWorkerCount() const { return worker_count_; } std::shared_ptr ConcurrentMessageLoop::GetTaskRunner() { return std::make_shared(weak_from_this()); } void ConcurrentMessageLoop::PostTask(const fml::closure& task) { if (!task) { return; } std::unique_lock lock(tasks_mutex_); // Don't just drop tasks on the floor in case of shutdown. if (shutdown_) { FML_DLOG(WARNING) << "Tried to post a task to shutdown concurrent message " "loop. The task will be executed on the callers thread."; lock.unlock(); ExecuteTask(task); return; } tasks_.push(task); // Unlock the mutex before notifying the condition variable because that mutex // has to be acquired on the other thread anyway. Waiting in this scope till // it is acquired there is a pessimization. lock.unlock(); tasks_condition_.notify_one(); } void ConcurrentMessageLoop::WorkerMain() { while (true) { std::unique_lock lock(tasks_mutex_); tasks_condition_.wait(lock, [&]() { return !tasks_.empty() || shutdown_ || HasThreadTasksLocked(); }); // Shutdown cannot be read with the task mutex unlocked. bool shutdown_now = shutdown_; fml::closure task; std::vector thread_tasks; if (!tasks_.empty()) { task = tasks_.front(); tasks_.pop(); } if (HasThreadTasksLocked()) { thread_tasks = GetThreadTasksLocked(); FML_DCHECK(!HasThreadTasksLocked()); } // Don't hold onto the mutex while tasks are being executed as they could // themselves try to post more tasks to the message loop. lock.unlock(); TRACE_EVENT0("flutter", "ConcurrentWorkerWake"); // Execute the primary task we woke up for. if (task) { ExecuteTask(task); } // Execute any thread tasks. for (const auto& thread_task : thread_tasks) { ExecuteTask(thread_task); } if (shutdown_now) { break; } } } void ConcurrentMessageLoop::ExecuteTask(const fml::closure& task) { task(); } void ConcurrentMessageLoop::Terminate() { std::scoped_lock lock(tasks_mutex_); shutdown_ = true; tasks_condition_.notify_all(); } void ConcurrentMessageLoop::PostTaskToAllWorkers(const fml::closure& task) { if (!task) { return; } std::scoped_lock lock(tasks_mutex_); for (const auto& worker_thread_id : worker_thread_ids_) { thread_tasks_[worker_thread_id].emplace_back(task); } tasks_condition_.notify_all(); } bool ConcurrentMessageLoop::HasThreadTasksLocked() const { return thread_tasks_.count(std::this_thread::get_id()) > 0; } std::vector ConcurrentMessageLoop::GetThreadTasksLocked() { auto found = thread_tasks_.find(std::this_thread::get_id()); FML_DCHECK(found != thread_tasks_.end()); std::vector pending_tasks; std::swap(pending_tasks, found->second); thread_tasks_.erase(found); return pending_tasks; } ConcurrentTaskRunner::ConcurrentTaskRunner( std::weak_ptr weak_loop) : weak_loop_(std::move(weak_loop)) {} ConcurrentTaskRunner::~ConcurrentTaskRunner() = default; void ConcurrentTaskRunner::PostTask(const fml::closure& task) { if (!task) { return; } if (auto loop = weak_loop_.lock()) { loop->PostTask(task); return; } FML_DLOG(WARNING) << "Tried to post to a concurrent message loop that has already died. " "Executing the task on the callers thread."; task(); } bool ConcurrentMessageLoop::RunsTasksOnCurrentThread() { std::scoped_lock lock(tasks_mutex_); for (const auto& worker_thread_id : worker_thread_ids_) { if (worker_thread_id == std::this_thread::get_id()) { return true; } } return false; } } // namespace fml