26struct ThreadPool::ThreadPoolThread :
public Thread
28 ThreadPoolThread (
ThreadPool& p,
size_t stackSize)
29 :
Thread (
"Pool", stackSize), pool (p)
36 if (! pool.runNextJob (*
this))
40 std::atomic<ThreadPoolJob*> currentJob {
nullptr };
43 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
55 jassert (pool ==
nullptr || ! pool->
contains (
this));
76 listeners.
add (listener);
81 listeners.
remove (listener);
87 return t->currentJob.load();
111void ThreadPool::createThreads (
int numThreads,
size_t threadStackSize)
114 threads.
add (
new ThreadPoolThread (*
this, threadStackSize));
116 for (
auto*
t : threads)
120void ThreadPool::stopThreads()
122 for (
auto* t : threads)
123 t->signalThreadShouldExit();
125 for (
auto* t : threads)
131 jassert (
job !=
nullptr);
132 jassert (
job->pool ==
nullptr);
134 if (
job->pool ==
nullptr)
137 job->shouldStop =
false;
138 job->isActive =
false;
146 for (
auto*
t : threads)
156 JobStatus runJob()
override {
return job(); }
169 JobStatus runJob()
override {
job();
return ThreadPoolJob::jobHasFinished; }
185 return threads.
size();
212 if (index > 0 && !
job->isActive)
213 jobs.move (index, 0);
227 jobFinishedSignal.
wait (2);
248 job->signalJobShouldExit();
274 for (
int i = jobs.
size(); --i >= 0;)
285 job->signalJobShouldExit();
315 jobFinishedSignal.
wait (20);
326 for (
auto*
job : jobs)
328 s.
add (
job->getJobName());
337 for (
auto*
t : threads)
351 for (
int i = 0; i < jobs.
size(); ++i)
353 if (
auto*
job = jobs[i])
365 job->isActive =
true;
375bool ThreadPool::runNextJob (ThreadPoolThread& thread)
377 if (
auto* job = pickNextJobToRun())
380 thread.currentJob = job;
384 result = job->runJob();
391 thread.currentJob =
nullptr;
393 OwnedArray<ThreadPoolJob> deletionList;
396 const ScopedLock sl (lock);
400 job->isActive =
false;
405 addToDeleteList (deletionList, job);
407 jobFinishedSignal.
signal();
423void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job)
const
425 job->shouldStop =
true;
428 if (job->shouldBeDeleted)
429 deletionList.
add (job);
ElementType getUnchecked(int index) const
int size() const noexcept
void removeFirstMatchingValue(ParameterType valueToRemove)
void remove(int indexToRemove)
int indexOf(ParameterType elementToLookFor) const
void add(const ElementType &newElement)
bool contains(ParameterType elementToLookFor) const
void move(int currentIndex, int newIndex) noexcept
int size() const noexcept
ObjectClass * add(ObjectClass *newObject)
void add(String stringToAdd)
static int getNumCpus() noexcept
void signalJobShouldExit()
void setJobName(const String &newName)
String getJobName() const
void addListener(Thread::Listener *)
static ThreadPoolJob * getCurrentThreadPoolJob()
void removeListener(Thread::Listener *)
ThreadPoolJob(const String &name)
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
int getNumThreads() const noexcept
ThreadPoolJob * getJob(int index) const noexcept
int getNumJobs() const noexcept
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
bool isJobRunning(const ThreadPoolJob *job) const noexcept
bool setThreadPriorities(int newPriority)
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
bool contains(const ThreadPoolJob *job) const noexcept
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
static Thread *JUCE_CALLTYPE getCurrentThread()
bool wait(int timeOutMilliseconds) const
Thread(const String &threadName, size_t threadStackSize=0)
bool threadShouldExit() const
static uint32 getMillisecondCounter() noexcept
bool wait(int timeOutMilliseconds=-1) const