OpenShot Audio Library | OpenShotAudio 0.3.2
Loading...
Searching...
No Matches
juce_InterprocessConnection.cpp
1/*
2 ==============================================================================
3
4 This file is part of the JUCE library.
5 Copyright (c) 2017 - ROLI Ltd.
6
7 JUCE is an open source library subject to commercial or open-source
8 licensing.
9
10 The code included in this file is provided under the terms of the ISC license
11 http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12 To use, copy, modify, and/or distribute this software for any purpose with or
13 without fee is hereby granted provided that the above copyright notice and
14 this permission notice appear in all copies.
15
16 JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17 EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18 DISCLAIMED.
19
20 ==============================================================================
21*/
22
23namespace juce
24{
25
26struct InterprocessConnection::ConnectionThread : public Thread
27{
28 ConnectionThread (InterprocessConnection& c) : Thread ("JUCE IPC"), owner (c) {}
29 void run() override { owner.runThread(); }
30
31 InterprocessConnection& owner;
32 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
33};
34
35//==============================================================================
37 : useMessageThread (callbacksOnMessageThread),
38 magicMessageHeader (magicMessageHeaderNumber)
39{
40 thread.reset (new ConnectionThread (*this));
41}
42
44{
45 callbackConnectionState = false;
46 disconnect();
47 masterReference.clear();
48 thread.reset();
49}
50
51//==============================================================================
53 int portNumber, int timeOutMillisecs)
54{
55 disconnect();
56
57 const ScopedLock sl (pipeAndSocketLock);
58 socket.reset (new StreamingSocket());
59
60 if (socket->connect (hostName, portNumber, timeOutMillisecs))
61 {
62 threadIsRunning = true;
63 connectionMadeInt();
64 thread->startThread();
65 return true;
66 }
67
68 socket.reset();
69 return false;
70}
71
73{
74 disconnect();
75
76 std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
77
78 if (newPipe->openExisting (pipeName))
79 {
80 const ScopedLock sl (pipeAndSocketLock);
81 pipeReceiveMessageTimeout = timeoutMs;
82 initialiseWithPipe (newPipe.release());
83 return true;
84 }
85
86 return false;
87}
88
90{
91 disconnect();
92
93 std::unique_ptr<NamedPipe> newPipe (new NamedPipe());
94
95 if (newPipe->createNewPipe (pipeName, mustNotExist))
96 {
97 const ScopedLock sl (pipeAndSocketLock);
98 pipeReceiveMessageTimeout = timeoutMs;
99 initialiseWithPipe (newPipe.release());
100 return true;
101 }
102
103 return false;
104}
105
107{
108 thread->signalThreadShouldExit();
109
110 {
111 const ScopedLock sl (pipeAndSocketLock);
112 if (socket != nullptr) socket->close();
113 if (pipe != nullptr) pipe->close();
114 }
115
116 thread->stopThread (4000);
117 deletePipeAndSocket();
118 connectionLostInt();
119}
120
121void InterprocessConnection::deletePipeAndSocket()
122{
123 const ScopedLock sl (pipeAndSocketLock);
124 socket.reset();
125 pipe.reset();
126}
127
129{
130 const ScopedLock sl (pipeAndSocketLock);
131
132 return ((socket != nullptr && socket->isConnected())
133 || (pipe != nullptr && pipe->isOpen()))
134 && threadIsRunning;
135}
136
138{
139 {
140 const ScopedLock sl (pipeAndSocketLock);
141
142 if (pipe == nullptr && socket == nullptr)
143 return {};
144
145 if (socket != nullptr && ! socket->isLocal())
146 return socket->getHostName();
147 }
148
149 return IPAddress::local().toString();
150}
151
152//==============================================================================
154{
155 uint32 messageHeader[2] = { ByteOrder::swapIfBigEndian (magicMessageHeader),
156 ByteOrder::swapIfBigEndian ((uint32) message.getSize()) };
157
158 MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
159 messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
160 messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
161
162 return writeData (messageData.getData(), (int) messageData.getSize()) == (int) messageData.getSize();
163}
164
165int InterprocessConnection::writeData (void* data, int dataSize)
166{
167 const ScopedLock sl (pipeAndSocketLock);
168
169 if (socket != nullptr)
170 return socket->write (data, dataSize);
171
172 if (pipe != nullptr)
173 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
174
175 return 0;
176}
177
178//==============================================================================
179void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
180{
181 jassert (socket == nullptr && pipe == nullptr);
182 socket.reset (newSocket);
183
184 threadIsRunning = true;
185 connectionMadeInt();
186 thread->startThread();
187}
188
189void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
190{
191 jassert (socket == nullptr && pipe == nullptr);
192 pipe.reset (newPipe);
193
194 threadIsRunning = true;
195 connectionMadeInt();
196 thread->startThread();
197}
198
199//==============================================================================
200struct ConnectionStateMessage : public MessageManager::MessageBase
201{
202 ConnectionStateMessage (InterprocessConnection* ipc, bool connected) noexcept
203 : owner (ipc), connectionMade (connected)
204 {}
205
206 void messageCallback() override
207 {
208 if (auto* ipc = owner.get())
209 {
210 if (connectionMade)
211 ipc->connectionMade();
212 else
213 ipc->connectionLost();
214 }
215 }
216
217 WeakReference<InterprocessConnection> owner;
218 bool connectionMade;
219
220 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
221};
222
223void InterprocessConnection::connectionMadeInt()
224{
225 if (! callbackConnectionState)
226 {
227 callbackConnectionState = true;
228
229 if (useMessageThread)
230 (new ConnectionStateMessage (this, true))->post();
231 else
233 }
234}
235
236void InterprocessConnection::connectionLostInt()
237{
238 if (callbackConnectionState)
239 {
240 callbackConnectionState = false;
241
242 if (useMessageThread)
243 (new ConnectionStateMessage (this, false))->post();
244 else
246 }
247}
248
249struct DataDeliveryMessage : public Message
250{
251 DataDeliveryMessage (InterprocessConnection* ipc, const MemoryBlock& d)
252 : owner (ipc), data (d)
253 {}
254
255 void messageCallback() override
256 {
257 if (auto* ipc = owner.get())
258 ipc->messageReceived (data);
259 }
260
261 WeakReference<InterprocessConnection> owner;
262 MemoryBlock data;
263};
264
265void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
266{
267 jassert (callbackConnectionState);
268
269 if (useMessageThread)
270 (new DataDeliveryMessage (this, data))->post();
271 else
272 messageReceived (data);
273}
274
275//==============================================================================
276int InterprocessConnection::readData (void* data, int num)
277{
278 if (socket != nullptr)
279 return socket->read (data, num, true);
280
281 if (pipe != nullptr)
282 return pipe->read (data, num, pipeReceiveMessageTimeout);
283
284 jassertfalse;
285 return -1;
286}
287
288bool InterprocessConnection::readNextMessage()
289{
290 uint32 messageHeader[2];
291 auto bytes = readData (messageHeader, sizeof (messageHeader));
292
293 if (bytes == (int) sizeof (messageHeader)
294 && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
295 {
296 auto bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
297
298 if (bytesInMessage > 0)
299 {
300 MemoryBlock messageData ((size_t) bytesInMessage, true);
301 int bytesRead = 0;
302
303 while (bytesInMessage > 0)
304 {
305 if (thread->threadShouldExit())
306 return false;
307
308 auto numThisTime = jmin (bytesInMessage, 65536);
309 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
310
311 if (bytesIn <= 0)
312 break;
313
314 bytesRead += bytesIn;
315 bytesInMessage -= bytesIn;
316 }
317
318 if (bytesRead >= 0)
319 deliverDataInt (messageData);
320 }
321
322 return true;
323 }
324
325 if (bytes < 0)
326 {
327 if (socket != nullptr)
328 deletePipeAndSocket();
329
330 connectionLostInt();
331 }
332
333 return false;
334}
335
336void InterprocessConnection::runThread()
337{
338 while (! thread->threadShouldExit())
339 {
340 if (socket != nullptr)
341 {
342 auto ready = socket->waitUntilReady (true, 100);
343
344 if (ready < 0)
345 {
346 deletePipeAndSocket();
347 connectionLostInt();
348 break;
349 }
350
351 if (ready == 0)
352 {
353 thread->wait (1);
354 continue;
355 }
356 }
357 else if (pipe != nullptr)
358 {
359 if (! pipe->isOpen())
360 {
361 deletePipeAndSocket();
362 connectionLostInt();
363 break;
364 }
365 }
366 else
367 {
368 break;
369 }
370
371 if (thread->threadShouldExit() || ! readNextMessage())
372 break;
373 }
374
375 threadIsRunning = false;
376}
377
378} // namespace juce
static Type swapIfBigEndian(Type value) noexcept
static IPAddress local(bool IPv6=false) noexcept
virtual void connectionMade()=0
virtual void messageReceived(const MemoryBlock &message)=0
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
virtual void connectionLost()=0
bool sendMessage(const MemoryBlock &message)
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
void * getData() noexcept
size_t getSize() const noexcept
Thread(const String &threadName, size_t threadStackSize=0)