From 48805e16399751f3df54ca1dcfd8ad404a82a84f Mon Sep 17 00:00:00 2001 From: Alejandro Saucedo Date: Thu, 15 Oct 2020 09:25:16 +0100 Subject: [PATCH] Updated to function for async --- single_include/kompute/Kompute.hpp | 98 ++++++++++++++++++- src/Sequence.cpp | 21 ++-- src/include/kompute/Manager.hpp | 33 ++++--- src/include/kompute/Sequence.hpp | 9 +- src/include/kompute/operations/OpAlgoBase.hpp | 2 +- 5 files changed, 138 insertions(+), 25 deletions(-) diff --git a/single_include/kompute/Kompute.hpp b/single_include/kompute/Kompute.hpp index 2718b6599..ee8705df5 100755 --- a/single_include/kompute/Kompute.hpp +++ b/single_include/kompute/Kompute.hpp @@ -1033,19 +1033,42 @@ class Sequence /** * Begins recording commands for commands to be submitted into the command * buffer. + * + * @return Boolean stating whether execution was successful. */ bool begin(); + /** * Ends the recording and stops recording commands when the record command * is sent. + * + * @return Boolean stating whether execution was successful. */ bool end(); + /** * Eval sends all the recorded and stored operations in the vector of * operations into the gpu as a submit job with a barrier. + * + * @return Boolean stating whether execution was successful. */ bool eval(); + /** + * Eval Async sends all the recorded and stored operations in the vector of operations into the gpu as a submit job with a barrier. EvalAwait() must be called after to ensure the sequence is terminated correctly. + * + * @return Boolean stating whether execution was successful. + */ + bool evalAsync(); + + /** + * Eval Await waits for the fence to finish processing and then once it finishes, it runs the postEval of all operations. + * + * @param waitFor Number of milliseconds to wait before timing out. + * @return Boolean stating whether execution was successful. + */ + bool evalAwait(uint64_t waitFor = UINT64_MAX); + /** * Returns true if the sequence is currently in recording activated. * @@ -1053,6 +1076,13 @@ class Sequence */ bool isRecording(); + /** + * Returns true if the sequence is currently running - mostly used for async workloads. + * + * @return Boolean stating if currently running. + */ + bool isRunning(); + /** * Returns true if the sequence has been successfully initialised. * @@ -1122,12 +1152,14 @@ class Sequence std::shared_ptr mCommandBuffer = nullptr; bool mFreeCommandBuffer = false; - // Base op objects + // -------------- ALWAYS OWNED RESOURCES + vk::Fence mFence; std::vector> mOperations; // State bool mIsInit = false; bool mRecording = false; + bool mIsRunning = false; // Create functions void createCommandPool(); @@ -1292,6 +1324,68 @@ class Manager SPDLOG_DEBUG("Kompute Manager evalOp running sequence SUCCESS"); } + /** + * Operation that adds extra operations to existing or new created + * sequences. + * + * @param tensors The tensors to be used in the operation recorded + * @param sequenceName The name of the sequence to be retrieved or created + * @param params Template parameters that will be used to initialise + * Operation to allow for extensible configurations on initialisation + */ + template + void evalOpAsync(std::vector> tensors, + std::string sequenceName, + TArgs&&... params) + { + SPDLOG_DEBUG("Kompute Manager evalOpAsync triggered"); + std::weak_ptr sqWeakPtr = + this->getOrCreateManagedSequence(sequenceName); + + if (std::shared_ptr sq = sqWeakPtr.lock()) { + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence BEGIN"); + sq->begin(); + + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence RECORD"); + sq->record(tensors, std::forward(params)...); + + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence END"); + sq->end(); + + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence EVAL"); + sq->evalAsync(); + } + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence SUCCESS"); + } + + /** + * Operation that adds extra operations to existing or new created + * sequences. + * + * @param sequenceName The name of the sequence to wait for termination + * @param waitFor The amount of time to wait before timing out + */ + template + void evalOpAwait(std::string sequenceName, uint64_t waitFor = UINT64_MAX) + { + SPDLOG_DEBUG("Kompute Manager evalOpAwait triggered"); + std::unordered_map>::iterator found = + this->mManagedSequences.find(sequenceName); + + if (found != this->mManagedSequences.end()) { + if (std::shared_ptr sq = found->second) { + SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence Sequence EVAL AWAIT"); + if (sq->isRunning()) { + sq->evalAwait(waitFor); + } + } + SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence SUCCESS"); + } + else { + SPDLOG_ERROR("Sequence not found"); + } + } + /** * Operation that adds extra operations to existing or new created * sequences. @@ -1599,7 +1693,7 @@ OpAlgoBase::OpAlgoBase(std::shared_ptr physicalD std::vector>& tensors) : OpBase(physicalDevice, device, commandBuffer, tensors, false) { - SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {} , shaderFilePath: {}", tensors.size()); + SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {}", tensors.size()); // The dispatch size is set up based on either explicitly provided template // parameters or by default it would take the shape and size of the tensors diff --git a/src/Sequence.cpp b/src/Sequence.cpp index 6fd9773e2..9038db2a7 100644 --- a/src/Sequence.cpp +++ b/src/Sequence.cpp @@ -140,12 +140,12 @@ Sequence::evalAsync() SPDLOG_WARN("Kompute Sequence evalAsync called when still recording"); return false; } - if (this->mEvalBusy) { + if (this->mIsRunning) { SPDLOG_WARN("Kompute Sequence evalAsync called when an eval async was called without successful wait"); return false; } - this->mEvalBusy = true; + this->mIsRunning = true; for (size_t i = 0; i < this->mOperations.size(); i++) { this->mOperations[i]->preEval(); @@ -154,7 +154,7 @@ Sequence::evalAsync() const vk::PipelineStageFlags waitStageMask = vk::PipelineStageFlagBits::eTransfer & vk::PipelineStageFlagBits::eComputeShader; vk::SubmitInfo submitInfo( - 0, nullptr, &waitStageMask, 1, this->mCommandBuffer.get()); + 0, nullptr, nullptr, 1, this->mCommandBuffer.get()); this->mFence = this->mDevice->createFence(vk::FenceCreateInfo()); @@ -162,12 +162,14 @@ Sequence::evalAsync() "Kompute sequence submitting command buffer into compute queue"); this->mComputeQueue->submit(1, &submitInfo, this->mFence); + + return true; } bool Sequence::evalAwait(uint64_t waitFor) { - if (!this->mEvalBusy) { + if (!this->mIsRunning) { SPDLOG_WARN("Kompute Sequence evalAwait called without existing eval"); return false; } @@ -177,7 +179,7 @@ Sequence::evalAwait(uint64_t waitFor) if (result == vk::Result::eTimeout) { SPDLOG_WARN("Kompute Sequence evalAwait timed out"); - this->mEvalBusy = false; + this->mIsRunning = false; return false; } @@ -185,7 +187,14 @@ Sequence::evalAwait(uint64_t waitFor) this->mOperations[i]->postEval(); } - this->mEvalBusy = false; + this->mIsRunning = false; + + return true; +} + +bool +Sequence::isRunning() { + return this->mIsRunning; } bool diff --git a/src/include/kompute/Manager.hpp b/src/include/kompute/Manager.hpp index 7f95aa9ab..e795bbf2e 100644 --- a/src/include/kompute/Manager.hpp +++ b/src/include/kompute/Manager.hpp @@ -102,7 +102,7 @@ class Manager * * @param tensors The tensors to be used in the operation recorded * @param sequenceName The name of the sequence to be retrieved or created - * @param TArgs Template parameters that will be used to initialise + * @param params Template parameters that will be used to initialise * Operation to allow for extensible configurations on initialisation */ template @@ -110,49 +110,52 @@ class Manager std::string sequenceName, TArgs&&... params) { - SPDLOG_DEBUG("Kompute Manager evalOp triggered"); + SPDLOG_DEBUG("Kompute Manager evalOpAsync triggered"); std::weak_ptr sqWeakPtr = this->getOrCreateManagedSequence(sequenceName); if (std::shared_ptr sq = sqWeakPtr.lock()) { - SPDLOG_DEBUG("Kompute Manager evalOp running sequence BEGIN"); + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence BEGIN"); sq->begin(); - SPDLOG_DEBUG("Kompute Manager evalOp running sequence RECORD"); + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence RECORD"); sq->record(tensors, std::forward(params)...); - SPDLOG_DEBUG("Kompute Manager evalOp running sequence END"); + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence END"); sq->end(); - SPDLOG_DEBUG("Kompute Manager evalOp running sequence EVAL"); + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence EVAL"); sq->evalAsync(); } - SPDLOG_DEBUG("Kompute Manager evalOp running sequence SUCCESS"); + SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence SUCCESS"); } /** * Operation that adds extra operations to existing or new created * sequences. * - * @param tensors The tensors to be used in the operation recorded - * @param sequenceName The name of the sequence to be retrieved or created - * @param TArgs Template parameters that will be used to initialise - * Operation to allow for extensible configurations on initialisation + * @param sequenceName The name of the sequence to wait for termination + * @param waitFor The amount of time to wait before timing out */ - template + template void evalOpAwait(std::string sequenceName, uint64_t waitFor = UINT64_MAX) { SPDLOG_DEBUG("Kompute Manager evalOpAwait triggered"); std::unordered_map>::iterator found = - this->mManagedSequences.find(sequenceName); + this->mManagedSequences.find(sequenceName); - if (found == this->mManagedSequences.end()) { + if (found != this->mManagedSequences.end()) { if (std::shared_ptr sq = found->second) { SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence Sequence EVAL AWAIT"); - sq->evalAwait(waitFor); + if (sq->isRunning()) { + sq->evalAwait(waitFor); + } } SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence SUCCESS"); } + else { + SPDLOG_ERROR("Sequence not found"); + } } /** diff --git a/src/include/kompute/Sequence.hpp b/src/include/kompute/Sequence.hpp index 749fdbeeb..2d9ef8e51 100644 --- a/src/include/kompute/Sequence.hpp +++ b/src/include/kompute/Sequence.hpp @@ -88,6 +88,13 @@ class Sequence */ bool isRecording(); + /** + * Returns true if the sequence is currently running - mostly used for async workloads. + * + * @return Boolean stating if currently running. + */ + bool isRunning(); + /** * Returns true if the sequence has been successfully initialised. * @@ -164,7 +171,7 @@ class Sequence // State bool mIsInit = false; bool mRecording = false; - bool mEvalBusy = false; + bool mIsRunning = false; // Create functions void createCommandPool(); diff --git a/src/include/kompute/operations/OpAlgoBase.hpp b/src/include/kompute/operations/OpAlgoBase.hpp index 3d1de2992..a32e20011 100644 --- a/src/include/kompute/operations/OpAlgoBase.hpp +++ b/src/include/kompute/operations/OpAlgoBase.hpp @@ -162,7 +162,7 @@ OpAlgoBase::OpAlgoBase(std::shared_ptr physicalD std::vector>& tensors) : OpBase(physicalDevice, device, commandBuffer, tensors, false) { - SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {} , shaderFilePath: {}", tensors.size()); + SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {}", tensors.size()); // The dispatch size is set up based on either explicitly provided template // parameters or by default it would take the shape and size of the tensors