AnyCollect  1.1.2
Controller.cc
Go to the documentation of this file.
1 //
2 // Controller.cc
3 //
4 // Created on October 25th 2018
5 //
6 // Copyright 2018 CFM (www.cfm.fr)
7 //
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
11 //
12 // http://www.apache.org/licenses/LICENSE-2.0
13 //
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 //
20 
21 #include <thread>
22 
23 #if GPERFTOOLS_CPU_PROFILE
24 #include <gperftools/profiler.h>
25 #endif
26 
27 #include "Config.h"
28 #include "Controller.h"
29 
30 
31 namespace AnyCollect {
33  delegate_(delegate),
34  isCollecting_(false)
35  {
36  this->setSamplingInterval(Controller::defaultSamplingInterval);
37  }
38 
39 
41  return this->delegate_;
42  }
43 
44 
45  bool Controller::isCollecting() const noexcept {
46  return this->isCollecting_;
47  }
48 
49  std::chrono::seconds Controller::samplingInterval() const noexcept {
50  return this->samplingInterval_;
51  }
52 
53 
54  void Controller::loadConfigFromFile(const std::string& configPath) {
55  if (this->isCollecting_)
56  return;
57 
58  Config config = Config{configPath};
59  this->sources_.clear();
60  this->expressions_.clear();
61  this->matchers_.clear();
62  size_t fileIndex = 0;
63 
64  for (const auto& file : config.files) {
65  for (const auto& path : file.paths) {
67  for (const auto& p : paths)
68  this->sources_.push_back(std::make_shared<Source>(p));
69  }
70  for (const auto& expression : file.expressions) {
71  this->expressions_.push_back(std::make_shared<Expression>(expression.regex));
72  for (const auto& metric : expression.metrics) {
73  this->matchers_.push_back(std::make_shared<Matcher>(metric));
74  this->expressions_.back()->matchers().push_back(this->matchers_.back());
75  }
76  for (auto itr = this->sources_.begin() + fileIndex; itr != this->sources_.end(); itr++)
77  (*itr)->expressions().push_back(this->expressions_.back());
78  }
79  fileIndex = this->sources_.size();
80  }
81 
82  for (const auto& command : config.commands) {
83  this->sources_.push_back(std::make_shared<Source>(command.program, command.arguments));
84  for (const auto& expression : command.expressions) {
85  this->expressions_.push_back(std::make_shared<Expression>(expression.regex));
86  for (const auto& metric : expression.metrics) {
87  this->matchers_.push_back(std::make_shared<Matcher>(metric));
88  this->expressions_.back()->matchers().push_back(this->matchers_.back());
89  }
90  this->sources_.back()->expressions().push_back(this->expressions_.back());
91  }
92  }
93  }
94 
95 
96  void Controller::setSamplingInterval(std::chrono::seconds interval) noexcept {
97  if (this->isCollecting_)
98  return;
99 
100  this->samplingInterval_ = interval;
101  if (interval != 0s)
102  this->unitsPerSecondFactor_ = 1.0 / interval.count();
103  else
104  this->unitsPerSecondFactor_ = 1.0;
105  }
106 
107 
108  std::vector<const Metric*> Controller::availableMetrics() noexcept {
109  if (this->isCollecting_ || this->sources_.empty() || this->expressions_.empty() || this->matchers_.empty())
110  return {};
111 
112  this->roundKey_ += 10;
113 
114  this->updatedMetrics_.clear();
115  this->updateSources();
116  this->computeMatches();
117 
118  this->updatedMetrics_.clear();
119  for (const auto& itr : this->metrics_)
120  this->updatedMetrics_.push_back(&itr.second);
121 
122  this->roundKey_ += 10;
123  auto availableMetrics = std::move(this->updatedMetrics_);
124  this->updatedMetrics_.clear();
125  return availableMetrics;
126  }
127 
128  void Controller::collectMetrics() noexcept {
129  if (this->isCollecting_ || this->sources_.empty() || this->expressions_.empty() || this->matchers_.empty())
130  return;
131 #if GPERFTOOLS_CPU_PROFILE
132  ProfilerStart("/tmp/aa.prof");
133 #endif
134  this->isCollecting_ = true;
135  this->roundKey_ = 0;
136  auto startTime = std::chrono::steady_clock::now();
137 
138  this->updatedMetrics_.clear();
139  this->updateSources();
140  this->computeMatches();
141 
142  while (true) {
143  std::this_thread::sleep_for(this->samplingInterval_ - (std::chrono::steady_clock::now() - startTime));
144  startTime = std::chrono::steady_clock::now();
145 
146  this->updatedMetrics_.clear();
147  this->updateSources();
148  this->computeMatches();
149 
152  this->isCollecting_ = false;
153 #if GPERFTOOLS_CPU_PROFILE
154  ProfilerStop();
155 #endif
156  return;
157  }
158 #if GPERFTOOLS_CPU_PROFILE
159  ProfilerFlush();
160 #endif
161  }
162  }
163 
164 
165  void Controller::updateSources() noexcept {
166  for (auto& source : this->sources_)
167  source->update();
168  }
169 
170  void Controller::computeMatches() noexcept {
171  for (const auto& source : this->sources_) {
172  auto begin = source->begin();
173  while(begin != source->end()) {
174  auto end = source->getLine(begin);
175  if (begin != end) {
176  for (const auto& expression : source->expressions()) {
177  auto& match = expression->apply(begin, end);
178  if (!match.empty()) {
179  for (const auto& matcher : expression->matchers())
180  this->parseData(*source, match, *matcher);
181  }
182  }
183  }
184  if (end != source->end())
185  begin = end + 1;
186  else
187  break;
188  }
189  }
190  this->roundKey_++;
191  }
192 
193  void Controller::parseData(const Source& source, const std::cmatch& match, const Matcher& matcher) noexcept {
194  auto value = matcher.getValue(match, source.pathParts());
195  if (!value.has_value())
196  return;
197  auto newMetric = matcher.getMetric(match, source.pathParts());
198  if (!newMetric.has_value())
199  return;
200 
201  auto itr = this->metrics_.find(newMetric.value().key());
202  bool isNew = (itr == this->metrics_.end());
203  if (isNew)
204  itr = this->metrics_.insert_or_assign(this->metrics_.begin(), newMetric.value().key(), std::move(newMetric.value()));
205 
206  Metric& metric = itr->second;
207  isNew = (isNew || (metric.roundKey() != this->roundKey_ - 1));
208  if (metric.roundKey() != this->roundKey_) {
209  metric.setNewValue(value.value(), matcher.computeRate(), matcher.convertToUnitsPerSecond() ? this->unitsPerSecondFactor_ : 1.0);
210  if ((!isNew || !matcher.computeRate()))
211  this->updatedMetrics_.push_back(&metric);
212  } else {
213  metric.updateValue(value.value(), matcher.convertToUnitsPerSecond() ? this->unitsPerSecondFactor_ : 1.0);
214  }
215  metric.setTimestamp(source.timestamp());
216  metric.setRoundKey(this->roundKey_);
217  }
218 
219 }
void loadConfigFromFile(const std::string &configPath)
Configures sources, expressions and matchers according to config file.
Definition: Controller.cc:54
void setTimestamp(std::chrono::system_clock::time_point timestamp) noexcept
Sets the timestamp of the metric.
Definition: Metric.cc:140
bool isCollecting() const noexcept
Returns whether the controller is collecting metrics.
Definition: Controller.cc:45
void setNewValue(double value, bool computeRate, double unitsPerSecondFactor=1.0) noexcept
Sets a new value to the metric.
Definition: Metric.cc:124
void setSamplingInterval(std::chrono::seconds interval) noexcept
Sets the metrics sampling interval.
Definition: Controller.cc:96
virtual void contollerCollectedMetrics(const Controller &controller, const std::vector< const Metric *> &metrics)=0
Function called every time the Controller has collected metrics.
std::map< size_t, Metric > metrics_
Map associating keys to their metric.
Definition: Controller.h:62
std::vector< std::shared_ptr< Matcher > > matchers_
Array of matchers.
Definition: Controller.h:61
void parseData(const Source &source, const std::cmatch &match, const Matcher &matcher) noexcept
Updates metrics from a match.
Definition: Controller.cc:193
static constexpr std::chrono::seconds defaultSamplingInterval
Default parameter option.
Definition: Controller.h:48
ControllerDelegate & delegate_
Delegate to alert when something happens.
Definition: Controller.h:52
std::vector< Config::command > commands
Definition: Config.h:96
Controller(ControllerDelegate &delegate) noexcept
Construct a new Controller object.
Definition: Controller.cc:32
Class used to represent a source (file or command output) from which metrics can be matched...
Definition: Source.h:39
Struct used to represent the JSON configuration file.
Definition: Config.h:35
std::chrono::seconds samplingInterval() const noexcept
Returns the metrics sampling interval.
Definition: Controller.cc:49
static std::vector< std::string > filePathsMatchingGlobbingPattern(const std::string &pattern) noexcept
Returns all file paths that match a pattern as defined by POSIX.
Definition: Source.cc:51
Class used to represent a matcher, that is a way to convert expressions matches into a metric...
Definition: Matcher.h:40
void updateSources() noexcept
Updates sources (fetches file contents and executes commands)
Definition: Controller.cc:165
bool isCollecting_
Whether the receiver is collecting metrics.
Definition: Controller.h:54
size_t roundKey() const noexcept
Returns the key of the collection iteration which created the metric.
Definition: Metric.cc:103
size_t roundKey_
Metric collection iteration unique identifier.
Definition: Controller.h:57
std::vector< std::shared_ptr< Source > > sources_
Array of sources.
Definition: Controller.h:59
std::vector< const Metric * > updatedMetrics_
Array of pointers to the iteration&#39;s metrics.
Definition: Controller.h:63
void collectMetrics() noexcept
Launch metric collection loop.
Definition: Controller.cc:128
ControllerDelegate & delegate() const noexcept
Returns the controller&#39;s delegate.
Definition: Controller.cc:40
std::vector< Config::file > files
Definition: Config.h:95
std::vector< std::shared_ptr< Expression > > expressions_
Array of expressions.
Definition: Controller.h:60
Class used to represent a metric object (with a name, unit, tags, a value and a timestamp) ...
Definition: Metric.h:33
Abstract delegate of Controller.
Definition: Controller.h:142
void setRoundKey(size_t roundKey) noexcept
Sets the round key of the metric.
Definition: Metric.cc:144
std::vector< const Metric * > availableMetrics() noexcept
Returns the array of all currently matching metrics on the system, without their values.
Definition: Controller.cc:108
void computeMatches() noexcept
For each line of each source, executes the source&#39;s expressions to find matches.
Definition: Controller.cc:170
virtual bool contollerShouldStopCollectingMetrics(const Controller &controller)=0
Function called after each iteration to ask the delegate whether the controller should stop collectin...
std::chrono::seconds samplingInterval_
Metrics sampling interval.
Definition: Controller.h:55
void updateValue(double value, double unitsPerSecondFactor=1.0) noexcept
Adds a new value to the current value.
Definition: Metric.cc:133