AstarParallel.cs 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #if UNITY_WEBGL && !UNITY_EDITOR
  2. #define SINGLE_THREAD
  3. #endif
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. namespace Pathfinding.Util {
  7. /// <summary>
  8. /// Helper for parallelizing tasks.
  9. /// More specifically this class is useful if the tasks need some large and slow to initialize 'scratch pad'.
  10. /// Using this class you can initialize a scratch pad per thread and then use the appropriate one in the task
  11. /// callback (which includes a thread index).
  12. ///
  13. /// Any exception that is thrown in the worker threads will be propagated out to the caller of the <see cref="Run"/> method.
  14. /// </summary>
  15. public class ParallelWorkQueue<T> {
  16. /// <summary>
  17. /// Callback to run for each item in the queue.
  18. /// The callback takes the item as the first parameter and the thread index as the second parameter.
  19. /// </summary>
  20. public System.Action<T, int> action;
  21. /// <summary>Number of threads to use</summary>
  22. public readonly int threadCount;
  23. /// <summary>Queue of items</summary>
  24. readonly Queue<T> queue;
  25. readonly int initialCount;
  26. #if !SINGLE_THREAD
  27. ManualResetEvent[] waitEvents;
  28. System.Exception innerException;
  29. #endif
  30. public ParallelWorkQueue (Queue<T> queue) {
  31. this.queue = queue;
  32. initialCount = queue.Count;
  33. #if SINGLE_THREAD
  34. threadCount = 1;
  35. #else
  36. threadCount = System.Math.Min(initialCount, System.Math.Max(1, AstarPath.CalculateThreadCount(ThreadCount.AutomaticHighLoad)));
  37. #endif
  38. }
  39. /// <summary>Execute the tasks.</summary>
  40. /// <param name="progressTimeoutMillis">This iterator will yield approximately every progressTimeoutMillis milliseconds.
  41. /// This can be used to e.g show a progress bar.</param>
  42. public IEnumerable<int> Run (int progressTimeoutMillis) {
  43. if (initialCount != queue.Count) throw new System.InvalidOperationException("Queue has been modified since the constructor");
  44. // Return early if there are no items in the queue.
  45. // This is important because WaitHandle.WaitAll with an array of length zero
  46. // results in weird behaviour (Microsoft's .Net returns false, Mono returns true
  47. // and the documentation says it should throw an exception).
  48. if (initialCount == 0) yield break;
  49. #if SINGLE_THREAD
  50. // WebGL does not support multithreading so we will do everything on the main thread instead
  51. for (int i = 0; i < initialCount; i++) {
  52. action(queue.Dequeue(), 0);
  53. yield return i + 1;
  54. }
  55. #else
  56. // Fire up a bunch of threads to scan the graph in parallel
  57. waitEvents = new ManualResetEvent[threadCount];
  58. for (int i = 0; i < waitEvents.Length; i++) {
  59. waitEvents[i] = new ManualResetEvent(false);
  60. #if NETFX_CORE
  61. // Need to make a copy here, otherwise it may refer to some other index when the task actually runs.
  62. int threadIndex = i;
  63. System.Threading.Tasks.Task.Run(() => RunTask(threadIndex));
  64. #else
  65. ThreadPool.QueueUserWorkItem(threadIndex => RunTask((int)threadIndex), i);
  66. #endif
  67. }
  68. while (!WaitHandle.WaitAll(waitEvents, progressTimeoutMillis)) {
  69. int count;
  70. lock (queue) count = queue.Count;
  71. yield return initialCount - count;
  72. }
  73. if (innerException != null) throw innerException;
  74. #endif
  75. }
  76. #if !SINGLE_THREAD
  77. void RunTask (int threadIndex) {
  78. try {
  79. while (true) {
  80. T tile;
  81. lock (queue) {
  82. if (queue.Count == 0) return;
  83. tile = queue.Dequeue();
  84. }
  85. action(tile, threadIndex);
  86. }
  87. } catch (System.Exception e) {
  88. innerException = e;
  89. // Stop the remaining threads
  90. lock (queue) queue.Clear();
  91. } finally {
  92. waitEvents[threadIndex].Set();
  93. }
  94. }
  95. #endif
  96. }
  97. }