Essentia  2.1-beta5-dev
sinkproxy.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2006-2016 Music Technology Group - Universitat Pompeu Fabra
3  *
4  * This file is part of Essentia
5  *
6  * Essentia is free software: you can redistribute it and/or modify it under
7  * the terms of the GNU Affero General Public License as published by the Free
8  * Software Foundation (FSF), either version 3 of the License, or (at your
9  * option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful, but WITHOUT
12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13  * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
14  * details.
15  *
16  * You should have received a copy of the Affero GNU General Public License
17  * version 3 along with this program. If not, see http://www.gnu.org/licenses/
18  */
19 
20 #ifndef ESSENTIA_SINKPROXY_H
21 #define ESSENTIA_SINKPROXY_H
22 
23 #include "sourcebase.h"
24 #include "multiratebuffer.h"
25 
26 namespace essentia {
27 namespace streaming {
28 
29 
30 class SinkProxyBase : public SinkBase {
31  protected:
33 
34  public:
35  SinkProxyBase(Algorithm* parent = 0, const std::string& name = "unnamed") :
36  SinkBase(parent, name), _proxiedSink(0) {}
37 
38  SinkProxyBase(const std::string& name) : SinkBase(name), _proxiedSink(0) {}
39 
41  E_DEBUG(EMemory, "Deleting SinkProxy " << fullName());
42  if (_proxiedSink) essentia::streaming::detach(*this, *_proxiedSink);
43  }
44 
45 
46  const void* buffer() const {
47  if (!_source)
48  throw EssentiaException("SinkProxy ", fullName(), " is not currently connected to another Source");
49 
50  return _source->buffer();
51  }
52 
53  void* buffer() {
54  if (!_source)
55  throw EssentiaException("SinkProxy ", fullName(), " is not currently connected to another Source");
56 
57  return _source->buffer();
58  }
59 
60  void setId(ReaderID id) {
61  SinkBase::setId(id);
62  if (_proxiedSink) _proxiedSink->setId(id);
63  }
64 
66  SinkBase::setSource(source);
67  if (_proxiedSink) _proxiedSink->setSource(source);
68  }
69 
70 
71  //---- StreamConnector interface hijacking for proxies ----------------------------------------//
72 
73  inline void acquire() { StreamConnector::acquire(); }
74 
75  virtual bool acquire(int n) {
76  throw EssentiaException("Cannot acquire for SinkProxy ", fullName(), ": you need to call acquire() on the Sink which is proxied by it");
77  }
78 
79  virtual int acquireSize() const {
80  if (!_proxiedSink)
81  throw EssentiaException("Cannot call ::acquireSize() on SinkProxy ", fullName(), " because it is not attached");
82 
83  return _proxiedSink->acquireSize();
84  }
85 
86  inline void release() { StreamConnector::release(); }
87 
88  virtual void release(int n) {
89  throw EssentiaException("Cannot release for SinkProxy ", fullName(), ": you need to call release() on the Sink which is proxied by it");
90  }
91 
92  virtual int releaseSize() const {
93  if (!_proxiedSink)
94  throw EssentiaException("Cannot call ::releaseSize() on SinkProxy ", fullName(), " because it is not attached");
95 
96  return _proxiedSink->releaseSize();
97  }
98 
99  //---------------------------------------------------------------------------------------------//
100 
101 
102  // TODO: deprecate (?)
104  if (!_proxiedSink) return;
105 
106  E_DEBUG(EConnectors, " " << fullName() << "::updateProxiedSink: " << _proxiedSink->fullName()
107  << "::setSource(" << (_source ? _source->fullName() : "0") << ")");
108  _proxiedSink->setSource(_source);
109  // for this to work we need to have called Source::connect(Sink) before Sink::connect(Source)
110  // we should always get the same id as this sink
111  E_DEBUG(EConnectors, " " << fullName() << "::updateProxiedSink: " << _proxiedSink->fullName()
112  << "::setId(" << _id << ")");
113  _proxiedSink->setId(_id);
114 
115  // propagate (explicitly instead of implicit recursive with virtual func, dirty but works ok atm)
116  SinkProxyBase* psink = dynamic_cast<SinkProxyBase*>(_proxiedSink);
117  if (psink) {
118  E_DEBUG(EConnectors, " SinkProxy::updateProxiedSink: " << psink->fullName() << "::updateProxiedSink()");
119  psink->updateProxiedSink();
120  }
121  }
122 
123  void detach() {
124  if (_proxiedSink) essentia::streaming::detach(*this, *_proxiedSink);
125  }
126 
127 
128 protected:
132  void attach(SinkBase* sink) {
133  // TODO: make sure the sink we're attaching to is not connected to anyone
134  checkSameTypeAs(*sink);
135 
136  if (_proxiedSink) {
137  // make sure the sink we're attaching to is not connected to anyone
138  // Note: this doesn't prevent to connect it after being attached, but that
139  // already limits possible damage
140  std::ostringstream msg;
141  msg << "Could not attach SinkProxy " << fullName() << " to " << sink->fullName()
142  << " because it is already attached to " << _proxiedSink->fullName();
143  throw EssentiaException(msg);
144  }
145 
146  E_DEBUG(EConnectors, " SinkProxy::attach: " << fullName() << "::_proxiedSink = " << sink->fullName());
147  _proxiedSink = sink;
148  }
149 
150  void detach(SinkBase* sink) {
151  if (sink != _proxiedSink) {
152  E_WARNING("Cannot detach SinkProxy " << fullName() << " from " << sink->fullName() << " as they are not attached");
153  return;
154  }
155 
156  E_DEBUG(EConnectors, " SinkProxy::detach: " << fullName() << "::_proxiedSink = 0");
157  _proxiedSink = 0;
158 
159  // look inside the source(proxy) to see whether we need to remove an ID or not
160  // -> no, because the source proxy only knows about the sink proxy as a normal sink,
161  // it is still connected and the reader ID is still valid. Whenever we attach a new sink
162  // to this proxy, it will reuse the reader ID
163  /*
164  if (_source) {
165  _source->disconnect(*this);
166  }
167  */
168  }
169 
170  friend void attach(SinkProxyBase& proxy, SinkBase& innerSink);
171  friend void detach(SinkProxyBase& proxy, SinkBase& innerSink);
172 
173 };
174 
175 template <typename TokenType>
176 class Source;
177 
178 template <typename TokenType>
179 class SinkProxy : public SinkProxyBase {
180  USE_TYPE_INFO(TokenType);
181 
182  public:
183 
184  SinkProxy(Algorithm* parent = 0, const std::string& name = "unnamed") :
186 
187  SinkProxy(const std::string& name) : SinkProxyBase(name) {}
188 
189 
190 
191  //---- Buffer access methods ----------------------------------------//
192 
194  return *static_cast<const MultiRateBuffer<TokenType>*>(SinkProxyBase::buffer());
195  }
196 
198  return *static_cast<MultiRateBuffer<TokenType>*>(SinkProxyBase::buffer());
199  }
200 
201 
203  checkSameTypeAs(source);
204  if (_source)
205  throw EssentiaException("You cannot connect more than one Source to a Sink: ", fullName());
206 
207  _source = &source;
208  E_DEBUG(EConnectors, "SinkProxy: sink " << fullName() << " now has source " << source.fullName());
209  // for this to work we need to have called Source::connect(Sink) before Sink::connect(Source) so that
210  // we already have an ID in the buffer readers' list
212  }
213 
214 
216  _source = 0;
218  }
219 
220  virtual const void* getTokens() const {
221  throw EssentiaException("Cannot get tokens for SinkProxy ", fullName(),
222  ": you need to call getTokens() on the Sink which is proxied by it");
223  }
224 
225  virtual const void* getFirstToken() const {
226  throw EssentiaException("Cannot get first token for SinkProxy ", fullName(),
227  ": you need to call getFirstToken() on the Sink which is proxied by it");
228  }
229 
230 
231  virtual int available() const {
232  return buffer().availableForRead(_id);
233  }
234 
235  virtual void reset() {}
236 
237 };
238 
239 inline void attach(SinkProxyBase& proxy, SinkBase& innerSink) {
240  E_DEBUG(EConnectors, "Attaching SinkProxy " << proxy.fullName() << " to " << innerSink.fullName());
241  // check types here to have a more informative error message in case it fails
242  if (!sameType(proxy, innerSink)) {
243  std::ostringstream msg;
244  msg << "Cannot attach SinkProxy " << proxy.fullName() << " (type: " << nameOfType(proxy) << ") to "
245  << innerSink.fullName() << " (type: " << nameOfType(innerSink) << ")";
246  throw EssentiaException(msg);
247  }
248  proxy.attach(&innerSink);
249  innerSink.attachProxy(&proxy);
250 }
251 
252 
253 inline void operator>>(SinkProxyBase& proxy, SinkBase& innerSink) {
254  attach(proxy, innerSink);
255 }
256 
257 inline void detach(SinkProxyBase& proxy, SinkBase& innerSink) {
258  E_DEBUG(EConnectors, "Detaching SinkProxy " << proxy.fullName() << " from " << innerSink.fullName());
259  proxy.detach(&innerSink);
260  innerSink.detachProxy(&proxy);
261 }
262 
263 
264 } // namespace streaming
265 } // namespace essentia
266 
267 
268 #endif // ESSENTIA_SINKPROXY_H
void detach()
Definition: sinkproxy.h:123
Definition: debugging.h:43
SinkProxyBase(Algorithm *parent=0, const std::string &name="unnamed")
Definition: sinkproxy.h:35
void detach(SinkProxyBase &proxy, SinkBase &innerSink)
Definition: sinkproxy.h:257
virtual void reset()
Definition: sinkproxy.h:235
MultiRateBuffer< TokenType > & buffer()
Definition: sinkproxy.h:197
const Algorithm * parent() const
Definition: connector.h:53
int ReaderID
Definition: types.h:343
virtual const void * getFirstToken() const
Definition: sinkproxy.h:225
void disconnect(SourceBase &source)
Definition: sinkproxy.h:215
void updateProxiedSink()
Definition: sinkproxy.h:103
Definition: sourcebase.h:52
virtual void setId(ReaderID id)
void setId(ReaderID id)
Definition: sinkproxy.h:60
const SourceBase * source() const
Definition: sinkbase.h:75
#define E_WARNING(msg)
Definition: debugging.h:164
virtual int acquireSize() const
Definition: streamconnector.h:69
std::string nameOfType(const std::type_info &type)
virtual void release(int n)
Definition: sinkproxy.h:88
bool acquire()
Definition: streamconnector.h:49
const MultiRateBuffer< TokenType > & buffer() const
Definition: sinkproxy.h:193
bool sameType(const std::type_info &t1, const std::type_info &t2)
Definition: types.h:259
void release()
Definition: sinkproxy.h:86
Definition: sinkproxy.h:179
void attach(SinkBase *sink)
Definition: sinkproxy.h:132
void release()
Definition: streamconnector.h:59
Definition: sinkbase.h:52
SinkProxy(const std::string &name)
Definition: sinkproxy.h:187
void operator>>(SourceBase &source, DevNullConnector devnull)
Definition: devnull.h:81
Definition: debugging.h:48
SourceBase * _source
Definition: sinkbase.h:54
SinkProxyBase(const std::string &name)
Definition: sinkproxy.h:38
virtual bool acquire(int n)
Definition: sinkproxy.h:75
std::string fullName() const
const void * buffer() const
Definition: sinkproxy.h:46
#define USE_TYPE_INFO(TokenType)
Definition: types.h:331
void * buffer()
Definition: sinkproxy.h:53
virtual int available() const
Definition: sinkproxy.h:231
virtual int releaseSize() const
Definition: sinkproxy.h:92
Definition: sink.h:31
Definition: algorithm.h:28
virtual const void * getTokens() const
Definition: sinkproxy.h:220
void detachProxy(SinkProxyBase *sproxy)
Definition: types.h:76
SinkProxy(Algorithm *parent=0, const std::string &name="unnamed")
Definition: sinkproxy.h:184
void connect(SourceBase &source)
Definition: sinkproxy.h:202
Definition: streamingalgorithm.h:140
~SinkProxyBase()
Definition: sinkproxy.h:40
void setSource(SourceBase *source)
Definition: sinkproxy.h:65
virtual int releaseSize() const
Definition: streamconnector.h:74
void checkSameTypeAs(const TypeProxy &obj) const
Definition: types.h:301
virtual void setSource(SourceBase *source)
const std::string & name() const
Definition: types.h:283
SinkBase * _proxiedSink
Definition: sinkproxy.h:32
void attachProxy(SinkProxyBase *sproxy)
void detach(SinkBase *sink)
Definition: sinkproxy.h:150
virtual int acquireSize() const
Definition: sinkproxy.h:79
#define E_DEBUG(module, msg)
Definition: debugging.h:157
void acquire()
Definition: sinkproxy.h:73
Definition: sinkproxy.h:30
ReaderID _id
Definition: sinkbase.h:55