FARGOS/VISTA Object Management Environment Core
..
|
Intermediary I/O processing object for performing multi-threaded receive-and-process operations on a flow of messages. More...
#include <io_processor.hpp>
Public Types | |
enum | ThreadMode { NONE =0, READ_THREAD =1, PROCESS_THREAD =2, PROCESS_DURING_READ =4, BOTH_THREADS =READ_THREAD | PROCESS_THREAD, SEPARATE_READ_AND_PROCESS_THREADS =BOTH_THREADS, READ_AND_PROCESS_ON_SAME_THREAD =READ_THREAD | PROCESS_DURING_READ } |
Mask to select threading modes. More... | |
enum | BlockMode { PACKET =1, _PACKET_HAS_SOURCE =2, _PACKET_HAS_RECEIVE_TIME =4, _PACKET_HAS_META_DATA =_PACKET_HAS_SOURCE | _PACKET_HAS_RECEIVE_TIME, _NOT_SOCKET =128, CONTIGUOUS_BYTE_STREAM =0, CONTIGUOUS_FILE_STREAM =_NOT_SOCKET, PACKET_WITH_SOURCE =PACKET | _PACKET_HAS_SOURCE, PACKET_WITH_TIME =PACKET | _PACKET_HAS_RECEIVE_TIME, PACKET_WITH_SOURCE_AND_TIME =PACKET | _PACKET_HAS_SOURCE | _PACKET_HAS_RECEIVE_TIME } |
Blocking mode. More... | |
Public Member Functions | |
int | submitOrProcessBlock (SharedBufferAllocRecord *rec) |
Submission routine that will either directly invoke the processing routine or notify the processing thread that it is available. More... | |
IO_Processor (SharedMemoryVariableNode *parentNode, BufferRegion *mgr, IO_processBlockFP b_func, OS_HANDLE_TYPE h, ThreadMode tMode=NONE, void *userData=nullptr, BlockMode bMode=PACKET, IO_receiveBlockFP r_func=recvConsume, IO_consumeFP c_func=doConsumeLoop, IO_processFP p_func=doProcessLoop) | |
Constructor for IO_Processor. More... | |
virtual | ~IO_Processor () |
size_t | dataOffset () const OME_ALWAYS_INLINE |
Return number of bytes reserved for meta data on each data element. More... | |
unsigned char * | bufferAddress (SharedBufferAllocRecord *rec, size_t *bufferLen=nullptr) const OME_ALWAYS_INLINE |
Return physical address of a buffer within the context of the local process' address space. More... | |
IO_metaBlock_header * | bufferHeaderAddress (SharedBufferAllocRecord *rec, size_t *headerLen=nullptr) const OME_ALWAYS_INLINE |
Return physical address of meta block header within the context of the local process' address space. More... | |
const char * | getLabel () const OME_ALWAYS_INLINE NONNULL_RETURN |
void | setLabel (const char *name) OME_ALWAYS_INLINE NONNULL_CLASS_PARAMETERS(2) |
void | setPacketsProcessedIncrement (int32_t incVal) |
void | setReadAttempts (uint32_t count) OME_ALWAYS_INLINE |
Set read attempts before blocking. More... | |
uint32_t | getReadAttempts () const OME_ALWAYS_INLINE |
Get limit on read attempts before blocking. More... | |
void | setReadTimeout (uint32_t count) OME_ALWAYS_INLINE |
Set read timeout. More... | |
uint32_t | getReadTimeout () const OME_ALWAYS_INLINE |
Get limit on read attempts before blocking. More... | |
void | setExtraData (void *data) OME_ALWAYS_INLINE |
Set extra information value. More... | |
void * | getExtraData () const OME_ALWAYS_INLINE |
Retrieve extra information value. More... | |
void | setMaxPacketSize (size_t bytes) OME_ALWAYS_INLINE |
Set MTU. More... | |
void | setProcessRoutine (IO_processBlockFP func) OME_ALWAYS_INLINE |
Set processing routine. More... | |
void | setProcessLoopRoutine (IO_processFP func) OME_ALWAYS_INLINE |
Set processing loop routine. More... | |
void | setConsumeLoopRoutine (IO_consumeFP func) OME_ALWAYS_INLINE |
Set consume loop routine. More... | |
void | setConsumeRoutine (IO_receiveBlockFP func) OME_ALWAYS_INLINE |
Set consume routine. More... | |
uint32_t | getThreadMode () const OME_ALWAYS_INLINE |
Get requested thread modes. More... | |
int | setThreadMode (ThreadMode mode) |
Set threading mode. More... | |
void | setBlockingMode (BlockMode mode) |
Set block delivery mode. More... | |
int | stopThread (uint32_t modes) |
Request stop. More... | |
int | interruptThread (uint32_t modes, bool force=false) |
Interrupt blocked thread. More... | |
int | waitForThreadStart (uint32_t mode) |
Wait for threads to start. More... | |
int | waitForThreadExit (uint32_t modes) |
Wait for threads to terminate. More... | |
int | waitForDataToProcess (bool alreadyLocked=false) |
Wait for data to arrive. More... | |
int | waitForDataToProcessOrUntil (const struct timespec *maxWaitUntil, bool alreadyLocked=false) |
Wait for data to arrive or until a point in time. More... | |
int | noteDataToProcess (bool alreadyLocked=false) |
Note new data has arrived. More... | |
Static Public Member Functions | |
static ssize_t | recvConsume (SharedBufferAllocRecord *rec, class IO_Processor *controller) |
static int | doConsumeLoop (IO_Processor *controller) |
Standard consume loop to receive incoming data. More... | |
static int | doProcessLoop (IO_Processor *controller) |
Standard processing loop to process data on separate thread; works in conjunction with doConsumeLoop(). More... | |
Public Attributes | |
BufferRegion * | bfrManager |
buffer region More... | |
IO_Processor_Statistics * | statistics |
statistics More... | |
TimedMutex * | mutex |
TimedCondition * | condition |
IO_processFP | processRoutine |
IO_processBlockFP | processBlockRoutine |
IO_consumeFP | consumeRoutine |
IO_receiveBlockFP | recvRoutine |
void * | extraData |
arbitrary extra data More... | |
pthread_t | consumeThreadID |
pthread_t | processThreadID |
OS_HANDLE_TYPE | descriptor |
uint32_t | descriptorFlags |
int32_t | packetsProcessedIncrement |
char | label [24] |
size_t | maxPacketSize |
max to receive in one go More... | |
unsigned int | lastReadTimeout |
unsigned char | threadStartedState |
unsigned char | currentThreadState |
unsigned char | desiredThreadState |
unsigned char | stopRequested |
unsigned char | blockingMode |
Intermediary I/O processing object for performing multi-threaded receive-and-process operations on a flow of messages.
Blocking mode.
Mask to select threading modes.
IO_Processor::IO_Processor | ( | SharedMemoryVariableNode * | parentNode, |
BufferRegion * | mgr, | ||
IO_processBlockFP | b_func, | ||
OS_HANDLE_TYPE | h, | ||
ThreadMode | tMode = NONE , |
||
void * | userData = nullptr , |
||
BlockMode | bMode = PACKET , |
||
IO_receiveBlockFP | r_func = recvConsume , |
||
IO_consumeFP | c_func = doConsumeLoop , |
||
IO_processFP | p_func = doProcessLoop |
||
) |
Constructor for IO_Processor.
References _PACKET_HAS_META_DATA, bfrManager, condition, consumeThreadID, currentThreadState, descriptor, descriptorFlags, desiredThreadState, BufferRegion::getBlockSize(), SharedMemoryVariable::getName(), INVALID_HANDLE_VALUE, label, lastReadTimeout, maxPacketSize, mutex, NONE, packetsProcessedIncrement, processThreadID, safe_strcpy, setBlockingMode(), setConsumeLoopRoutine(), setConsumeRoutine(), setExtraData(), setProcessLoopRoutine(), setProcessRoutine(), setThreadMode(), statistics, stopRequested, threadStartedState, and waitForThreadStart().
|
virtual |
References condition, mutex, and statistics.
|
inline |
Return physical address of a buffer within the context of the local process' address space.
References bfrManager, BufferRegion::blockAddress(), dataOffset(), OME_EXPECT_TRUE, and SharedBufferAllocRecord_32::usedLen.
Referenced by ReplumbAndLog::acceptData(), Extract_And_Process_Document_Stream::addIOblockThenProcess(), LogManager::bufferAddress(), LogManager::commitLogRecord(), HTTPstatusLog::emitLineAsHTTPevent(), HTTPstatusLog::forwardBuffer(), HTTPnotificationPublisher::forwardToClients(), processPacketFromSourceUsingClass(), processPacketUsingClass(), and HTTPembeddedServerBase::readHTTPstream().
|
inline |
Return physical address of meta block header within the context of the local process' address space.
References bfrManager, BufferRegion::blockAddress(), and dataOffset().
Referenced by processPacketFromSourceUsingClass(), and recvConsume().
|
inline |
Return number of bytes reserved for meta data on each data element.
References _PACKET_HAS_META_DATA, and blockingMode.
Referenced by bufferAddress(), and bufferHeaderAddress().
|
static |
Standard consume loop to receive incoming data.
0 | is always returned; this is subject to change. |
References BufferRegion::allocateBlock(), bfrManager, blockingMode, IO_Processor_Statistics::bytesRead, CONTIGUOUS_BYTE_STREAM, CONTIGUOUS_FILE_STREAM, maxPacketSize, PACKET, READ_THREAD, recvRoutine, BufferRegion::returnBlock(), statistics, stopRequested, submitOrProcessBlock(), and SharedBufferAllocRecord_32::usedLen.
|
static |
Standard processing loop to process data on separate thread; works in conjunction with doConsumeLoop().
0 | is always returned; this is subject to change. |
References bfrManager, IO_Processor_Statistics::bytesProcessed, BufferRegion::getActiveListHead(), mutex, OME_EXPECT_FALSE, IO_Processor_Statistics::packetsProcessed, packetsProcessedIncrement, IO_Processor_Statistics::packetsRead, PROCESS_THREAD, processBlockRoutine, BufferRegion::returnBlock(), statistics, stopRequested, TimedMutex::unlock(), SharedBufferAllocRecord_32::usedLen, waitForBufferAllocRecordToBeReady(), and waitForDataToProcess().
|
inline |
Retrieve extra information value.
References extraData.
Referenced by ReplumbAndLog::acceptData(), HTTPnotificationPublisher::forwardToClients(), processPacketFromSourceUsingClass(), processPacketUsingClass(), and HTTPembeddedServerBase::readHTTPstream().
|
inline |
References label.
|
inline |
Get limit on read attempts before blocking.
References IO_Processor_Statistics::readAttemptsBeforeBlocking, and statistics.
Referenced by recvConsume().
|
inline |
Get limit on read attempts before blocking.
References IO_Processor_Statistics::maxReadTimeout, and statistics.
Referenced by recvConsume().
|
inline |
int IO_Processor::interruptThread | ( | uint32_t | modes, |
bool | force = false |
||
) |
Interrupt blocked thread.
References consumeThreadID, currentThreadState, PROCESS_THREAD, processThreadID, READ_THREAD, and SIGIO.
Referenced by stopThread().
int IO_Processor::noteDataToProcess | ( | bool | alreadyLocked = false | ) |
Note new data has arrived.
References condition, TimedCondition::isSleeping(), mutex, IO_Processor_Statistics::packetsProcessed, IO_Processor_Statistics::packetsRead, TimedCondition::postCondition(), statistics, and TimedMutex::unlock().
Referenced by HTTPstatusLog::emitLineAsHTTPevent(), stopThread(), and submitOrProcessBlock().
|
static |
Normal receive/consume routine suitable for handling datagram and stream sockets as well as files.
References _NOT_SOCKET, _PACKET_HAS_META_DATA, _PACKET_HAS_RECEIVE_TIME, _PACKET_HAS_SOURCE, blockingMode, bufferHeaderAddress(), descriptor, descriptorFlags, EAGAIN, EINTR, errno, IO_metaBlock_header::fromAddress, IO_metaBlock_header::fromLen, getCurrentTime(), getReadAttempts(), getReadTimeout(), IO_metaBlock_header::headerLen, htons, lastReadTimeout, maxPacketSize, OS_SOCKET_TYPE, IO_Processor_Statistics::productiveReadSpins, READ_THREAD, IO_metaBlock_header::receiveTimeNanoseconds, IO_metaBlock_header::receiveTimeSeconds, SOCKET_CAST, statistics, stopRequested, TimeWithNanoseconds::time_nanosec, and TimeWithNanoseconds::time_sec.
void IO_Processor::setBlockingMode | ( | BlockMode | mode | ) |
|
inline |
|
inline |
|
inline |
|
inline |
References label, and safe_strcpy.
|
inline |
Set MTU.
References maxPacketSize.
|
inline |
References packetsProcessedIncrement.
|
inline |
|
inline |
Set processing routine.
References processBlockRoutine.
Referenced by HTTPstatusLog::becomeEventStream(), and IO_Processor().
|
inline |
Set read attempts before blocking.
References IO_Processor_Statistics::readAttemptsBeforeBlocking, and statistics.
Referenced by ReplumbAndLog::ReplumbAndLog().
|
inline |
Set read timeout.
References IO_Processor_Statistics::maxReadTimeout, and statistics.
int IO_Processor::setThreadMode | ( | ThreadMode | mode | ) |
Set threading mode.
References consumeThreadID, currentThreadState, desiredThreadState, NULL, PROCESS_DURING_READ, READ_THREAD, and stopRequested.
Referenced by HTTPstatusLog::becomeEventStream(), IO_Processor(), and ReplumbAndLog::ReplumbAndLog().
int IO_Processor::stopThread | ( | uint32_t | modes | ) |
Request stop.
References currentThreadState, desiredThreadState, interruptThread(), noteDataToProcess(), PROCESS_THREAD, READ_THREAD, and stopRequested.
Referenced by HTTPstatusLog::becomeEventStream(), HTTP_SessionRecord::closeConnection(), LogManager::killProcessingThread(), HTTPembeddedServerBase::readHTTPstream(), and LogManager::~LogManager().
int IO_Processor::submitOrProcessBlock | ( | SharedBufferAllocRecord * | rec | ) |
Submission routine that will either directly invoke the processing routine or notify the processing thread that it is available.
Not normally used directly, but available for special use cases.
References bfrManager, IO_Processor_Statistics::bytesProcessed, currentThreadState, mutex, noteDataToProcess(), IO_Processor_Statistics::packetsProcessed, packetsProcessedIncrement, IO_Processor_Statistics::packetsRead, PROCESS_DURING_READ, BufferRegion::returnBlock(), and statistics.
Referenced by doConsumeLoop(), and LogManager::writeDataToBuffer().
int IO_Processor::waitForDataToProcess | ( | bool | alreadyLocked = false | ) |
Wait for data to arrive.
References condition, mutex, IO_Processor_Statistics::packetsProcessed, IO_Processor_Statistics::packetsRead, PROCESS_THREAD, statistics, stopRequested, and TimedMutex::unlock().
Referenced by doProcessLoop(), and waitForDataToProcessOrUntil().
int IO_Processor::waitForDataToProcessOrUntil | ( | const struct timespec * | maxWaitUntil, |
bool | alreadyLocked = false |
||
) |
Wait for data to arrive or until a point in time.
References condition, mutex, OME_EXPECT_FALSE, IO_Processor_Statistics::packetsProcessed, IO_Processor_Statistics::packetsRead, PROCESS_THREAD, statistics, stopRequested, TimedMutex::unlock(), and waitForDataToProcess().
int IO_Processor::waitForThreadExit | ( | uint32_t | modes | ) |
Wait for threads to terminate.
References consumeThreadID, PROCESS_THREAD, processThreadID, READ_THREAD, and threadStartedState.
Referenced by HTTPstatusLog::becomeEventStream(), LogManager::closeLog(), and LogManager::~LogManager().
int IO_Processor::waitForThreadStart | ( | uint32_t | mode | ) |
Wait for threads to start.
References currentThreadState, desiredThreadState, OME_YIELD_THREAD, PROCESS_THREAD, READ_THREAD, and threadStartedState.
Referenced by HTTPstatusLog::becomeEventStream(), IO_Processor(), and LogManager::LogManager().
BufferRegion* IO_Processor::bfrManager |
unsigned char IO_Processor::blockingMode |
Referenced by dataOffset(), doConsumeLoop(), processPacketFromSourceUsingClass(), recvConsume(), and setBlockingMode().
TimedCondition* IO_Processor::condition |
Referenced by IO_Processor(), noteDataToProcess(), waitForDataToProcess(), waitForDataToProcessOrUntil(), and ~IO_Processor().
IO_consumeFP IO_Processor::consumeRoutine |
Referenced by setConsumeLoopRoutine().
pthread_t IO_Processor::consumeThreadID |
Referenced by interruptThread(), IO_Processor(), setThreadMode(), and waitForThreadExit().
unsigned char IO_Processor::currentThreadState |
OS_HANDLE_TYPE IO_Processor::descriptor |
Referenced by HTTPstatusLog::addHTTPsession(), HTTPstatusLog::becomeEventStream(), HTTP_SessionRecord::closeConnection(), LogManager::closeLog(), LogManager::commitLogRecord(), HTTP_SessionRecord::default_OPTIONS(), HTTPstatusLog::forwardBuffer(), HTTP_SessionRecord::handleHTTPrequest(), IO_Processor(), HTTPembeddedServerBase::readHTTPstream(), recvConsume(), HTTPstatusLog::removeHTTPsession(), and HTTP_SessionRecord::sendResponse().
uint32_t IO_Processor::descriptorFlags |
Referenced by IO_Processor(), and recvConsume().
unsigned char IO_Processor::desiredThreadState |
void* IO_Processor::extraData |
arbitrary extra data
Referenced by getExtraData(), and setExtraData().
char IO_Processor::label[24] |
Referenced by getLabel(), IO_Processor(), and setLabel().
unsigned int IO_Processor::lastReadTimeout |
Referenced by IO_Processor(), and recvConsume().
size_t IO_Processor::maxPacketSize |
max to receive in one go
Referenced by doConsumeLoop(), IO_Processor(), recvConsume(), and setMaxPacketSize().
TimedMutex* IO_Processor::mutex |
int32_t IO_Processor::packetsProcessedIncrement |
Referenced by doProcessLoop(), IO_Processor(), setPacketsProcessedIncrement(), and submitOrProcessBlock().
IO_processBlockFP IO_Processor::processBlockRoutine |
Referenced by doProcessLoop(), and setProcessRoutine().
IO_processFP IO_Processor::processRoutine |
Referenced by setProcessLoopRoutine().
pthread_t IO_Processor::processThreadID |
Referenced by interruptThread(), IO_Processor(), and waitForThreadExit().
IO_receiveBlockFP IO_Processor::recvRoutine |
Referenced by doConsumeLoop(), and setConsumeRoutine().
IO_Processor_Statistics* IO_Processor::statistics |
statistics
Referenced by HTTPstatusLog::becomeEventStream(), doConsumeLoop(), doProcessLoop(), HTTPstatusLog::emitLineAsHTTPevent(), getReadAttempts(), getReadTimeout(), IO_Processor(), noteDataToProcess(), recvConsume(), setReadAttempts(), setReadTimeout(), submitOrProcessBlock(), waitForDataToProcess(), waitForDataToProcessOrUntil(), LogManager::writeDataToBuffer(), and ~IO_Processor().
unsigned char IO_Processor::stopRequested |
unsigned char IO_Processor::threadStartedState |
Referenced by IO_Processor(), waitForThreadExit(), and waitForThreadStart().
![]() | Generated: Tue Jul 28 2020 16:03:27
Support Information |