Ring Daemon 16.0.0
Loading...
Searching...
No Matches
scheduled_executor.cpp
Go to the documentation of this file.
1/*
2 * Copyright (C) 2004-2025 Savoir-faire Linux Inc.
3 *
4 * This program is free software: you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation, either version 3 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <https://www.gnu.org/licenses/>.
16 */
17#include "scheduled_executor.h"
18#include "logger.h"
19
20namespace jami {
21
22std::atomic<uint64_t> task_cookie = {0};
23
25 : name_(name)
26 , running_(std::make_shared<std::atomic<bool>>(true))
27 , thread_([this, is_running = running_] {
28 // The thread needs its own reference of `running_` in case the
29 // scheduler is destroyed within the thread because of a job
30
31 while (*is_running)
32 loop();
33 })
34{}
35
37{
38 stop();
39
40 if (not thread_.joinable()) {
41 return;
42 }
43
44 // Avoid deadlock
45 if (std::this_thread::get_id() == thread_.get_id()) {
46 thread_.detach();
47 } else {
48 thread_.join();
49 }
50}
51
52void
54{
55 std::lock_guard lock(jobLock_);
56 *running_ = false;
57 jobs_.clear();
58 cv_.notify_all();
59}
60
61void
62ScheduledExecutor::run(std::function<void()>&& job,
63 const char* filename, uint32_t linum)
64{
65 std::lock_guard lock(jobLock_);
66 auto now = clock::now();
67 jobs_[now].emplace_back(std::move(job), filename, linum);
68 cv_.notify_all();
69}
70
71std::shared_ptr<Task>
72ScheduledExecutor::schedule(std::function<void()>&& job, time_point t,
73 const char* filename, uint32_t linum)
74{
75 auto ret = std::make_shared<Task>(std::move(job), filename, linum);
76 schedule(ret, t);
77 return ret;
78}
79
80std::shared_ptr<Task>
81ScheduledExecutor::scheduleIn(std::function<void()>&& job, duration dt,
82 const char* filename, uint32_t linum)
83{
84 return schedule(std::move(job), clock::now() + dt,
85 filename, linum);
86}
87
88std::shared_ptr<RepeatedTask>
89ScheduledExecutor::scheduleAtFixedRate(std::function<bool()>&& job,
91 const char* filename, uint32_t linum)
92{
93 auto ret = std::make_shared<RepeatedTask>(std::move(job), filename, linum);
94 reschedule(ret, clock::now(), dt);
95 return ret;
96}
97
98void
99ScheduledExecutor::reschedule(std::shared_ptr<RepeatedTask> task, time_point t, duration dt)
100{
101 const char* filename = task->job().filename;
102 uint32_t linenum = task->job().linum;
103 schedule(std::make_shared<Task>([this, task = std::move(task), t, dt]() mutable {
104 if (task->run(name_.c_str()))
105 reschedule(std::move(task), t + dt, dt);
106 }, filename, linenum),
107 t);
108}
109
110void
111ScheduledExecutor::schedule(std::shared_ptr<Task> task, time_point t)
112{
113 const char* filename = task->job().filename;
114 uint32_t linenum = task->job().linum;
115 std::lock_guard lock(jobLock_);
116 jobs_[t].emplace_back([task = std::move(task), this] { task->run(name_.c_str()); },
117 filename, linenum);
118 cv_.notify_all();
119}
120
121void
122ScheduledExecutor::loop()
123{
124 std::vector<Job> jobs;
125 {
126 std::unique_lock lock(jobLock_);
127 while (*running_ and (jobs_.empty() or jobs_.begin()->first > clock::now())) {
128 if (jobs_.empty())
129 cv_.wait(lock);
130 else {
131 auto nextJob = jobs_.begin()->first;
132 cv_.wait_until(lock, nextJob);
133 }
134 }
135 if (not *running_)
136 return;
137 jobs = std::move(jobs_.begin()->second);
138 jobs_.erase(jobs_.begin());
139 }
140 for (auto& job : jobs) {
141 try {
142 job.fn();
143 } catch (const std::exception& e) {
144 JAMI_ERR("Exception running job: %s", e.what());
145 }
146 }
147}
148
149} // namespace jami
std::shared_ptr< Task > scheduleIn(std::function< void()> &&job, duration dt, const char *filename=CURRENT_FILENAME(), uint32_t linum=CURRENT_LINE())
Schedule job to be run after delay dt.
ScheduledExecutor(const std::string &name_)
void stop()
Stop the scheduler, it is unable to be reversed.
std::shared_ptr< RepeatedTask > scheduleAtFixedRate(std::function< bool()> &&job, duration dt, const char *filename=CURRENT_FILENAME(), uint32_t linum=CURRENT_LINE())
Schedule job to be run every dt, starting now.
std::shared_ptr< Task > schedule(std::function< void()> &&job, time_point t, const char *filename=CURRENT_FILENAME(), uint32_t linum=CURRENT_LINE())
Schedule job to be run at time t.
void run(std::function< void()> &&job, const char *filename=CURRENT_FILENAME(), uint32_t linum=CURRENT_LINE())
Schedule job to be run ASAP.
#define JAMI_ERR(...)
Definition logger.h:218
void emitSignal(Args... args)
Definition ring_signal.h:64
std::atomic< uint64_t > task_cookie