diff --git a/README.md b/README.md
index ff15caf44..fdd721af8 100644
--- a/README.md
+++ b/README.md
@@ -23,7 +23,7 @@
-
Blazing fast, lightweight, mobile-enabled, and optimized for advanced GPU processing usecases.
+Blazing fast, mobile-enabled, asynchronous, and optimized for advanced GPU processing usecases.
🔋 [Documentation](https://kompute.cc) 💻 [Blog Post](https://medium.com/@AxSaucedo/machine-learning-and-data-processing-in-the-gpu-with-vulkan-kompute-c9350e5e5d3a) ⌨ [Examples](#more-examples) 💾
@@ -34,6 +34,7 @@
* [Documentation](https://kompute.cc) leveraging doxygen and sphinx
* BYOV: Bring-your-own-Vulkan design to play nice with existing Vulkan applications
* Non-Vulkan core naming conventions to disambiguate Vulkan vs Kompute components
+* Asynchronous processing capabilities with granular mult-queue workload processing
* Fast development cycles with shader tooling, but robust static shader binary bundles for prod
* Explicit relationships for GPU and host memory ownership and memory management
* End-to-end examples for [machine learning 🤖](https://towardsdatascience.com/machine-learning-and-data-processing-in-the-gpu-with-vulkan-kompute-c9350e5e5d3a), [mobile development 📱](https://towardsdatascience.com/gpu-accelerated-machine-learning-in-your-mobile-applications-using-the-android-ndk-vulkan-kompute-1e9da37b7617), [game development 🎮](https://towardsdatascience.com/supercharging-game-development-with-gpu-accelerated-ml-using-vulkan-kompute-the-godot-game-engine-4e75a84ea9f0).
@@ -110,8 +111,10 @@ We are currently developing Vulkan Kompute not to hide the Vulkan SDK interface
### Simple examples
* [Pass shader as raw string](#your-first-kompute)
-* [Create your custom Kompute Operations](#your-custom-kompute-operation)
* [Record batch commands with a Kompute Sequence](#record-batch-commands)
+* [Run Asynchronous Operations](#asynchronous-operations)
+* [Run Parallel Operations Across Multiple GPU Queues](#parallel-operations)
+* [Create your custom Kompute Operations](#your-custom-kompute-operation)
### End-to-end examples
@@ -119,51 +122,6 @@ We are currently developing Vulkan Kompute not to hide the Vulkan SDK interface
* [Android NDK Mobile Kompute ML Application](https://towardsdatascience.com/gpu-accelerated-machine-learning-in-your-mobile-applications-using-the-android-ndk-vulkan-kompute-1e9da37b7617)
* [Game Development Kompute ML in Godot Engine](https://towardsdatascience.com/supercharging-game-development-with-gpu-accelerated-ml-using-vulkan-kompute-the-godot-game-engine-4e75a84ea9f0)
-### Your Custom Kompute Operation
-
-Build your own pre-compiled operations for domain specific workflows. Back to [more examples](#simple-examples)
-
-We also provide tools that allow you to [convert shaders into C++ headers](https://github.com/EthicalML/vulkan-kompute/blob/master/scripts/convert_shaders.py#L40).
-
-```c++
-
-template
-class OpMyCustom : public OpAlgoBase
-{
- public:
- OpMyCustom(std::shared_ptr physicalDevice,
- std::shared_ptr device,
- std::shared_ptr commandBuffer,
- std::vector> tensors)
- : OpAlgoBase(physicalDevice, device, commandBuffer, tensors, "")
- {
- // Perform your custom steps such as reading from a shader file
- this->mShaderFilePath = "shaders/glsl/opmult.comp";
- }
-}
-
-
-int main() {
-
- kp::Manager mgr; // Automatically selects Device 0
-
- // Create 3 tensors of default type float
- auto tensorLhs = std::make_shared(kp::Tensor({ 0., 1., 2. }));
- auto tensorRhs = std::make_shared(kp::Tensor({ 2., 4., 6. }));
- auto tensorOut = std::make_shared(kp::Tensor({ 0., 0., 0. }));
-
- // Create tensors data explicitly in GPU with an operation
- mgr.evalOpDefault({ tensorLhs, tensorRhs, tensorOut });
-
- // Run Kompute operation on the parameters provided with dispatch layout
- mgr.evalOpDefault>(
- { tensorLhs, tensorRhs, tensorOut });
-
- // Prints the output which is { 0, 4, 12 }
- std::cout << fmt::format("Output: {}", tensorOutput.data()) << std::endl;
-}
-```
-
#### Record batch commands
Record commands in a single submit by using a Sequence to send in batch to GPU. Back to [more examples](#simple-examples)
@@ -214,6 +172,206 @@ int main() {
}
```
+#### Asynchronous Operations
+
+You can submit operations asynchronously with the async/await commands in the kp::Manager and kp::Sequence, which provides granularity on waiting on the vk::Fence. Back to [more examples](#simple-examples)
+
+```c++
+int main() {
+
+ // You can allow Kompute to create the Vulkan components, or pass your existing ones
+ kp::Manager mgr; // Selects device 0 unless explicitly requested
+
+ // Creates tensor an initializes GPU memory (below we show more granularity)
+ auto tensor = std::make_shared(kp::Tensor(std::vector(10, 0.0)));
+
+ // Create tensors data explicitly in GPU with an operation
+ mgr.evalOpAsyncDefault({ tensor });
+
+ // Define your shader as a string (using string literals for simplicity)
+ // (You can also pass the raw compiled bytes, or even path to file)
+ std::string shader(R"(
+ #version 450
+
+ layout (local_size_x = 1) in;
+
+ layout(set = 0, binding = 0) buffer b { float pb[]; };
+
+ shared uint sharedTotal[1];
+
+ void main() {
+ uint index = gl_GlobalInvocationID.x;
+
+ sharedTotal[0] = 0;
+
+ // Iterating to simulate longer process
+ for (int i = 0; i < 100000000; i++)
+ {
+ atomicAdd(sharedTotal[0], 1);
+ }
+
+ pb[index] = sharedTotal[0];
+ }
+ )");
+
+ // We can now await for the previous submitted command
+ // The first parameter can be the amount of time to wait
+ // The time provided is in nanoseconds
+ mgr.evalOpAwaitDefault(10000);
+
+ // Run Async Kompute operation on the parameters provided
+ mgr.evalOpAsyncDefault>(
+ { tensor },
+ std::vector(shader.begin(), shader.end()));
+
+ // Here we can do other work
+
+ // When we're ready we can wait
+ // The default wait time is UINT64_MAX
+ mgr.evalOpAwaitDefault()
+
+ // Sync the GPU memory back to the local tensor
+ // We can still run synchronous jobs in our created sequence
+ mgr.evalOpDefault({ tensor });
+
+ // Prints the output: B: { 100000000, ... }
+ std::cout << fmt::format("B: {}",
+ tensor.data()) << std::endl;
+}
+```
+
+#### Parallel Operations
+
+Besides being able to submit asynchronous operations, you can also leverage the underlying GPU compute queues to process operations in parallel.
+
+This will depend on your underlying graphics card, but for example in NVIDIA graphics cards the operations submitted across queues in one family are not parallelizable, but operations submitted across queueFamilies can be parallelizable.
+
+Below we show how you can parallelize operations in an [NVIDIA 1650](http://vulkan.gpuinfo.org/displayreport.php?id=9700#queuefamilies), which has a `GRAPHICS+COMPUTE` family on `index 0`, and `COMPUTE` family on `index 2`.
+
+Back to [more examples](#simple-examples)
+
+```c++
+int main() {
+
+ // In this case we select device 0, and for queues, one queue from familyIndex 0
+ // and one queue from familyIndex 2
+ uint32_t deviceIndex(0);
+ std::vector familyIndeces = {0, 2};
+
+ // We create a manager with device index, and queues by queue family index
+ kp::Manager mgr(deviceIndex, familyIndeces);
+
+ // We need to create explicit sequences with their respective queues
+ // The second parameter is the index in the familyIndex array which is relative
+ // to the vector we created the manager with.
+ mgr.createManagedSequence("queueOne", 0);
+ mgr.createManagedSequence("queueTwo", 1);
+
+ // Creates tensor an initializes GPU memory (below we show more granularity)
+ auto tensorA = std::make_shared(kp::Tensor(std::vector(10, 0.0)));
+ auto tensorB = std::make_shared(kp::Tensor(std::vector(10, 0.0)));
+
+ // We run the first step synchronously on the default sequence
+ mgr.evalOpDefault({ tensorA, tensorB });
+
+ // Define your shader as a string (using string literals for simplicity)
+ // (You can also pass the raw compiled bytes, or even path to file)
+ std::string shader(R"(
+ #version 450
+
+ layout (local_size_x = 1) in;
+
+ layout(set = 0, binding = 0) buffer b { float pb[]; };
+
+ shared uint sharedTotal[1];
+
+ void main() {
+ uint index = gl_GlobalInvocationID.x;
+
+ sharedTotal[0] = 0;
+
+ // Iterating to simulate longer process
+ for (int i = 0; i < 100000000; i++)
+ {
+ atomicAdd(sharedTotal[0], 1);
+ }
+
+ pb[index] = sharedTotal[0];
+ }
+ )");
+
+ // Run the first parallel operation in the `queueOne` sequence
+ mgr.evalOpAsync>(
+ { tensorA },
+ "queueOne",
+ std::vector(shader.begin(), shader.end()));
+
+ // Run the second parallel operation in the `queueTwo` sequence
+ mgr.evalOpAsync>(
+ { tensorB },
+ "queueTwo",
+ std::vector(shader.begin(), shader.end()));
+
+ // Here we can do other work
+
+ // We can now wait for thw two parallel tasks to finish
+ mgr.evalOpAwait("queueOne")
+ mgr.evalOpAwait("queueTwo")
+
+ // Sync the GPU memory back to the local tensor
+ mgr.evalOp({ tensorA, tensorB });
+
+ // Prints the output: A: 100000000 B: 100000000
+ std::cout << fmt::format("A: {}, B: {}",
+ tensorA.data()[0], tensorB.data()[0]) << std::endl;
+}
+```
+
+### Your Custom Kompute Operation
+
+Build your own pre-compiled operations for domain specific workflows. Back to [more examples](#simple-examples)
+
+We also provide tools that allow you to [convert shaders into C++ headers](https://github.com/EthicalML/vulkan-kompute/blob/master/scripts/convert_shaders.py#L40).
+
+```c++
+
+template
+class OpMyCustom : public OpAlgoBase
+{
+ public:
+ OpMyCustom(std::shared_ptr physicalDevice,
+ std::shared_ptr device,
+ std::shared_ptr commandBuffer,
+ std::vector> tensors)
+ : OpAlgoBase(physicalDevice, device, commandBuffer, tensors, "")
+ {
+ // Perform your custom steps such as reading from a shader file
+ this->mShaderFilePath = "shaders/glsl/opmult.comp";
+ }
+}
+
+
+int main() {
+
+ kp::Manager mgr; // Automatically selects Device 0
+
+ // Create 3 tensors of default type float
+ auto tensorLhs = std::make_shared(kp::Tensor({ 0., 1., 2. }));
+ auto tensorRhs = std::make_shared(kp::Tensor({ 2., 4., 6. }));
+ auto tensorOut = std::make_shared(kp::Tensor({ 0., 0., 0. }));
+
+ // Create tensors data explicitly in GPU with an operation
+ mgr.evalOpDefault({ tensorLhs, tensorRhs, tensorOut });
+
+ // Run Kompute operation on the parameters provided with dispatch layout
+ mgr.evalOpDefault>(
+ { tensorLhs, tensorRhs, tensorOut });
+
+ // Prints the output which is { 0, 4, 12 }
+ std::cout << fmt::format("Output: {}", tensorOutput.data()) << std::endl;
+}
+```
+
## Components & Architecture
The core architecture of Kompute include the following:
diff --git a/docs/index.rst b/docs/index.rst
index d29f1fbf6..ca8d59398 100755
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -15,7 +15,8 @@ Index
Mobile App Intergration (Android)
Game Engine Integration (Godot Engine)
Converting GLSL/HLSL Shaders to C++ Headers
- Class Reference
- Memory management principles
+ Asynchronous & Parallel Operations
+ Class Documentation and C++ Reference
+ Memory Management Principles
Code Index
diff --git a/docs/overview/async-parallel.rst b/docs/overview/async-parallel.rst
new file mode 100644
index 000000000..0505424d6
--- /dev/null
+++ b/docs/overview/async-parallel.rst
@@ -0,0 +1,108 @@
+
+Asynchronous and Parallel Operations
+=============
+
+In GPU computing it is possible to have multiple levels of asynchronous and parallel processing of GPU tasks.
+
+It is important to understand the conceptual distinctions of the diffent terminology when using each of these components.
+
+In this section we will cover the following points:
+
+* Asynchronous operation submission
+* Parallel processing of operations
+
+Asynchronous operation submission
+---------------------------------
+
+As the name implies, this refers to the asynchronous submission of operations. This means that operations can be submitted to the GPU, and the C++ / host CPU can continue performing tasks, until when the user desires to run `await` to wait until the operation finishes.
+
+This basically provides further granularity on Vulkan Fences, which is its means to enable the CPU host to know when GPU commands have finished executing.
+
+It is important that submitting tasks asynchronously, does not mean that these will be executed in parallel. Parallel execution of operations will be covered in the following section.
+
+Asynchronous operation submission can be achieved through the kp::Manager, or directly through the kp::Sequence. Below is an example using the Kompute manager.
+
+Async/Await Example
+^^^^^^^^^^^^^^^^^^^^^
+
+Asynchronous job submission is done using `evalOpAsync` and `evalOpAwait` functions.
+
+For simplicity the `evalOpAsyncDefault` and `evalOpAwaitDefault` functions are provided, which can be used similar to the synchronous counterparts (which basically use the default named sequence).
+
+A simple example of asynchronous submission can be found below.
+
+One important thing to bare in mind when using asynchronous submissions, is that you should make sure that any overlapping asynchronous functions are run in separate sequences.
+
+The reason why this is important is that the Await function not only waits for the fence, but also runs the `postEval` functions across all operations, which is required for several operations.
+
+.. code-block:: cpp
+ :linenos:
+
+ // You can allow Kompute to create the Vulkan components, or pass your existing ones
+ kp::Manager mgr; // Selects device 0 unless explicitly requested
+
+ // Creates tensor an initializes GPU memory (below we show more granularity)
+ auto tensor = std::make_shared(kp::Tensor(std::vector(10, 0.0)));
+
+ // Create tensors data explicitly in GPU with an operation
+ mgr.evalOpAsyncDefault({ tensor });
+
+ // Define your shader as a string (using string literals for simplicity)
+ // (You can also pass the raw compiled bytes, or even path to file)
+ std::string shader(R"(
+ #version 450
+
+ layout (local_size_x = 1) in;
+
+ layout(set = 0, binding = 0) buffer b { float pb[]; };
+
+ shared uint sharedTotal[1];
+
+ void main() {
+ uint index = gl_GlobalInvocationID.x;
+
+ sharedTotal[0] = 0;
+
+ // Iterating to simulate longer process
+ for (int i = 0; i < 100000000; i++)
+ {
+ atomicAdd(sharedTotal[0], 1);
+ }
+
+ pb[index] = sharedTotal[0];
+ }
+ )");
+
+ // We can now await for the previous submitted command
+ // The first parameter can be the amount of time to wait
+ // The time provided is in nanoseconds
+ mgr.evalOpAwaitDefault(10000);
+
+ // Run Async Kompute operation on the parameters provided
+ mgr.evalOpAsyncDefault>(
+ { tensor },
+ std::vector(shader.begin(), shader.end()));
+
+ // Here we can do other work
+
+ // When we're ready we can wait
+ // The default wait time is UINT64_MAX
+ mgr.evalOpAwaitDefault()
+
+ // Sync the GPU memory back to the local tensor
+ // We can still run synchronous jobs in our created sequence
+ mgr.evalOpDefault({ tensor });
+
+ // Prints the output: B: { 100000000, ... }
+ std::cout << fmt::format("B: {}",
+ tensor.data()) << std::endl;
+
+
+Parallel Operation Submission
+-----------
+
+In order to work with parallel execution of tasks, it is important that you understand some of the core GPU processing limitations, as these can be quite broad and hardware dependent, which means they will vary across NVIDIA / AMD / ETC video cards.
+
+GPUs by default will optimize towards GPU
+
+
diff --git a/docs/overview/reference.rst b/docs/overview/reference.rst
index f2b8eda18..5bffdcddd 100644
--- a/docs/overview/reference.rst
+++ b/docs/overview/reference.rst
@@ -1,5 +1,5 @@
-Reference
+Class Documentation and C++ Reference
========
This section provides a breakdown of the cpp classes and what each of their functions provide. It is partially generated and augomented from the Doxygen autodoc content. You can also go directly to the `raw doxygen docs <../doxygen/annotated.html>`_.
diff --git a/single_include/kompute/Kompute.hpp b/single_include/kompute/Kompute.hpp
index 2718b6599..bd7b3a125 100755
--- a/single_include/kompute/Kompute.hpp
+++ b/single_include/kompute/Kompute.hpp
@@ -18,7 +18,9 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#ifndef KOMPUTE_VK_API_MINOR_VERSION
#define KOMPUTE_VK_API_MINOR_VERSION 1
#endif // KOMPUTE_VK_API_MINOR_VERSION
-#define KOMPUTE_VK_API_VERSION VK_MAKE_VERSION(KOMPUTE_VK_API_MAJOR_VERSION, KOMPUTE_VK_API_MINOR_VERSION, 0)
+#define KOMPUTE_VK_API_VERSION \
+ VK_MAKE_VERSION( \
+ KOMPUTE_VK_API_MAJOR_VERSION, KOMPUTE_VK_API_MINOR_VERSION, 0)
#endif // KOMPUTE_VK_API_VERSION
// SPDLOG_ACTIVE_LEVEL must be defined before spdlog.h import
@@ -39,7 +41,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_DEBUG(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_DEBUG(message, ...) ((void)__android_log_print(ANDROID_LOG_DEBUG, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_DEBUG(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_DEBUG, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_DEBUG(message, ...) \
std::cout << "DEBUG: " << message << std::endl
@@ -49,7 +52,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_INFO(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_INFO(message, ...) ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_INFO(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_INFO(message, ...) std::cout << "INFO: " << message << std::endl
#endif // VK_USE_PLATFORM_ANDROID_KHR
@@ -58,7 +62,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_WARN(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_WARN(message, ...) ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_WARN(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_WARN(message, ...) \
std::cout << "WARNING: " << message << std::endl
@@ -68,7 +73,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_ERROR(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_ERROR(message, ...) ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_ERROR(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_ERROR(message, ...) \
std::cout << "ERROR: " << message << std::endl
@@ -1033,19 +1039,45 @@ class Sequence
/**
* Begins recording commands for commands to be submitted into the command
* buffer.
+ *
+ * @return Boolean stating whether execution was successful.
*/
bool begin();
+
/**
* Ends the recording and stops recording commands when the record command
* is sent.
+ *
+ * @return Boolean stating whether execution was successful.
*/
bool end();
+
/**
* Eval sends all the recorded and stored operations in the vector of
* operations into the gpu as a submit job with a barrier.
+ *
+ * @return Boolean stating whether execution was successful.
*/
bool eval();
+ /**
+ * Eval Async sends all the recorded and stored operations in the vector of
+ * operations into the gpu as a submit job with a barrier. EvalAwait() must
+ * be called after to ensure the sequence is terminated correctly.
+ *
+ * @return Boolean stating whether execution was successful.
+ */
+ bool evalAsync();
+
+ /**
+ * Eval Await waits for the fence to finish processing and then once it
+ * finishes, it runs the postEval of all operations.
+ *
+ * @param waitFor Number of milliseconds to wait before timing out.
+ * @return Boolean stating whether execution was successful.
+ */
+ bool evalAwait(uint64_t waitFor = UINT64_MAX);
+
/**
* Returns true if the sequence is currently in recording activated.
*
@@ -1053,6 +1085,14 @@ class Sequence
*/
bool isRecording();
+ /**
+ * Returns true if the sequence is currently running - mostly used for async
+ * workloads.
+ *
+ * @return Boolean stating if currently running.
+ */
+ bool isRunning();
+
/**
* Returns true if the sequence has been successfully initialised.
*
@@ -1122,12 +1162,14 @@ class Sequence
std::shared_ptr mCommandBuffer = nullptr;
bool mFreeCommandBuffer = false;
- // Base op objects
+ // -------------- ALWAYS OWNED RESOURCES
+ vk::Fence mFence;
std::vector> mOperations;
// State
bool mIsInit = false;
bool mRecording = false;
+ bool mIsRunning = false;
// Create functions
void createCommandPool();
@@ -1221,19 +1263,25 @@ class Manager
Manager();
/**
- Similar to base constructor but allows the user to provide the device
- they would like to create the resources on.
- */
- Manager(uint32_t physicalDeviceIndex);
+ * Similar to base constructor but allows the user to provide the device
+ * they would like to create the resources on.
+ *
+ * @param physicalDeviceIndex The index of the physical device to use
+ * @param familyQueueIndeces (Optional) List of queue indeces to add for
+ * explicit allocation
+ * @param totalQueues The total number of compute queues to create.
+ */
+ Manager(uint32_t physicalDeviceIndex,
+ const std::vector& familyQueueIndeces = {});
/**
* Manager constructor which allows your own vulkan application to integrate
* with the vulkan kompute use.
*
* @param instance Vulkan compute instance to base this application
- * @physicalDevice Vulkan physical device to use for application
- * @device Vulkan logical device to use for all base resources
- * @physicalDeviceIndex Index for vulkan physical device used
+ * @param physicalDevice Vulkan physical device to use for application
+ * @param device Vulkan logical device to use for all base resources
+ * @param physicalDeviceIndex Index for vulkan physical device used
*/
Manager(std::shared_ptr instance,
std::shared_ptr physicalDevice,
@@ -1259,8 +1307,18 @@ class Manager
std::string sequenceName);
/**
- * Operation that adds extra operations to existing or new created
- * sequences.
+ * Create a new managed Kompute sequence so it's available within the
+ * manager.
+ *
+ * @param sequenceName The name for the named sequence to be created
+ * @param queueIndex The queue to use from the available queues
+ * @return Weak pointer to the manager owned sequence resource
+ */
+ std::weak_ptr createManagedSequence(std::string sequenceName,
+ uint32_t queueIndex = 0);
+
+ /**
+ * Function that evaluates operation against named sequence.
*
* @param tensors The tensors to be used in the operation recorded
* @param sequenceName The name of the sequence to be retrieved or created
@@ -1293,8 +1351,7 @@ class Manager
}
/**
- * Operation that adds extra operations to existing or new created
- * sequences.
+ * Function that evaluates operation against default sequence.
*
* @param tensors The tensors to be used in the operation recorded
* @param TArgs Template parameters that will be used to initialise
@@ -1309,6 +1366,103 @@ class Manager
tensors, KP_DEFAULT_SESSION, std::forward(params)...);
}
+ /**
+ * Function that evaluates operation against named sequence asynchronously.
+ *
+ * @param tensors The tensors to be used in the operation recorded
+ * @param sequenceName The name of the sequence to be retrieved or created
+ * @param params Template parameters that will be used to initialise
+ * Operation to allow for extensible configurations on initialisation
+ */
+ template
+ void evalOpAsync(std::vector> tensors,
+ std::string sequenceName,
+ TArgs&&... params)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync triggered");
+ std::weak_ptr sqWeakPtr =
+ this->getOrCreateManagedSequence(sequenceName);
+
+ std::unordered_map>::iterator
+ found = this->mManagedSequences.find(sequenceName);
+
+ if (found != this->mManagedSequences.end()) {
+ std::shared_ptr sq = found->second;
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence BEGIN");
+ sq->begin();
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence RECORD");
+ sq->record(tensors, std::forward(params)...);
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence END");
+ sq->end();
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence EVAL");
+ sq->evalAsync();
+ } else {
+ SPDLOG_ERROR("Kompute Manager evalOpAsync sequence [{}] not found",
+ sequenceName);
+ }
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence SUCCESS");
+ }
+
+ /**
+ * Operation that evaluates operation against default sequence asynchronously.
+ *
+ * @param tensors The tensors to be used in the operation recorded
+ * @param params Template parameters that will be used to initialise
+ * Operation to allow for extensible configurations on initialisation
+ */
+ template
+ void evalOpAsyncDefault(std::vector> tensors,
+ TArgs&&... params)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAsyncDefault triggered");
+ this->evalOpAsync(
+ tensors, KP_DEFAULT_SESSION, std::forward(params)...);
+ }
+
+ /**
+ * Operation that awaits for named sequence to finish.
+ *
+ * @param sequenceName The name of the sequence to wait for termination
+ * @param waitFor The amount of time to wait before timing out
+ */
+ void evalOpAwait(std::string sequenceName, uint64_t waitFor = UINT64_MAX)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAwait triggered with sequence {}", sequenceName);
+ std::unordered_map>::iterator
+ found = this->mManagedSequences.find(sequenceName);
+
+ if (found != this->mManagedSequences.end()) {
+ if (std::shared_ptr sq = found->second) {
+ SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence "
+ "Sequence EVAL AWAIT");
+ if (sq->isRunning()) {
+ sq->evalAwait(waitFor);
+ }
+ }
+ SPDLOG_DEBUG(
+ "Kompute Manager evalOpAwait running sequence SUCCESS");
+ } else {
+ SPDLOG_ERROR("Kompute Manager evalOpAwait Sequence not found");
+ }
+ }
+
+ /**
+ * Operation that awaits for default sequence to finish.
+ *
+ * @param tensors The tensors to be used in the operation recorded
+ * @param params Template parameters that will be used to initialise
+ * Operation to allow for extensible configurations on initialisation
+ */
+ void evalOpAwaitDefault(uint64_t waitFor = UINT64_MAX)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAwaitDefault triggered");
+ this->evalOpAwait(KP_DEFAULT_SESSION, waitFor);
+ }
+
/**
* Function that simplifies the common workflow of tensor creation and
* initialization. It will take the constructor parameters for a Tensor
@@ -1342,13 +1496,14 @@ class Manager
uint32_t mPhysicalDeviceIndex = -1;
std::shared_ptr mDevice = nullptr;
bool mFreeDevice = false;
- uint32_t mComputeQueueFamilyIndex = -1;
- std::shared_ptr mComputeQueue = nullptr;
// -------------- ALWAYS OWNED RESOURCES
std::unordered_map>
mManagedSequences;
+ std::vector mComputeQueueFamilyIndeces;
+ std::vector> mComputeQueues;
+
#if DEBUG
#ifndef KOMPUTE_DISABLE_VK_DEBUG_LAYERS
vk::DebugReportCallbackEXT mDebugReportCallback;
@@ -1358,7 +1513,7 @@ class Manager
// Create functions
void createInstance();
- void createDevice();
+ void createDevice(const std::vector& familyQueueIndeces = {});
};
} // End namespace kp
@@ -1599,7 +1754,7 @@ OpAlgoBase::OpAlgoBase(std::shared_ptr physicalD
std::vector>& tensors)
: OpBase(physicalDevice, device, commandBuffer, tensors, false)
{
- SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {} , shaderFilePath: {}", tensors.size());
+ SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {}", tensors.size());
// The dispatch size is set up based on either explicitly provided template
// parameters or by default it would take the shape and size of the tensors
diff --git a/src/Algorithm.cpp b/src/Algorithm.cpp
index 0138201c7..845036475 100644
--- a/src/Algorithm.cpp
+++ b/src/Algorithm.cpp
@@ -266,8 +266,7 @@ Algorithm::createPipeline(std::vector specializationData)
#ifdef KOMPUTE_CREATE_PIPELINE_RESULT_VALUE
vk::ResultValue pipelineResult =
- this->mDevice->createComputePipeline(*this->mPipelineCache,
-pipelineInfo);
+ this->mDevice->createComputePipeline(*this->mPipelineCache, pipelineInfo);
if (pipelineResult.result != vk::Result::eSuccess) {
throw std::runtime_error("Failed to create pipeline result: " +
diff --git a/src/Manager.cpp b/src/Manager.cpp
index 1debb4c9a..b2739d6be 100644
--- a/src/Manager.cpp
+++ b/src/Manager.cpp
@@ -28,12 +28,13 @@ Manager::Manager()
: Manager(0)
{}
-Manager::Manager(uint32_t physicalDeviceIndex)
+Manager::Manager(uint32_t physicalDeviceIndex,
+ const std::vector& familyQueueIndeces)
{
this->mPhysicalDeviceIndex = physicalDeviceIndex;
this->createInstance();
- this->createDevice();
+ this->createDevice(familyQueueIndeces);
}
Manager::Manager(std::shared_ptr instance,
@@ -98,19 +99,31 @@ Manager::getOrCreateManagedSequence(std::string sequenceName)
this->mManagedSequences.find(sequenceName);
if (found == this->mManagedSequences.end()) {
- std::shared_ptr sq =
- std::make_shared(this->mPhysicalDevice,
- this->mDevice,
- this->mComputeQueue,
- this->mComputeQueueFamilyIndex);
- sq->init();
- this->mManagedSequences.insert({ sequenceName, sq });
- return sq;
+ return this->createManagedSequence(sequenceName);
} else {
return found->second;
}
}
+std::weak_ptr
+Manager::createManagedSequence(std::string sequenceName, uint32_t queueIndex)
+{
+
+ SPDLOG_DEBUG("Kompute Manager createManagedSequence with sequenceName: {} "
+ "and queueIndex: {}",
+ sequenceName,
+ queueIndex);
+
+ std::shared_ptr sq =
+ std::make_shared(this->mPhysicalDevice,
+ this->mDevice,
+ this->mComputeQueues[queueIndex],
+ this->mComputeQueueFamilyIndeces[queueIndex]);
+ sq->init();
+ this->mManagedSequences.insert({ sequenceName, sq });
+ return sq;
+}
+
void
Manager::createInstance()
{
@@ -197,7 +210,7 @@ Manager::createInstance()
}
void
-Manager::createDevice()
+Manager::createDevice(const std::vector& familyQueueIndeces)
{
SPDLOG_DEBUG("Kompute Manager creating Device");
@@ -228,45 +241,75 @@ Manager::createDevice()
this->mPhysicalDeviceIndex,
physicalDeviceProperties.deviceName);
- // Find compute queue
- std::vector allQueueFamilyProperties =
- physicalDevice.getQueueFamilyProperties();
+ if (!familyQueueIndeces.size()) {
+ // Find compute queue
+ std::vector allQueueFamilyProperties =
+ physicalDevice.getQueueFamilyProperties();
- this->mComputeQueueFamilyIndex = -1;
- for (uint32_t i = 0; i < allQueueFamilyProperties.size(); i++) {
- vk::QueueFamilyProperties queueFamilyProperties =
- allQueueFamilyProperties[i];
+ uint32_t computeQueueFamilyIndex = -1;
+ for (uint32_t i = 0; i < allQueueFamilyProperties.size(); i++) {
+ vk::QueueFamilyProperties queueFamilyProperties =
+ allQueueFamilyProperties[i];
- if (queueFamilyProperties.queueFlags & vk::QueueFlagBits::eCompute) {
- this->mComputeQueueFamilyIndex = i;
- break;
+ if (queueFamilyProperties.queueFlags &
+ vk::QueueFlagBits::eCompute) {
+ computeQueueFamilyIndex = i;
+ break;
+ }
}
+
+ if (computeQueueFamilyIndex < 0) {
+ throw std::runtime_error("Compute queue is not supported");
+ }
+
+ this->mComputeQueueFamilyIndeces.push_back(computeQueueFamilyIndex);
+ } else {
+ this->mComputeQueueFamilyIndeces = familyQueueIndeces;
}
- if (this->mComputeQueueFamilyIndex < 0) {
- throw std::runtime_error("Compute queue is not supported");
+ std::unordered_map familyQueueCounts;
+ std::unordered_map> familyQueuePriorities;
+ for (const auto& value : this->mComputeQueueFamilyIndeces) {
+ familyQueueCounts[value]++;
+ familyQueuePriorities[value].push_back(1.0f);
}
- const float defaultQueuePriority(0.0f);
- const uint32_t defaultQueueCount(1);
- vk::DeviceQueueCreateInfo deviceQueueCreateInfo(
- vk::DeviceQueueCreateFlags(),
- this->mComputeQueueFamilyIndex,
- defaultQueueCount,
- &defaultQueuePriority);
+ std::unordered_map familyQueueIndexCount;
+ std::vector deviceQueueCreateInfos;
+ for (const auto& familyQueueInfo : familyQueueCounts) {
+ // Setting the device count to 0
+ familyQueueIndexCount[familyQueueInfo.first] = 0;
+
+ // Creating the respective device queue
+ vk::DeviceQueueCreateInfo deviceQueueCreateInfo(
+ vk::DeviceQueueCreateFlags(),
+ familyQueueInfo.first,
+ familyQueueInfo.second,
+ familyQueuePriorities[familyQueueInfo.first].data());
+ deviceQueueCreateInfos.push_back(deviceQueueCreateInfo);
+ }
vk::DeviceCreateInfo deviceCreateInfo(vk::DeviceCreateFlags(),
- 1, // Number of deviceQueueCreateInfo
- &deviceQueueCreateInfo);
+ deviceQueueCreateInfos.size(),
+ deviceQueueCreateInfos.data());
this->mDevice = std::make_shared();
physicalDevice.createDevice(
&deviceCreateInfo, nullptr, this->mDevice.get());
SPDLOG_DEBUG("Kompute Manager device created");
- this->mComputeQueue = std::make_shared();
- this->mDevice->getQueue(
- this->mComputeQueueFamilyIndex, 0, this->mComputeQueue.get());
+ for (const uint32_t& familyQueueIndex : this->mComputeQueueFamilyIndeces) {
+ std::shared_ptr currQueue = std::make_shared();
+
+ this->mDevice->getQueue(familyQueueIndex,
+ familyQueueIndexCount[familyQueueIndex],
+ currQueue.get());
+
+ familyQueueIndexCount[familyQueueIndex]++;
+
+ this->mComputeQueues.push_back(currQueue);
+ }
+
SPDLOG_DEBUG("Kompute Manager compute queue obtained");
}
diff --git a/src/Sequence.cpp b/src/Sequence.cpp
index 7f2c63a44..a03a34afe 100644
--- a/src/Sequence.cpp
+++ b/src/Sequence.cpp
@@ -118,40 +118,86 @@ Sequence::end()
bool
Sequence::eval()
{
- SPDLOG_DEBUG("Kompute sequence compute recording EVAL");
+ SPDLOG_DEBUG("Kompute sequence EVAL BEGIN");
- if (this->isRecording()) {
- SPDLOG_WARN("Kompute Sequence eval called when still recording");
+ bool evalResult = this->evalAsync();
+ if (!evalResult) {
+ SPDLOG_DEBUG("Kompute sequence EVAL FAILURE");
return false;
}
+ evalResult = this->evalAwait();
+
+ SPDLOG_DEBUG("Kompute sequence EVAL SUCCESS");
+
+ return evalResult;
+}
+
+bool
+Sequence::evalAsync()
+{
+ if (this->isRecording()) {
+ SPDLOG_WARN("Kompute Sequence evalAsync called when still recording");
+ return false;
+ }
+ if (this->mIsRunning) {
+ SPDLOG_WARN("Kompute Sequence evalAsync called when an eval async was "
+ "called without successful wait");
+ return false;
+ }
+
+ this->mIsRunning = true;
+
for (size_t i = 0; i < this->mOperations.size(); i++) {
this->mOperations[i]->preEval();
}
- const vk::PipelineStageFlags waitStageMask =
- vk::PipelineStageFlagBits::eTransfer;
vk::SubmitInfo submitInfo(
- 0, nullptr, &waitStageMask, 1, this->mCommandBuffer.get());
+ 0, nullptr, nullptr, 1, this->mCommandBuffer.get());
- vk::Fence fence = this->mDevice->createFence(vk::FenceCreateInfo());
+ this->mFence = this->mDevice->createFence(vk::FenceCreateInfo());
SPDLOG_DEBUG(
"Kompute sequence submitting command buffer into compute queue");
- this->mComputeQueue->submit(1, &submitInfo, fence);
- this->mDevice->waitForFences(1, &fence, VK_TRUE, UINT64_MAX);
- this->mDevice->destroy(fence);
+ this->mComputeQueue->submit(1, &submitInfo, this->mFence);
+
+ return true;
+}
+
+bool
+Sequence::evalAwait(uint64_t waitFor)
+{
+ if (!this->mIsRunning) {
+ SPDLOG_WARN("Kompute Sequence evalAwait called without existing eval");
+ return false;
+ }
+
+ vk::Result result =
+ this->mDevice->waitForFences(1, &this->mFence, VK_TRUE, waitFor);
+ this->mDevice->destroy(this->mFence);
+
+ if (result == vk::Result::eTimeout) {
+ SPDLOG_WARN("Kompute Sequence evalAwait timed out");
+ this->mIsRunning = false;
+ return false;
+ }
for (size_t i = 0; i < this->mOperations.size(); i++) {
this->mOperations[i]->postEval();
}
- SPDLOG_DEBUG("Kompute sequence EVAL success");
+ this->mIsRunning = false;
return true;
}
+bool
+Sequence::isRunning()
+{
+ return this->mIsRunning;
+}
+
bool
Sequence::isRecording()
{
diff --git a/src/include/kompute/Core.hpp b/src/include/kompute/Core.hpp
index dfa83d7a5..72ffa5346 100644
--- a/src/include/kompute/Core.hpp
+++ b/src/include/kompute/Core.hpp
@@ -18,10 +18,11 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#ifndef KOMPUTE_VK_API_MINOR_VERSION
#define KOMPUTE_VK_API_MINOR_VERSION 1
#endif // KOMPUTE_VK_API_MINOR_VERSION
-#define KOMPUTE_VK_API_VERSION VK_MAKE_VERSION(KOMPUTE_VK_API_MAJOR_VERSION, KOMPUTE_VK_API_MINOR_VERSION, 0)
+#define KOMPUTE_VK_API_VERSION \
+ VK_MAKE_VERSION( \
+ KOMPUTE_VK_API_MAJOR_VERSION, KOMPUTE_VK_API_MINOR_VERSION, 0)
#endif // KOMPUTE_VK_API_VERSION
-
// SPDLOG_ACTIVE_LEVEL must be defined before spdlog.h import
#if !defined(SPDLOG_ACTIVE_LEVEL)
#if DEBUG
@@ -40,7 +41,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_DEBUG(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_DEBUG(message, ...) ((void)__android_log_print(ANDROID_LOG_DEBUG, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_DEBUG(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_DEBUG, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_DEBUG(message, ...) \
std::cout << "DEBUG: " << message << std::endl
@@ -50,7 +52,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_INFO(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_INFO(message, ...) ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_INFO(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_INFO(message, ...) std::cout << "INFO: " << message << std::endl
#endif // VK_USE_PLATFORM_ANDROID_KHR
@@ -59,7 +62,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_WARN(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_WARN(message, ...) ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_WARN(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_WARN(message, ...) \
std::cout << "WARNING: " << message << std::endl
@@ -69,7 +73,8 @@ static const char* KOMPUTE_LOG_TAG = "KomputeLog";
#define SPDLOG_ERROR(message, ...)
#else
#if defined(VK_USE_PLATFORM_ANDROID_KHR)
-#define SPDLOG_ERROR(message, ...) ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
+#define SPDLOG_ERROR(message, ...) \
+ ((void)__android_log_print(ANDROID_LOG_INFO, KOMPUTE_LOG_TAG, message))
#else
#define SPDLOG_ERROR(message, ...) \
std::cout << "ERROR: " << message << std::endl
diff --git a/src/include/kompute/Manager.hpp b/src/include/kompute/Manager.hpp
index e2c7832fd..969af9ac5 100644
--- a/src/include/kompute/Manager.hpp
+++ b/src/include/kompute/Manager.hpp
@@ -25,19 +25,25 @@ class Manager
Manager();
/**
- Similar to base constructor but allows the user to provide the device
- they would like to create the resources on.
- */
- Manager(uint32_t physicalDeviceIndex);
+ * Similar to base constructor but allows the user to provide the device
+ * they would like to create the resources on.
+ *
+ * @param physicalDeviceIndex The index of the physical device to use
+ * @param familyQueueIndeces (Optional) List of queue indeces to add for
+ * explicit allocation
+ * @param totalQueues The total number of compute queues to create.
+ */
+ Manager(uint32_t physicalDeviceIndex,
+ const std::vector& familyQueueIndeces = {});
/**
* Manager constructor which allows your own vulkan application to integrate
* with the vulkan kompute use.
*
* @param instance Vulkan compute instance to base this application
- * @physicalDevice Vulkan physical device to use for application
- * @device Vulkan logical device to use for all base resources
- * @physicalDeviceIndex Index for vulkan physical device used
+ * @param physicalDevice Vulkan physical device to use for application
+ * @param device Vulkan logical device to use for all base resources
+ * @param physicalDeviceIndex Index for vulkan physical device used
*/
Manager(std::shared_ptr instance,
std::shared_ptr physicalDevice,
@@ -63,8 +69,18 @@ class Manager
std::string sequenceName);
/**
- * Operation that adds extra operations to existing or new created
- * sequences.
+ * Create a new managed Kompute sequence so it's available within the
+ * manager.
+ *
+ * @param sequenceName The name for the named sequence to be created
+ * @param queueIndex The queue to use from the available queues
+ * @return Weak pointer to the manager owned sequence resource
+ */
+ std::weak_ptr createManagedSequence(std::string sequenceName,
+ uint32_t queueIndex = 0);
+
+ /**
+ * Function that evaluates operation against named sequence.
*
* @param tensors The tensors to be used in the operation recorded
* @param sequenceName The name of the sequence to be retrieved or created
@@ -97,8 +113,7 @@ class Manager
}
/**
- * Operation that adds extra operations to existing or new created
- * sequences.
+ * Function that evaluates operation against default sequence.
*
* @param tensors The tensors to be used in the operation recorded
* @param TArgs Template parameters that will be used to initialise
@@ -113,6 +128,103 @@ class Manager
tensors, KP_DEFAULT_SESSION, std::forward(params)...);
}
+ /**
+ * Function that evaluates operation against named sequence asynchronously.
+ *
+ * @param tensors The tensors to be used in the operation recorded
+ * @param sequenceName The name of the sequence to be retrieved or created
+ * @param params Template parameters that will be used to initialise
+ * Operation to allow for extensible configurations on initialisation
+ */
+ template
+ void evalOpAsync(std::vector> tensors,
+ std::string sequenceName,
+ TArgs&&... params)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync triggered");
+ std::weak_ptr sqWeakPtr =
+ this->getOrCreateManagedSequence(sequenceName);
+
+ std::unordered_map>::iterator
+ found = this->mManagedSequences.find(sequenceName);
+
+ if (found != this->mManagedSequences.end()) {
+ std::shared_ptr sq = found->second;
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence BEGIN");
+ sq->begin();
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence RECORD");
+ sq->record(tensors, std::forward(params)...);
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence END");
+ sq->end();
+
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence EVAL");
+ sq->evalAsync();
+ } else {
+ SPDLOG_ERROR("Kompute Manager evalOpAsync sequence [{}] not found",
+ sequenceName);
+ }
+ SPDLOG_DEBUG("Kompute Manager evalOpAsync running sequence SUCCESS");
+ }
+
+ /**
+ * Operation that evaluates operation against default sequence asynchronously.
+ *
+ * @param tensors The tensors to be used in the operation recorded
+ * @param params Template parameters that will be used to initialise
+ * Operation to allow for extensible configurations on initialisation
+ */
+ template
+ void evalOpAsyncDefault(std::vector> tensors,
+ TArgs&&... params)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAsyncDefault triggered");
+ this->evalOpAsync(
+ tensors, KP_DEFAULT_SESSION, std::forward(params)...);
+ }
+
+ /**
+ * Operation that awaits for named sequence to finish.
+ *
+ * @param sequenceName The name of the sequence to wait for termination
+ * @param waitFor The amount of time to wait before timing out
+ */
+ void evalOpAwait(std::string sequenceName, uint64_t waitFor = UINT64_MAX)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAwait triggered with sequence {}", sequenceName);
+ std::unordered_map>::iterator
+ found = this->mManagedSequences.find(sequenceName);
+
+ if (found != this->mManagedSequences.end()) {
+ if (std::shared_ptr sq = found->second) {
+ SPDLOG_DEBUG("Kompute Manager evalOpAwait running sequence "
+ "Sequence EVAL AWAIT");
+ if (sq->isRunning()) {
+ sq->evalAwait(waitFor);
+ }
+ }
+ SPDLOG_DEBUG(
+ "Kompute Manager evalOpAwait running sequence SUCCESS");
+ } else {
+ SPDLOG_ERROR("Kompute Manager evalOpAwait Sequence not found");
+ }
+ }
+
+ /**
+ * Operation that awaits for default sequence to finish.
+ *
+ * @param tensors The tensors to be used in the operation recorded
+ * @param params Template parameters that will be used to initialise
+ * Operation to allow for extensible configurations on initialisation
+ */
+ void evalOpAwaitDefault(uint64_t waitFor = UINT64_MAX)
+ {
+ SPDLOG_DEBUG("Kompute Manager evalOpAwaitDefault triggered");
+ this->evalOpAwait(KP_DEFAULT_SESSION, waitFor);
+ }
+
/**
* Function that simplifies the common workflow of tensor creation and
* initialization. It will take the constructor parameters for a Tensor
@@ -146,13 +258,14 @@ class Manager
uint32_t mPhysicalDeviceIndex = -1;
std::shared_ptr mDevice = nullptr;
bool mFreeDevice = false;
- uint32_t mComputeQueueFamilyIndex = -1;
- std::shared_ptr mComputeQueue = nullptr;
// -------------- ALWAYS OWNED RESOURCES
std::unordered_map>
mManagedSequences;
+ std::vector mComputeQueueFamilyIndeces;
+ std::vector> mComputeQueues;
+
#if DEBUG
#ifndef KOMPUTE_DISABLE_VK_DEBUG_LAYERS
vk::DebugReportCallbackEXT mDebugReportCallback;
@@ -162,7 +275,7 @@ class Manager
// Create functions
void createInstance();
- void createDevice();
+ void createDevice(const std::vector& familyQueueIndeces = {});
};
} // End namespace kp
diff --git a/src/include/kompute/Sequence.hpp b/src/include/kompute/Sequence.hpp
index 6e4a4ab4f..314de6657 100644
--- a/src/include/kompute/Sequence.hpp
+++ b/src/include/kompute/Sequence.hpp
@@ -45,19 +45,45 @@ class Sequence
/**
* Begins recording commands for commands to be submitted into the command
* buffer.
+ *
+ * @return Boolean stating whether execution was successful.
*/
bool begin();
+
/**
* Ends the recording and stops recording commands when the record command
* is sent.
+ *
+ * @return Boolean stating whether execution was successful.
*/
bool end();
+
/**
* Eval sends all the recorded and stored operations in the vector of
* operations into the gpu as a submit job with a barrier.
+ *
+ * @return Boolean stating whether execution was successful.
*/
bool eval();
+ /**
+ * Eval Async sends all the recorded and stored operations in the vector of
+ * operations into the gpu as a submit job with a barrier. EvalAwait() must
+ * be called after to ensure the sequence is terminated correctly.
+ *
+ * @return Boolean stating whether execution was successful.
+ */
+ bool evalAsync();
+
+ /**
+ * Eval Await waits for the fence to finish processing and then once it
+ * finishes, it runs the postEval of all operations.
+ *
+ * @param waitFor Number of milliseconds to wait before timing out.
+ * @return Boolean stating whether execution was successful.
+ */
+ bool evalAwait(uint64_t waitFor = UINT64_MAX);
+
/**
* Returns true if the sequence is currently in recording activated.
*
@@ -65,6 +91,14 @@ class Sequence
*/
bool isRecording();
+ /**
+ * Returns true if the sequence is currently running - mostly used for async
+ * workloads.
+ *
+ * @return Boolean stating if currently running.
+ */
+ bool isRunning();
+
/**
* Returns true if the sequence has been successfully initialised.
*
@@ -134,12 +168,14 @@ class Sequence
std::shared_ptr mCommandBuffer = nullptr;
bool mFreeCommandBuffer = false;
- // Base op objects
+ // -------------- ALWAYS OWNED RESOURCES
+ vk::Fence mFence;
std::vector> mOperations;
// State
bool mIsInit = false;
bool mRecording = false;
+ bool mIsRunning = false;
// Create functions
void createCommandPool();
diff --git a/src/include/kompute/operations/OpAlgoBase.hpp b/src/include/kompute/operations/OpAlgoBase.hpp
index 3d1de2992..a32e20011 100644
--- a/src/include/kompute/operations/OpAlgoBase.hpp
+++ b/src/include/kompute/operations/OpAlgoBase.hpp
@@ -162,7 +162,7 @@ OpAlgoBase::OpAlgoBase(std::shared_ptr physicalD
std::vector>& tensors)
: OpBase(physicalDevice, device, commandBuffer, tensors, false)
{
- SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {} , shaderFilePath: {}", tensors.size());
+ SPDLOG_DEBUG("Kompute OpAlgoBase constructor with params numTensors: {}", tensors.size());
// The dispatch size is set up based on either explicitly provided template
// parameters or by default it would take the shape and size of the tensors
diff --git a/test/TestAsyncOperations.cpp b/test/TestAsyncOperations.cpp
new file mode 100644
index 000000000..5c8612fc3
--- /dev/null
+++ b/test/TestAsyncOperations.cpp
@@ -0,0 +1,164 @@
+
+#include "gtest/gtest.h"
+
+#include
+
+#include "kompute/Kompute.hpp"
+
+TEST(TestAsyncOperations, TestManagerParallelExecution)
+{
+ // This test is built for NVIDIA 1650. It assumes:
+ // * Queue family 0 and 2 have compute capabilities
+ // * GPU is able to process parallel shader code across different families
+ uint32_t size = 10;
+
+ uint32_t numParallel = 2;
+
+ std::string shader(R"(
+ #version 450
+
+ layout (local_size_x = 1) in;
+
+ layout(set = 0, binding = 0) buffer b { float pb[]; };
+
+ shared uint sharedTotal[1];
+
+ void main() {
+ uint index = gl_GlobalInvocationID.x;
+
+ sharedTotal[0] = 0;
+
+ for (int i = 0; i < 100000000; i++)
+ {
+ atomicAdd(sharedTotal[0], 1);
+ }
+
+ pb[index] = sharedTotal[0];
+ }
+ )");
+
+ std::vector data(size, 0.0);
+ std::vector resultSync(size, 100000000);
+ std::vector resultAsync(size, 100000000);
+
+ kp::Manager mgr;
+
+ std::vector> inputsSyncB;
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ inputsSyncB.push_back(std::make_shared(kp::Tensor(data)));
+ }
+
+ mgr.evalOpDefault(inputsSyncB);
+
+ auto startSync = std::chrono::high_resolution_clock::now();
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ mgr.evalOpDefault>(
+ { inputsSyncB[i] },
+ std::vector(shader.begin(), shader.end()));
+ }
+
+ auto endSync = std::chrono::high_resolution_clock::now();
+ auto durationSync =
+ std::chrono::duration_cast(endSync - startSync)
+ .count();
+
+ mgr.evalOpDefault(inputsSyncB);
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ EXPECT_EQ(inputsSyncB[i]->data(), resultSync);
+ }
+
+ kp::Manager mgrAsync(0, { 0, 2 });
+
+ std::vector> inputsAsyncB;
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ inputsAsyncB.push_back(std::make_shared(kp::Tensor(data)));
+ }
+
+ mgrAsync.evalOpDefault(inputsAsyncB);
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ mgrAsync.createManagedSequence("async" + std::to_string(i), i);
+ }
+
+ auto startAsync = std::chrono::high_resolution_clock::now();
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ mgrAsync.evalOpAsync>(
+ { inputsAsyncB[i] },
+ "async" + std::to_string(i),
+ std::vector(shader.begin(), shader.end()));
+ }
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ mgrAsync.evalOpAwait("async" + std::to_string(i));
+ }
+
+ auto endAsync = std::chrono::high_resolution_clock::now();
+ auto durationAsync = std::chrono::duration_cast(
+ endAsync - startAsync)
+ .count();
+
+ mgrAsync.evalOpDefault({ inputsAsyncB });
+
+ for (uint32_t i = 0; i < numParallel; i++) {
+ EXPECT_EQ(inputsAsyncB[i]->data(), resultAsync);
+ }
+
+ // The speedup should be at least 40%
+ EXPECT_LT(durationAsync, durationSync * 0.6);
+}
+
+TEST(TestAsyncOperations, TestManagerAsyncExecution)
+{
+ uint32_t size = 10;
+
+ std::string shader(R"(
+ #version 450
+
+ layout (local_size_x = 1) in;
+
+ layout(set = 0, binding = 0) buffer b { float pb[]; };
+
+ shared uint sharedTotal[1];
+
+ void main() {
+ uint index = gl_GlobalInvocationID.x;
+
+ sharedTotal[0] = 0;
+
+ for (int i = 0; i < 100000000; i++)
+ {
+ atomicAdd(sharedTotal[0], 1);
+ }
+
+ pb[index] = sharedTotal[0];
+ }
+ )");
+
+ std::vector data(size, 0.0);
+ std::vector resultAsync(size, 100000000);
+
+ kp::Manager mgrAsync(0);
+
+ std::vector> inputsAsyncB;
+
+ inputsAsyncB.push_back(std::make_shared(kp::Tensor(data)));
+
+ mgrAsync.evalOpAsyncDefault(inputsAsyncB);
+ mgrAsync.evalOpAwaitDefault();
+
+ mgrAsync.evalOpAsyncDefault>(
+ { inputsAsyncB[0] },
+ std::vector(shader.begin(), shader.end()));
+
+ mgrAsync.evalOpAwaitDefault();
+
+ mgrAsync.evalOpAsyncDefault({ inputsAsyncB });
+ mgrAsync.evalOpAwaitDefault();
+
+ EXPECT_EQ(inputsAsyncB[0]->data(), resultAsync);
+}
diff --git a/test/TestMultipleAlgoExecutions.cpp b/test/TestMultipleAlgoExecutions.cpp
index fb0803690..1b59d3516 100644
--- a/test/TestMultipleAlgoExecutions.cpp
+++ b/test/TestMultipleAlgoExecutions.cpp
@@ -258,10 +258,10 @@ TEST(TestMultipleAlgoExecutions, ManagerEvalMultSourceStrOpCreate)
)");
mgr.evalOpDefault>(
- { tensorInA, tensorInB, tensorOut },
- std::vector(shader.begin(), shader.end()));
+ { tensorInA, tensorInB, tensorOut },
+ std::vector(shader.begin(), shader.end()));
- mgr.evalOpDefault({tensorOut});
+ mgr.evalOpDefault({ tensorOut });
EXPECT_EQ(tensorOut->data(), std::vector({ 0.0, 4.0, 12.0 }));
}
@@ -295,10 +295,10 @@ TEST(TestMultipleAlgoExecutions, ManagerEvalMultSourceStrMgrCreate)
)");
mgr.evalOpDefault>(
- { tensorInA, tensorInB, tensorOut },
- std::vector(shader.begin(), shader.end()));
+ { tensorInA, tensorInB, tensorOut },
+ std::vector(shader.begin(), shader.end()));
- mgr.evalOpDefault({tensorOut});
+ mgr.evalOpDefault({ tensorOut });
EXPECT_EQ(tensorOut->data(), std::vector({ 0.0, 4.0, 12.0 }));
}