herald  2.0.0
aggregates.h
1 // Copyright 2021 Herald Project Contributors
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 #ifndef HERALD_AGGREGATES_H
6 #define HERALD_AGGREGATES_H
7 
8 #include <map>
9 #include <variant>
10 #include <vector>
11 // #include <iostream>
12 
13 #include "ranges.h"
14 
15 namespace herald {
16 namespace analysis {
17 namespace aggregates {
18 
19 struct Count {
20  static constexpr int runs = 1;
21 
22  Count() : count(0) {}
23  ~Count() = default;
24 
25  void beginRun(int thisRun) { // 1 indexed
26  run = thisRun;
27  }
28 
29  template <typename ValT>
30  void map(ValT value) {
31  if (run > 1) return; // performance enhancement
32 
33  ++count;
34  }
35 
36  double reduce() {
37  return count;
38  }
39 
40  void reset() {
41  count = 0;
42  }
43 
44 private:
45  int count;
46  int run;
47 };
48 
49 struct Mean {
50  static constexpr int runs = 1;
51 
52  Mean() : count(0), run(1), sum(0.0) {}
53  ~Mean() = default;
54 
55  void beginRun(int thisRun) { // 1 indexed
56  run = thisRun;
57  }
58 
59  template <typename ValT>
60  void map(ValT value) {
61  if (run > 1) return; // performance enhancement
62 
63  sum += (double)value;
64  ++count;
65  }
66 
67  double reduce() {
68  return sum / count;
69  }
70 
71  void reset() {
72  count = 0;
73  sum = 0.0;
74  }
75 
76 private:
77  int count;
78  int run;
79  double sum;
80 };
81 
82 struct Mode {
83  static constexpr int runs = 1;
84 
85  Mode() : run(1), counts() {}
86  ~Mode() = default;
87 
88  void beginRun(int thisRun) { // 1 indexed
89  run = thisRun;
90  }
91 
92  template <typename ValT>
93  void map(ValT value) {
94  if (run > 1) return; // performance enhancement
95 
96  double dv = (double)value;
97  auto ptr = counts.find(dv);
98  if (counts.end() == ptr) {
99  counts.emplace(dv,1);
100  } else {
101  ++(ptr->second);
102  }
103  }
104 
105  double reduce() {
106  // loop through map once and find largest
107  double largest = 0;
108  int largestCount = 0;
109  for (auto iter = counts.begin();iter != counts.end();++iter) {
110  if (iter->second > largestCount) {
111  largestCount = iter->second;
112  largest = iter->first;
113  }
114  }
115  return largest;
116  }
117 
118  void reset() {
119  counts.clear();
120  }
121 
122 private:
123  int run;
124  std::map<double,int> counts; // value converted to double, and int count for each
125 };
126 
127 struct Variance {
128  static constexpr int runs = 2;
129 
130  Variance() : count(0), run(1), sum(0.0), mean(0.0) {}
131  ~Variance() = default;
132 
133  void beginRun(int thisRun) { // 1 indexed
134  run = thisRun;
135  if (2 == run) {
136  // initialise mean
137  mean = sum / count;
138  // reinitialise counters
139  sum = 0.0;
140  count = 0;
141  }
142  }
143 
144  template <typename ValT>
145  void map(ValT value) {
146  double dv = (double)value;
147  if (1 == run) {
148  sum += dv;
149  } else {
150  // 2 === run
151  sum += (dv - mean)*(dv - mean);
152  }
153  ++count;
154  }
155 
156  double reduce() {
157  if (count < 1) {
158  return 0.0; // div by zero check
159  }
160  return sum / (count - 1); // Sample variance
161  }
162 
163  void reset() {
164  count = 0;
165  run = 1;
166  sum = 0.0;
167  mean = 0.0;
168  }
169 
170 private:
171  int count;
172  int run;
173  double sum;
174  double mean;
175 };
176 
177 
178 struct Median {
179  static constexpr int runs = 1;
180 
181  Median() : run(1), minNextPos(0), maxNextPos(0), minHeap(), maxHeap() {}
182  ~Median() = default;
183 
184  void beginRun(int thisRun) {
185  run = thisRun;
186  }
187 
188  template <typename ValT>
189  void map(ValT value) {
190  if (run > 1) {
191  return;
192  }
193  double dv = (double)value;
194  if (minNextPos == maxNextPos) {
195  if (10 == maxNextPos) {
196  removeLeast(maxHeap,maxNextPos);
197  }
198  maxHeap[maxNextPos++] = dv;
199  if (10 == minNextPos) {
200  removeMost(minHeap,minNextPos);
201  }
202  minHeap[minNextPos++] = maxHeap[leastIndex(maxHeap,maxNextPos)];
203  } else {
204  if (10 == minNextPos) {
205  removeMost(minHeap,minNextPos);
206  }
207  minHeap[minNextPos++] = dv;
208  if (10 == maxNextPos) {
209  removeLeast(maxHeap,maxNextPos);
210  }
211  maxHeap[maxNextPos++] = minHeap[mostIndex(minHeap,minNextPos)];
212  }
213  }
214 
215  double reduce() {
216  if (0 == minNextPos && 0 == maxNextPos) {
217  return 0.0; // empty data check
218  }
219  if (minNextPos > maxNextPos) {
220  return minHeap[leastIndex(minHeap,minNextPos)];
221  }
222  return (minHeap[leastIndex(minHeap,minNextPos)] + maxHeap[mostIndex(maxHeap,maxNextPos)]) / 2.0;
223  }
224 
225  void reset() {
226  run = 1;
227  minNextPos = 0;
228  maxNextPos = 0;
229  }
230 
231 
232 private:
233  int run;
234  int minNextPos;
235  int maxNextPos;
236  std::array<double,10> minHeap;
237  std::array<double,10> maxHeap;
238 
239  int leastIndex(const std::array<double,10>& from, const int fromNextPos) const {
240  int leastIdx = 0;
241  double least = from[0];
242  for (int i = 1;i < fromNextPos;++i) {
243  if (from[i] < least) {
244  least = from[i];
245  leastIdx = i;
246  }
247  }
248  return leastIdx;
249  }
250 
251  int mostIndex(const std::array<double,10>& from, const int fromNextPos) const {
252  int mostIdx = 0;
253  double most = from[0];
254  for (int i = 1;i < fromNextPos;++i) {
255  if (from[i] > most) {
256  most = from[i];
257  mostIdx = i;
258  }
259  }
260  return mostIdx;
261  }
262 
263  void removeMost(std::array<double,10>& from, int& fromNextPos) {
264  int mostIdx = mostIndex(from,fromNextPos);
265  for (int i = mostIdx; i < fromNextPos - 1;++i) {
266  from[i] = from[i + 1];
267  }
268  --fromNextPos;
269  }
270 
271  void removeLeast(std::array<double,10>& from, int& fromNextPos) {
272  int leastIdx = leastIndex(from,fromNextPos);
273  for (int i = leastIdx; i < fromNextPos - 1;++i) {
274  from[i] = from[i + 1];
275  }
276  --fromNextPos;
277  }
278 };
279 
280 
282 struct Gaussian {
283  static constexpr int runs = 1;
284 
285  Gaussian() : run(1), distribution() {}
286  ~Gaussian() = default;
287 
288  void beginRun(int thisRun) {
289  run = thisRun;
290  }
291 
292  template <typename ValT>
293  void map(ValT value) {
294  if (run > 1) {
295  return;
296  }
297  double dv = (double)value;
298  distribution.add(dv);
299  }
300 
301  double reduce() {
302  return distribution.mean();
303  }
304 
305  void reset() {
306  run = 1;
307  distribution.reset();
308  }
309 
310  const Distribution& model() const {
311  return distribution;
312  }
313 
314 private:
315  int run;
316  Distribution distribution;
317 };
318 
319 
320 
321 
322 
324 
325 template <typename... Aggs>
326 struct aggregate {
327  aggregate(Aggs... configuredAggregates) : aggregates() {
328  addAggregate<Aggs...>(configuredAggregates...);
329  }
330  ~aggregate() = default;
331 
332  template <typename SampleListT,
333  typename SampleT = typename std::remove_cv<typename SampleListT::value_type>::type,
334  typename ValT = typename std::remove_cv<typename SampleT::value_type>::type,
335  std::size_t MaxSize = SampleListT::max_size
336  >
337  friend auto operator|(SampleListT& from, aggregate<Aggs...> me) -> aggregate<Aggs...> {
338  // determine number of runs
339  int maxRuns = 1;
340  for (auto& agg : me.aggregates) {
341  std::visit([&maxRuns](auto&& arg) {
342  if (arg.runs > maxRuns) {
343  maxRuns = arg.runs;
344  }
345  }, agg);
346  }
347 
348  // loop over all incoming data, calling each aggregate each time
349  for (int run = 1;run <= maxRuns;run++) {
350  for (auto& agg : me.aggregates) {
351  std::visit([&run](auto&& arg) {
352  arg.beginRun(run);
353  // std::cout << "Beggining run " << run << std::endl;
354  }, agg);
355  }
356 
357  for (auto& v : from) {
358  for (auto& agg : me.aggregates) {
359  std::visit([&v](auto&& arg) {
360  // std::cout << "Sample taken: " << v.taken.secondsSinceUnixEpoch() << std::endl;
361  arg.map(v);
362  }, agg);
363  }
364  }
365  }
366 
367  // return me so we can then do get<Agg>()
368  return me;
369  }
370 
371  template <typename Coll, typename Pred>
373  // determine number of runs
374  int maxRuns = 1;
375  for (auto& agg : me.aggregates) {
376  std::visit([&maxRuns](auto&& arg) {
377  if (arg.runs > maxRuns) {
378  maxRuns = arg.runs;
379  }
380  }, agg);
381  }
382 
383  // loop over all incoming data, calling each aggregate each time
384  for (int run = 1;run <= maxRuns;run++) {
385  for (auto& agg : me.aggregates) {
386  std::visit([&run](auto&& arg) {
387  arg.beginRun(run);
388  // std::cout << "Beggining run " << run << std::endl;
389  }, agg);
390  }
391 
392  while (!from.ended()) {
393  auto& v = *from;
394  for (auto& agg : me.aggregates) {
395  std::visit([&v](auto&& arg) {
396  // std::cout << "Sample taken: " << v.taken.secondsSinceUnixEpoch() << std::endl;
397  arg.map(v);
398  }, agg);
399  }
400  ++from;
401  }
402  }
403 
404  // return me so we can then do get<Agg>()
405  return me;
406  }
407 
408  template <typename ValT>
409  friend auto operator|(herald::analysis::views::view<ValT> from, aggregate<Aggs...> me) -> aggregate<Aggs...> {
410  // determine number of runs
411  int maxRuns = 1;
412  for (auto& agg : me.aggregates) {
413  std::visit([&maxRuns](auto&& arg) {
414  if (arg.runs > maxRuns) {
415  maxRuns = arg.runs;
416  }
417  }, agg);
418  }
419 
420  // loop over all incoming data, calling each aggregate each time
421  for (int run = 1;run <= maxRuns;run++) {
422  for (auto& agg : me.aggregates) {
423  std::visit([&run](auto&& arg) {
424  arg.beginRun(run);
425  // std::cout << "Beggining run " << run << std::endl;
426  }, agg);
427  }
428 
429  for (auto& v : from) {
430  for (auto& agg : me.aggregates) {
431  std::visit([&v](auto&& arg) {
432  // std::cout << "Sample taken: " << v.taken.secondsSinceUnixEpoch() << std::endl;
433  arg.map(v);
434  }, agg);
435  }
436  }
437  }
438 
439  // return me so we can then do get<Agg>()
440  return me;
441  }
442 
443  template <typename Agg>
444  Agg& get() {
445  Agg& retval = std::get<0>(aggregates.front());
446  for (auto& agg : aggregates) {
447  // See https://en.cppreference.com/w/cpp/utility/variant/visit
448  std::visit([&retval](auto&& arg) {
449  using T = std::decay_t<decltype(arg)>;
450  if constexpr (std::is_same_v<Agg,T>) {
451  retval = arg;
452  }
453  }, agg);
454  }
455  return retval;
456  }
457 
458 private:
459  std::vector<std::variant<Aggs...>> aggregates;
460 
461  template <typename Last>
462  void addAggregate(Last last) {
463  aggregates.emplace_back(std::move(last));
464  }
465 
466  template <typename First, typename Second, typename... Remaining>
467  void addAggregate(First first, Second second, Remaining... remaining) {
468  aggregates.emplace_back(std::move(first));
469  addAggregate<Second, Remaining...>(second, remaining...);
470  }
471 };
472 
473 
474 
475 
476 
477 
478 
479 
483 template <typename... Aggs>
484 struct summarise {
485  summarise() : aggregates() {
486  // Initialise members of aggregates, one per aggregate type requested
487  addAggregate<Aggs...>();
488  }
489  ~summarise() = default;
490 
491  template <typename ValT>
492  friend auto operator|(herald::analysis::views::view<ValT>& from, summarise<Aggs...> me) -> summarise<Aggs...> {
493  // determine number of runs
494  int maxRuns = 1;
495  for (auto& agg : me.aggregates) {
496  std::visit([&maxRuns](auto&& arg) {
497  if (arg.runs > maxRuns) {
498  maxRuns = arg.runs;
499  }
500  }, agg);
501  }
502 
503  // loop over all incoming data, calling each aggregate each time
504  for (int run = 1;run <= maxRuns;run++) {
505  for (auto& agg : me.aggregates) {
506  std::visit([&run](auto&& arg) {
507  arg.beginRun(run);
508  }, agg);
509  }
510 
511  for (auto& v : from) {
512  for (auto& agg : me.aggregates) {
513  std::visit([&v](auto&& arg) {
514  arg.map(v);
515  }, agg);
516  }
517  }
518  }
519 
520  // return me so we can then do get<Agg>()
521  return me;
522  }
523 
524  template <typename Agg>
525  double get() {
526  double result = 0.0;
527  for (auto& agg : aggregates) {
528  // See https://en.cppreference.com/w/cpp/utility/variant/visit
529  std::visit([&result](auto&& arg) {
530  using T = std::decay_t<decltype(arg)>;
531  if constexpr (std::is_same_v<Agg,T>) {
532  result = arg.reduce();
533  }
534  }, agg);
535  }
536  return result;
537  }
538 
539 private:
540  std::vector<std::variant<Aggs...>> aggregates;
541 
542  template <typename Last>
543  void addAggregate() {
544  aggregates.emplace_back(Last());
545  }
546 
547  template <typename First, typename Second, typename... Remaining>
548  void addAggregate() {
549  aggregates.emplace_back(First());
550  addAggregate<Second, Remaining...>();
551  }
552 };
553 
554 
555 } // end namespace aggregates
556 }
557 }
558 
559 #endif
Definition: distribution.h:16
void add(double x) noexcept
Add a single occurence of a value.
void reset() noexcept
Reset this instance to its initial state.
const double mean() const noexcept
return the mean
Acts as a non-global memory arena for arbitrary classes.
Definition: aggregates.h:15
Definition: aggregates.h:19
Gaussian.
Definition: aggregates.h:282
Definition: aggregates.h:49
Definition: aggregates.h:178
Definition: aggregates.h:82
Definition: aggregates.h:127
A Variadic aggregation function requiring aggregations to be prior initialised (i....
Definition: aggregates.h:326
Definition: aggregates.h:484
Definition: ranges.h:215