Fifo.vala 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. namespace Invercargill.DataStructures {
  2. public class Fifo<T> : Enumerable<T>, Queue<T> {
  3. public bool is_unblocked { get { return !is_blocking; } }
  4. private bool is_blocking = true;
  5. private FifoItem<T>* first_item = null;
  6. private FifoItem<T>* last_item = null;
  7. public override int? peek_count() {
  8. return null;
  9. }
  10. public override EnumerableInfo get_info() {
  11. return new EnumerableInfo.infer_ultimate (this, EnumerableCategory.EXTERNAL);
  12. }
  13. public void unblock() {
  14. mutex.lock();
  15. is_blocking = false;
  16. cond.broadcast();
  17. mutex.unlock();
  18. }
  19. public void push(owned T item) {
  20. mutex.lock();
  21. FifoItem<T>* fifo_item = new FifoItem<T>() {
  22. item = item,
  23. };
  24. if(first_item == null) {
  25. first_item = fifo_item;
  26. last_item = fifo_item;
  27. }
  28. else {
  29. last_item->next_item = fifo_item;
  30. last_item = fifo_item;
  31. }
  32. cond.broadcast();
  33. mutex.unlock();
  34. }
  35. public void push_all(Enumerable<T> items) {
  36. mutex.lock();
  37. foreach (var item in items) {
  38. FifoItem<T>* fifo_item = new FifoItem<T>() {
  39. item = item,
  40. };
  41. if(first_item == null) {
  42. first_item = fifo_item;
  43. last_item = fifo_item;
  44. }
  45. else {
  46. last_item->next_item = fifo_item;
  47. last_item = fifo_item;
  48. }
  49. }
  50. cond.broadcast();
  51. mutex.unlock();
  52. }
  53. public T pop() throws SequenceError {
  54. mutex.lock();
  55. while(is_blocking && first_item == null){
  56. cond.wait(mutex);
  57. }
  58. var item = first_item;
  59. if(item != null) {
  60. first_item = item->next_item;
  61. }
  62. mutex.unlock();
  63. if(item == null) {
  64. throw new SequenceError.NO_ELEMENTS("No elements in unblocked queue to pop");
  65. }
  66. var result = item->item;
  67. delete item;
  68. return result;
  69. }
  70. public T peek() throws SequenceError {
  71. var item = first_item;
  72. if(item == null) {
  73. throw new SequenceError.NO_ELEMENTS("No elements in queue");
  74. }
  75. return item->item;
  76. }
  77. public void push_start(owned T item) {
  78. mutex.lock();
  79. FifoItem<T>* fifo_item = new FifoItem<T>() {
  80. next_item = first_item,
  81. item = item,
  82. };
  83. if(first_item == null) {
  84. last_item = fifo_item;
  85. }
  86. first_item = fifo_item;
  87. cond.broadcast();
  88. mutex.unlock();
  89. }
  90. private bool has_next() {
  91. mutex.lock();
  92. while(is_blocking && first_item == null){
  93. cond.wait(mutex);
  94. }
  95. var state = first_item != null;
  96. mutex.unlock();
  97. return state;
  98. }
  99. private T get_next() {
  100. mutex.lock();
  101. var item = first_item;
  102. first_item = item->next_item;
  103. mutex.unlock();
  104. var result = item->item;
  105. delete item;
  106. return result;
  107. }
  108. private Cond cond = Cond ();
  109. private Mutex mutex = Mutex ();
  110. public override Tracker<T> get_tracker () {
  111. return new LambdaTracker<T>(
  112. () => has_next(),
  113. () => get_next());
  114. }
  115. ~Fifo() {
  116. var n = first_item;
  117. while(n != null) {
  118. var c = n;
  119. n = n->next_item;
  120. delete c;
  121. }
  122. }
  123. [Compact]
  124. private class FifoItem<T> {
  125. public T item;
  126. public FifoItem<T>* next_item;
  127. }
  128. }
  129. }