Essentia  2.1-beta6-dev
phantombuffer_impl.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2021 Music Technology Group - Universitat Pompeu Fabra
3  *
4  * This file is part of Essentia
5  *
6  * Essentia is free software: you can redistribute it and/or modify it under
7  * the terms of the GNU Affero General Public License as published by the Free
8  * Software Foundation (FSF), either version 3 of the License, or (at your
9  * option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13  * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
14  * details.
15  *
16  * You should have received a copy of the Affero GNU General Public License
17  * version 3 along with this program. If not, see http://www.gnu.org/licenses/
18  */
19 
20 #ifndef ESSENTIA_PHANTOMBUFFER_IMPL_H
21 #define ESSENTIA_PHANTOMBUFFER_IMPL_H
22 
23 #include "streamingalgorithm.h"
24 
25 namespace essentia {
26 namespace streaming {
27 
28 template <typename T>
29 const std::vector<T>& PhantomBuffer<T>::readView(ReaderID id) const {
30  return _readView[id];
31 }
32 
33 
34 template <typename T>
36  // add read window & view, just at where our write window is
37  Window w;
38  if (!startFromZero) {
39  w.end = w.begin = _writeWindow.begin;
40  }
41  _readWindow.push_back(w);
42 
43  ReaderID id = _readWindow.size() - 1; // index of last one
44 
45  _readView.push_back(RogueVector<T>());
46  updateReadView(id);
47 
48  return id;
49 }
50 
51 template <typename T>
53  _readView.erase(_readView.begin() + id);
54  _readWindow.erase(_readWindow.begin() + id);
55 }
56 
57 
58 template <typename T>
60  return _readWindow.size();
61 }
62 
63 
68 template <typename T>
69 bool PhantomBuffer<T>::acquireForRead(ReaderID id, int requested) {
70 
71  //DEBUG_NL("acquire " << requested << " for read (id: " << id << "), (" << availableForRead(id) << " available)");
72 
73  // we can afford to have phantomSize + 1 here, because either:
74  // 1) we're strictly before the phantom zone (from at least 1 token), so no pb
75  // 2) we're just at the beginning of the phantom zone, but in that case we
76  // should have been relocated to the beginning of the buffer
77  if (requested > (_phantomSize + 1)) {
78  // warning: this could cause a buffer to block, we need to reallocate or throw an exception here
79  std::ostringstream msg;
80  msg << "acquireForRead: Requested number of tokens (" << requested << ") > phantom size (" << _phantomSize << ")";
81  msg << " in " << _parent->fullName() << " → " << _parent->sinks()[id]->fullName();
82  throw EssentiaException(msg);
83  }
84 
85  MutexLocker lock(mutex); NOWARN_UNUSED(lock);
86  if (availableForRead(id) < requested) return false;
87 
88  _readWindow[id].end = _readWindow[id].begin + requested;
89  updateReadView(id);
90 
91  return true;
92 }
93 
100 template <typename T>
102 
103  //DEBUG_NL("acquire " << requested << " for write... (" << availableForWrite() << " available)");
104 
105  if (requested > (_phantomSize + 1)) {
106  // warning: this could cause a buffer to block, we need to reallocate or throw an exception here
107  std::ostringstream msg;
108  msg << "acquireForWrite: Requested number of tokens (" << requested << ") > phantom size (" << _phantomSize << ")";
109  msg << " in " << _parent->fullName();
110  throw EssentiaException(msg);
111  }
112 
113  MutexLocker lock(mutex); NOWARN_UNUSED(lock);
114  if (availableForWrite() < requested) return false;
115 
116  _writeWindow.end = _writeWindow.begin + requested;
117  updateWriteView();
118 
119  return true;
120 }
121 
122 template <typename T>
124  MutexLocker lock(mutex); NOWARN_UNUSED(lock);
125 
126  // error checking:
127  if (released > _writeWindow.end - _writeWindow.begin) {
128  std::ostringstream msg;
129  msg << _parent->fullName() << ": releasing too many tokens (write access): "
130  << released << " instead of " << _writeWindow.end - _writeWindow.begin << " max allowed";
131  throw EssentiaException(msg);
132  }
133 
134  // replicate from the beginning to the phantom zone if necessary
135  if (_writeWindow.begin < _phantomSize) {
136  T* first = &_buffer[_writeWindow.begin];
137  T* last = &_buffer[(std::min)(_writeWindow.begin + released, _phantomSize)];
138  T* result = &_buffer[_writeWindow.begin + _bufferSize];
139  fastcopy(result, first, last-first);
140  }
141  // replicate from the phantom zone to the beginning if necessary
142  else if (_writeWindow.end > _bufferSize) {
143  int beginIdx = (std::max)(_writeWindow.begin, (int)_bufferSize);
144  T* first = &_buffer[beginIdx];
145  T* last = &_buffer[_writeWindow.end];
146  T* result = &_buffer[beginIdx - _bufferSize];
147  fastcopy(result, first, last-first);
148  }
149 
150  _writeWindow.begin += released;
151  relocateWriteWindow();
152  updateWriteView();
153 
154  //DEBUG_NL(" - total written tokens: " << _writeWindow.total(_bufferSize));
155 }
156 
157 template <typename T>
159  MutexLocker lock(mutex); NOWARN_UNUSED(lock);
160  Window& w = _readWindow[id];
161 
162  // error checking:
163  if (released > w.end - w.begin) {
164  std::ostringstream msg;
165  msg << _parent->fullName() << ": releasing too many tokens (read access): "
166  << released << " instead of " << w.end - w.begin << " max allowed";
167  throw EssentiaException(msg);
168  }
169 
170  w.begin += released;
171  relocateReadWindow(id);
172  updateReadView(id);
173 
174  //DEBUG_NL(" - total read tokens: " << w.total(_bufferSize));
175 }
176 
177 
179 
180 
181 template <typename T>
183  // only do it when necessary
184  if (_writeWindow.turn < 1000000)
185  return;
186 
187  // get maximum number of turns we can substract
188  int m = _writeWindow.turn;
189 
190  for (uint i=0; i<_readWindow.size(); i++) {
191  m = (std::min)(m, _readWindow[i].turn);
192  }
193  // substract turns
194  _writeWindow.turn -= m;
195  for (uint i=0; i<_readWindow.size(); i++) {
196  _readWindow[i].turn -= m;
197  }
198 }
199 
200 template <typename T>
202  const RogueVector<T>& vconst = static_cast<const RogueVector<T>&>(readView(id));
203  RogueVector<T>& v = const_cast<RogueVector<T>&>(vconst);
204  v.setData(&_buffer[0] + _readWindow[id].begin);
205  v.setSize(_readWindow[id].end - _readWindow[id].begin);
206 }
207 
208 template <typename T>
210  _writeView.setData(&_buffer[0] + _writeWindow.begin);
211  _writeView.setSize(_writeWindow.end - _writeWindow.begin);
212 }
213 
214 
215 // mutex should be locked before entering this function
216 // make sure it doesn't overflow
224 template <typename T>
226  //relocateReadWindow(id); // this call should be useless, but it's a safety guard to have it
227 
228  int theoretical = _writeWindow.total(_bufferSize) - _readWindow[id].total(_bufferSize);
229  int contiguous = _bufferSize + _phantomSize - _readWindow[id].begin;
230 
231  /*
232  DEBUG_NL("avail for read: " << _readWindow[id].total(_bufferSize)
233  << " write: " << _writeWindow.total(_bufferSize)
234  << " final: " << min(theoretical, contiguous));
235  */
236 
237  return (std::min)(theoretical, contiguous);
238 }
239 
247 template <typename T>
248 int PhantomBuffer<T>::availableForWrite(bool contiguous) const {
249  //relocateWriteWindow(); // this call should be useless, but it's a safety guard to have it
250 
251  int minTotal = _bufferSize;
252  if (!_readWindow.empty()) { // someone is connected, take its value instead of bufferSize
253  minTotal = _readWindow.begin()->total(_bufferSize);
254  }
255 
256  //DEBUG_PLAIN(_writeWindow.total(_bufferSize) << " read:");
257 
258  // for each read window, find the one that is the latest, as it is the one
259  // that the write window should not overtake.
260  for (uint i=0; i<_readWindow.size(); i++) {
261  const Window& w = _readWindow[i];
262  minTotal = (std::min)(minTotal, w.total(_bufferSize));
263  }
264 
265  int theoretical = minTotal - _writeWindow.total(_bufferSize) + _bufferSize;
266  if (!contiguous) {
267  return theoretical;
268  }
269 
270  int ncontiguous = _bufferSize + _phantomSize - _writeWindow.begin;
271  return (std::min)(theoretical, ncontiguous);
272 }
273 
274 // reposition pointer if we're in the phantom zone
275 template <typename T>
277  if (_writeWindow.begin >= _bufferSize) {
278  _writeWindow.begin -= _bufferSize;
279  _writeWindow.end -= _bufferSize;
280  _writeWindow.turn++;
281  //resetTurns();
282  }
283 }
284 
285 // reposition pointer if we're in the phantom zone
286 template <typename T>
288  Window& w = _readWindow[id];
289  if (w.begin >= _bufferSize) {
290  w.begin -= _bufferSize;
291  w.end -= _bufferSize;
292  w.turn++;
293  //resetTurns();
294  }
295 }
296 
297 template <typename T>
299  // we don't need to clear the buffer, because when new data is written to the
300  // buffer, it will overwrite the old data, and no one can read the old data
301  // until new data is written
302  //_buffer.clear();
303  _writeWindow = Window();
304  for (int i=0; i<(int)_readWindow.size(); i++) {
305  _readWindow[i] = Window();
306  }
307 }
308 
309 } // namespace streaming
310 } // namespace essentia
311 
312 #endif // ESSENTIA_PHANTOMBUFFER_IMPL_H
Definition: types.h:77
Definition: threading.h:51
Definition: roguevector.h:30
void setSize(size_t size)
Definition: roguevector.h:82
void setData(T *data)
Definition: roguevector.h:79
void removeReader(ReaderID id)
Definition: phantombuffer_impl.h:52
bool acquireForRead(ReaderID id, int requested)
Definition: phantombuffer_impl.h:69
int availableForRead(ReaderID id) const
Definition: phantombuffer_impl.h:225
bool acquireForWrite(int requested)
Definition: phantombuffer_impl.h:101
void updateReadView(ReaderID id)
Definition: phantombuffer_impl.h:201
void resetTurns()
Definition: phantombuffer_impl.h:182
void relocateReadWindow(ReaderID id)
Definition: phantombuffer_impl.h:287
int numberReaders() const
Definition: phantombuffer_impl.h:59
const std::vector< T > & readView(ReaderID id) const
Definition: phantombuffer_impl.h:29
ReaderID addReader(bool startFromZero=false)
Definition: phantombuffer_impl.h:35
int availableForWrite(bool contiguous=true) const
Definition: phantombuffer_impl.h:248
void relocateWriteWindow()
Definition: phantombuffer_impl.h:276
void updateWriteView()
Definition: phantombuffer_impl.h:209
void reset()
Definition: phantombuffer_impl.h:298
void releaseForWrite(int released)
Definition: phantombuffer_impl.h:123
void releaseForRead(ReaderID id, int released)
Definition: phantombuffer_impl.h:158
Definition: phantombuffer.h:33
int begin
Definition: phantombuffer.h:35
int turn
Definition: phantombuffer.h:37
int end
Definition: phantombuffer.h:36
int total(int bufferSize) const
Definition: phantombuffer.h:41
#define NOWARN_UNUSED(expr)
Definition: essentiautil.h:42
Definition: algorithm.h:28
void fastcopy(T *dest, const T *src, int n)
Definition: essentiautil.h:180
int ReaderID
Definition: types.h:344
unsigned int uint
Definition: types.h:49