MetroCollect  2.3.4
MetricsController.cc
Go to the documentation of this file.
1 //
2 // MetricsController.cc
3 //
4 // Created on July 24th 2018
5 //
6 // Copyright 2018 CFM (www.cfm.fr)
7 //
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
11 //
12 // http://www.apache.org/licenses/LICENSE-2.0
13 //
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 //
20 
21 #include <algorithm>
22 #include <cmath>
23 #include <thread>
24 
25 #if GPERFTOOLS_CPU_PROFILE
26 #include <gperftools/profiler.h>
27 #endif
28 
29 #include "MetricsController.h"
30 
31 
32 namespace MetroCollect {
34  delegate_(delegate),
35  sourceInterests_(MetricsSource::makeSourceInterests(false)),
38  isCollecting_(false),
39  convertToUnitsPerSecond_(MetricsController::defaultConvertToUnitsPerSecond),
40  processingWindowLength_(MetricsController::defaultProcessingWindowLength),
41  processingWindowOverlap_(MetricsController::defaultProcessingWindowOverlap),
42  currentMetrics_(std::make_unique<MetricsDataArray>(this->sourceInterests_)),
43  previousMetrics_(std::make_unique<MetricsDataArray>(this->sourceInterests_)),
44  metricsValues_(MetricsController::defaultProcessingWindowLength, MetricsDiffArray(this->sourceInterests_))
45  {
46  this->setSamplingInterval(MetricsController::defaultSamplingInterval);
47  }
48 
49 
51  return this->delegate_;
52  }
53 
54 
55  bool MetricsController::isCollecting() const noexcept {
56  return this->isCollecting_;
57  }
58 
59 
60  bool MetricsController::sendValues() const noexcept {
61  return this->sendValues_;
62  }
63 
64  bool MetricsController::sendStats() const noexcept {
65  return this->sendStats_;
66  }
67 
68  const std::vector<size_t>& MetricsController::requestedMetrics() const noexcept {
69  return this->requestedMetrics_;
70  }
71 
72  std::chrono::milliseconds MetricsController::samplingInterval() const noexcept {
73  return this->samplingInterval_;
74  }
75 
77  return this->processingWindowLength_;
78  }
79 
81  return this->processingWindowOverlap_;
82  }
83 
85  return this->convertToUnitsPerSecond_;
86  }
87 
88 
89  void MetricsController::setSendValues(bool sendValues) noexcept {
90  this->sendValues_ = sendValues;
91  }
92 
93  void MetricsController::setSendStats(bool sendStats) noexcept {
94  this->sendStats_ = sendStats;
95  }
96 
98  if (this->isCollecting_)
99  return;
100 
101  std::set<size_t> indexes;
102  std::set<size_t> allIndexes;
103  *this->sourceInterests_ = *requestedMetrics.sourceInterests();
104  for (size_t i = 0; i < Statistics::extraCount; i++) {
105  indexes.clear();
106  for (size_t j = 0; j < requestedMetrics.size(); j++) {
107  if (requestedMetrics[j] & Statistics::StatsDependencies[i]) {
108  indexes.insert(j);
109  allIndexes.insert(j);
110  }
111  }
112 
113  switch (Statistics::StatsIndex(i)) {
115  this->metricsStatsIntermediate_.minIndexes.setIndexes(indexes);
116  this->metricsStats_.min.setIndexes(indexes);
117  break;
119  this->metricsStatsIntermediate_.maxIndexes.setIndexes(indexes);
120  this->metricsStats_.max.setIndexes(indexes);
121  break;
123  this->metricsStats_.average.setIndexes(indexes);
124  break;
126  this->metricsStats_.stdDev.setIndexes(indexes);
127  break;
129  this->metricsStatsIntermediate_.sum.setIndexes(indexes);
130  break;
132  this->metricsStatsIntermediate_.sumSquared.setIndexes(indexes);
133  break;
134  }
135  }
136 
137  this->requestedMetrics_.clear();
138  for (size_t i : allIndexes)
139  this->requestedMetrics_.push_back(i);
140  }
141 
142  void MetricsController::setSamplingInterval(std::chrono::milliseconds interval) noexcept {
143  if (this->isCollecting_)
144  return;
145 
146  this->samplingInterval_ = interval;
147  if (interval != 0ms)
148  this->unitsPerSecondFactor_ = 1000.0 / interval.count();
149  else
150  this->unitsPerSecondFactor_ = 1.0;
151  }
152 
153  void MetricsController::setProcessingWindow(size_t length, size_t overlap) noexcept {
154  if (isCollecting_)
155  return;
156 
157  if (length != this->processingWindowLength_) {
158  this->processingWindowLength_ = length;
159  this->metricsValues_.reset(length, MetricsDiffArray(this->sourceInterests_));
160  }
161 
162  if (overlap >= length)
163  overlap = length - 1;
164  if (overlap != this->processingWindowOverlap_)
165  this->processingWindowOverlap_ = overlap;
166  }
167 
168  void MetricsController::setConvertToUnitsPerSeconds(bool convertToUnitsPerSecond) noexcept {
169  if (isCollecting_)
170  return;
171 
172  this->convertToUnitsPerSecond_ = convertToUnitsPerSecond;
173  }
174 
175 
177  if (this->isCollecting_)
178  return;
179 #if GPERFTOOLS_CPU_PROFILE
180  ProfilerStart("/tmp/aa.prof");
181 #endif
182  this->isCollecting_ = true;
183 
184  bool cachedSendStats = this->sendStats_;
185  size_t processingWindowIndex = 0;
186  auto startTime = std::chrono::steady_clock::now();
187 
188  this->currentMetrics_->updateData();
189  while (true) {
190  std::this_thread::sleep_for(this->samplingInterval_ - (std::chrono::steady_clock::now() - startTime));
191  startTime = std::chrono::steady_clock::now();
192 
193  this->updateMetrics();
194  if (cachedSendStats)
195  this->updateIterativeStats();
196 
197  if (this->sendValues_)
199 
200  processingWindowIndex++;
201  if (processingWindowIndex >= this->processingWindowLength_) {
202  if (cachedSendStats) {
203  this->computeFinalStats();
205  }
207  this->metricsValues_.reset();
208  if (cachedSendStats)
209  this->resetIterativeStats();
210  this->isCollecting_ = false;
211 #if GPERFTOOLS_CPU_PROFILE
212  ProfilerStop();
213 #endif
214  return;
215  }
216  this->metricsValues_.moveBegin(this->processingWindowOverlap_);
217  processingWindowIndex = this->processingWindowOverlap_;
218 
219  if (cachedSendStats)
220  this->resetIterativeStats();
221  cachedSendStats = this->sendStats_;
222 #if GPERFTOOLS_CPU_PROFILE
223  ProfilerFlush();
224 #endif
225  }
226  }
227  }
228 
229  bool MetricsController::isMetricNull(size_t index) const {
230  return ((*this->currentMetrics_)[index] == 0 && (*this->previousMetrics_)[index] == 0);
231  }
232 
233 
235  std::swap(this->currentMetrics_, this->previousMetrics_);
236  this->currentMetrics_->updateData();
237  this->metricsValues_.moveEnd(1);
238  this->metricsValues_.back().computeDiff(*this->currentMetrics_, *this->previousMetrics_, this->convertToUnitsPerSecond_ ? this->unitsPerSecondFactor_ : 1.0);
239  }
240 
242  auto& newPoint = this->metricsValues_.back();
243  auto newPointAbsoluteIndex = this->metricsValues_.absoluteIndex(this->metricsValues_.size() - 1);
244 
245  for (auto& val : this->metricsStatsIntermediate_.minIndexes.indexedValues()) {
246  if (newPoint[val.index] < this->metricsValues_.atAbsoluteIndex(val.value)[val.index])
247  val.value = newPointAbsoluteIndex;
248  }
249 
250  for (auto& val : this->metricsStatsIntermediate_.maxIndexes.indexedValues()) {
251  if (newPoint[val.index] > this->metricsValues_.atAbsoluteIndex(val.value)[val.index])
252  val.value = newPointAbsoluteIndex;
253  }
254 
255  for (auto& val : this->metricsStatsIntermediate_.sum.indexedValues()) {
256  val.value += newPoint[val.index];
257  }
258 
259  for (auto& val : this->metricsStatsIntermediate_.sumSquared.indexedValues()) {
260  val.value += newPoint[val.index] * newPoint[val.index];
261  }
262  }
263 
265  this->metricsStats_.forEach([&](auto& array, Statistics::Stats ) {
266  array.setStartTime(this->metricsValues_.front().timestamp());
267  array.setEndTime(this->metricsValues_.back().timestamp());
268  });
269 
270  for (const auto& val : this->metricsStatsIntermediate_.minIndexes.indexedValues())
271  this->metricsStats_.min[val.index] = this->metricsValues_.atAbsoluteIndex(val.value)[val.index];
272 
273  for (const auto& val : this->metricsStatsIntermediate_.maxIndexes.indexedValues())
274  this->metricsStats_.max[val.index] = this->metricsValues_.atAbsoluteIndex(val.value)[val.index];
275 
276  for (const auto& val : this->metricsStatsIntermediate_.sum.indexedValues())
277  this->metricsStats_.average[val.index] = val.value / static_cast<double>(this->processingWindowLength_);
278 
279  for (const auto& val : this->metricsStats_.average.indexedValues()) {
280  double variance = this->metricsStatsIntermediate_.sumSquared[val.index] / static_cast<double>(this->processingWindowLength_) - val.value * val.value;
281  if (variance < 1e-4)
282  this->metricsStats_.stdDev[val.index] = 0;
283  else
284  this->metricsStats_.stdDev[val.index] = sqrt(variance);
285  }
286  }
287 
289  auto firstIndex = this->metricsValues_.absoluteIndex(0);
290  if (this->metricsValues_.size() == 0) {
291  for (auto& val : this->metricsStatsIntermediate_.minIndexes.indexedValues())
292  val.value = firstIndex;
293  for (auto& val : this->metricsStatsIntermediate_.maxIndexes.indexedValues())
294  val.value = firstIndex;
295  for (auto& val : this->metricsStatsIntermediate_.sum.indexedValues())
296  val.value = 0;
297  for (auto& val : this->metricsStatsIntermediate_.sumSquared.indexedValues())
298  val.value = 0;
299  }
300  else {
301  for (auto& val : this->metricsStatsIntermediate_.minIndexes.indexedValues()) {
302  if (!this->metricsValues_.absoluteIndexIsInBounds(val.value)) {
303  val.value = firstIndex;
304  for (size_t i = 0; i < this->metricsValues_.size(); i++) {
305  if (this->metricsValues_[i][val.index] < this->metricsValues_.atAbsoluteIndex(val.value)[val.index])
306  val.value = this->metricsValues_.absoluteIndex(i);
307  }
308  }
309  }
310 
311  for (auto& val : this->metricsStatsIntermediate_.maxIndexes.indexedValues()) {
312  if (!this->metricsValues_.absoluteIndexIsInBounds(val.value)) {
313  val.value = firstIndex;
314  for (size_t i = 0; i < this->metricsValues_.size(); i++) {
315  if (this->metricsValues_[i][val.index] > this->metricsValues_.atAbsoluteIndex(val.value)[val.index])
316  val.value = this->metricsValues_.absoluteIndex(i);
317  }
318  }
319  }
320 
321  if (this->processingWindowOverlap_ * 2 <= this->processingWindowLength_) {
322  for (auto& val : this->metricsStatsIntermediate_.sum.indexedValues()) {
323  val.value = 0;
324  for (size_t i = 0; i < this->metricsValues_.size(); i++)
325  val.value += this->metricsValues_[i][val.index];
326  }
327 
328  for (auto& val : this->metricsStatsIntermediate_.sumSquared.indexedValues()) {
329  val.value = 0;
330  for (size_t i = 0; i < this->metricsValues_.size(); i++)
331  val.value += this->metricsValues_[i][val.index] * this->metricsValues_[i][val.index];
332  }
333  }
334  else {
335  for (auto& val : this->metricsStatsIntermediate_.sum.indexedValues()) {
336  for (long i = -1; i >= static_cast<long>(this->processingWindowOverlap_ - this->processingWindowLength_); i--)
337  val.value -= this->metricsValues_[i][val.index];
338  }
339 
340  for (auto& val : this->metricsStatsIntermediate_.sumSquared.indexedValues()) {
341  for (long i = -1; i >= static_cast<long>(this->processingWindowOverlap_ - this->processingWindowLength_); i--)
342  val.value -= this->metricsValues_[i][val.index] * this->metricsValues_[i][val.index];
343  }
344  }
345  }
346  }
347 
348 
349  Statistics::Stats Statistics::statsFromName(const std::string_view& statName) {
350  if (statName == Statistics::nameStatsAll)
351  return StatsAll;
352  auto itr = std::find(Statistics::names.begin(), Statistics::names.end(), statName);
353  size_t index = std::distance(Statistics::names.begin(), itr);
354  if (index < Statistics::names.size())
355  return allStats[index];
356  else
357  return StatsNone;
358  }
359 
360 
362  for (size_t i = 0; i < count; i++) {
363  if (stat & StatsDependencies[i])
364  stat |= allStats[i];
365  }
366  return stat;
367  }
368 }
static constexpr std::chrono::milliseconds defaultSamplingInterval
Default parameter option.
static constexpr bool defaultConvertToUnitsPerSecond
Default parameter option.
void computeFinalStats()
Compute final stats at the end of processing window.
void updateIterativeStats()
Update iterative stats with new MetricsDiffArray.
MetricsStatsArray< DiffValueType > average
Array of averages.
bool convertToUnitsPerSeconds() const noexcept
Returns whether to convert metric differences to units per second.
StatsIndex
Associates a unique index to each statistic.
Definition: Statistics.h:45
std::chrono::milliseconds samplingInterval() const noexcept
Returns the metrics sampling interval.
void setRequestedMetrics(const MetricsArray< Statistics::Stats > &requestedMetrics) noexcept
Sets the list of request metrics.
bool sendStats_
Whether to compute stats and give them to the delegate.
static constexpr size_t defaultProcessingWindowLength
Default parameter option.
MetricsStatsArray< DiffValueType > sumSquared
Current sum of all values squared.
std::unique_ptr< MetricsDataArray > currentMetrics_
Pointer to latest metric raw values.
size_t processingWindowLength() const noexcept
Returns the length of the processing window used to compute statistics.
virtual void metricsContollerCollectedMetricsValues(const MetricsController &metricsController, const MetricsDiffArray &metricsDiff, const MetricsDataArray &previousMetrics, const MetricsDataArray &currentMetrics)=0
Function called every time the MetricsController has collected new values.
MetricsControllerDelegate & delegate() const noexcept
Returns the controller&#39;s delegate.
bool sendValues() const noexcept
Returns whether the controller alerts the delegate after collecting new values.
static Stats statsRequiredForStat(Stats stat)
Returns the stats which are required to compute this stat.
Sum of squares of values (intermediate statistic)
Definition: Statistics.h:53
Generic class to store and manage metrics.
Definition: MetricsArray.h:40
Stats
Bitwise type te select statistics.
Definition: Statistics.h:64
void setSamplingInterval(std::chrono::milliseconds interval) noexcept
Sets the metrics sampling interval.
void setSendStats(bool sendStats) noexcept
Sets whether the controller alerts the delegate after computing statistics.
static constexpr bool defaultSendValues
Default parameter option.
void collectMetrics()
Launch metric collection loop.
void setConvertToUnitsPerSeconds(bool convertToUnitsPerSecond) noexcept
Sets whether to convert metric differences to units per second.
const std::vector< size_t > & requestedMetrics() const noexcept
Returns the list of request metrics.
std::vector< size_t > requestedMetrics_
Indexes of metrics requested to be collected.
MetricsStats metricsStats_
Final statistics arrays.
All statistics selected.
Definition: Statistics.h:71
static constexpr size_t count
Number of available statistics.
Definition: Statistics.h:57
static constexpr std::array< Stats, count > allStats
Array of all available statistics.
Definition: Statistics.h:73
Sum of value (intermediate statistic)
Definition: Statistics.h:52
static constexpr bool defaultSendStats
Default parameter option.
static constexpr size_t extraCount
Number of statistics and intermediate values.
Definition: Statistics.h:79
bool sendValues_
Whether to give values to the delegate.
size_t processingWindowOverlap() const noexcept
Returns the overlap of the processing window used to compute statistics.
bool isMetricNull(size_t index) const
Checks wether a metric is zero.
MetricsStatsArray< size_t > maxIndexes
Current indexes of maximums.
const std::vector< IndexedValue > & indexedValues() const noexcept
Returns the requested metric indexes and values.
size_t processingWindowLength_
Length of processing window used to compute statistics.
MetricsController(MetricsControllerDelegate &delegate) noexcept
Construct a new Metrics Controller object.
static constexpr size_t defaultProcessingWindowOverlap
Default parameter option.
MetricsStatsArray< DiffValueType > stdDev
Array of standard deviations.
SourceInterests makeSourceInterests(bool value)
Construct a new shared pointer to a SourceInterests object.
MetricsControllerDelegate & delegate_
Delegate to alert when something happens.
void resetIterativeStats()
Reset iterative stats to begin new processing window.
void forEach(Function func)
Execute a function for each stats array.
void setProcessingWindow(size_t length, size_t overlap) noexcept
Sets the processing window parameters used to compute statistics.
MetricsStatsIntermediate metricsStatsIntermediate_
Intermediate statistics arrays.
static constexpr std::string_view nameStatsAll
Definition: Statistics.h:59
static Stats statsFromName(const std::string_view &statName)
Convert a stat name to a stat value.
double unitsPerSecondFactor_
Factor to convert metric differences to units per second.
std::unique_ptr< MetricsDataArray > previousMetrics_
Pointer to earlier metric raw values.
MetricsStatsArray< DiffValueType > sum
Current sum of all values.
bool sendStats() const noexcept
Returns whether the controller alerts the delegate after computing statistics.
CircularArray::CircularArray< MetricsDiffArray > metricsValues_
CircularArray holding all metric differences, used to compute statistics.
void setSendValues(bool sendValues) noexcept
Sets whether the controller alerts the delegate after collecting new values.
void updateMetrics()
Fetch latest metrics from source and compute their variations.
No statistics selected.
Definition: Statistics.h:70
static constexpr std::array< std::string_view, count > names
Names of available statistics.
Definition: Statistics.h:58
std::chrono::milliseconds samplingInterval_
Metrics sampling interval.
Abstract delegate of MetricsController.
virtual bool metricsContollerShouldStopCollectingMetrics(const MetricsController &metricsController)=0
Function called after each iteration to ask the delegate whether the controller should stop collectin...
MetricsStatsArray< DiffValueType > min
Array of minimums.
Class to compute and store metric values variations.
MetricsStatsArray< DiffValueType > max
Array of maximums.
size_t processingWindowOverlap_
Overlap of processing window used to compute statistics.
bool isCollecting_
Whether the receiver is collecting metrics.
static constexpr std::array< StatsDependency, extraCount > StatsDependencies
Array of all statistics dependencies.
Definition: Statistics.h:92
bool convertToUnitsPerSecond_
Whether to convert metric differences to units per second.
virtual void metricsContollerCollectedMetricsStats(const MetricsController &metricsController, const MetricsController::MetricsStats &metricsStats)=0
Function called every time the MetricsController has computed statistics.
bool isCollecting() const noexcept
Returns whether the controller is collecting metrics.
MetricsStatsArray< size_t > minIndexes
Current indexes of minimums.