public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializable>>
IgniteKernal and CommunicationSpi.
The IO manager is responsible for controlling CommunicationSPI which in turn is responsible
for exchanging data between Ignite nodes.
Communication manager provides a rich API for data exchanging between a pair of cluster nodes. Two types
of communication Message-based communication and File-based communication are available.
Each of them support sending data to an arbitrary topic on the remote node (see GridTopic for an
additional information about Ignite topics).
Message and GridTopic are used to provide a topic-based messaging protocol
between cluster nodes. All of messages used for data exchanging can be devided into two general types:
internal and user messages.
Internal message communication is used by Ignite Kernal. Please, refer to appropriate methods below:
sendToGridTopic(ClusterNode, GridTopic, Message, byte)sendOrderedMessage(ClusterNode, Object, Message, byte, long, boolean)addMessageListener(Object, GridMessageListener)
User message communication is directly exposed to the IgniteMessaging API and provides
for user functionality for topic-based message exchanging among nodes within the cluser defined
by ClusterGroup. Please, refer to appropriate methods below:
sendToCustomTopic(ClusterNode, Object, Message, byte)addUserMessageListener(Object, IgniteBiPredicate, UUID)
Sending or receiving binary data (represented by a File) over a SocketChannel is only
possible when the build-in TcpCommunicationSpi implementation of Communication SPI is used and
both local and remote nodes are CHANNEL_COMMUNICATION feature
support. To ensue that the remote node satisfies all conditions the fileTransmissionSupported(ClusterNode)
method must be called prior to data sending.
It is possible to receive a set of files on a particular topic (any of GridTopic) on the remote node.
A transmission handler for desired topic must be registered prior to opening transmission sender to it.
Methods below are used to register handlers and open new transmissions:
addTransmissionHandler(Object, TransmissionHandler)removeTransmissionHandler(Object)openTransmissionSender(UUID, Object)
Each transmission sender opens a new transmission session to remote node prior to sending files over it.
(see description of GridIoManager.TransmissionSender for details). The GridIoManager.TransmissionSender
will send all files within single session syncronously one by one.
NOTE. It is important to call close() method or use try-with-resource statement to release all resources once you've done with the transmission session. This ensures that all resources are released on remote node in a proper way (i.e. transmission handlers are closed).
TcpCommunicationSpi,
IgniteMessaging,
TransmissionHandler| Modifier and Type | Class and Description |
|---|---|
class |
GridIoManager.TransmissionSender
Сlass represents an implementation of transmission file writer.
|
GridComponent.DiscoveryDataExchangeType| Modifier and Type | Field and Description |
|---|---|
static String |
COMM_METRICS
Io communication metrics registry name.
|
static byte |
DIRECT_PROTO_VER
Direct protocol version.
|
static String |
DIRECT_PROTO_VER_ATTR
Direct protocol version attribute name.
|
static MessageFactory[] |
EMPTY
Empty array of message factories.
|
static int |
MAX_CLOSED_TOPICS
Max closed topics to store.
|
static String |
OUTBOUND_MSG_QUEUE_CNT
Outbound message queue size metric name.
|
static String |
RCVD_BYTES_CNT
Received bytes count metric name.
|
static String |
RCVD_MSGS_CNT
Received messages count metric name.
|
static String |
SENT_BYTES_CNT
Sent bytes count metric name.
|
static String |
SENT_MSG_CNT
Sent messages count metric name.
|
ctx, log| Constructor and Description |
|---|
GridIoManager(GridKernalContext ctx) |
| Modifier and Type | Method and Description |
|---|---|
void |
addDisconnectListener(GridDisconnectListener lsnr) |
void |
addMessageListener(GridTopic topic,
GridMessageListener lsnr) |
void |
addMessageListener(Object topic,
GridMessageListener lsnr) |
void |
addTransmissionHandler(Object topic,
TransmissionHandler hnd) |
void |
addUserMessageListener(@Nullable Object topic,
@Nullable IgniteBiPredicate<UUID,?> p) |
void |
addUserMessageListener(@Nullable Object topic,
@Nullable IgniteBiPredicate<UUID,?> p,
UUID nodeId) |
boolean |
checkNodeLeft(UUID nodeId,
IgniteCheckedException sndErr,
boolean ping) |
static @Nullable Byte |
currentPolicy() |
void |
dumpStats()
Dumps SPI stats to diagnostic logs in case TcpCommunicationSpi is used, no-op otherwise.
|
boolean |
fileTransmissionSupported(ClusterNode node)
This method must be used prior to opening a
GridIoManager.TransmissionSender by calling
openTransmissionSender(UUID, Object) to ensure that remote and local nodes
are fully support direct SocketChannel connection to transfer data. |
MessageFormatter |
formatter() |
int |
getOutboundMessagesQueueSize()
Gets outbound messages queue size.
|
long |
getReceivedBytesCount()
Gets received bytes count.
|
int |
getReceivedMessagesCount()
Gets received messages count.
|
long |
getSentBytesCount()
Gets sent bytes count.
|
int |
getSentMessagesCount()
Gets sent messages count.
|
MessageFactory |
messageFactory() |
void |
onKernalStart0() |
void |
onKernalStop0(boolean cancel) |
GridIoManager.TransmissionSender |
openTransmissionSender(UUID remoteId,
Object topic) |
void |
printMemoryStats()
Prints memory statistics (sizes of internal structures, etc.).
|
void |
removeDisconnectListener(GridDisconnectListener lsnr) |
boolean |
removeMessageListener(GridTopic topic) |
boolean |
removeMessageListener(GridTopic topic,
@Nullable GridMessageListener lsnr) |
boolean |
removeMessageListener(Object topic) |
boolean |
removeMessageListener(Object topic,
@Nullable GridMessageListener lsnr) |
void |
removeTransmissionHandler(Object topic) |
void |
removeUserMessageListener(@Nullable Object topic,
IgniteBiPredicate<UUID,?> p) |
void |
resetMetrics()
Resets metrics for this manager.
|
void |
runIoTest(long warmup,
long duration,
int threads,
long latencyLimit,
int rangesCnt,
int payLoadSize,
boolean procFromNioThread,
List<ClusterNode> nodes) |
void |
sendGeneric(ClusterNode node,
Object topic,
int topicOrd,
Message msg,
byte plc) |
IgniteInternalFuture<List<IgniteIoTestMessage>> |
sendIoTest(ClusterNode node,
byte[] payload,
boolean procFromNioThread) |
IgniteInternalFuture |
sendIoTest(List<ClusterNode> nodes,
byte[] payload,
boolean procFromNioThread) |
void |
sendOrderedMessage(ClusterNode node,
Object topic,
Message msg,
byte plc,
long timeout,
boolean skipOnTimeout) |
void |
sendOrderedMessage(ClusterNode node,
Object topic,
Message msg,
byte plc,
long timeout,
boolean skipOnTimeout,
IgniteInClosure<IgniteException> ackC) |
void |
sendToCustomTopic(ClusterNode node,
Object topic,
Message msg,
byte plc) |
void |
sendToCustomTopic(UUID nodeId,
Object topic,
Message msg,
byte plc) |
void |
sendToGridTopic(ClusterNode node,
GridTopic topic,
Message msg,
byte plc) |
void |
sendToGridTopic(ClusterNode node,
GridTopic topic,
Message msg,
byte plc,
IgniteInClosure<IgniteException> ackC) |
void |
sendToGridTopic(Collection<? extends ClusterNode> nodes,
GridTopic topic,
Message msg,
byte plc) |
void |
sendToGridTopic(UUID nodeId,
GridTopic topic,
Message msg,
byte plc) |
void |
sendUserMessage(Collection<? extends ClusterNode> nodes,
Object msg,
@Nullable Object topic,
boolean ordered,
long timeout,
boolean async)
Sends a peer deployable user message.
|
void |
start()
Starts grid component.
|
void |
stop(boolean cancel)
Stops grid component.
|
assertParameter, collectGridNodeData, collectJoiningNodeData, discoveryDataType, enabled, getSpi, getSpi, getSpis, inject, onAfterSpiStart, onBeforeSpiStart, onDisconnected, onGridDataReceived, onJoiningNodeDataReceived, onKernalStart, onKernalStop, onReconnected, startInfo, startSpi, stopInfo, stopSpi, toString, validateNode, validateNodepublic static final String COMM_METRICS
public static final String OUTBOUND_MSG_QUEUE_CNT
public static final String SENT_MSG_CNT
public static final String SENT_BYTES_CNT
public static final String RCVD_MSGS_CNT
public static final String RCVD_BYTES_CNT
public static final MessageFactory[] EMPTY
public static final int MAX_CLOSED_TOPICS
public static final String DIRECT_PROTO_VER_ATTR
public static final byte DIRECT_PROTO_VER
public GridIoManager(GridKernalContext ctx)
ctx - Grid kernal context.public MessageFactory messageFactory()
public MessageFormatter formatter()
public void resetMetrics()
public void start()
throws IgniteCheckedException
IgniteCheckedException - Throws in case of any errors.public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread)
nodes - Nodes.payload - Payload.procFromNioThread - If true message is processed from NIO thread.public IgniteInternalFuture<List<IgniteIoTestMessage>> sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread)
node - Node.payload - Payload.procFromNioThread - If true message is processed from NIO thread.public void runIoTest(long warmup,
long duration,
int threads,
long latencyLimit,
int rangesCnt,
int payLoadSize,
boolean procFromNioThread,
List<ClusterNode> nodes)
warmup - Warmup duration in milliseconds.duration - Test duration in milliseconds.threads - Thread count.latencyLimit - Max latency in nanoseconds.rangesCnt - Ranges count in resulting histogram.payLoadSize - Payload size in bytes.procFromNioThread - True to process requests in NIO threads.nodes - Nodes participating in test.public void onKernalStart0()
throws IgniteCheckedException
onKernalStart0 in class GridManagerAdapter<CommunicationSpi<Serializable>>IgniteCheckedException - If failed.public void onKernalStop0(boolean cancel)
onKernalStop0 in class GridManagerAdapter<CommunicationSpi<Serializable>>cancel - Cancel flag.public void stop(boolean cancel)
throws IgniteCheckedException
cancel - If true, then all ongoing tasks or jobs for relevant
components need to be cancelled.IgniteCheckedException - Thrown in case of any errors.@Nullable public static @Nullable Byte currentPolicy()
public boolean checkNodeLeft(UUID nodeId, IgniteCheckedException sndErr, boolean ping) throws IgniteClientDisconnectedCheckedException
nodeId - Node ID.sndErr - Send error.ping - True if try ping node.True if node left.IgniteClientDisconnectedCheckedException - If ping failed.public GridIoManager.TransmissionSender openTransmissionSender(UUID remoteId, Object topic)
remoteId - The remote node to connect to.topic - The remote topic to connect to.public void addTransmissionHandler(Object topic, TransmissionHandler hnd)
topic - The GridTopic to register handler to.hnd - Handler which will handle file upload requests.public void removeTransmissionHandler(Object topic)
topic - The topic to erase handler from.public boolean fileTransmissionSupported(ClusterNode node)
GridIoManager.TransmissionSender by calling
openTransmissionSender(UUID, Object) to ensure that remote and local nodes
are fully support direct SocketChannel connection to transfer data.node - Remote node to check.true if a file can be sent over socket channel directly.public void sendToCustomTopic(UUID nodeId, Object topic, Message msg, byte plc) throws IgniteCheckedException
nodeId - Id of destination node.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.IgniteCheckedException - Thrown in case of any errors.public void sendToGridTopic(UUID nodeId, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException
nodeId - Id of destination node.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.IgniteCheckedException - Thrown in case of any errors.public void sendToCustomTopic(ClusterNode node, Object topic, Message msg, byte plc) throws IgniteCheckedException
node - Destination node.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.IgniteCheckedException - Thrown in case of any errors.public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException
node - Destination node.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.IgniteCheckedException - Thrown in case of any errors.public void sendGeneric(ClusterNode node, Object topic, int topicOrd, Message msg, byte plc) throws IgniteCheckedException
node - Destination node.topic - Topic to send the message to.topicOrd - GridTopic enumeration ordinal.msg - Message to send.plc - Type of processing.IgniteCheckedException - Thrown in case of any errors.public void sendOrderedMessage(ClusterNode node, Object topic, Message msg, byte plc, long timeout, boolean skipOnTimeout) throws IgniteCheckedException
node - Destination node.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.timeout - Timeout to keep a message on receiving queue.skipOnTimeout - Whether message can be skipped on timeout.IgniteCheckedException - Thrown in case of any errors.public void sendToGridTopic(ClusterNode node, GridTopic topic, Message msg, byte plc, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
node - Destination nodes.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.ackC - Ack closure.IgniteCheckedException - Thrown in case of any errors.public void sendToGridTopic(Collection<? extends ClusterNode> nodes, GridTopic topic, Message msg, byte plc) throws IgniteCheckedException
nodes - Destination nodes.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.IgniteCheckedException - Thrown in case of any errors.public void sendOrderedMessage(ClusterNode node, Object topic, Message msg, byte plc, long timeout, boolean skipOnTimeout, IgniteInClosure<IgniteException> ackC) throws IgniteCheckedException
node - Destination node.topic - Topic to send the message to.msg - Message to send.plc - Type of processing.timeout - Timeout to keep a message on receiving queue.skipOnTimeout - Whether message can be skipped on timeout.ackC - Ack closure.IgniteCheckedException - Thrown in case of any errors.public void sendUserMessage(Collection<? extends ClusterNode> nodes, Object msg, @Nullable @Nullable Object topic, boolean ordered, long timeout, boolean async) throws IgniteCheckedException
nodes - Destination nodes.msg - Message to send.topic - Message topic to use.ordered - Is message ordered?timeout - Message timeout in milliseconds for ordered messages.async - Async flag.IgniteCheckedException - Thrown in case of any errors.public void addUserMessageListener(@Nullable
@Nullable Object topic,
@Nullable
@Nullable IgniteBiPredicate<UUID,?> p)
public void addUserMessageListener(@Nullable
@Nullable Object topic,
@Nullable
@Nullable IgniteBiPredicate<UUID,?> p,
UUID nodeId)
topic - Topic to subscribe to.p - Message predicate.public void removeUserMessageListener(@Nullable
@Nullable Object topic,
IgniteBiPredicate<UUID,?> p)
topic - Topic to unsubscribe from.p - Message predicate.public void addMessageListener(GridTopic topic, GridMessageListener lsnr)
topic - Listener's topic.lsnr - Listener to add.public void addDisconnectListener(GridDisconnectListener lsnr)
lsnr - Listener to add.public void removeDisconnectListener(GridDisconnectListener lsnr)
lsnr - Listener to remove.public void addMessageListener(Object topic, GridMessageListener lsnr)
topic - Listener's topic.lsnr - Listener to add.public boolean removeMessageListener(GridTopic topic)
topic - Message topic.public boolean removeMessageListener(Object topic)
topic - Message topic.public boolean removeMessageListener(GridTopic topic, @Nullable @Nullable GridMessageListener lsnr)
topic - Listener's topic.lsnr - Listener to remove.public boolean removeMessageListener(Object topic, @Nullable @Nullable GridMessageListener lsnr)
topic - Listener's topic.lsnr - Listener to remove.public int getSentMessagesCount()
public long getSentBytesCount()
public int getReceivedMessagesCount()
public long getReceivedBytesCount()
public int getOutboundMessagesQueueSize()
public void dumpStats()
public void printMemoryStats()
printMemoryStats in interface GridComponentprintMemoryStats in class GridManagerAdapter<CommunicationSpi<Serializable>>
Follow @ApacheIgnite
Ignite Database and Caching Platform : ver. 2.8.1 Release Date : May 21 2020