Essentia  2.1-beta6-dev
ringbufferimpl.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2021 Music Technology Group - Universitat Pompeu Fabra
3  *
4  * This file is part of Essentia
5  *
6  * Essentia is free software: you can redistribute it and/or modify it under
7  * the terms of the GNU Affero General Public License as published by the Free
8  * Software Foundation (FSF), either version 3 of the License, or (at your
9  * option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13  * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
14  * details.
15  *
16  * You should have received a copy of the Affero GNU General Public License
17  * version 3 along with this program. If not, see http://www.gnu.org/licenses/
18  */
19 
20 #ifndef ESSENTIA_STREAMING_RINGBUFFERIMPL_H
21 #define ESSENTIA_STREAMING_RINGBUFFERIMPL_H
22 
23 #include "atomic.h"
24 
25 #ifdef OS_WIN32
26 
27 #include <windows.h>
28 
29 class Condition {
30  protected:
31  int waitersCount;
32  CRITICAL_SECTION conditionLock;
33  CRITICAL_SECTION waitersCountLock;
34  HANDLE event;
35 
36  public:
37  Condition() {
38  InitializeCriticalSection(&conditionLock);
39  InitializeCriticalSection(&waitersCountLock);
40  event = CreateEvent (NULL, // no security
41  FALSE, // auto-reset event
42  FALSE, // non-signaled initially
43  NULL); // unnamed
44  waitersCount = 0;
45  }
46 
47  void lock() { EnterCriticalSection(&conditionLock); }
48  void unlock() { LeaveCriticalSection(&conditionLock); }
49 
50  void wait() {
51  EnterCriticalSection(&waitersCountLock);
52  waitersCount++;
53  LeaveCriticalSection(&waitersCountLock);
54 
55  LeaveCriticalSection(&conditionLock);
56 
57  int result = WaitForSingleObject(event, INFINITE);
58 
59  EnterCriticalSection(&waitersCountLock);
60  waitersCount--;
61  LeaveCriticalSection(&waitersCountLock);
62 
63  EnterCriticalSection(&conditionLock);
64  }
65 
66  void signal() {
67  // Avoid race conditions.
68  EnterCriticalSection(&waitersCountLock);
69  bool haveWaiters = waitersCount > 0;
70  LeaveCriticalSection(&waitersCountLock);
71 
72  if (haveWaiters)
73  SetEvent(event);
74  }
75 };
76 
77 
78 #else // OS_WIN32
79 
80 #include <pthread.h>
81 
82 class Condition {
83  protected:
84  pthread_mutex_t pthreadMutex;
85  pthread_cond_t pthreadCondition;
86 
87  public:
89  pthread_mutex_init(&pthreadMutex,0);
90  pthread_cond_init(&pthreadCondition,0);
91  }
92 
93  void lock() { pthread_mutex_lock(&pthreadMutex); }
94  void unlock() { pthread_mutex_unlock(&pthreadMutex); }
95  void wait() { pthread_cond_wait(&pthreadCondition, &pthreadMutex); }
96  void signal() { pthread_cond_signal(&pthreadCondition); }
97 
98 };
99 
100 
101 #endif // OS_WIN32
102 
103 
104 namespace essentia {
105 namespace streaming {
106 
108  public:
110 
113 
114  Atomic _available;
115  Atomic _space;
116 
118 
120 
121  // whether to wait for space (to add data to the buffer)
122  // or for availability of data (when reading data from the buffer)
124  {
127 
128  RingBufferImpl(WaitingCondition c, int bufferSize)
129  : _bufferSize(bufferSize)
130  , _writeIndex(0)
131  , _readIndex(0)
132  , _available(0)
134  , _waitingCondition(c)
135  {
136  _buffer = new Real[_bufferSize];
137  }
138 
140  {
141  delete [] _buffer;
142  }
143 
144  void reset() {
145  _writeIndex = 0;
146  _readIndex = 0;
147  _available = 0;
149  delete[] _buffer;
150  _buffer = new Real[_bufferSize];
151  }
152 
153  void waitAvailable(void)
154  {
155  // this function should only be called if the waiting condition
156  // has been set accordingly
157  assert(_waitingCondition == kAvailable);
158 
159  condition.lock();
160 
161  while (_available == 0)
162  {
163  condition.wait();
164  }
165 
166  condition.unlock();
167  }
168 
169  void waitSpace(void)
170  {
171  // this function should only be called if the waiting condition
172  // has been set accordingly
173  assert(_waitingCondition == kSpace);
174 
175  condition.lock();
176 
177  while (_space == 0)
178  {
179  condition.wait();
180  }
181 
182  condition.unlock();
183  }
184 
185  int add(const Real* inputData, int inputSize)
186  {
187  int size = _space;
188  if (size > inputSize) size = inputSize;
189 
190  if (_writeIndex + size > _bufferSize)
191  {
192  int n = _bufferSize - _writeIndex;
193  memcpy( &_buffer[_writeIndex], inputData, n * sizeof(AudioSample));
194  memcpy( _buffer, &inputData[n], (size - n)*sizeof(AudioSample));
195  _writeIndex = (size - n);
196  } else {
197  memcpy( &_buffer[_writeIndex], inputData, size * sizeof(AudioSample));
198  _writeIndex += size;
199  }
200  _space -= size;
201  _available += size;
202 
203  condition.lock();
205  {
206  // the thread that is using this ringbuffer will be waiting for
207  // data to become available - typically the essentia-part from
208  // a RingBufferInput. we signal the waiting condition here
209  condition.signal();
210  }
211  condition.unlock();
212 
213  return size;
214  }
215 
216  int get(Real* outputData, int outputSize)
217  {
218  int size = _available;
219  if (size > outputSize) size = outputSize;
220 
221  assert(size <= _bufferSize);
222  if (_readIndex + size > _bufferSize)
223  {
224  int n = _bufferSize - _readIndex;
225  memcpy( outputData, &_buffer[_readIndex], n * sizeof(AudioSample));
226  memcpy( &outputData[n], _buffer, (size - n)*sizeof(AudioSample));
227  _readIndex = (size - n);
228  } else {
229  memcpy( outputData, &_buffer[_readIndex], size * sizeof(AudioSample));
230  _readIndex += size;
231  }
232  _available -= size;
233  _space += size;
234 
235  condition.lock();
236  if (_waitingCondition == kSpace)
237  {
238  // the thread that is using this ringbuffer will be waiting for
239  // space in the buffer - typically the essentia-part from
240  // a RingBufferOutput. we signal the waiting condition here
241  condition.signal();
242  }
243  condition.unlock();
244 
245  return size;
246  }
247 
248 };
249 
250 } // namespace streaming
251 } // namespace essentia
252 
253 #endif // ESSENTIA_STREAMING_RINGBUFFERIMPL_H
Definition: ringbufferimpl.h:82
Condition()
Definition: ringbufferimpl.h:88
void signal()
Definition: ringbufferimpl.h:96
void unlock()
Definition: ringbufferimpl.h:94
void wait()
Definition: ringbufferimpl.h:95
void lock()
Definition: ringbufferimpl.h:93
pthread_mutex_t pthreadMutex
Definition: ringbufferimpl.h:84
pthread_cond_t pthreadCondition
Definition: ringbufferimpl.h:85
Definition: ringbufferimpl.h:107
enum essentia::streaming::RingBufferImpl::WaitingCondition _waitingCondition
void waitAvailable(void)
Definition: ringbufferimpl.h:153
Atomic _space
Definition: ringbufferimpl.h:115
int _writeIndex
Definition: ringbufferimpl.h:111
~RingBufferImpl()
Definition: ringbufferimpl.h:139
Real * _buffer
Definition: ringbufferimpl.h:117
int get(Real *outputData, int outputSize)
Definition: ringbufferimpl.h:216
void waitSpace(void)
Definition: ringbufferimpl.h:169
int add(const Real *inputData, int inputSize)
Definition: ringbufferimpl.h:185
Atomic _available
Definition: ringbufferimpl.h:114
void reset()
Definition: ringbufferimpl.h:144
int _readIndex
Definition: ringbufferimpl.h:112
WaitingCondition
Definition: ringbufferimpl.h:124
@ kAvailable
Definition: ringbufferimpl.h:125
@ kSpace
Definition: ringbufferimpl.h:125
RingBufferImpl(WaitingCondition c, int bufferSize)
Definition: ringbufferimpl.h:128
int _bufferSize
Definition: ringbufferimpl.h:109
Condition condition
Definition: ringbufferimpl.h:119
Definition: algorithm.h:28
Real AudioSample
Definition: types.h:349
float Real
Definition: types.h:69
#define NULL
Definition: tnt_i_refvec.h:33