MetroCollect  2.3.4
SnapStreamInterface.cc
Go to the documentation of this file.
1 //
2 // SnapStreamInterface.cc
3 //
4 // Created on July 27th 2018
5 //
6 // Copyright 2018 CFM (www.cfm.fr)
7 //
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
11 //
12 // http://www.apache.org/licenses/LICENSE-2.0
13 //
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 //
20 
21 #include <algorithm>
22 
23 #include "SnapStreamInterface.h"
24 
25 using namespace std::literals;
26 
27 
28 namespace SnapInterface {
29  SnapStreamInterface::SnapStreamInterface() :
30  metricsController_(*this)
31  { }
32 
33 
34  void SnapStreamInterface::setConfig(const Plugin::Config& cfg) {
37  std::chrono::milliseconds sampling = MetroCollect::MetricsController::defaultSamplingInterval;
42 
43  if (cfg.has_bool_key(std::string(SnapStreamInterface::configKeySendValues)))
44  sendValues = cfg.get_bool(std::string(SnapStreamInterface::configKeySendValues));
45  if (cfg.has_bool_key(std::string(SnapStreamInterface::configKeySendStats)))
46  sendStats = cfg.get_bool(std::string(SnapStreamInterface::configKeySendStats));
47 
48  if (cfg.has_int_key(std::string(SnapStreamInterface::configKeySamplingInterval)))
49  sampling = std::chrono::milliseconds(cfg.get_int(std::string(SnapStreamInterface::configKeySamplingInterval)));
50  if (cfg.has_int_key(std::string(SnapStreamInterface::configKeyProcessingWindowLength)))
51  windowLength = cfg.get_int(std::string(SnapStreamInterface::configKeyProcessingWindowLength));
52  if (cfg.has_int_key(std::string(SnapStreamInterface::configKeyProcessingWindowOverlap)))
53  windowOverlap = cfg.get_int(std::string(SnapStreamInterface::configKeyProcessingWindowOverlap));
54 
55  if (cfg.has_bool_key(std::string(SnapStreamInterface::configKeyConvertToUnitsPerSecond)))
56  convert = cfg.get_bool(std::string(SnapStreamInterface::configKeyConvertToUnitsPerSecond));
57  if (cfg.has_int_key(std::string(SnapStreamInterface::configKeyUnchangedMetricTimeout)))
58  unchangedTimeout = cfg.get_int(std::string(SnapStreamInterface::configKeyUnchangedMetricTimeout));
59 
60  this->metricsController_.setSendValues(sendValues);
61  this->metricsController_.setSendStats(sendStats);
63  this->metricsController_.setProcessingWindow(windowLength, windowOverlap);
65  this->unchangedMetricsTimeout_ = unchangedTimeout;
66  }
67 
68 
69  void SnapStreamInterface::parseSnapMetrics(const std::vector<Plugin::Metric>& metrics) {
70  for (auto& value : *this->requestedMetrics_.sourceInterests())
71  value = false;
72  for (auto& value : this->requestedMetrics_)
74 
76 
77  for (const auto& metric : metrics) {
78  fieldName.clear();
79  for (size_t i = SnapStreamInterface::appPrefix.size(); i < metric.ns().size() - 2; i++)
80  fieldName.push_back(metric.ns()[i].get_value());
81  auto fieldIndexes = this->requestedMetrics_.indexesOfFieldName(fieldName);
82  auto stat = MetroCollect::Statistics::statsFromName(metric.ns()[metric.ns().size() - 2].get_value());
83  for (size_t i : fieldIndexes)
84  this->requestedMetrics_[i] |= MetroCollect::Statistics::statsRequiredForStat(stat);
85  }
86 
87  this->metricsController_.setRequestedMetrics(this->requestedMetrics_);
88  }
89 
90 
91  void SnapStreamInterface::insertAppPrefixToNamespace(std::vector<std::string>& ns) {
92  std::transform(SnapStreamInterface::appPrefix.begin(), SnapStreamInterface::appPrefix.end(), std::inserter(ns, ns.begin()), [&](auto s) { return std::string(s); });
93  }
94 
95  void SnapStreamInterface::createRequestedMetrics(MetricsPackage& package, std::string suffix, std::function<bool(MetroCollect::Statistics::Stats, bool*)> condition) {
96  for (size_t i = 0; i < this->requestedMetrics_.size(); i++) {
97  bool isSecondaryMetric = false;
98  if (condition(this->requestedMetrics_[i], &isSecondaryMetric)) {
99  auto fieldInfo = this->requestedMetrics_.fieldInfoAtIndex(i);
100  this->insertAppPrefixToNamespace(fieldInfo.name);
101  if (!suffix.empty())
102  fieldInfo.name.push_back(suffix);
103  Plugin::Namespace ns(fieldInfo.name);
104  for (const auto& dynamicIndex: fieldInfo.dynamicIndexes)
105  ns[dynamicIndex.index + SnapStreamInterface::appPrefix.size()].set_name(dynamicIndex.description);
106  if (!suffix.empty())
107  ns[ns.size() - 1].set_name(std::string(SnapStreamInterface::statNamespaceDescription));
108 
109  package.computedMetrics.emplace_back(ns, "", "");
110  auto itr = std::lower_bound(this->metricsController_.requestedMetrics().begin(), this->metricsController_.requestedMetrics().end(), i);
111  auto index = std::distance(this->metricsController_.requestedMetrics().begin(), itr);
112  package.metricsTimeoutPointer.push_back(&package.metricsTimeout[index]);
113  package.secondaryMetrics.push_back(isSecondaryMetric);
114  }
115  }
116  }
117 
119  package.metricsToSend.reserve(package.computedMetrics.size());
120  if (this->unchangedMetricsTimeout_ < ((size_t)-1)) {
121  for (size_t i = 0; i < package.metricsTimeout.size(); i++)
122  package.metricsTimeout[i] = (i * this->unchangedMetricsTimeout_) / package.metricsTimeout.size() + 1;
123  }
124  }
125 
126 
127  void SnapStreamInterface::sendMetrics(MetricsPackage& package, std::function<void(MetricsPackage::TimedMetrics&)> copyMetrics) {
128  bool shouldSend = true;
129  if (!package.currentMetrics) {
130  shouldSend = ((bool)package.nextMetrics);
131  package.currentMetrics = std::make_unique<MetricsPackage::TimedMetrics>();
132  package.currentMetrics->values.resize(package.computedMetrics.size());
133  }
134  std::swap(package.currentMetrics, package.nextMetrics);
135 
136  copyMetrics(*package.nextMetrics);
137 
138  if (!shouldSend)
139  return;
140 
141  for (size_t i = 0; i < package.computedMetrics.size(); i++) {
142  if (package.currentMetrics->values[i] != package.nextMetrics->values[i])
143  *package.metricsTimeoutPointer[i] = 1;
144  else if (*package.metricsTimeoutPointer[i] > 3 && package.currentMetrics->values[i] != 0)
145  *package.metricsTimeoutPointer[i] = 2;
146  }
147 
148  package.metricsToSend.clear();
149  for (size_t i = 0; i < package.computedMetrics.size(); i++) {
150  if (*package.metricsTimeoutPointer[i] <= 1 || (*package.metricsTimeoutPointer[i] <= 3 && !package.secondaryMetrics[i])) {
151  package.computedMetrics[i].set_data(package.currentMetrics->values[i]);
152  package.computedMetrics[i].set_timestamp(package.currentMetrics->timestamp);
153  package.metricsToSend.push_back(&package.computedMetrics[i]);
154  }
155  }
156  this->send_metrics(package.metricsToSend);
157 
158  if (this->unchangedMetricsTimeout_ < ((size_t)-1)) {
159  for (size_t i = 0; i < package.metricsTimeout.size(); i++) {
160  if (package.metricsTimeout[i] == 0)
161  package.metricsTimeout[i] = (i * this->unchangedMetricsTimeout_) / package.metricsTimeout.size() + 3;
162  else if(package.metricsTimeout[i] == 2 || package.metricsTimeout[i] == 3)
163  package.metricsTimeout[i] += this->unchangedMetricsTimeout_ - 1;
164  else
165  package.metricsTimeout[i]--;
166  }
167  } else
168  std::fill(package.metricsTimeout.begin(), package.metricsTimeout.end(), ((size_t)-1));
169  }
170 
171 
172  const Plugin::ConfigPolicy SnapStreamInterface::get_config_policy() {
173  Plugin::ConfigPolicy policy;
174  std::vector<std::string> ns;
175  std::vector<std::string> baseNamespace;
176  this->insertAppPrefixToNamespace(baseNamespace);
177  for (size_t i = 0; i < SnapStreamInterface::configKeysInt.size(); i++) {
178  ns = baseNamespace;
179  ns.emplace_back(SnapStreamInterface::configKeysInt[i]);
180  policy.add_rule(ns, Plugin::IntRule{std::string(SnapStreamInterface::configKeysInt[i]), {SnapStreamInterface::configValuesInt[i], false}});
181  }
182  for (size_t i = 0; i < SnapStreamInterface::configKeysBool.size(); i++) {
183  ns = baseNamespace;
184  ns.emplace_back(SnapStreamInterface::configKeysBool[i]);
185  policy.add_rule(ns, Plugin::BoolRule{std::string(SnapStreamInterface::configKeysBool[i]), {SnapStreamInterface::configValuesBool[i], false}});
186  }
187  return policy;
188  }
189 
190  std::vector<Plugin::Metric> SnapStreamInterface::get_metric_types(Plugin::Config cfg) {
191  std::vector<Plugin::Metric> metrics;
192  this->setConfig(cfg);
193 
194  auto fieldsInfo = this->requestedMetrics_.allFieldsInfo();
195  for (auto& fieldInfo : fieldsInfo) {
196  this->insertAppPrefixToNamespace(fieldInfo.name);
197  Plugin::Namespace ns{fieldInfo.name};
198  for (const auto& dynamicIndex: fieldInfo.dynamicIndexes)
199  ns[dynamicIndex.index + SnapStreamInterface::appPrefix.size()].set_name(dynamicIndex.description);
200  ns.add_dynamic_element(std::string(SnapStreamInterface::statNamespaceDescription));
201  ns.add_static_element(std::string(SnapStreamInterface::statNamespaceLastComponent));
202  metrics.emplace_back(ns, fieldInfo.unit, fieldInfo.description);
203  }
204 
205  return metrics;
206  }
207 
209  if (this->valuesPackage_.computedMetrics.size() == 0 && this->statsPackage_.computedMetrics.size() == 0)
210  return;
211 
213  }
214 
215  void SnapStreamInterface::get_metrics_in(std::vector<Plugin::Metric> &metsIn) {
216  if (metsIn.size() == 0)
217  return;
218 
219  this->setConfig(metsIn.front().get_config());
220 
221  this->parseSnapMetrics(metsIn);
222 
223  if (this->metricsController_.sendValues()) {
226  *isSecondaryMetric = false;
227  return value != 0;
228  });
229  this->fillMetricsPackage(this->valuesPackage_);
230  }
231 
232  if (this->metricsController_.sendStats()) {
234  for (size_t statIndex = 0; statIndex < MetroCollect::Statistics::count; statIndex++) {
235  this->createRequestedMetrics(this->statsPackage_, std::string(MetroCollect::Statistics::names[statIndex]), [&](MetroCollect::Statistics::Stats value, bool* isSecondaryMetric) {
237  return value & MetroCollect::Statistics::allStats[statIndex];
238  });
239  }
240  this->fillMetricsPackage(this->statsPackage_);
241  }
242  }
243 
244 
246  this->sendMetrics(this->valuesPackage_, [&](MetricsPackage::TimedMetrics& array) {
247  size_t index = 0;
248  array.timestamp = metricsDiff.endTime();
249  for (size_t metricIndex : this->metricsController_.requestedMetrics()) {
250  array.values[index] = metricsDiff[metricIndex];
251  index++;
252  }
253  });
254  }
255 
257  this->sendMetrics(this->statsPackage_, [&](MetricsPackage::TimedMetrics& array) {
258  size_t index = 0;
259  array.timestamp = metricsStats.min.endTime();
260  metricsStats.forEach([&](const auto& statsMetrics, MetroCollect::Statistics::Stats ) {
261  for (const auto& val : statsMetrics.indexedValues()) {
262  array.values[index] = val.value;
263  index++;
264  }
265  });
266  });
267  }
268 
270  return this->context_cancelled();
271  }
272 }
273 
274 
275 int main(int argc, char* argv[]) {
276  Plugin::Meta meta(Plugin::Type::StreamCollector, std::string(SnapInterface::SnapStreamInterface::appName), SnapInterface::SnapStreamInterface::appVersion, Plugin::RpcType::GRPCStream);
277  meta.concurrency_count = 1;
278  meta.exclusive = false;
279  meta.strategy = Plugin::Strategy::Sticky;
280  meta.cache_ttl = 1ms;
282  Plugin::start_stream_collector(argc, argv, &plugin, meta);
283  return 0;
284 }
MetricsSource::SourceInterests sourceInterests() const noexcept
Returns the source interests.
Definition: MetricsArray.cc:57
static constexpr std::string_view configKeyConvertToUnitsPerSecond
Snap plugin configuration key.
static constexpr std::chrono::milliseconds defaultSamplingInterval
Default parameter option.
static constexpr bool defaultConvertToUnitsPerSecond
Default parameter option.
Struct to store all final metric stats.
std::vector< Plugin::Metric > get_metric_types(Plugin::Config cfg) override final
Returns all metrics that can be fetched by the plugin to Snap.
const std::chrono::system_clock::time_point & endTime() const noexcept
Get time of latest metrics.
void setConfig(const Plugin::Config &cfg)
Configures the plugin and the controller accordingly, unspecified values are reset to default...
void setRequestedMetrics(const MetricsArray< Statistics::Stats > &requestedMetrics) noexcept
Sets the list of request metrics.
Interface class between MetricsController and Snap&#39;s streaming-collector plugin class.
MetroCollect::MetricsArray< MetroCollect::Statistics::Stats > requestedMetrics_
Array of requested metrics.
std::vector< std::string > FieldName
Type used for field names (an array of strings)
Definition: SourceField.h:31
static constexpr std::array configKeysBool
Array of boolean-valued configuration keys.
static constexpr std::array appPrefix
Snap metric name prefix.
MetroCollect::MetricsController metricsController_
MetricsController used to collect statistics.
static constexpr size_t defaultProcessingWindowLength
Default parameter option.
static constexpr std::string_view statNamespaceDescription
Snap metric name description.
static constexpr std::array< int, configKeysInt.size()> configValuesInt
Array of integer-valued configuration default values.
size_t unchangedMetricsTimeout_
Sub-sampling period for constant metrics.
std::vector< bool > secondaryMetrics
Array to mark the most important metric stat (average)
Strcut to store Snap metric objects and metric data to make it faster to send them to Snap...
static constexpr std::string_view configKeyProcessingWindowOverlap
Snap plugin configuration key.
bool sendValues() const noexcept
Returns whether the controller alerts the delegate after collecting new values.
static Stats statsRequiredForStat(Stats stat)
Returns the stats which are required to compute this stat.
static constexpr bool defaultSendStats
Snap plugin configuration default value.
static constexpr size_t defaultUnchangedMetricTimeout
Snap plugin configuration default value.
static constexpr std::string_view statNamespaceLastComponent
Snap metric name category.
Stats
Bitwise type te select statistics.
Definition: Statistics.h:64
void setSamplingInterval(std::chrono::milliseconds interval) noexcept
Sets the metrics sampling interval.
void setSendStats(bool sendStats) noexcept
Sets whether the controller alerts the delegate after computing statistics.
const std::chrono::system_clock::time_point & endTime() const noexcept
Get time of latest metrics.
void collectMetrics()
Launch metric collection loop.
void setConvertToUnitsPerSeconds(bool convertToUnitsPerSecond) noexcept
Sets whether to convert metric differences to units per second.
void stream_metrics() override final
Stream metrics to Snap.
void get_metrics_in(std::vector< Plugin::Metric > &metsIn) override final
Gets the user&#39;s requested metrics from Snap.
const std::vector< size_t > & requestedMetrics() const noexcept
Returns the list of request metrics.
std::vector< size_t > metricsTimeout
Sub-sampling counters for constant metrics.
std::vector< size_t * > metricsTimeoutPointer
Array to match metrics to their sub-sampling counter (one is shared for all statistics) ...
static constexpr std::string_view configKeySendStats
Snap plugin configuration key.
static constexpr std::string_view configKeySendValues
Snap plugin configuration key.
std::vector< Plugin::Metric * > metricsToSend
Array of pointers to Snap metrics to be sent.
static constexpr size_t count
Number of available statistics.
Definition: Statistics.h:57
int main(int argc, char *argv[])
MetroCollect&#39;s main function.
static constexpr std::array< Stats, count > allStats
Array of all available statistics.
Definition: Statistics.h:73
void sendMetrics(MetricsPackage &package, std::function< void(MetricsPackage::TimedMetrics &)> copyMetrics)
Filters and sends metrics form a package to Snap.
std::vector< Plugin::Metric > computedMetrics
Cache for Snap metrics.
static constexpr std::array configKeysInt
Array of integer-valued configuration keys.
void metricsContollerCollectedMetricsStats(const MetroCollect::MetricsController &metricsController, const MetroCollect::MetricsController::MetricsStats &metricsStats) override final
Function called every time the MetricsController has computed statistics.
void createRequestedMetrics(MetricsPackage &package, std::string suffix, std::function< bool(MetroCollect::Statistics::Stats, bool *)> condition)
Configures a MetricsPackage object according to requested metrics.
static constexpr size_t defaultProcessingWindowOverlap
Default parameter option.
void forEach(Function func)
Execute a function for each stats array.
static constexpr std::string_view appName
Snap plugin name.
static constexpr std::string_view configKeyProcessingWindowLength
Snap plugin configuration key.
void parseSnapMetrics(const std::vector< Plugin::Metric > &metrics)
Generate requestedMetrics array from list of Snap metrics.
void setProcessingWindow(size_t length, size_t overlap) noexcept
Sets the processing window parameters used to compute statistics.
const MetricsSource::FieldInfo fieldInfoAtIndex(size_t index) const noexcept
Get details about a specific metric field.
static Stats statsFromName(const std::string_view &statName)
Convert a stat name to a stat value.
Class to fetch metrics from the kernel and store them.
bool sendStats() const noexcept
Returns whether the controller alerts the delegate after computing statistics.
const Plugin::ConfigPolicy get_config_policy() override final
Returns the plugin configuration keys and default values to Snap.
const std::vector< MetricsSource::FieldInfo > allFieldsInfo() const noexcept
Get details about all fields.
std::unique_ptr< TimedMetrics > currentMetrics
Earlier values that may be sent.
void setSendValues(bool sendValues) noexcept
Sets whether the controller alerts the delegate after collecting new values.
static constexpr bool defaultSendValues
Snap plugin configuration default value.
void clear(size_t metricsCount)
Reinitializes the receiver and allocates space for metrics.
std::chrono::system_clock::time_point timestamp
Timestamp of the metrics.
MetricsPackage statsPackage_
MetricsPackage for metrics from computed statistics.
No statistics selected.
Definition: Statistics.h:70
std::unique_ptr< TimedMetrics > nextMetrics
Latest values kept to be sent on the next iteration.
static constexpr std::array< std::string_view, count > names
Names of available statistics.
Definition: Statistics.h:58
void fillMetricsPackage(MetricsPackage &package)
Initializes a MetricsPackage sub-sampling counters.
void insertAppPrefixToNamespace(std::vector< std::string > &ns)
Inserts plugin name prefix into Snap namespace.
bool metricsContollerShouldStopCollectingMetrics(const MetroCollect::MetricsController &metricsController) override final
Function called after each iteration to ask the delegate whether the controller should stop collectin...
std::vector< MetroCollect::DiffValueType > values
Array of metrics.
MetricsStatsArray< DiffValueType > min
Array of minimums.
Class to control the collection of metrics.
static constexpr std::string_view configKeyUnchangedMetricTimeout
Snap plugin configuration key.
Class to compute and store metric values variations.
static constexpr std::array< bool, configKeysBool.size()> configValuesBool
Array of boolean-valued configuration default values.
MetricsPackage valuesPackage_
MetricsPackage for metrics from raw counters variations.
size_t size() const noexcept
Returns the size of the underlying array.
static constexpr int appVersion
Snap plugin version.
static constexpr std::string_view configKeySamplingInterval
Snap plugin configuration key.
void metricsContollerCollectedMetricsValues(const MetroCollect::MetricsController &metricsController, const MetroCollect::MetricsDiffArray &metricsDiff, const MetroCollect::MetricsDataArray &previousMetrics, const MetroCollect::MetricsDataArray &currentMetrics) override final
Function called every time the MetricsController has collected new values.