Class DistributedMetaStorageImpl
- java.lang.Object
-
- org.apache.ignite.internal.processors.GridProcessorAdapter
-
- org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl
-
- All Implemented Interfaces:
GridComponent,IgniteChangeGlobalStateSupport,GridProcessor,DistributedMetaStorage,ReadableDistributedMetaStorage
public class DistributedMetaStorageImpl extends GridProcessorAdapter implements DistributedMetaStorage, IgniteChangeGlobalStateSupport
Implementation of
DistributedMetaStoragebased onMetaStoragefor persistence and discovery SPI for communication.It is based on existing local metastorage API for persistent clusters (in-memory clusters and client nodes will store data in memory). Write/remove operation use Discovery SPI to send updates to the cluster, it guarantees updates order and the fact that all existing (alive) nodes have handled the update message.
As a way to find out which node has the latest data there is a "version" value of distributed metastorage, (
DistributedMetaStorageVersion) which is basically the pair<number of all updates, hash of all updates>. First element of the pair is the value ofgetUpdatesCount().Whole updates history until some point in the past is stored along with the data, so when an outdated node connects to the cluster it will receive all the missing data and apply it locally. Listeners will also be invoked after such updates. If there's not enough history stored or joining node is clear then it'll receive shapshot of distributed metastorage (usually called
fullDatain code) so there won't be inconsistencies.- See Also:
DistributedMetaStorageUpdateMessage,DistributedMetaStorageUpdateAckMessage
-
-
Nested Class Summary
-
Nested classes/interfaces inherited from interface org.apache.ignite.internal.GridComponent
GridComponent.DiscoveryDataExchangeType
-
-
Field Summary
Fields Modifier and Type Field Description static longDFLT_MAX_HISTORY_BYTESDefault upper bound of history size in bytes.static StringDISTRIBUTED_METASTORE_VIEWName of the system view for a systemMetaStorage.static StringDISTRIBUTED_METASTORE_VIEW_DESCDescription of the system view for aMetaStorage.-
Fields inherited from class org.apache.ignite.internal.processors.GridProcessorAdapter
ctx, diagnosticLog, log
-
Fields inherited from interface org.apache.ignite.internal.processors.metastorage.DistributedMetaStorage
IGNITE_INTERNAL_KEY_PREFIX
-
-
Constructor Summary
Constructors Constructor Description DistributedMetaStorageImpl(GridKernalContext ctx)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidcollectGridNodeData(DiscoveryDataBag dataBag)Collects discovery data on nodes already in grid on receivingTcpDiscoveryNodeAddedMessage.voidcollectJoiningNodeData(DiscoveryDataBag dataBag)Collects discovery data on joining node before sendingTcpDiscoveryJoinRequestMessagerequest.booleancompareAndRemove(@NotNull String key, @NotNull Serializable expVal)Remove value from distributed metastorage but only if current value matches the expected one.booleancompareAndSet(@NotNull String key, @Nullable Serializable expVal, @NotNull Serializable newVal)Write value into distributed metastorage but only if current value matches the expected one.GridFutureAdapter<Boolean>compareAndSetAsync(@NotNull String key, @Nullable Serializable expVal, @NotNull Serializable newVal)Write value into distributed metastorage asynchronously but only if current value matches the expected one.@Nullable GridComponent.DiscoveryDataExchangeTypediscoveryDataType()Gets unique component type to distinguish components providing discovery data.Future<?>flush()longgetUpdatesCount()Get the total number of updates (write/remove) that metastorage ever had.voidinMemoryReadyForRead()Executed roughly at the same time asonMetaStorageReadyForRead(ReadOnlyMetastorage).voiditerate(@NotNull String keyPrefix, @NotNull BiConsumer<String,? super Serializable> cb)Iterate over all values corresponding to the keys with given prefix.voidlisten(@NotNull Predicate<String> keyPred, DistributedMetaStorageListener<?> lsnr)Add listener on data updates.voidonActivate(GridKernalContext kctx)Called when cluster performing activation.voidonDeActivate(GridKernalContext kctx)Called when cluster performing deactivation.voidonDisconnected(IgniteFuture<?> reconnectFut)Client disconnected callback.voidonGridDataReceived(DiscoveryDataBag.GridDiscoveryData data)Receives discovery data object from remote nodes (called on new node during discovery process).voidonJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData discoData)Method is called on nodes that are already in grid (not on joining node).voidonKernalStop(boolean cancel)Callback to notify that kernal is about to stop.IgniteInternalFuture<?>onReconnected(boolean clusterRestarted)Client reconnected callback.<T extends Serializable>
Tread(@NotNull String key)Get value by the key.voidremove(@NotNull String key)Remove value from distributed metastorage.GridFutureAdapter<?>removeAsync(@NotNull String key)Remove value from distributed metastorage asynchronously.voidstart()Starts grid component.voidsuspend(IgniteInternalFuture<?> compFut)@Nullable IgniteNodeValidationResultvalidateNode(ClusterNode node, DiscoveryDataBag.JoiningNodeDiscoveryData discoData)Validates that new node can join grid topology, this method is called on coordinator node before new node joins topology.voidwrite(@NotNull String key, @NotNull Serializable val)Write value into distributed metastorage.GridFutureAdapter<?>writeAsync(@NotNull String key, @NotNull Serializable val)Write value into distributed metastorage asynchronously.-
Methods inherited from class org.apache.ignite.internal.processors.GridProcessorAdapter
assertParameter, onKernalStart, printMemoryStats, stop, toString, validateNode
-
-
-
-
Field Detail
-
DFLT_MAX_HISTORY_BYTES
public static final long DFLT_MAX_HISTORY_BYTES
Default upper bound of history size in bytes.- See Also:
- Constant Field Values
-
DISTRIBUTED_METASTORE_VIEW
public static final String DISTRIBUTED_METASTORE_VIEW
Name of the system view for a systemMetaStorage.
-
DISTRIBUTED_METASTORE_VIEW_DESC
public static final String DISTRIBUTED_METASTORE_VIEW_DESC
Description of the system view for aMetaStorage.- See Also:
- Constant Field Values
-
-
Constructor Detail
-
DistributedMetaStorageImpl
public DistributedMetaStorageImpl(GridKernalContext ctx)
- Parameters:
ctx- Kernal context.
-
-
Method Detail
-
start
public void start() throws IgniteCheckedExceptionStarts grid component.
Create all required listeners.- Specified by:
startin interfaceGridComponent- Overrides:
startin classGridProcessorAdapter- Throws:
IgniteCheckedException- Throws in case of any errors.
-
onKernalStop
public void onKernalStop(boolean cancel)
Callback to notify that kernal is about to stop.
For persistent cluster it will stop the async worker.- Specified by:
onKernalStopin interfaceGridComponent- Overrides:
onKernalStopin classGridProcessorAdapter- Parameters:
cancel- Iftruethen ignore worker's queue and finish it as fast as possible. Otherwise just wait until queue is empty and worker completed its job.
-
inMemoryReadyForRead
public void inMemoryReadyForRead()
Executed roughly at the same time asonMetaStorageReadyForRead(ReadOnlyMetastorage).
-
onActivate
public void onActivate(GridKernalContext kctx)
Called when cluster performing activation.- Specified by:
onActivatein interfaceIgniteChangeGlobalStateSupport- Parameters:
kctx- Kernal context.
-
onDeActivate
public void onDeActivate(GridKernalContext kctx)
Called when cluster performing deactivation.
For persistent nodes wait until worker's queue is empty and worker completed its job.- Specified by:
onDeActivatein interfaceIgniteChangeGlobalStateSupport- Parameters:
kctx- Kernal context.
-
getUpdatesCount
public long getUpdatesCount()
Get the total number of updates (write/remove) that metastorage ever had.- Specified by:
getUpdatesCountin interfaceReadableDistributedMetaStorage
-
read
@Nullable public <T extends Serializable> T read(@NotNull @NotNull String key) throws IgniteCheckedException
Get value by the key. Should be consistent for all nodes.- Specified by:
readin interfaceReadableDistributedMetaStorage- Parameters:
key- The key.- Returns:
- Value associated with the key.
- Throws:
IgniteCheckedException- If reading or unmarshalling failed.
-
write
public void write(@NotNull @NotNull String key, @NotNull @NotNull Serializable val) throws IgniteCheckedExceptionWrite value into distributed metastorage.- Specified by:
writein interfaceDistributedMetaStorage- Parameters:
key- The key.val- Value to write. Must not be null.- Throws:
IgniteCheckedException- In case of marshalling error or some other unexpected exception.
-
writeAsync
public GridFutureAdapter<?> writeAsync(@NotNull @NotNull String key, @NotNull @NotNull Serializable val) throws IgniteCheckedException
Write value into distributed metastorage asynchronously.- Specified by:
writeAsyncin interfaceDistributedMetaStorage- Parameters:
key- The key.val- Value to write. Must not be null.- Returns:
- Future with the operation result.
- Throws:
IgniteCheckedException- In case of marshalling error or some other unexpected exception.
-
removeAsync
public GridFutureAdapter<?> removeAsync(@NotNull @NotNull String key) throws IgniteCheckedException
Remove value from distributed metastorage asynchronously.- Specified by:
removeAsyncin interfaceDistributedMetaStorage- Parameters:
key- The key.- Returns:
- Future with the operation result.
- Throws:
IgniteCheckedException- In case of marshalling error or some other unexpected exception.
-
remove
public void remove(@NotNull @NotNull String key) throws IgniteCheckedExceptionRemove value from distributed metastorage.- Specified by:
removein interfaceDistributedMetaStorage- Parameters:
key- The key.- Throws:
IgniteCheckedException- In case of marshalling error or some other unexpected exception.
-
compareAndSet
public boolean compareAndSet(@NotNull @NotNull String key, @Nullable @Nullable Serializable expVal, @NotNull @NotNull Serializable newVal) throws IgniteCheckedExceptionWrite value into distributed metastorage but only if current value matches the expected one.- Specified by:
compareAndSetin interfaceDistributedMetaStorage- Parameters:
key- The key.expVal- Expected value. Might be null.newVal- Value to write. Must not be null.- Returns:
Trueif expected value matched the actual one and write was completed successfully.Falseotherwise.- Throws:
IgniteCheckedException- In case of marshalling error or some other unexpected exception.
-
compareAndSetAsync
public GridFutureAdapter<Boolean> compareAndSetAsync(@NotNull @NotNull String key, @Nullable @Nullable Serializable expVal, @NotNull @NotNull Serializable newVal) throws IgniteCheckedException
Write value into distributed metastorage asynchronously but only if current value matches the expected one.- Specified by:
compareAndSetAsyncin interfaceDistributedMetaStorage- Parameters:
key- The key.expVal- Expected value. Might be null.newVal- Value to write. Must not be null.- Returns:
Trueif expected value matched the actual one and write was completed successfully.Falseotherwise.- Throws:
IgniteCheckedException- In case of marshalling error or some other unexpected exception.
-
compareAndRemove
public boolean compareAndRemove(@NotNull @NotNull String key, @NotNull @NotNull Serializable expVal) throws IgniteCheckedExceptionRemove value from distributed metastorage but only if current value matches the expected one.- Specified by:
compareAndRemovein interfaceDistributedMetaStorage- Parameters:
key- The key.expVal- Expected value. Must not be null.- Returns:
Trueif expected value matched the actual one and remove was completed successfully.Falseotherwise.- Throws:
IgniteCheckedException- In case of marshalling error or some other unexpected exception.
-
iterate
public void iterate(@NotNull @NotNull String keyPrefix, @NotNull @NotNull BiConsumer<String,? super Serializable> cb) throws IgniteCheckedExceptionIterate over all values corresponding to the keys with given prefix. It is guaranteed that iteration will be executed in ascending keys order.- Specified by:
iteratein interfaceReadableDistributedMetaStorage- Parameters:
keyPrefix- Prefix for the keys that will be iterated.cb- Callback that will be applied to all<key, value>pairs.- Throws:
IgniteCheckedException- If reading or unmarshalling failed.
-
listen
public void listen(@NotNull @NotNull Predicate<String> keyPred, DistributedMetaStorageListener<?> lsnr)Add listener on data updates. Updates happens it two cases:- Some node invoked write or remove. Listeners are invoked after update update operation is already completed.
- Node is just started and not ready for write yet. In this case listeners are invoked for every key with new value (retrieved from the clueter) or already existing value if there was no updates for given key. This guarantees that all listeners are invoked for all updates in case of failover.
- Specified by:
listenin interfaceReadableDistributedMetaStorage- Parameters:
keyPred- Predicate to check whether this listener should be invoked on given key update or not.lsnr- Listener object.- See Also:
DistributedMetaStorageListener
-
discoveryDataType
@Nullable public @Nullable GridComponent.DiscoveryDataExchangeType discoveryDataType()
Gets unique component type to distinguish components providing discovery data. Must return non-null value if component implements any of methodsGridComponent.collectJoiningNodeData(DiscoveryDataBag)orGridComponent.collectGridNodeData(DiscoveryDataBag).- Specified by:
discoveryDataTypein interfaceGridComponent- Overrides:
discoveryDataTypein classGridProcessorAdapter- Returns:
- Unique component type for discovery data exchange.
-
collectJoiningNodeData
public void collectJoiningNodeData(DiscoveryDataBag dataBag)
Collects discovery data on joining node before sendingTcpDiscoveryJoinRequestMessagerequest.- Specified by:
collectJoiningNodeDatain interfaceGridComponent- Overrides:
collectJoiningNodeDatain classGridProcessorAdapter- Parameters:
dataBag- container object to store discovery data in.
-
validateNode
@Nullable public @Nullable IgniteNodeValidationResult validateNode(ClusterNode node, DiscoveryDataBag.JoiningNodeDiscoveryData discoData)
Validates that new node can join grid topology, this method is called on coordinator node before new node joins topology.
If local node is client then method should do nothing. It is expected that this method is invoked on coordinator node, but there might be exceptions to this. Validation rules:-
Do not join node that has no distributed metastorage if feature is supported in current topology and
distributed metastorage has already been used (
getUpdatesCount()is not zero). - Do not join node that has updates count greater then on local node and hasn't provided enough history to apply it to the cluster.
- Do not join node if its distributed metastorage version hash differs from the local one. In such cases node is probably from different cluster or has some inconsistent data.
- Specified by:
validateNodein interfaceGridComponent- Overrides:
validateNodein classGridProcessorAdapter- Parameters:
node- Joining node.discoData- Joining node discovery data.- Returns:
- Validation result or
nullin case of success.
-
Do not join node that has no distributed metastorage if feature is supported in current topology and
distributed metastorage has already been used (
-
onJoiningNodeDataReceived
public void onJoiningNodeDataReceived(DiscoveryDataBag.JoiningNodeDiscoveryData discoData)
Method is called on nodes that are already in grid (not on joining node). It receives discovery data from joining node.
SincevalidateNode(ClusterNode, DiscoveryDataBag.JoiningNodeDiscoveryData)has already been invoked we can be sure that joining node has valid discovery data. Current method does something meaningful only if joining node has bigger distributed metastorage version, in this case all required updates will be applied.- Specified by:
onJoiningNodeDataReceivedin interfaceGridComponent- Overrides:
onJoiningNodeDataReceivedin classGridProcessorAdapter- Parameters:
discoData-DiscoveryDataBag.JoiningNodeDiscoveryDatainterface to retrieve discovery data of joining node.
-
collectGridNodeData
public void collectGridNodeData(DiscoveryDataBag dataBag)
Collects discovery data on nodes already in grid on receivingTcpDiscoveryNodeAddedMessage.
Does nothing on client nodes. Also does nothinf if feature is not supported on some node in topology. Otherwise it fills databag with data required for joining node so it could be consistent with the cluster. There are 2 main cases: local node has enough history to send only updates or it doesn't. In first case the history is collected, otherwise whole distributed metastorage (fullData) is collected along with available history. Goal of collecting history in second case is to allow all nodes in cluster to have the same history so connection of new server will always give the same result.- Specified by:
collectGridNodeDatain interfaceGridComponent- Overrides:
collectGridNodeDatain classGridProcessorAdapter- Parameters:
dataBag- container object to store discovery data in.
-
onDisconnected
public void onDisconnected(IgniteFuture<?> reconnectFut)
Client disconnected callback.- Specified by:
onDisconnectedin interfaceGridComponent- Overrides:
onDisconnectedin classGridProcessorAdapter- Parameters:
reconnectFut- Reconnect future.
-
onReconnected
public IgniteInternalFuture<?> onReconnected(boolean clusterRestarted)
Client reconnected callback.- Specified by:
onReconnectedin interfaceGridComponent- Overrides:
onReconnectedin classGridProcessorAdapter- Parameters:
clusterRestarted- Cluster restarted flag.- Returns:
- Future to wait before completing reconnect future.
-
onGridDataReceived
public void onGridDataReceived(DiscoveryDataBag.GridDiscoveryData data)
Receives discovery data object from remote nodes (called on new node during discovery process).
Applies received updates if they are present in response.- Specified by:
onGridDataReceivedin interfaceGridComponent- Overrides:
onGridDataReceivedin classGridProcessorAdapter- Parameters:
data- Grid discovery data.
-
flush
public Future<?> flush()
- Returns:
- Future which will be completed when all the updates prior to the pause processed.
-
suspend
public void suspend(IgniteInternalFuture<?> compFut)
- Parameters:
compFut- Future which should be completed when worker may proceed with updates.
-
-