26struct InterprocessConnection::ConnectionThread :
public Thread
28 ConnectionThread (InterprocessConnection& c) :
Thread (
"JUCE IPC"), owner (c) {}
29 void run()
override { owner.runThread(); }
31 InterprocessConnection& owner;
32 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
40 thread.reset (
new ConnectionThread (*
this));
45 callbackConnectionState =
false;
47 masterReference.clear();
62 threadIsRunning =
true;
64 thread->startThread();
81 pipeReceiveMessageTimeout = timeoutMs;
82 initialiseWithPipe (
newPipe.release());
98 pipeReceiveMessageTimeout = timeoutMs;
99 initialiseWithPipe (
newPipe.release());
108 thread->signalThreadShouldExit();
112 if (socket !=
nullptr) socket->close();
113 if (pipe !=
nullptr) pipe->close();
116 thread->stopThread (4000);
117 deletePipeAndSocket();
121void InterprocessConnection::deletePipeAndSocket()
132 return ((socket !=
nullptr && socket->isConnected())
133 || (pipe !=
nullptr && pipe->isOpen()))
142 if (pipe ==
nullptr && socket ==
nullptr)
145 if (socket !=
nullptr && ! socket->isLocal())
146 return socket->getHostName();
165int InterprocessConnection::writeData (
void* data,
int dataSize)
169 if (socket !=
nullptr)
170 return socket->write (data, dataSize);
173 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
179void InterprocessConnection::initialiseWithSocket (StreamingSocket* newSocket)
181 jassert (socket ==
nullptr && pipe ==
nullptr);
182 socket.reset (newSocket);
184 threadIsRunning =
true;
186 thread->startThread();
189void InterprocessConnection::initialiseWithPipe (NamedPipe* newPipe)
191 jassert (socket ==
nullptr && pipe ==
nullptr);
192 pipe.reset (newPipe);
194 threadIsRunning =
true;
196 thread->startThread();
200struct ConnectionStateMessage :
public MessageManager::MessageBase
202 ConnectionStateMessage (InterprocessConnection* ipc,
bool connected) noexcept
203 : owner (ipc), connectionMade (connected)
206 void messageCallback()
override
208 if (
auto* ipc = owner.get())
211 ipc->connectionMade();
213 ipc->connectionLost();
217 WeakReference<InterprocessConnection> owner;
220 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
223void InterprocessConnection::connectionMadeInt()
225 if (! callbackConnectionState)
227 callbackConnectionState =
true;
229 if (useMessageThread)
230 (
new ConnectionStateMessage (
this,
true))->post();
236void InterprocessConnection::connectionLostInt()
238 if (callbackConnectionState)
240 callbackConnectionState =
false;
242 if (useMessageThread)
243 (
new ConnectionStateMessage (
this,
false))->post();
249struct DataDeliveryMessage :
public Message
251 DataDeliveryMessage (InterprocessConnection* ipc,
const MemoryBlock& d)
252 : owner (ipc), data (d)
255 void messageCallback()
override
257 if (
auto* ipc = owner.get())
258 ipc->messageReceived (data);
261 WeakReference<InterprocessConnection> owner;
265void InterprocessConnection::deliverDataInt (
const MemoryBlock& data)
267 jassert (callbackConnectionState);
269 if (useMessageThread)
270 (
new DataDeliveryMessage (
this, data))->post();
276int InterprocessConnection::readData (
void* data,
int num)
278 if (socket !=
nullptr)
279 return socket->read (data, num,
true);
282 return pipe->read (data, num, pipeReceiveMessageTimeout);
288bool InterprocessConnection::readNextMessage()
290 uint32 messageHeader[2];
291 auto bytes = readData (messageHeader,
sizeof (messageHeader));
293 if (bytes == (
int)
sizeof (messageHeader)
298 if (bytesInMessage > 0)
300 MemoryBlock messageData ((
size_t) bytesInMessage,
true);
303 while (bytesInMessage > 0)
305 if (thread->threadShouldExit())
308 auto numThisTime = jmin (bytesInMessage, 65536);
309 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
314 bytesRead += bytesIn;
315 bytesInMessage -= bytesIn;
319 deliverDataInt (messageData);
327 if (socket !=
nullptr)
328 deletePipeAndSocket();
336void InterprocessConnection::runThread()
338 while (! thread->threadShouldExit())
340 if (socket !=
nullptr)
342 auto ready = socket->waitUntilReady (
true, 100);
346 deletePipeAndSocket();
357 else if (pipe !=
nullptr)
359 if (! pipe->isOpen())
361 deletePipeAndSocket();
371 if (thread->threadShouldExit() || ! readNextMessage())
375 threadIsRunning =
false;
static Type swapIfBigEndian(Type value) noexcept
static IPAddress local(bool IPv6=false) noexcept
virtual void connectionMade()=0
virtual void messageReceived(const MemoryBlock &message)=0
String getConnectedHostName() const
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
virtual ~InterprocessConnection()
virtual void connectionLost()=0
bool sendMessage(const MemoryBlock &message)
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
void * getData() noexcept
size_t getSize() const noexcept
Thread(const String &threadName, size_t threadStackSize=0)