MapReduce.php 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. <?php
  2. /**
  3. * CakePHP(tm) : Rapid Development Framework (https://cakephp.org)
  4. * Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org)
  5. *
  6. * Licensed under The MIT License
  7. * For full copyright and license information, please see the LICENSE.txt
  8. * Redistributions of files must retain the above copyright notice.
  9. *
  10. * @copyright Copyright (c) Cake Software Foundation, Inc. (https://cakefoundation.org)
  11. * @link https://cakephp.org CakePHP(tm) Project
  12. * @license https://opensource.org/licenses/mit-license.php MIT License
  13. */
  14. namespace Cake\Collection\Iterator;
  15. use ArrayIterator;
  16. use IteratorAggregate;
  17. use LogicException;
  18. use Traversable;
  19. /**
  20. * Implements a simplistic version of the popular Map-Reduce algorithm. Acts
  21. * like an iterator for the original passed data after each result has been
  22. * processed, thus offering a transparent wrapper for results coming from any
  23. * source.
  24. */
  25. class MapReduce implements IteratorAggregate
  26. {
  27. /**
  28. * Holds the shuffled results that were emitted from the map
  29. * phase
  30. *
  31. * @var array
  32. */
  33. protected $_intermediate = [];
  34. /**
  35. * Holds the results as emitted during the reduce phase
  36. *
  37. * @var array
  38. */
  39. protected $_result = [];
  40. /**
  41. * Whether the Map-Reduce routine has been executed already on the data
  42. *
  43. * @var bool
  44. */
  45. protected $_executed = false;
  46. /**
  47. * Holds the original data that needs to be processed
  48. *
  49. * @var \Traversable|null
  50. */
  51. protected $_data;
  52. /**
  53. * A callable that will be executed for each record in the original data
  54. *
  55. * @var callable
  56. */
  57. protected $_mapper;
  58. /**
  59. * A callable that will be executed for each intermediate record emitted during
  60. * the Map phase
  61. *
  62. * @var callable|null
  63. */
  64. protected $_reducer;
  65. /**
  66. * Count of elements emitted during the Reduce phase
  67. *
  68. * @var int
  69. */
  70. protected $_counter = 0;
  71. /**
  72. * Constructor
  73. *
  74. * ### Example:
  75. *
  76. * Separate all unique odd and even numbers in an array
  77. *
  78. * ```
  79. * $data = new \ArrayObject([1, 2, 3, 4, 5, 3]);
  80. * $mapper = function ($value, $key, $mr) {
  81. * $type = ($value % 2 === 0) ? 'even' : 'odd';
  82. * $mr->emitIntermediate($value, $type);
  83. * };
  84. *
  85. * $reducer = function ($numbers, $type, $mr) {
  86. * $mr->emit(array_unique($numbers), $type);
  87. * };
  88. * $results = new MapReduce($data, $mapper, $reducer);
  89. * ```
  90. *
  91. * Previous example will generate the following result:
  92. *
  93. * ```
  94. * ['odd' => [1, 3, 5], 'even' => [2, 4]]
  95. * ```
  96. *
  97. * @param \Traversable $data the original data to be processed
  98. * @param callable $mapper the mapper callback. This function will receive 3 arguments.
  99. * The first one is the current value, second the current results key and third is
  100. * this class instance so you can call the result emitters.
  101. * @param callable|null $reducer the reducer callback. This function will receive 3 arguments.
  102. * The first one is the list of values inside a bucket, second one is the name
  103. * of the bucket that was created during the mapping phase and third one is an
  104. * instance of this class.
  105. */
  106. public function __construct(Traversable $data, callable $mapper, callable $reducer = null)
  107. {
  108. $this->_data = $data;
  109. $this->_mapper = $mapper;
  110. $this->_reducer = $reducer;
  111. }
  112. /**
  113. * Returns an iterator with the end result of running the Map and Reduce
  114. * phases on the original data
  115. *
  116. * @return \ArrayIterator
  117. */
  118. public function getIterator()
  119. {
  120. if (!$this->_executed) {
  121. $this->_execute();
  122. }
  123. return new ArrayIterator($this->_result);
  124. }
  125. /**
  126. * Appends a new record to the bucket labelled with $key, usually as a result
  127. * of mapping a single record from the original data.
  128. *
  129. * @param mixed $val The record itself to store in the bucket
  130. * @param string $bucket the name of the bucket where to put the record
  131. * @return void
  132. */
  133. public function emitIntermediate($val, $bucket)
  134. {
  135. $this->_intermediate[$bucket][] = $val;
  136. }
  137. /**
  138. * Appends a new record to the final list of results and optionally assign a key
  139. * for this record.
  140. *
  141. * @param mixed $val The value to be appended to the final list of results
  142. * @param string|null $key and optional key to assign to the value
  143. * @return void
  144. */
  145. public function emit($val, $key = null)
  146. {
  147. $this->_result[$key === null ? $this->_counter : $key] = $val;
  148. $this->_counter++;
  149. }
  150. /**
  151. * Runs the actual Map-Reduce algorithm. This is iterate the original data
  152. * and call the mapper function for each , then for each intermediate
  153. * bucket created during the Map phase call the reduce function.
  154. *
  155. * @return void
  156. * @throws \LogicException if emitIntermediate was called but no reducer function
  157. * was provided
  158. */
  159. protected function _execute()
  160. {
  161. $mapper = $this->_mapper;
  162. foreach ($this->_data as $key => $val) {
  163. $mapper($val, $key, $this);
  164. }
  165. $this->_data = null;
  166. if (!empty($this->_intermediate) && empty($this->_reducer)) {
  167. throw new LogicException('No reducer function was provided');
  168. }
  169. $reducer = $this->_reducer;
  170. foreach ($this->_intermediate as $key => $list) {
  171. $reducer($list, $key, $this);
  172. }
  173. $this->_intermediate = [];
  174. $this->_executed = true;
  175. }
  176. }