// Ignore the filter for the moment
if( message.isReliable() || fast == null ) {
- reliable.broadcast( adapter, buffer, true );
+ // Don't need to copy the data because message protocol is already
+ // giving us a fresh buffer
+ reliable.broadcast( adapter, buffer, true, false );
} else {
- fast.broadcast( adapter, buffer, false );
+ fast.broadcast( adapter, buffer, false, false );
}
}
/**
* Dispatches the data to all endpoints managed by the
- * kernel that match the specified endpoint filter..
+ * kernel that match the specified endpoint filter..
+ * If 'copy' is true then the implementation will copy the byte buffer
+ * before delivering it to endpoints. This allows the caller to reuse
+ * the data buffer. Though it is important that the buffer not be changed
+ * by another thread while this call is running.
+ * Only the bytes from data.position() to data.remaining() are sent.
*/
- public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable );
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
+ boolean copy );
/**
* Returns true if there are waiting envelopes.
}
}
- public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable )
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
+ boolean copy )
{
if( !reliable )
throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." );
- // Copy the data just once
- byte[] copy = new byte[data.remaining()];
- System.arraycopy(data.array(), data.position(), copy, 0, data.remaining());
+ if( copy )
+ {
+ // Copy the data just once
+ byte[] temp = new byte[data.remaining()];
+ System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
+ data = ByteBuffer.wrap(temp);
+ }
// Hand it to all of the endpoints that match our routing
for( NioEndpoint p : endpoints.values() ) {
if( filter != null && !filter.apply(p) )
continue;
- // Give it the data
- p.send( data, false, false );
+ // Give it the data... but let each endpoint track their
+ // own completion over the shared array of bytes by
+ // duplicating it
+ p.send( data.duplicate(), false, false );
}
// Wake up the selector so it can reinitialize its
// efficiently done as change requests... or simply
// keeping a thread-safe set of endpoints with pending
// writes. For most cases, it shouldn't matter.
- for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
- if( e.getKey().hasPending() )
+ for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
+ if( e.getKey().hasPending() ) {
e.getValue().interestOps(SelectionKey.OP_WRITE);
+ }
}
}
}
c.write( current );
-
+
// If we wrote all of that packet then we need to remove it
if( current.remaining() == 0 ) {
p.removePending();
* Dispatches the data to all endpoints managed by the
* kernel. 'routing' is currently ignored.
*/
- public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable )
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
+ boolean copy )
{
if( reliable )
throw new UnsupportedOperationException( "Reliable send not supported by this kernel." );
+ // We ignore the copy flag because we know all outbound traffic
+ // goes instantly.
+
// Hand it to all of the endpoints that match our routing
for( UdpEndpoint p : socketEndpoints.values() ) {
// Does it match the filter?