herald  2.0.0
coordinator.h
1 // Copyright 2021 Herald Project Contributors
2 // SPDX-License-Identifier: Apache-2.0
3 //
4 
5 #ifndef HERALD_COORDINATOR_H
6 #define HERALD_COORDINATOR_H
7 
8 #include "../context.h"
9 #include "activities.h"
10 #include "../data/sensor_logger.h"
11 
12 #include <map>
13 #include <vector>
14 #include <algorithm>
15 #include <iterator>
16 #include <optional>
17 
18 namespace herald {
19 
21 namespace engine {
22 
38 template <typename ContextT>
39 class Coordinator {
40 public:
42  Coordinator(ContextT& ctx)
43  : context(ctx),
44  providers(),
45  running(false)
46  HLOGGERINIT(ctx,"engine","coordinator")
47  {}
48 
49  ~Coordinator() = default;
50 
52  template <typename SensorT>
53  void add(SensorT& sensor) {
54  HTDBG("Adding sensor");
55  auto prov = sensor.coordinationProvider();
56  if (prov.has_value()) {
57  HTDBG("Sensor has Provider implementation");
58  providers.push_back(prov.value());
59  }
60  }
62  template <typename SensorT>
63  void remove(SensorT& sensor)
64  {
65  // TODO support remove
66  }
67 
69  void start() {
70  HTDBG("Start called");
71  // Clear feature providers
72  featureProviders.clear();
73  // Fetch feature providers
74  for (auto prov: providers) {
75  auto myFeatures = prov.get().connectionsProvided();
76  for (auto feature : myFeatures) {
77  featureProviders.emplace(feature,prov);
78  }
79  }
80  running = true;
81  HTDBG("Start returning");
82  }
83 
85  void iteration() {
86  if (!running) {
87  HTDBG("Coordinator not running. Returning from iteration having done nothing.");
88  return;
89  }
90  HTDBG("################# ITERATION #################");
91  // HTDBG("Entered iteration");
92  // Create empty list of required prereqs per provider
93  std::map<std::reference_wrapper<CoordinationProvider>,std::vector<PrioritisedPrerequisite>> assignPrereqs;
94  for (auto& prov : providers) {
95  assignPrereqs.emplace(prov,std::vector<PrioritisedPrerequisite>());
96  }
97  // HTDBG("Completed initialisation of provider prerequisities containers");
98  // HTDBG(" - Provider count: {}", providers.size());
99 
100  std::vector<PrioritisedPrerequisite> connsRequired;
101  // Loop over providers and ask for feature pre-requisites
102  for (auto& prov : providers) {
103  auto myConns = prov.get().requiredConnections();
104  std::copy(myConns.begin(),myConns.end(),
105  std::back_insert_iterator<std::vector<PrioritisedPrerequisite>>(connsRequired));
106  }
107  // HTDBG(std::to_string(connsRequired.size()));
108  // HTDBG("Retrieved providers' current prerequisites");
109  // TODO de-duplicate pre-reqs
110  // Now link required prereqs to each provider
111  for (auto& p : connsRequired) {
112  auto el = featureProviders.find(std::get<0>(p)); // find provider for given prereq by feature tag
113  if (featureProviders.end() != el) {
114  assignPrereqs[el->second].push_back(p);
115  }
116  }
117  // HTDBG("Linked pre-reqs to their providers");
118 
119  // // Some debug checks here
120  // int cnt = 0;
121  // for (auto& ass : assignPrereqs) {
122  // // HTDBG("assign prereqs number {} has this many prereqs to fill {}", cnt, ass.second.size());
123  // cnt++;
124  // }
125 
126  // Communicate with relevant feature providers and request features for targets (in descending priority order)
127  // - Includes removal of previous features no longer needed
128  std::vector<PrioritisedPrerequisite> provisioned;
129  for (auto& prov : assignPrereqs) {
130  // TODO sort by descending priority before passing on
131 
132  // FOR PLATFORMS WITH STD::FUTURE AND STD::ASYNC
133  // std::future<void> fut = std::async(std::launch::async,
134  // &CoordinationProvider::provision, prov.first,
135  // //prov.first->provision(
136  // prov.second,[&provisioned] (
137  // const std::vector<PrioritisedPrerequisite> myProvisioned) -> void {
138  // std::copy(myProvisioned.begin(),myProvisioned.end(),
139  // std::back_insert_iterator<std::vector<PrioritisedPrerequisite>>(provisioned));
140  // });
141  // fut.get(); // waits for callback // TODO wait with timeout
142 
143  // FOR OTHER PLATFORMS (E.g. ZEPHYR):-
144  std::vector<PrioritisedPrerequisite> myProvisioned = prov.first.get().provision(prov.second);
145  std::copy(myProvisioned.begin(),myProvisioned.end(),
146  std::back_insert_iterator<std::vector<PrioritisedPrerequisite>>(provisioned));
147  }
148  // HTDBG("All pre-requisities requests sent and responses received");
149  // TODO do the above asynchronously and await callback or timeout for all
150 
151  // For each which are now present, ask for activities (in descending priority order)
152  for (auto& prov : providers) {
153  auto maxActs = prov.get().requiredActivities();
154  // TODO sort by descending priority before actioning
155  for (auto& act : maxActs) {
156  std::string san("Activity ");
157  san += act.name;
158  HTDBG(san);
159  // HTDBG("Checking next desired activity for prereqs being satisfied");
160  // Filter requested by provisioned
161  bool allFound = true;
162  for (auto& pre : act.prerequisites) {
163  bool myFound = false;
164  for (auto& exists : provisioned) {
165  if (std::get<0>(pre) == std::get<0>(exists) &&
166  std::get<1>(pre) == std::get<2>(exists)) {
167  myFound = true;
168  }
169  }
170  allFound = allFound & myFound;
171  if (myFound) {
172  HTDBG(" - Prereq satisfied");
173  } else {
174  HTDBG(" - Prereq NOT SATISFIED");
175  }
176  }
177  // Carry out activities with completion callbacks passed in
178  if (allFound) {
179  HTDBG("All satisfied, calling activity");
180  // do activity
181 
182  // FOR PLATFORMS WITH STD::ASYNC
183  // act.executor(act,[this] (Activity act, std::optional<Activity> followOn) -> void {
184  // // TODO handle result
185  // // TODO Carry out any follow up activities
186  // HTDBG("Activity completion callback called");
187  // });
188 
189  // FOR PLATFORMS WITHOUT
190  std::optional<Activity> followOn = act.executor(act);
191  // TODO carry out follow on activity until no more follow ons (or max follow on number hit)
192  }
193  }
194  }
195  // HTDBG("Leaving iteration");
196  HTDBG("################# END #################");
197  }
199  void stop() {
200  running = false;
201  }
202 
203 private:
204  ContextT& context;
205 
206  std::vector<std::reference_wrapper<CoordinationProvider>> providers;
207  std::map<FeatureTag,std::reference_wrapper<CoordinationProvider>> featureProviders;
208 
209  bool running;
210 
211  HLOGGER(ContextT);
212 };
213 
214 // /** Comparator for less than (use in maps) **/
215 bool operator<(const std::reference_wrapper<CoordinationProvider> first, const std::reference_wrapper<CoordinationProvider> second);
216 
217 }
218 }
219 
220 #endif
Coordinates all connection and activities used across all sensors within Herald.
Definition: coordinator.h:39
Coordinator(ContextT &ctx)
Default constructor. Receives a configured platform-specific context instance.
Definition: coordinator.h:42
void stop()
Closes out any existing connections/activities.
Definition: coordinator.h:199
void start()
Prepares for iterations to be called (may pre-emptively make calls)
Definition: coordinator.h:69
void iteration()
Execute an iteration of activity, according to settings.
Definition: coordinator.h:85
void remove(SensorT &sensor)
Remove from iteration planning.
Definition: coordinator.h:63
void add(SensorT &sensor)
Introspect and include in iteration planning.
Definition: coordinator.h:53
Acts as a non-global memory arena for arbitrary classes.
Definition: aggregates.h:15