Browse Source

Brought threads to C++11

Revamped thread communication
Fixed dynamic thread Add/Remove
Added unit-test for threads !!!!
undefined
Benjamin ‘Touky’ Huet Sam Hocevar <sam@hocevar.net> 9 years ago
parent
commit
825401d997
12 changed files with 626 additions and 191 deletions
  1. +1
    -1
      doc/samples/meshviewer/meshviewer.cpp
  2. +2
    -2
      doc/tutorial/11_fractal.cpp
  3. +7
    -5
      src/lol/base/array.h
  4. +3
    -0
      src/lol/base/features.h
  5. +58
    -19
      src/lol/sys/thread.h
  6. +7
    -7
      src/lol/sys/threadtypes.h
  7. +1
    -0
      src/lol/sys/timer.h
  8. +162
    -63
      src/sys/thread.cpp
  9. +200
    -85
      src/sys/threadbase.h
  10. +9
    -7
      src/sys/threadtypes.cpp
  11. +5
    -0
      src/sys/timer.cpp
  12. +171
    -2
      src/t/sys/thread.cpp

+ 1
- 1
doc/samples/meshviewer/meshviewer.cpp View File

@@ -238,7 +238,7 @@ void MeshViewer::TickGame(float seconds)
{
for (ThreadJob* job : result)
{
if (job->GetJobType() == ThreadJobType::WORK_SUCCESSED)
if (job->GetJobType() == ThreadJobType::WORK_SUCCEEDED)
{
MeshViewerLoadJob* mvjob = static_cast<MeshViewerLoadJob*>(job);
mvjob->RetrieveResult(this);


+ 2
- 2
doc/tutorial/11_fractal.cpp View File

@@ -127,7 +127,7 @@ public:
#if LOL_FEATURE_THREADS
/* Spawn worker threads and wait for their readiness. */
for (int i = 0; i < MAX_THREADS; i++)
m_threads[i] = new thread(std::bind(&Fractal::DoWorkHelper, this));
m_threads[i] = new thread(std::bind(&Fractal::DoWorkHelper, this, std::placeholders::_1));
for (int i = 0; i < MAX_THREADS; i++)
m_spawnqueue.pop();
#endif
@@ -321,7 +321,7 @@ public:
}

#if LOL_FEATURE_THREADS
void DoWorkHelper()
void DoWorkHelper(thread *inst)
{
m_spawnqueue.push(0);
for ( ; ; )


+ 7
- 5
src/lol/base/array.h View File

@@ -267,12 +267,12 @@ public:
return false;
}

bool RemoveSwapItem(T const &x)
bool remove_swap_item(T const &x)
{
ptrdiff_t idx = find(x);
if (idx != INDEX_NONE)
{
RemoveSwap(idx);
remove_swap(idx);
return true;
}
return false;
@@ -329,7 +329,7 @@ public:
m_count -= todelete;
}

void RemoveSwap(ptrdiff_t pos, ptrdiff_t todelete = 1)
void remove_swap(ptrdiff_t pos, ptrdiff_t todelete = 1)
{
ASSERT(todelete >= 0);
ASSERT(pos - todelete >= -m_count - 1 && pos + todelete <= m_count,
@@ -400,8 +400,9 @@ public:
inline void Push(T const &x) { push(x); }
inline bool PushUnique(T const &x) { return push_unique(x); }
inline T Pop() { return pop(); }
inline void Swap(ptrdiff_t pos1, ptrdiff_t pos2) { return swap(pos1, pos2); }
inline void Remove(ptrdiff_t pos, ptrdiff_t todelete = 1) { return remove(pos, todelete); }
inline void Swap(ptrdiff_t pos1, ptrdiff_t pos2) { swap(pos1, pos2); }
inline void Remove(ptrdiff_t pos, ptrdiff_t todelete = 1) { remove(pos, todelete); }
inline void RemoveSwap(ptrdiff_t pos, ptrdiff_t todelete = 1) { remove_swap(pos, todelete); }
inline void Empty() { empty(); }
inline element_t& Last() { return last(); }
inline element_t *Data() { return data(); }
@@ -411,6 +412,7 @@ public:
inline bool InsertUnique(T const &x, ptrdiff_t pos) { return insert_unique(x, pos); }
inline ptrdiff_t Find(T const &x) { return find(x); }
inline bool RemoveItem(T const &x) { return remove_item(x); }
inline bool RemoveSwapItem(T const &x) { return remove_swap_item(x); }
inline void Resize(ptrdiff_t item_count, element_t e = element_t()) { return resize(item_count, e); }
inline void Reserve(ptrdiff_t toreserve) { return reserve(toreserve); }
inline ptrdiff_t Count() const { return count(); }


+ 3
- 0
src/lol/base/features.h View File

@@ -51,6 +51,9 @@
#undef LOL_FEATURE_CXX11_NULLPTR
#undef LOL_FEATURE_CXX11_TEMPLATE_ALIASES
#undef LOL_FEATURE_CXX11_SFINAE_FOR_CTORS
#undef LOL_FEATURE_CXX11_THREADS /* Touky: Is it really needed ? */

#define LOL_FEATURE_CXX11_THREADS 1 /* Touky: This should be available everywhere */

/* Features supported by GCC */
#if defined __GNUC__


+ 58
- 19
src/lol/sys/thread.h View File

@@ -41,8 +41,22 @@ public:
class thread : thread_base
{
public:
thread(std::function<void(void)> fn) : thread_base(fn) {}
virtual ~thread() {}
thread(std::function<void(thread*)> fn)
: thread_base(std::bind(&thread::do_trampoline, this)),
m_thread_function(fn)
{
Init();
}
virtual ~thread()
{ }

protected:
static void do_trampoline(thread *that)
{
that->m_thread_function(that);
}

std::function<void(thread*)> m_thread_function;
};

//ThreadStatus ----------------------------------------------------------------
@@ -52,6 +66,8 @@ struct ThreadStatusBase : public StructSafeEnum
{
NOTHING,
THREAD_STARTED,
THREAD_WORKING,
THREAD_IDLE,
THREAD_STOPPED,
};
protected:
@@ -71,7 +87,7 @@ struct ThreadJobTypeBase : public StructSafeEnum
{
NONE,
WORK_TODO,
WORK_SUCCESSED,
WORK_SUCCEEDED,
WORK_FAILED,
WORK_DONE,
THREAD_STOP
@@ -81,7 +97,7 @@ protected:
{
enum_map[NONE] = "NONE";
enum_map[WORK_TODO] = "WORK_TODO";
enum_map[WORK_SUCCESSED] = "WORK_SUCCESSED";
enum_map[WORK_SUCCEEDED] = "WORK_SUCCEEDED";
enum_map[WORK_FAILED] = "WORK_FAILED";
enum_map[WORK_DONE] = "WORK_DONE";
enum_map[THREAD_STOP] = "THREAD_STOP";
@@ -98,6 +114,7 @@ class ThreadJob
protected:
inline ThreadJob(ThreadJobType type) : m_type(type) {}
public:
char const *GetName() { return "<ThreadJob>"; }
inline ThreadJob() : m_type(ThreadJobType::NONE) {}
virtual ~ThreadJob() {}

@@ -115,40 +132,62 @@ class BaseThreadManager : public Entity
{
public:
char const *GetName() { return "<BaseThreadManager>"; }
BaseThreadManager(int thread_count);
BaseThreadManager(int thread_count, int thread_min);
~BaseThreadManager();
BaseThreadManager(int thread_max);
BaseThreadManager(int thread_max, int thread_min);
virtual ~BaseThreadManager();

//Base setup
void Setup(int thread_max);
void Setup(int thread_max, int thread_min);

//Initialize, Ticker::Ref and start the thread
bool Start();
//Stop the threads
bool Stop();

private:
//Work stuff
bool AddThreadWork(ThreadJob* job);

protected:
//Thread addition
void AddThreads(int nb);
void StopThreads(int nb);
//Thread count management
void AdjustThreadCount(int count);
void CleanAddedThread(bool wait = false);
void CleanRemovedThread(bool wait = false);

//Work stuff
bool AddWork(ThreadJob* job, bool force = false);
int GetDispatchCount();
int GetDispatchedCount();

//Dispatch job to threads
void DispatchJob(ThreadJob* job);
void DispatchJob(array<ThreadJob*> const& jobs);
//Fetch Results
bool FetchResult(array<ThreadJob*>& results);
//Base thread work function
void BaseThreadWork();
void BaseThreadWork(thread* inst);

virtual void TickGame(float seconds);
//Default behaviour : delete the job result
virtual void TreatResult(ThreadJob* result) { delete(result); }

/* Worker threads */
int m_thread_count;
int m_thread_min;
array<thread*> m_threads;
array<ThreadJob*> m_job_dispatch;
private:
/* Jobs */
array<ThreadJob*> m_job_dispatch;
int m_job_dispatched = 0;

#if LOL_FEATURE_THREADS
queue<ThreadStatus> m_spawnqueue, m_donequeue;
/* Worker threads */
int m_thread_max = 0;
int m_thread_min = 0;
int m_thread_active = 0;
array<thread*> m_threads;
Timer m_thread_added_timer;
int m_thread_added = 0;
Timer m_thread_removed_timer;
int m_thread_removed = 0;

queue<ThreadStatus> m_statusqueue;
queue<thread*> m_spawnqueue, m_donequeue;
#endif //LOL_FEATURE_THREADS
queue<ThreadJob*> m_jobqueue;
queue<ThreadJob*> m_resultqueue;


+ 7
- 7
src/lol/sys/threadtypes.h View File

@@ -26,15 +26,15 @@ class DefaultThreadManager : public BaseThreadManager
{
public:
char const *GetName() { return "<DefaultThreadManager>"; }
DefaultThreadManager(int thread_count)
: BaseThreadManager(thread_count, thread_count)
DefaultThreadManager(int thread_max)
: BaseThreadManager(thread_max, thread_max)
{ }
DefaultThreadManager(int thread_count, int thread_min)
: BaseThreadManager(thread_count, thread_min)
DefaultThreadManager(int thread_max, int thread_min)
: BaseThreadManager(thread_max, thread_min)
{ }

//Work stuff
bool AddJob(ThreadJob* job);
void AddJob(ThreadJob* job);
bool GetWorkResult(array<ThreadJob*>& results);

protected:
@@ -115,8 +115,8 @@ class AsyncImageLoader : public BaseThreadManager
{
public:
char const *GetName() { return "<AsyncImageLoader>"; }
AsyncImageLoader(int thread_count)
: BaseThreadManager(thread_count, 0)
AsyncImageLoader(int thread_max)
: BaseThreadManager(thread_max, 0)
{
m_dummy_image.DummyFill();
}


+ 1
- 0
src/lol/sys/timer.h View File

@@ -26,6 +26,7 @@ public:
Timer();
~Timer();

void Reset();
float Get();
float Poll();
void Wait(float seconds);


+ 162
- 63
src/sys/thread.cpp View File

@@ -17,16 +17,12 @@ namespace lol
{

//BaseThreadManager -----------------------------------------------------------
BaseThreadManager::BaseThreadManager(int thread_count)
{
m_thread_min = thread_count;
m_thread_count = thread_count;
}
BaseThreadManager::BaseThreadManager(int thread_max) : BaseThreadManager(thread_max, thread_max)
{ }

BaseThreadManager::BaseThreadManager(int thread_min, int thread_count)
BaseThreadManager::BaseThreadManager(int thread_max, int thread_min)
{
m_thread_min = thread_min;
m_thread_count = thread_count;
Setup(thread_max, thread_min);
}

BaseThreadManager::~BaseThreadManager()
@@ -34,72 +30,158 @@ BaseThreadManager::~BaseThreadManager()
Stop();
}

//Initialize, Ticker::Ref and start the thread
//Base Setup ------------------------------------------------------------------
void BaseThreadManager::Setup(int thread_max)
{
Setup(thread_max, thread_max);
}
void BaseThreadManager::Setup(int thread_max, int thread_min)
{
#if LOL_FEATURE_THREADS
m_thread_max = thread_max;
m_thread_min = thread_min;
#endif //LOL_FEATURE_THREADS
}

//Initialize, Ticker::Ref and start the thread --------------------------------
bool BaseThreadManager::Start()
{
#if LOL_FEATURE_THREADS
ASSERT(!!m_thread_max, "Thread count shouldn't be zero");

if (m_threads.count() > 0)
return false;

//Add minimum threads
m_threads.reserve(m_thread_count);
AddThreads(m_thread_count /*FIX THAT (TOUKY) m_thread_min*/);
m_threads.reserve(m_thread_max);
//AddThreads(m_thread_min);
AdjustThreadCount(m_thread_min);
#endif //LOL_FEATURE_THREADS

return true;
}

//Stop the threads
//Stop the threads ------------------------------------------------------------
bool BaseThreadManager::Stop()
{
#if LOL_FEATURE_THREADS
if (m_threads.count() <= 0)
return false;

//Stop all threads
StopThreads((int)m_threads.count());
//End all threads
//RemoveThreads((int)m_threads.count());
AdjustThreadCount(0);
CleanAddedThread(true);
CleanRemovedThread(true);
#endif //LOL_FEATURE_THREADS

return true;
}

//----
void BaseThreadManager::AddThreads(int nb)
//Work stuff ------------------------------------------------------------------
bool BaseThreadManager::AddThreadWork(ThreadJob* job)
{
return m_jobqueue.try_push(job);
}

//-----------------------------------------------------------------------------
void BaseThreadManager::AdjustThreadCount(int count)
{
//Don't add threads if not availables
#if LOL_FEATURE_THREADS
//Spawn worker threads and ...
for (int i = 0; i < nb; i++)
m_threads << new thread(std::bind(&BaseThreadManager::BaseThreadWork, this));
int actual_count = (int)m_threads.count() - m_thread_removed;
int diff = count - actual_count;

//... Wait for their readiness.
for (int i = 0; i < m_thread_count; i++)
m_spawnqueue.pop();
for (int i = 0; i < lol::abs(diff); i++)
{
if (diff > 0)
{
//Spawn worker threads and ...
m_threads << new thread(std::bind(&BaseThreadManager::BaseThreadWork, this, std::placeholders::_1));
m_thread_added++;
m_thread_added_timer.Reset();
}
else
{
//Signal worker threads for completion and ...
m_jobqueue.push(new ThreadJob(ThreadJobType::THREAD_STOP));
m_thread_removed++;
m_thread_removed_timer.Reset();
}
}
#endif //LOL_FEATURE_THREADS
}

//----
void BaseThreadManager::StopThreads(int nb)
//-----------------------------------------------------------------------------
void BaseThreadManager::CleanAddedThread(bool wait)
{
//Don't stop threads if not availables
#if LOL_FEATURE_THREADS
//Signal worker threads for completion and ...
ThreadJob stop_job(ThreadJobType::THREAD_STOP);
for (int i = 0; i < nb; i++)
m_jobqueue.push(&stop_job);
//... Wait for their readiness.
while (m_thread_added > 0)
{
thread* inst = nullptr;
bool found = false;
if (wait)
found = !!(inst = m_spawnqueue.pop());
else
found = m_spawnqueue.try_pop(inst);
if (found)
m_thread_added--;
else
break;
}
//Assert if spawning took too much time
//ASSERT(!(m_thread_added > 0 && m_thread_added_timer.Poll() > 5.f));
#endif //LOL_FEATURE_THREADS
}

//-----------------------------------------------------------------------------
void BaseThreadManager::CleanRemovedThread(bool wait)
{
#if LOL_FEATURE_THREADS
//... Wait for them to quit.
for (int i = 0; i < nb; i++)
m_donequeue.pop();
while (m_thread_removed > 0)
{
thread* inst = nullptr;
bool found = false;
if (wait)
found = !!(inst = m_donequeue.pop());
else
found = m_donequeue.try_pop(inst);
if (found)
{
m_thread_removed--;
m_threads.remove_swap_item(inst);
delete inst;
}
else
break;
}
//Assert if stopping took too much time
//ASSERT(!(m_thread_removed > 0 && m_thread_removed_timer.Poll() > 5.f));
#endif //LOL_FEATURE_THREADS
}

//Work stuff
bool BaseThreadManager::AddWork(ThreadJob* job, bool force)
//-----------------------------------------------------------------------------
int BaseThreadManager::GetDispatchCount()
{
if (!force)
return m_jobqueue.try_push(job);
m_jobqueue.push(job);
return true;
return (int)m_job_dispatch.count();
}
int BaseThreadManager::GetDispatchedCount()
{
return m_job_dispatched;
}

//-----------------------------------------------------------------------------
void BaseThreadManager::DispatchJob(ThreadJob* job)
{
m_job_dispatch << job;
}
void BaseThreadManager::DispatchJob(array<ThreadJob*> const& jobs)
{
m_job_dispatch += jobs;
}

//----
//-----------------------------------------------------------------------------
bool BaseThreadManager::FetchResult(array<ThreadJob*>& results)
{
ThreadJob* result;
@@ -108,45 +190,54 @@ bool BaseThreadManager::FetchResult(array<ThreadJob*>& results)
return results.count() > 0;
}

//Base thread work function
void BaseThreadManager::BaseThreadWork()
//Base thread work function ---------------------------------------------------
void BaseThreadManager::BaseThreadWork(thread* inst)
{
ThreadJob* job = nullptr;
#if LOL_FEATURE_THREADS
//Register that the thread has started
m_spawnqueue.push(ThreadStatus::THREAD_STARTED);
for ( ; ; )
#endif //LOL_FEATURE_THREADS
m_statusqueue.push(ThreadStatus::THREAD_STARTED);
m_spawnqueue.push(inst);
for (;;)
#else //LOL_FEATURE_THREADS
while (m_jobqueue.try_pop(job))
#endif
{
//Try to retrieve a job
ThreadJob* job = m_jobqueue.pop();
#if LOL_FEATURE_THREADS
//Retrieve a job
job = m_jobqueue.pop();
//Stop thread
if (job->GetJobType() == ThreadJobType::THREAD_STOP)
{
delete job;
break;
}
//Or work
else
#else //LOL_FEATURE_THREADS
if (!job)
return;
#endif //LOL_FEATURE_THREADS
if (*job == ThreadJobType::WORK_TODO)
{
#if LOL_FEATURE_THREADS
m_statusqueue.push(ThreadStatus::THREAD_WORKING);
#endif //LOL_FEATURE_THREADS
if (job->DoWork())
job->SetJobType(ThreadJobType::WORK_SUCCESSED);
job->SetJobType(ThreadJobType::WORK_SUCCEEDED);
else
job->SetJobType(ThreadJobType::WORK_FAILED);
m_resultqueue.push(job);
#if LOL_FEATURE_THREADS
m_statusqueue.push(ThreadStatus::THREAD_IDLE);
#endif //LOL_FEATURE_THREADS
}
}
#if LOL_FEATURE_THREADS
//Register that the thread has stopped
m_donequeue.push(ThreadStatus::THREAD_STOPPED);
m_statusqueue.push(ThreadStatus::THREAD_STOPPED);
m_donequeue.push(inst);
#endif //LOL_FEATURE_THREADS
}

//----
//-----------------------------------------------------------------------------
void BaseThreadManager::TickGame(float seconds)
{
Entity::TickGame(seconds);
@@ -155,12 +246,16 @@ void BaseThreadManager::TickGame(float seconds)
Start();

//Dispatch work task
while (m_job_dispatch.count() > 0 && AddWork(m_job_dispatch.last()))
m_job_dispatch.pop();
while (m_job_dispatch.count() > 0 && AddThreadWork(m_job_dispatch[0]))
{
m_job_dispatch.remove(0);
//Keep track of added jobs
m_job_dispatched++;
}

//Execute one task per frame if thread are not available
#if !LOL_FEATURE_THREADS
BaseThreadWork();
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
BaseThreadWork(nullptr);
#endif // !LOL_FEATURE_THREADS

array<ThreadJob*> result;
@@ -171,17 +266,21 @@ void BaseThreadManager::TickGame(float seconds)
{
ThreadJob* job = result[i];
TreatResult(job);
//Remove job from count as it has been treated
m_job_dispatched--;
}
}

//TODO: TOUKY: FIX THAT
/*
//Resize thread count if needed
if (m_threads.count() > m_jobqueue.count() && m_threads.count() > m_thread_min)
StopThreads((int)(m_threads.Count() - m_thread_min));
else if (m_threads.count() < m_jobqueue.count())
AddThreads((int)(lol::min(m_jobqueue.count(), (ptrdiff_t)m_thread_count) - m_threads.count()));
*/
#if LOL_FEATURE_THREADS
ThreadStatus status;
while (m_statusqueue.try_pop(status))
m_thread_active += status == ThreadStatus::THREAD_IDLE ? -1 : +1;

AdjustThreadCount(lol::clamp(m_job_dispatched, m_thread_min, m_thread_max));
CleanAddedThread();
CleanRemovedThread();
#endif //LOL_FEATURE_THREADS
}

} /* namespace lol */

+ 200
- 85
src/sys/threadbase.h View File

@@ -18,7 +18,13 @@
// -----------------------------------
//

#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
# include <thread>
# include <mutex>
# include <condition_variable>
#elif defined HAVE_PTHREAD_H
# include <pthread.h>
#elif defined _XBOX
# include <xtl.h>
@@ -38,60 +44,90 @@ namespace lol
class mutex_base
{
public:
//-------------------------------------------------------------------------
mutex_base()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Nothing */
#elif defined HAVE_PTHREAD_H
pthread_mutex_init(&m_mutex, nullptr);
#elif defined _WIN32
InitializeCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS
}

//-------------------------------------------------------------------------
~mutex_base()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Nothing */
#elif defined HAVE_PTHREAD_H
pthread_mutex_destroy(&m_mutex);
#elif defined _WIN32
DeleteCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS
}

//Will block the thread if another has already locked ---------------------
void lock()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
m_mutex.lock();
#elif defined HAVE_PTHREAD_H
pthread_mutex_lock(&m_mutex);
#elif defined _WIN32
EnterCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS
}

//Will not block if another thread has already locked ---------------------
bool try_lock()
{
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
return m_mutex.try_lock();
#elif defined HAVE_PTHREAD_H
return !pthread_mutex_trylock(&m_mutex);
#elif defined _WIN32
return !!TryEnterCriticalSection(&m_mutex);
#endif
}

//-------------------------------------------------------------------------
void unlock()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
m_mutex.unlock();
#elif defined HAVE_PTHREAD_H
pthread_mutex_unlock(&m_mutex);
#elif defined _WIN32
LeaveCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS
}

//-------------------------------------------------------------------------
private:
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
std::mutex m_mutex;
#elif defined HAVE_PTHREAD_H
pthread_mutex_t m_mutex;
#elif defined _WIN32
CRITICAL_SECTION m_mutex;
#endif
#endif //LOL_FEATURE_THREADS
};

//A FIFO queue for threads ----------------------------------------------------
template<typename T, int N>
class queue_base
{
@@ -99,8 +135,11 @@ public:
queue_base()
{
m_start = m_count = 0;
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Nothing */
#elif defined HAVE_PTHREAD_H
m_poppers = m_pushers = 0;
pthread_mutex_init(&m_mutex, nullptr);
pthread_cond_init(&m_empty_cond, nullptr);
@@ -110,13 +149,15 @@ public:
m_full_sem = CreateSemaphore(nullptr, 0, CAPACITY, nullptr);
InitializeCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS
}

~queue_base()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Nothing */
#elif defined HAVE_PTHREAD_H
pthread_cond_destroy(&m_empty_cond);
pthread_cond_destroy(&m_full_cond);
pthread_mutex_destroy(&m_mutex);
@@ -125,13 +166,18 @@ public:
CloseHandle(m_full_sem);
DeleteCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS
}

//Will block the thread if another has already locked ---------------------
void push(T value)
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Wait for the mutex availability or non-fullness */
std::unique_lock<std::mutex> uni_lock(m_mutex);
m_full_cond.wait(uni_lock, [&]{return m_count < CAPACITY; });
#elif defined HAVE_PTHREAD_H
pthread_mutex_lock(&m_mutex);
/* If queue is full, wait on the "full" cond var. */
m_pushers++;
@@ -142,14 +188,16 @@ public:
WaitForSingleObject(m_empty_sem, INFINITE);
EnterCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS

/* Push value */
m_values[(m_start + m_count) % CAPACITY] = value;
m_count++;
do_push(value); /* Push value */

#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Release lock and notify empty condition var (in that order) */
uni_lock.unlock();
m_empty_cond.notify_one();
#elif defined HAVE_PTHREAD_H
/* If there were poppers waiting, signal the "empty" cond var. */
if (m_poppers)
pthread_cond_signal(&m_empty_cond);
@@ -158,15 +206,31 @@ public:
LeaveCriticalSection(&m_mutex);
ReleaseSemaphore(m_full_sem, 1, nullptr);
#endif
#endif //LOL_FEATURE_THREADS
}

//Will not block if another has already locked ----------------------------
bool try_push(T value)
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
pthread_mutex_lock(&m_mutex);
/* If queue is full, wait on the "full" cond var. */
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
if (m_count == CAPACITY)
return false;
#elif LOL_FEATURE_CXX11_THREADS
/* Same as Push(), except .... */
std::unique_lock<std::mutex> uni_lock(m_mutex, std::try_to_lock);
/* Bail on fail try_lock fail */
if (!uni_lock.owns_lock())
return false;
/* Bail on max CAPACITY */
if (m_count == CAPACITY)
{
uni_lock.unlock();
return false;
}
#elif defined HAVE_PTHREAD_H
/* Bail on fail try_lock */
if (pthread_mutex_trylock(&m_mutex))
return false;
/* Bail on max CAPACITY */
if (m_count == CAPACITY)
{
pthread_mutex_unlock(&m_mutex);
@@ -178,14 +242,16 @@ public:
return false;
EnterCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS

/* Push value */
m_values[(m_start + m_count) % CAPACITY] = value;
m_count++;
do_push(value); /* Push value */

#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Release lock and notify empty condition var (in that order) */
uni_lock.unlock();
m_empty_cond.notify_one();
#elif defined HAVE_PTHREAD_H
/* If there were poppers waiting, signal the "empty" cond var. */
if (m_poppers)
pthread_cond_signal(&m_empty_cond);
@@ -194,15 +260,20 @@ public:
LeaveCriticalSection(&m_mutex);
ReleaseSemaphore(m_full_sem, 1, nullptr);
#endif
#endif //LOL_FEATURE_THREADS

return true;
}

//Will block the thread if another has already locked ---------------------
T pop()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
ASSERT(0, "Pop should only be used with threads. Use try_pop instead.");
#elif LOL_FEATURE_CXX11_THREADS
/* Wait for the mutex availability or non-emptiness */
std::unique_lock<std::mutex> uni_lock(m_mutex);
m_empty_cond.wait(uni_lock, [&]{return m_count > 0; });
#elif defined HAVE_PTHREAD_H
pthread_mutex_lock(&m_mutex);
/* Wait until there is something in the queue. Be careful, we
* could get woken up but another thread may have eaten the
@@ -215,20 +286,16 @@ public:
WaitForSingleObject(m_full_sem, INFINITE);
EnterCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS

#if !LOL_FEATURE_THREADS
if (m_count == 0)
return T(0);
#endif //!LOL_FEATURE_THREADS
T ret = do_pop(); /* Pop value */

/* Pop value */
T ret = m_values[m_start];
m_start = (m_start + 1) % CAPACITY;
m_count--;
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Release lock and notify full condition var (in that order) */
uni_lock.unlock();
m_full_cond.notify_one();
#elif defined HAVE_PTHREAD_H
/* If there were pushers waiting, signal the "full" cond var. */
if (m_pushers)
pthread_cond_signal(&m_full_cond);
@@ -237,15 +304,29 @@ public:
LeaveCriticalSection(&m_mutex);
ReleaseSemaphore(m_empty_sem, 1, nullptr);
#endif
#endif //LOL_FEATURE_THREADS

return ret;
}

//Will not block if another has already locked ----------------------------
bool try_pop(T &ret)
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
if (m_count == 0)
return false;
#elif LOL_FEATURE_CXX11_THREADS
/* Same as Pop(), except .... */
std::unique_lock<std::mutex> uni_lock(m_mutex, std::try_to_lock);
/* Bail on fail try_lock fail */
if (!uni_lock.owns_lock())
return false;
/* Bail on zero count */
if (m_count == 0)
{
uni_lock.unlock();
return false;
}
#elif defined HAVE_PTHREAD_H
pthread_mutex_lock(&m_mutex);
if (m_count == 0)
{
@@ -258,20 +339,16 @@ public:
return false;
EnterCriticalSection(&m_mutex);
#endif
#endif //LOL_FEATURE_THREADS

#if !LOL_FEATURE_THREADS
if (m_count == 0)
return false;
#endif //!LOL_FEATURE_THREADS
ret = do_pop(); /* Pop value */

/* Pop value */
ret = m_values[m_start];
m_start = (m_start + 1) % CAPACITY;
m_count--;
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
/* Release lock and notify full condition var (in that order) */
uni_lock.unlock();
m_full_cond.notify_one();
#elif defined HAVE_PTHREAD_H
/* If there were pushers waiting, signal the "full" cond var. */
if (m_pushers)
pthread_cond_signal(&m_full_cond);
@@ -280,17 +357,37 @@ public:
LeaveCriticalSection(&m_mutex);
ReleaseSemaphore(m_empty_sem, 1, nullptr);
#endif
#endif //LOL_FEATURE_THREADS

return true;
}

//Inner methods for actual update -----------------------------------------
private:
void do_push(T &value)
{
m_values[(m_start + m_count) % CAPACITY] = value;
m_count++;
}

T& do_pop()
{
size_t idx = m_start;
m_start = (m_start + 1) % CAPACITY;
m_count--;
return m_values[idx];
}

//-------------------------------------------------------------------------
private:
static size_t const CAPACITY = N;
T m_values[CAPACITY];
size_t m_start, m_count;
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
std::mutex m_mutex;
std::condition_variable m_empty_cond, m_full_cond;
#elif defined HAVE_PTHREAD_H
size_t m_poppers, m_pushers;
pthread_mutex_t m_mutex;
pthread_cond_t m_empty_cond, m_full_cond;
@@ -298,17 +395,23 @@ private:
HANDLE m_empty_sem, m_full_sem;
CRITICAL_SECTION m_mutex;
#endif
#endif //LOL_FEATURE_THREADS
};

//Base class for threads ------------------------------------------------------
class thread_base
{
public:
thread_base(std::function<void(void)> function)
: m_function(function)
: m_function(function)
{ }

void Init()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
m_thread = std::thread(trampoline, this);
#elif defined HAVE_PTHREAD_H
/* Set the joinable attribute for systems who don't play nice */
pthread_attr_t attr;
pthread_attr_init(&attr);
@@ -318,38 +421,50 @@ public:
m_thread = CreateThread(nullptr, 0, (LPTHREAD_START_ROUTINE)trampoline,
this, 0, &m_tid);
#endif
#endif //LOL_FEATURE_THREADS
}

virtual ~thread_base()
{
#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
m_thread.join();
#elif defined HAVE_PTHREAD_H
pthread_join(m_thread, nullptr);
#elif defined _WIN32
WaitForSingleObject(m_thread, INFINITE);
#endif
#endif //LOL_FEATURE_THREADS
}

private:
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
static void trampoline(thread_base *that)
{
that->m_function();
}
#else
static void *trampoline(void *data)
{
thread_base *that = (thread_base *)data;
that->m_function();
return nullptr;
}
#endif

std::function<void(void)> m_function;

#if LOL_FEATURE_THREADS
#if defined HAVE_PTHREAD_H
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
/* Nothing */
#elif LOL_FEATURE_CXX11_THREADS
std::thread m_thread;
#elif defined HAVE_PTHREAD_H
pthread_t m_thread;
#elif defined _WIN32
HANDLE m_thread;
DWORD m_tid;
#endif
#endif //LOL_FEATURE_THREADS
};

} /* namespace lol */


+ 9
- 7
src/sys/threadtypes.cpp View File

@@ -16,9 +16,9 @@
using namespace lol;

//DefaultThreadManager --------------------------------------------------------
bool DefaultThreadManager::AddJob(ThreadJob* job)
void DefaultThreadManager::AddJob(ThreadJob* job)
{
return AddWork(job);
DispatchJob(job);
}
bool DefaultThreadManager::GetWorkResult(array<ThreadJob*>& results)
{
@@ -32,6 +32,7 @@ class FileUpdateTesterJob : public ThreadJob
{
friend class FileUpdateTester;
public:
char const *GetName() { return "<FileUpdateTesterJob>"; }
FileUpdateTesterJob()
: ThreadJob(ThreadJobType::NONE) { }
FileUpdateTesterJob(String path)
@@ -82,7 +83,7 @@ FileUpdateTester::~FileUpdateTester()
//File interface --------------------------------------------------------------
FileUpdateTester::Status* FileUpdateTester::RegisterFile(String const& path)
{
m_job_dispatch << new FileUpdateTesterJob(path);
DispatchJob(new FileUpdateTesterJob(path));
m_files[path] = new FileUpdateTester::Status();
return m_files[path];
}
@@ -115,7 +116,7 @@ void FileUpdateTester::TickGame(float seconds)
{
super::TickGame(seconds);

if (!m_job_dispatch.count() && m_job_done.count())
if (!GetDispatchCount() && m_job_done.count())
{
if (m_frame_count++ < m_frame_skip)
return;
@@ -125,7 +126,7 @@ void FileUpdateTester::TickGame(float seconds)
for (String key : keys)
m_files[key]->SetUpdated(false);

m_job_dispatch = m_job_done;
DispatchJob(m_job_done);
m_job_done.empty();
}
}
@@ -147,6 +148,7 @@ void FileUpdateTester::TreatResult(ThreadJob* result)
class AsyncImageJob : public ThreadJob
{
public:
char const *GetName() { return "<AsyncImageJob>"; }
AsyncImageJob()
: ThreadJob(ThreadJobType::NONE)
{
@@ -180,7 +182,7 @@ Image* AsyncImageLoader::Load(const lol::String& path)
//Link the two
m_images[path] = image;
//Add job
AddWork(job);
DispatchJob(job);
//return Dummy image
return image;
}
@@ -202,7 +204,7 @@ void AsyncImageLoader::TreatResult(ThreadJob* result)
AsyncImageJob* job = dynamic_cast<AsyncImageJob*>(result);
ASSERT(job);
//Copy image if work is OK
if (job->GetJobType() == ThreadJobType::WORK_SUCCESSED)
if (job->GetJobType() == ThreadJobType::WORK_SUCCEEDED)
{
Image* src = m_images[job->GetPath()];
m_images.remove(job->GetPath());


+ 5
- 0
src/sys/timer.cpp View File

@@ -152,6 +152,11 @@ Timer::~Timer()
delete data;
}

void Timer::Reset()
{
(void)data->GetSeconds(true);
}

float Timer::Get()
{
return data->GetSeconds(true);


+ 171
- 2
src/t/sys/thread.cpp View File

@@ -20,9 +20,178 @@ namespace lol

lolunit_declare_fixture(thread_test)
{
void SetUp() {}
//FileUpdateTesterJob ---------------------------------------------------------
class UnitTestJob : public ThreadJob
{
friend class UnitTestThreadManager;
public:
char const *GetName() { return "<UnitTestJob>"; }
UnitTestJob() : ThreadJob(ThreadJobType::WORK_TODO)
{ }
bool IsDone()
{
return m_done;
}

protected:
virtual bool DoWork()
{
Timer timer;
m_done = false;
Log::Info(Line("%s: STARTED WORK"), GetName());
timer.Wait(2.f);
Log::Info(Line("%s: ENDED WORK"), GetName());
m_done = true;
return true;
}
bool m_done = false;
};
//Unit test thread manager
class UnitTestThreadManager : public BaseThreadManager
{
typedef BaseThreadManager super;

struct UnitTestStatusBase : public StructSafeEnum
{
enum Type
{
NOT_QUEUED,
QUEUED,
RETRIEVED,
DONE,
};
protected:
virtual bool BuildEnumMap(map<int64_t, String>& enum_map)
{
enum_map[NOT_QUEUED] = "NOT_QUEUED";
enum_map[QUEUED] = "QUEUED";
enum_map[RETRIEVED] = "RETRIEVED";
enum_map[DONE] = "DONE";
return true;
}
};
typedef SafeEnum<UnitTestStatusBase> UnitTestStatus;

public:
char const *GetName() { return "<UnitTestThreadManager>"; }
UnitTestThreadManager() : BaseThreadManager(4, 1)
{ }
virtual ~UnitTestThreadManager()
{ }

void AddJob(ThreadJob* job)
{
Log::Info(Line("%s DISPATCHING JOB %s"), GetName(), job->GetName());
DispatchJob(job);
}
bool GetWorkResult(array<ThreadJob*>& results)
{
results += m_job_result;
m_job_result.Empty();
Log::Info(Line("%s GETWORKRESULT (%i)"), GetName(), results.count());
return results.Count() > 0;
}

virtual void TickGame(float seconds)
{
switch (m_status.ToScalar())
{
case UnitTestStatus::NOT_QUEUED:
if (!!GetDispatchCount())
{
Log::Info(Line("%s TICKGAME %s"), GetName(), m_status.ToString().C());
m_status = UnitTestStatus::QUEUED;
}
break;
case UnitTestStatus::QUEUED:
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
if (!GetDispatchedCount())
#else //LOL_FEATURE_THREADS
if (GetDispatchedCount())
#endif
{
Log::Info(Line("%s TICKGAME %s"), GetName(), m_status.ToString().C());
m_status = UnitTestStatus::RETRIEVED;
}
break;
case UnitTestStatus::RETRIEVED:
if (m_job_result.count() == 4)
{
Log::Info(Line("%s TICKGAME %s"), GetName(), m_status.ToString().C());
m_status = UnitTestStatus::DONE;
}
break;
default:
break;
}
//Debug error fix-up
#if !LOL_BUILD_RELEASE
m_tickstate = STATE_PRETICK_GAME;
#endif
super::TickGame(seconds);
}
bool IsDone() { return m_status == UnitTestStatus::DONE; }
int Test_GetDispatchCount() { return GetDispatchCount(); }
int Test_GetDispatchedCount() { return GetDispatchedCount(); }

protected:
virtual void TreatResult(ThreadJob* result) { m_job_result << result; }
array<ThreadJob*> m_job_result;
UnitTestStatus m_status;
};
UnitTestThreadManager m_manager;

void SetUp()
{
}

void TearDown()
{
}

void TearDown() {}
lolunit_declare_test(threads)
{
Log::Info(Line("%s START"), m_manager.GetName());
//Start threads manager
m_manager.Start();
Log::Info(Line("%s STARTED"), m_manager.GetName());

UnitTestJob job[4];
lolunit_assert_equal(0, m_manager.Test_GetDispatchCount());
m_manager.AddJob(&job[0]);
lolunit_assert_equal(1, m_manager.Test_GetDispatchCount());
lolunit_assert_equal(false, job[0].IsDone());
bool dispatch_check = true;
while (!m_manager.IsDone())
{
m_manager.TickGame(1.f / 60.f);
if (dispatch_check)
{
lolunit_assert_equal(0, m_manager.Test_GetDispatchCount());
#if !defined(LOL_FEATURE_THREADS) || !LOL_FEATURE_THREADS
lolunit_assert_equal(0, m_manager.Test_GetDispatchedCount());
#else //LOL_FEATURE_THREADS
lolunit_assert_equal(1, m_manager.Test_GetDispatchedCount());
#endif
m_manager.AddJob(&job[1]);
m_manager.AddJob(&job[2]);
m_manager.AddJob(&job[3]);
dispatch_check = false;
}
}
lolunit_assert_equal(true, job[0].IsDone());
lolunit_assert_equal(true, job[1].IsDone());
lolunit_assert_equal(true, job[2].IsDone());
lolunit_assert_equal(true, job[3].IsDone());
array<ThreadJob*> results;
m_manager.GetWorkResult(results);
lolunit_assert_equal(4, results.count());

Log::Info(Line("%s STOP"), m_manager.GetName());
//Stop manager
m_manager.Stop();
Log::Info(Line("%s STOPPED"), m_manager.GetName());
}

lolunit_declare_test(queue_try_push)
{


Loading…
Cancel
Save