Class DataStreamerImpl<K,V>
- java.lang.Object
-
- org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl<K,V>
-
- All Implemented Interfaces:
AutoCloseable,Comparable<Delayed>,Delayed,IgniteDataStreamer<K,V>
public class DataStreamerImpl<K,V> extends Object implements IgniteDataStreamer<K,V>, Delayed
Data streamer implementation.
-
-
Nested Class Summary
Nested Classes Modifier and Type Class Description protected static classDataStreamerImpl.IsolatedUpdaterIsolated receiver which only loads entry initial value.
-
Field Summary
Fields Modifier and Type Field Description static StringWRN_INCONSISTENT_UPDATES-
Fields inherited from interface org.apache.ignite.IgniteDataStreamer
DFLT_MAX_PARALLEL_OPS, DFLT_PARALLEL_OPS_MULTIPLIER, DFLT_PER_NODE_BUFFER_SIZE, DFLT_PER_THREAD_BUFFER_SIZE, DFLT_UNLIMIT_TIMEOUT
-
-
Constructor Summary
Constructors Constructor Description DataStreamerImpl(GridKernalContext ctx, @Nullable String cacheName, DelayQueue<DataStreamerImpl<K,V>> flushQ)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description IgniteFuture<?>addData(Collection<? extends Map.Entry<K,V>> entries)Adds data for streaming on remote node.IgniteFuture<?>addData(Map.Entry<K,V> entry)Adds data for streaming on remote node.IgniteFuture<?>addData(Map<K,V> entries)Adds data for streaming on remote node.IgniteFuture<?>addData(K key, V val)Adds data for streaming on remote node.IgniteFuture<?>addDataInternal(Collection<? extends DataStreamerEntry> entries)IgniteFuture<?>addDataInternal(Collection<? extends DataStreamerEntry> entries, boolean useThreadBuffer)IgniteFuture<?>addDataInternal(KeyCacheObject key, CacheObject val)booleanallowOverwrite()Gets flag enabling overwriting existing values in cache.voidallowOverwrite(boolean allow)Sets flag enabling overwriting existing values in cache.longautoFlushFrequency()Gets automatic flush frequency.voidautoFlushFrequency(long autoFlushFreq)Sets automatic flush frequency.@Nullable StringcacheName()Name of cache to stream data to.CacheObjectContextcacheObjectContext()voidclose()Closes data streamer.voidclose(boolean cancel)Streams any remaining data and closes this streamer.voidcloseEx(boolean cancel)intcompareTo(Delayed o)protected @NotNull IgniteCacheFutureImplcreateDataLoadFuture()Creates data load future and register its as active future.voiddeployClass(Class<?> depCls)Optional deploy class for peer deployment.voidflush()Streams any remaining data, but doesn't close the streamer.IgniteFuture<?>future()Gets future for this streaming process.longgetDelay(TimeUnit unit)IgniteInternalFuture<?>internalFuture()voidioPolicyResolver(IgniteClosure<ClusterNode,Byte> ioPlcRslvr)booleankeepBinary()Gets flag indicating that objects should be kept in binary format when passed to the stream receiver.voidkeepBinary(boolean keepBinary)Sets flag indicating that objects should be kept in binary format when passes to the steam receiver.intmaxRemapCount()voidmaxRemapCount(int maxRemapCnt)voidonDisconnected(IgniteFuture<?> reconnectFut)intperNodeBufferSize()Gets size of per node key-value pairs buffer.voidperNodeBufferSize(int bufSize)Sets size of per node key-value pairs buffer.intperNodeParallelOperations()Gets maximum number of parallel stream operations for a single node.voidperNodeParallelOperations(int parallelOps)Sets maximum number of parallel stream operations for a single node.intperThreadBufferSize()Gets buffer size set byIgniteDataStreamer.perThreadBufferSize(int).voidperThreadBufferSize(int size)Allows to set buffer size for thread in case of stream byIgniteDataStreamer.addData(Object, Object)call.voidreceiver(StreamReceiver<K,V> rcvr)Sets custom stream receiver to this data streamer.IgniteFuture<?>removeData(K key)Adds key for removal on remote node.IgniteFuture<?>removeDataInternal(KeyCacheObject key)booleanskipStore()Gets flag indicating that write-through behavior should be disabled for data streaming.voidskipStore(boolean skipStore)Sets flag indicating that write-through behavior should be disabled for data streaming.longtimeout()Gets timeout set byIgniteDataStreamer.timeout(long).voidtimeout(long timeout)Sets the timeout that is used in the following cases: any data addition method can be blocked when all per node parallel operations are exhausted.StringtoString()voidtryFlush()Flushes every internal buffer if buffer was flushed before passed in threshold.
-
-
-
Field Detail
-
WRN_INCONSISTENT_UPDATES
public static final String WRN_INCONSISTENT_UPDATES
- See Also:
- Constant Field Values
-
-
Constructor Detail
-
DataStreamerImpl
public DataStreamerImpl(GridKernalContext ctx, @Nullable @Nullable String cacheName, DelayQueue<DataStreamerImpl<K,V>> flushQ)
- Parameters:
ctx- Grid kernal context.cacheName- Cache name.flushQ- Flush queue.
-
-
Method Detail
-
perThreadBufferSize
public void perThreadBufferSize(int size)
Allows to set buffer size for thread in case of stream byIgniteDataStreamer.addData(Object, Object)call.- Specified by:
perThreadBufferSizein interfaceIgniteDataStreamer<K,V>- Parameters:
size- Size of buffer.
-
perThreadBufferSize
public int perThreadBufferSize()
Gets buffer size set byIgniteDataStreamer.perThreadBufferSize(int).- Specified by:
perThreadBufferSizein interfaceIgniteDataStreamer<K,V>- Returns:
- Buffer size.
-
cacheObjectContext
public CacheObjectContext cacheObjectContext()
- Returns:
- Cache object context.
-
future
public IgniteFuture<?> future()
Gets future for this streaming process. This future completes whenever methodIgniteDataStreamer.close(boolean)completes. By attaching listeners to this future it is possible to get asynchronous notifications for completion of this streaming process.- Specified by:
futurein interfaceIgniteDataStreamer<K,V>- Returns:
- Future for this streaming process.
-
internalFuture
public IgniteInternalFuture<?> internalFuture()
- Returns:
- Internal future.
-
deployClass
public void deployClass(Class<?> depCls)
Optional deploy class for peer deployment. All classes added by a data streamer must be class-loadable from the same class-loader. Ignite will make the best effort to detect the most suitable class-loader for data loading. However, in complex cases, where compound or deeply nested class-loaders are used, it is best to specify a deploy class which can be any class loaded by the class-loader for given data.- Specified by:
deployClassin interfaceIgniteDataStreamer<K,V>- Parameters:
depCls- Any class loaded by the class-loader for given data.
-
receiver
public void receiver(StreamReceiver<K,V> rcvr)
Sets custom stream receiver to this data streamer.Disables
IgniteDataStreamer.allowOverwrite(boolean)and setsIgniteDataStreamer.allowOverwrite()returningtrue.- Specified by:
receiverin interfaceIgniteDataStreamer<K,V>- Parameters:
rcvr- Stream receiver.
-
allowOverwrite
public boolean allowOverwrite()
Gets flag enabling overwriting existing values in cache. Data streamer will perform better if this flag is disabled.This flag is disabled by default (default is
false).- Specified by:
allowOverwritein interfaceIgniteDataStreamer<K,V>- Returns:
Trueif overwriting is allowed or if receiver is changed byIgniteDataStreamer.receiver(StreamReceiver).Falseotherwise.
-
allowOverwrite
public void allowOverwrite(boolean allow)
Sets flag enabling overwriting existing values in cache. Data streamer will perform better if this flag is disabled. Note that when this flag isfalse, updates will not be propagated to the cache store (i.e.IgniteDataStreamer.skipStore()flag will be set totrueimplicitly).This flag is disabled by default (default is
false).The flag has no effect when custom cache receiver set using
IgniteDataStreamer.receiver(StreamReceiver)method.- Specified by:
allowOverwritein interfaceIgniteDataStreamer<K,V>- Parameters:
allow- Flag value.
-
skipStore
public boolean skipStore()
Gets flag indicating that write-through behavior should be disabled for data streaming. Default isfalse.- Specified by:
skipStorein interfaceIgniteDataStreamer<K,V>- Returns:
- Skip store flag.
-
skipStore
public void skipStore(boolean skipStore)
Sets flag indicating that write-through behavior should be disabled for data streaming. Default isfalse.- Specified by:
skipStorein interfaceIgniteDataStreamer<K,V>- Parameters:
skipStore- Skip store flag.
-
keepBinary
public boolean keepBinary()
Gets flag indicating that objects should be kept in binary format when passed to the stream receiver. Default isfalse.- Specified by:
keepBinaryin interfaceIgniteDataStreamer<K,V>- Returns:
- Skip store flag.
-
keepBinary
public void keepBinary(boolean keepBinary)
Sets flag indicating that objects should be kept in binary format when passes to the steam receiver. Default isfalse.- Specified by:
keepBinaryin interfaceIgniteDataStreamer<K,V>- Parameters:
keepBinary- Keep binary flag.
-
cacheName
@Nullable public @Nullable String cacheName()
Name of cache to stream data to.- Specified by:
cacheNamein interfaceIgniteDataStreamer<K,V>- Returns:
- Cache name or
nullfor default cache.
-
perNodeBufferSize
public int perNodeBufferSize()
Gets size of per node key-value pairs buffer.- Specified by:
perNodeBufferSizein interfaceIgniteDataStreamer<K,V>- Returns:
- Per node buffer size.
-
perNodeBufferSize
public void perNodeBufferSize(int bufSize)
Sets size of per node key-value pairs buffer.This method should be called prior to
IgniteDataStreamer.addData(Object, Object)call.If not provided, default value is
IgniteDataStreamer.DFLT_PER_NODE_BUFFER_SIZE.- Specified by:
perNodeBufferSizein interfaceIgniteDataStreamer<K,V>- Parameters:
bufSize- Per node buffer size.
-
perNodeParallelOperations
public int perNodeParallelOperations()
Gets maximum number of parallel stream operations for a single node.- Specified by:
perNodeParallelOperationsin interfaceIgniteDataStreamer<K,V>- Returns:
- Maximum number of parallel stream operations for a single node.
-
perNodeParallelOperations
public void perNodeParallelOperations(int parallelOps)
Sets maximum number of parallel stream operations for a single node.This method should be called prior to
IgniteDataStreamer.addData(Object, Object)call.If not provided, default value is calculated as follows
IgniteDataStreamer.DFLT_PARALLEL_OPS_MULTIPLIER*DATA_STREAMER_POOL_SIZE_ON_REMOTE_NODE.- Specified by:
perNodeParallelOperationsin interfaceIgniteDataStreamer<K,V>- Parameters:
parallelOps- Maximum number of parallel stream operations for a single node.- See Also:
IgniteConfiguration.getDataStreamerThreadPoolSize()
-
timeout
public void timeout(long timeout)
Sets the timeout that is used in the following cases:- any data addition method can be blocked when all per node parallel operations are exhausted. The timeout defines the max time you will be blocked waiting for a permit to add a chunk of data into the streamer;
- Total timeout time for
IgniteDataStreamer.flush()operation; - Total timeout time for
IgniteDataStreamer.close()operation.
- Specified by:
timeoutin interfaceIgniteDataStreamer<K,V>- Parameters:
timeout- Timeout in milliseconds.
-
timeout
public long timeout()
Gets timeout set byIgniteDataStreamer.timeout(long).- Specified by:
timeoutin interfaceIgniteDataStreamer<K,V>- Returns:
- Timeout in milliseconds.
-
autoFlushFrequency
public long autoFlushFrequency()
Gets automatic flush frequency. Essentially, this is the time after which the streamer will make an attempt to submit all data added so far to remote nodes. Note that there is no guarantee that data will be delivered after this concrete attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.If set to
0, automatic flush is disabled.Automatic flush is disabled by default (default value is
0).- Specified by:
autoFlushFrequencyin interfaceIgniteDataStreamer<K,V>- Returns:
- Flush frequency or
0if automatic flush is disabled. - See Also:
IgniteDataStreamer.flush()
-
autoFlushFrequency
public void autoFlushFrequency(long autoFlushFreq)
Sets automatic flush frequency. Essentially, this is the time after which the streamer will make an attempt to submit all data added so far to remote nodes. Note that there is no guarantee that data will be delivered after this concrete attempt (e.g., it can fail when topology is changing), but it won't be lost anyway.If set to
0, automatic flush is disabled.Automatic flush is disabled by default (default value is
0).- Specified by:
autoFlushFrequencyin interfaceIgniteDataStreamer<K,V>- Parameters:
autoFlushFreq- Flush frequency or0to disable automatic flush.- See Also:
IgniteDataStreamer.flush()
-
addData
public IgniteFuture<?> addData(Map<K,V> entries) throws IllegalStateException
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are called.Note: if
IgniteDataStreamer.allowOverwrite()set tofalse(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setIgniteDataStreamer.allowOverwrite(boolean)totrue)- Specified by:
addDatain interfaceIgniteDataStreamer<K,V>- Parameters:
entries- Map to be streamed.- Returns:
- Future for this stream operation.
Note: It may never complete unless
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are explicitly called. - Throws:
IllegalStateException- If grid has been concurrently stopped orIgniteDataStreamer.close(boolean)has already been called on streamer.- See Also:
IgniteDataStreamer.allowOverwrite()
-
addData
public IgniteFuture<?> addData(Collection<? extends Map.Entry<K,V>> entries)
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are called.Note: if
IgniteDataStreamer.allowOverwrite()set tofalse(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setIgniteDataStreamer.allowOverwrite(boolean)totrue)- Specified by:
addDatain interfaceIgniteDataStreamer<K,V>- Parameters:
entries- Collection of entries to be streamed.- Returns:
- Future for this stream operation.
Note: It may never complete unless
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are explicitly called. - See Also:
IgniteDataStreamer.allowOverwrite()
-
addDataInternal
public IgniteFuture<?> addDataInternal(KeyCacheObject key, CacheObject val)
- Parameters:
key- Key.val- Value.- Returns:
- Future.
-
removeDataInternal
public IgniteFuture<?> removeDataInternal(KeyCacheObject key)
- Parameters:
key- Key.- Returns:
- Future.
-
addDataInternal
public IgniteFuture<?> addDataInternal(Collection<? extends DataStreamerEntry> entries)
- Parameters:
entries- Entries.- Returns:
- Future.
-
addDataInternal
public IgniteFuture<?> addDataInternal(Collection<? extends DataStreamerEntry> entries, boolean useThreadBuffer)
- Parameters:
entries- Entries.useThreadBuffer-- Returns:
- Future.
-
createDataLoadFuture
@NotNull protected @NotNull IgniteCacheFutureImpl createDataLoadFuture()
Creates data load future and register its as active future.- Returns:
- Data load future.
-
addData
public IgniteFuture<?> addData(Map.Entry<K,V> entry)
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are called.Note: if
IgniteDataStreamer.allowOverwrite()set tofalse(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setIgniteDataStreamer.allowOverwrite(boolean)totrue)- Specified by:
addDatain interfaceIgniteDataStreamer<K,V>- Parameters:
entry- Entry.- Returns:
- Future for this operation.
Note: It may never complete unless
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are explicitly called. - See Also:
IgniteDataStreamer.allowOverwrite()
-
addData
public IgniteFuture<?> addData(K key, V val)
Adds data for streaming on remote node. This method can be called from multiple threads in parallel to speed up streaming if needed.Note that streamer will stream data concurrently by multiple internal threads, so the data may get to remote nodes in different order from which it was added to the streamer. The data may not be sent until
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are called.Note: if
IgniteDataStreamer.allowOverwrite()set tofalse(by default) then data streamer will not overwrite existing cache entries for better performance (to change, setIgniteDataStreamer.allowOverwrite(boolean)totrue)- Specified by:
addDatain interfaceIgniteDataStreamer<K,V>- Parameters:
key- Key.val- Value ornullif respective entry must be removed from cache.- Returns:
- Future for this operation.
Note: It may never complete unless
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are explicitly called. - See Also:
IgniteDataStreamer.allowOverwrite()
-
removeData
public IgniteFuture<?> removeData(K key)
Adds key for removal on remote node. Equivalent toaddData(key, null).- Specified by:
removeDatain interfaceIgniteDataStreamer<K,V>- Parameters:
key- Key.- Returns:
- Future for this operation.
Note: It may never complete unless
IgniteDataStreamer.flush()orIgniteDataStreamer.close()are explicitly called.
-
ioPolicyResolver
public void ioPolicyResolver(IgniteClosure<ClusterNode,Byte> ioPlcRslvr)
- Parameters:
ioPlcRslvr- IO policy resolver.
-
flush
public void flush() throws javax.cache.CacheExceptionStreams any remaining data, but doesn't close the streamer. Data can be still added after flush is finished. This method blocks and doesn't allow to add any data until all data is streamed.If another thread is already performing flush, this method will block, wait for another thread to complete flush and exit. If you don't want to wait in this case, use
IgniteDataStreamer.tryFlush()method.Note that #flush() guarantees completion of all futures returned by
IgniteDataStreamer.addData(Object, Object), listeners should be tracked separately.- Specified by:
flushin interfaceIgniteDataStreamer<K,V>- Throws:
javax.cache.CacheException- If failed to load data from buffer.- See Also:
IgniteDataStreamer.tryFlush()
-
tryFlush
public void tryFlush() throws IgniteInterruptedExceptionFlushes every internal buffer if buffer was flushed before passed in threshold.Does not wait for result and does not fail on errors assuming that this method should be called periodically.
- Specified by:
tryFlushin interfaceIgniteDataStreamer<K,V>- Throws:
IgniteInterruptedException- If thread has been interrupted.- See Also:
IgniteDataStreamer.flush()
-
close
public void close(boolean cancel) throws javax.cache.CacheExceptionDescription copied from interface:IgniteDataStreamerStreams any remaining data and closes this streamer.- Specified by:
closein interfaceIgniteDataStreamer<K,V>- Parameters:
cancel-Trueto close with cancellation.- Throws:
javax.cache.CacheException- If failed.
-
closeEx
public void closeEx(boolean cancel) throws IgniteCheckedException- Parameters:
cancel-Trueto close with cancellation.- Throws:
IgniteCheckedException- If failed.
-
onDisconnected
public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException
- Parameters:
reconnectFut- Reconnect future.- Throws:
IgniteCheckedException- If failed.
-
close
public void close() throws javax.cache.CacheExceptionCloses data streamer. This method is identical to callingclose(false)method.The method is invoked automatically on objects managed by the
try-with-resourcesstatement.- Specified by:
closein interfaceAutoCloseable- Specified by:
closein interfaceIgniteDataStreamer<K,V>- Throws:
javax.cache.CacheException- If failed to close data streamer.
-
maxRemapCount
public int maxRemapCount()
- Returns:
- Max remap count.
-
maxRemapCount
public void maxRemapCount(int maxRemapCnt)
- Parameters:
maxRemapCnt- New max remap count.
-
compareTo
public int compareTo(Delayed o)
- Specified by:
compareToin interfaceComparable<K>
-
-