FARGOS/VISTA Object Management Environment Core
..
|
Streaming filter that consumes input data from multiple sources and outputs blocks in sorted order. More...
#include <OrderedInput.hpp>
Public Member Functions | |
OrderedMultipleInputFilter (size_t maxReorderFromSource=0) | |
Create an OrderedMultipleInputFilter. More... | |
virtual | ~OrderedMultipleInputFilter () |
void | setThreadingEnabled (bool enabled) |
Set threading enabled flag. More... | |
bool | getThreadingEnabled () const OME_ALWAYS_INLINE |
Retrieve threading enabled flag. More... | |
virtual void | noteNextKey (uint64_t key, int64_t blockCount, uint64_t nextKeyFromAlternateSource) |
User exit to note next input sort key. More... | |
virtual int | processInputBlock (OrderedInputBlock *block)=0 |
Abstract interface to process block. More... | |
virtual void | nowAtEndOfFile (OrderedInputSource *source) |
User-exit called when end-of-file is detected on an input source. More... | |
uint_fast32_t | inputSourceTotal () const OME_ALWAYS_INLINE OME_ALWAYS_OPTIMIZE("-O3") |
Return total number of registered input sources. More... | |
virtual uint_fast32_t | getTotalBlocksPending () const |
Interface to return total amount of pending work. More... | |
virtual uint_fast32_t | getTotalBlocksInProgress () const |
Interface to return total amount of work currently being processed. More... | |
bool | isQueueEmpty () const OME_ALWAYS_INLINE OME_ALWAYS_OPTIMIZE("-O3") |
Return Boolean indication of queue being empty. More... | |
uint_fast32_t | purgeQueue () |
Completely purge any pending input blocks. More... | |
void | addInputSource (OrderedInputSource *source, ssize_t preloadBy=-1) |
Register a new input source. More... | |
int | preloadQueue () |
Preload the priority queue with the initial set of blocks. Only the first block from each source is loaded, which means the main priority queue size remains limited by the number of distinct input sources. Actual reordering within a given input stream is handled by the OrderedInputSourceProxyWithReordering class. More... | |
int64_t | processQueue (uint32_t *stopFlagPtr, const struct timespec *maxWaitTime=&OrderedInputSource::WAIT_YEAR_MAX) OME_ALWAYS_OPTIMIZE("-O3") |
Process input blocks until all OrderedInputSource objects are drained of available data or a specified point in time is reached. More... | |
int64_t | processInputFiles (uint32_t *stopFlag, const struct timespec *maxWaitTime=&OrderedInputSource::WAIT_YEAR_MAX) |
Process data from the registered collection of OrderedInputSource objects. This is a convenience cover that invokes preloadQueue() and then processQueue(). Due to the invocation of preloadQueue(), it should be called only once. If it returns early due to the maximum wait time having been reached, processQueue() should be called to continue processing. More... | |
virtual int64_t | processWorkInProgress (uint32_t *stopFlagPtr, const struct timespec *maxWaitTime=&OrderedInputSource::WAIT_YEAR_MAX) |
Interface to complete processing any background work that was previously initiated via processQueue() but has not yet finished. More... | |
Protected Attributes | |
OrderedInputPriorityQueue | priorityQueue |
std::vector< OrderedInputSource * > | sourceFiles |
std::set< OrderedInputSource * > | activeInputSources |
size_t | maxReorderedElements |
uint_fast16_t | activeSourceCount |
bool | forceCopy |
bool | threadingEnabled |
Streaming filter that consumes input data from multiple sources and outputs blocks in sorted order.
The processQueue() method processes all registered inputs in order. There is a processInputFiles() convenience method that calls preloadQueue() first before invoking processQueue().
If reordered blocks are to be tolerated, the class OrderedInputSourceProxyWithReordering will be used an intermediary for a given input and reorders any out-of-order content from a single source.
The basic flow is to call OrderedInputSource::getNextInputBlock() on an OrderedInputSource (or subclass) object to retrieve an OrderedInputBlock block and eventually return it via a OrderedInputSource::recoverInputBlock() call.
A retrieved block is processed by an ordered sequence of two virtual function calls: noteNextKey() and processInputBlock().
The noteNextKey() call is often used to note the source time of an input block and set the simulated clock time.
The processInputBlock() routine must be implemented by a subclass and deals with doing something useful with the retrieved data.
|
inline |
Create an OrderedMultipleInputFilter.
maxReorderFromSource | is an optional parameter that indicates how many out-of-order blocks can be tolerated and correctly reordered from a single source. |
References activeSourceCount, forceCopy, maxReorderedElements, and threadingEnabled.
|
inlinevirtual |
References purgeQueue().
|
inline |
Register a new input source.
source | specifies the OrderedInputSource |
preloadBy | is an optional parameter that specifies the amount of out-of-order blocks to be tolerated from this source. If negative, the default is used. |
References activeInputSources, OrderedInputSourceProxyWithReordering::dropProxy(), io(), LOG_COMPONENT_CERR, LOG_ENDLINE, maxReorderedElements, sourceFiles, and OrderedInputSourceProxyWithReordering::totalBlocksPending().
|
inline |
Retrieve threading enabled flag.
References threadingEnabled.
|
inlinevirtual |
Interface to return total amount of work currently being processed.
|
inlinevirtual |
Interface to return total amount of pending work.
|
inline |
Return total number of registered input sources.
References OrderedInputPriorityQueue::getQueueLength(), and priorityQueue.
|
inline |
Return Boolean indication of queue being empty.
References OrderedInputPriorityQueue::getQueueLength(), and priorityQueue.
|
inlinevirtual |
User exit to note next input sort key.
Typically the key value represents nanoseconds. This routine can be overridden by a user's subclass to pick up the key to be used prior to a processInputBlock() call.
key | is the key for the current block |
blockCount | is a relative count of blocks |
nextKeyFromAlternateSource | specifies the next earliest key from a different source. If there is no other active source, then a value of ~0 will be specified. |
Referenced by processQueue().
|
inlinevirtual |
User-exit called when end-of-file is detected on an input source.
source | specifies the input source being read |
The default implementation calls OrderedInputSource::noteEOFread()
Referenced by preloadQueue(), and processQueue().
|
inline |
Preload the priority queue with the initial set of blocks. Only the first block from each source is loaded, which means the main priority queue size remains limited by the number of distinct input sources. Actual reordering within a given input stream is handled by the OrderedInputSourceProxyWithReordering class.
References activeInputSources, activeSourceCount, OrderedInputSource::InputBlockReturn::block, forceCopy, nowAtEndOfFile(), OrderedInputSource::InputBlockReturn::obtainedLen, OME_EXPECT_TRUE, priorityQueue, OrderedInputPriorityQueue::pushBlock(), and sourceFiles.
Referenced by processInputFiles().
|
pure virtual |
Abstract interface to process block.
block | points at the block to be processed |
1 | continue processing, but do not call OrderedInputSource::recoverInputBlock(). Ownership has been transferred and the block will be recovered later. |
0 | continue processing |
-1 | stop processing this input |
-2 | stop processing all input |
Referenced by processQueue().
|
inline |
Process data from the registered collection of OrderedInputSource objects. This is a convenience cover that invokes preloadQueue() and then processQueue(). Due to the invocation of preloadQueue(), it should be called only once. If it returns early due to the maximum wait time having been reached, processQueue() should be called to continue processing.
stopFlag | points to a memory region which will be watched; processing will stop if it is set to a nonzero value. A null pointer can be passed if this is not needed. |
maxWaitTime | specifies the maximum amount of time to wait for input from a source. Normally, this is specified as an absolute point in time; however, if the number of seconds is less than that representing a single day, the timespec is interpreted as being an offset from the earliest time in the queue. |
References OrderedInputPriorityQueue::getQueueLength(), io(), LOG_COMPONENT_CERR, LOG_ENDLINE, preloadQueue(), priorityQueue, and processQueue().
|
inline |
Process input blocks until all OrderedInputSource objects are drained of available data or a specified point in time is reached.
stopFlagPtr | points to a memory region which will be watched; processing will stop if it is set to a nonzero value. A null pointer can be passed if this is not needed. |
maxWaitTime | specifies the maximum amount of time to wait for input from a source. Normally, this is specified as an absolute point in time; however, if the number of seconds is less than that representing a single day, the timespec is interpreted as being an offset from the earliest time in the queue. |
EOF across all inputs can be detected by calling isQueueEmpty().
For simulation purposes, relative constraints will process the data as quickly as possible. If other events must be processed exactly as would have happened in real-time, the best approach is to use a relative time constraint for the first call to processQueue() and absolute times for the subsequent calls.
References activeInputSources, activeSourceCount, OrderedInputSource::InputBlockReturn::block, forceCopy, OrderedInputPriorityQueue::getQueueLength(), OrderedInputBlock::inputFile, io(), LOG_COMPONENT_CERR, LOG_ENDLINE, noteNextKey(), nowAtEndOfFile(), OME_EXPECT_FALSE, OME_EXPECT_TRUE, OME_PREFETCH, OrderedInputPriorityQueue::peekNextBlock(), priorityQueue, processInputBlock(), purgeQueue(), OrderedInputPriorityQueue::pushBlock(), OrderedInputPriorityQueue::removeNextBlock(), and OrderedInputBlock::sortKey.
Referenced by processInputFiles().
|
inlinevirtual |
Interface to complete processing any background work that was previously initiated via processQueue() but has not yet finished.
stopFlagPtr | points to a memory region which will be watched; processing will stop if it is set to a nonzero value. A null pointer can be passed if this is not needed. |
maxWaitTime | specifies the maximum amount of time to wait for input from a source. |
|
inline |
Completely purge any pending input blocks.
References OrderedInputPriorityQueue::getQueueLength(), OrderedInputBlock::inputFile, OrderedInputPriorityQueue::popNextBlock(), and priorityQueue.
Referenced by processQueue(), and ~OrderedMultipleInputFilter().
|
inline |
Set threading enabled flag.
enabled | is a Boolean specifying the flag value. |
References threadingEnabled.
|
protected |
Referenced by addInputSource(), preloadQueue(), and processQueue().
|
protected |
Referenced by OrderedMultipleInputFilter(), preloadQueue(), and processQueue().
|
protected |
Referenced by OrderedMultipleInputFilter(), preloadQueue(), and processQueue().
|
protected |
Referenced by addInputSource(), and OrderedMultipleInputFilter().
|
protected |
Referenced by inputSourceTotal(), isQueueEmpty(), preloadQueue(), processInputFiles(), processQueue(), and purgeQueue().
|
protected |
Referenced by addInputSource(), and preloadQueue().
|
protected |
Referenced by getThreadingEnabled(), OrderedMultipleInputFilter(), and setThreadingEnabled().
![]() | Generated: Tue Jul 28 2020 16:03:27
Support Information |