diff --git a/Source/Engine/Threading/JobSystem.cpp b/Source/Engine/Threading/JobSystem.cpp new file mode 100644 index 000000000..b878bbdea --- /dev/null +++ b/Source/Engine/Threading/JobSystem.cpp @@ -0,0 +1,200 @@ +// Copyright (c) 2012-2021 Wojciech Figat. All rights reserved. + +#include "JobSystem.h" +#include "IRunnable.h" +#include "Engine/Core/Collections/RingBuffer.h" +#include "Engine/Platform/CPUInfo.h" +#include "Engine/Platform/Thread.h" +#include "Engine/Platform/ConditionVariable.h" +#include "Engine/Engine/EngineService.h" +#include "Engine/Profiler/ProfilerCPU.h" + +class JobSystemService : public EngineService +{ +public: + + JobSystemService() + : EngineService(TEXT("JobSystem"), -800) + { + } + + bool Init() override; + void BeforeExit() override; + void Dispose() override; +}; + +struct JobData +{ + Function Job; + int32 Index; + int32 Count; +}; + +template<> +struct TIsPODType +{ + enum { Value = true }; +}; + +class JobSystemThread : public IRunnable +{ +public: + int32 Index; + +public: + + // [IRunnable] + String ToString() const override + { + return TEXT("JobSystemThread"); + } + + int32 Run() override; + + void AfterWork(bool wasKilled) override + { + Delete(this); + } +}; + +namespace +{ + JobSystemService JobSystemInstance; + Thread* Threads[32] = {}; + int32 ThreadsCount = 0; + volatile int64 ExitFlag = 0; + volatile int64 DoneLabel = 0; + volatile int64 NextLabel = 0; + CriticalSection JobsLocker; + ConditionVariable JobsSignal; + ConditionVariable WaitSignal; + RingBuffer> Jobs; +} + +bool JobSystemService::Init() +{ + ThreadsCount = Math::Min(Platform::GetCPUInfo().LogicalProcessorCount, ARRAY_COUNT(Threads)); + for (int32 i = 0; i < ThreadsCount; i++) + { + auto runnable = New(); + runnable->Index = i; + auto thread = Thread::Create(runnable, String::Format(TEXT("Job System {0}"), i), ThreadPriority::AboveNormal); + if (thread == nullptr) + return true; + Threads[i] = thread; + } + return false; +} + +void JobSystemService::BeforeExit() +{ + Platform::AtomicStore(&ExitFlag, 1); + JobsSignal.NotifyAll(); +} + +void JobSystemService::Dispose() +{ + Platform::AtomicStore(&ExitFlag, 1); + JobsSignal.NotifyAll(); + Platform::Sleep(1); + + for (int32 i = 0; i < ThreadsCount; i++) + { + if (Threads[i] && Threads[i]->IsRunning()) + Threads[i]->Kill(true); + Threads[i] = nullptr; + } +} + +int32 JobSystemThread::Run() +{ + Platform::SetThreadAffinityMask(1 << Index); + + JobData data; + CriticalSection mutex; + while (Platform::AtomicRead(&ExitFlag) == 0) + { + // Try to get a job + JobsLocker.Lock(); + if (Jobs.Count() != 0) + { + auto& front = Jobs.PeekFront(); + data = front; + front.Index++; + if (front.Index == front.Count) + { + Jobs.PopFront(); + } + } + JobsLocker.Unlock(); + + if (data.Job.IsBinded()) + { + // Run job + data.Job(data.Index); + data.Job.Unbind(); + + if (data.Index + 1 == data.Count) + { + // Move forward with the job queue + Platform::InterlockedIncrement(&DoneLabel); + WaitSignal.NotifyAll(); + } + } + else + { + // Wait for signal + mutex.Lock(); + JobsSignal.Wait(mutex); + mutex.Unlock(); + } + } + return 0; +} + +int64 JobSystem::Dispatch(const Function& job, int32 jobCount) +{ + PROFILE_CPU(); + if (jobCount <= 0) + return 0; + + JobData data; + data.Job = job; + data.Index = 0; + data.Count = jobCount; + + JobsLocker.Lock(); + const auto label = Platform::InterlockedIncrement(&NextLabel); + Jobs.PushBack(data); + JobsLocker.Unlock(); + + if (jobCount == 1) + JobsSignal.NotifyOne(); + else + JobsSignal.NotifyAll(); + + return label; +} + +void JobSystem::Wait() +{ + Wait(Platform::AtomicRead(&NextLabel)); +} + +void JobSystem::Wait(int64 label) +{ + PROFILE_CPU(); + + // Early out + if (label <= Platform::AtomicRead(&DoneLabel)) + return; + + // Wait on signal until input label is not yet done + CriticalSection mutex; + while (label > Platform::AtomicRead(&DoneLabel) && Platform::AtomicRead(&ExitFlag) == 0) + { + mutex.Lock(); + WaitSignal.Wait(mutex); + mutex.Unlock(); + } +} diff --git a/Source/Engine/Threading/JobSystem.h b/Source/Engine/Threading/JobSystem.h new file mode 100644 index 000000000..cf9b79073 --- /dev/null +++ b/Source/Engine/Threading/JobSystem.h @@ -0,0 +1,32 @@ +// Copyright (c) 2012-2021 Wojciech Figat. All rights reserved. + +#pragma once + +#include "Engine/Core/Delegate.h" + +/// +/// Lightweight multi-threaded jobs execution scheduler. Uses a pool of threads and supports work-stealing concept. +/// +API_CLASS(Static) class FLAXENGINE_API JobSystem +{ +DECLARE_SCRIPTING_TYPE_MINIMAL(JobSystem); + + /// + /// Dispatches the job for the execution. + /// + /// The job. Argument is an index of the job execution. + /// The job executions count. + /// The label identifying this dispatch. Can be used to wait for the execution end. + API_FUNCTION() static int64 Dispatch(const Function& job, int32 jobCount = 1); + + /// + /// Waits for all dispatched jobs to finish. + /// + API_FUNCTION() static void Wait(); + + /// + /// Waits for all dispatched jobs until a given label to finish (i.e. waits for a Dispatch that returned that label). + /// + /// The label. + API_FUNCTION() static void Wait(int64 label); +};