diff --git b/.gitignore a/.gitignore new file mode 100644 index 0000000..0ff047c --- /dev/null +++ a/.gitignore @@ -0,0 +1,2 @@ +build/ +CMakeLists.txt.user diff --git b/CMakeLists.txt a/CMakeLists.txt new file mode 100644 index 0000000..da7537c --- /dev/null +++ a/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.0) + +# Check to see where cmake is located. +if( IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/cmake ) + LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) +elseif( IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/../cmake ) + LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../cmake) +else() + return() +endif() + +# Check to see if there is versioning information available +if(IS_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/osdev_versioning/cmake) + LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/osdev_versioning/cmake) + include(osdevversion) +endif() + +include(projectheader) +project_header(osdev_transqueue) + +add_subdirectory(src) +add_subdirectory(tests) + +# include(packaging) +# package_component() diff --git b/README.md a/README.md new file mode 100644 index 0000000..e69de29 --- /dev/null +++ a/README.md diff --git b/src/CMakeLists.txt a/src/CMakeLists.txt new file mode 100644 index 0000000..4ccb86d --- /dev/null +++ a/src/CMakeLists.txt @@ -0,0 +1,38 @@ +cmake_minimum_required(VERSION 3.0) +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../../../cmake) +include(projectheader) +project_header(transqueue) + +find_package( Qt5Core REQUIRED ) +find_package( Qt5Sql REQUIRED ) + +include_directories( SYSTEM + ${Qt5Core_INCLUDE_DIRS} + ${CMAKE_CURRENT_SOURCE_DIR}/../datatypes + ${CMAKE_CURRENT_SOURCE_DIR}/../logutils + ${CMAKE_CURRENT_SOURCE_DIR}/../config + ${CMAKE_CURRENT_SOURCE_DIR}/../pugixml +) + +include(compiler) + +set(SRC_LIST + ${CMAKE_CURRENT_SOURCE_DIR}/transqueue.cpp +) + +include(qtmoc) +create_mocs( SRC_LIST MOC_LIST + ${CMAKE_CURRENT_SOURCE_DIR}/transqueue.h +) + +include(library) +add_libraries( + ${Qt5Core_LIBRARIES} + datatypes + logutils + config + pugixml +) + +include(installation) +install_component() diff --git b/src/transqueue.cpp a/src/transqueue.cpp new file mode 100644 index 0000000..18cd68d --- /dev/null +++ a/src/transqueue.cpp @@ -0,0 +1,163 @@ +/* **************************************************************************** + * Copyright 2019 Open Systems Development BV * + * * + * Permission is hereby granted, free of charge, to any person obtaining a * + * copy of this software and associated documentation files (the "Software"), * + * to deal in the Software without restriction, including without limitation * + * the rights to use, copy, modify, merge, publish, distribute, sublicense, * + * and/or sell copies of the Software, and to permit persons to whom the * + * Software is furnished to do so, subject to the following conditions: * + * * + * The above copyright notice and this permission notice shall be included in * + * all copies or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * + * DEALINGS IN THE SOFTWARE. * + * ***************************************************************************/ + +#include "transqueue.h" + +#include "dcxmlconfig.h" +#include "log.h" +#include "threadcontext.h" +#include "ormxmlwriter.h" + +#include +#include +#include + +using namespace osdev::components; + +TransQueue::TransQueue( QObject *_parent ) + : QObject( _parent ) + , m_TTL( DCXmlConfig::Instance().timeToLive() ) + , m_queueTimer() + , m_queueMutex() + , m_queue() +{ + // Connect the timer to the internal timeout-slot. + connect( &m_queueTimer, &QTimer::timeout, this, &TransQueue::slotProcessQueue ); +} + +void TransQueue::setTimeOut( int milli_seconds ) +{ + m_queueTimer.setSingleShot( true ); + m_queueTimer.setInterval( milli_seconds ); +} + +void TransQueue::setTransaction( const QSharedPointer& pData ) +{ + QMutexLocker mutLock( &m_queueMutex ); + if( pData ) + { + if( pData->TTL() == -1 ) + { + pData->setTTL( m_TTL ); + m_queue.enqueue( pData ); + } + else if( pData->TTL() == 0 ) + { + if( !this->dumpToDisk( pData ) ) + { + LogError( "[TransQueue::setTransaction]", + QString( "There was a problem dumping the data disk. That is why we dump it to the LogFile : %1" ).arg( pData->asString() ) ); + } + + } + else if( pData->TTL() > 0 ) + { + pData->decTTL(); + m_queue.enqueue( pData ); + } + + + this->startProcessing(); + } +} + +bool TransQueue::processing() const +{ + return m_queueTimer.isActive(); +} + +void TransQueue::startProcessing( bool force ) +{ + if( force ) + { + m_queueTimer.stop(); + /* + * The timer will be started after slotProcessQueue is done processing. + * Timer is implemented as a singleshot, preventing timeout() signals + * to pile up due to the processingtime of the slot. + */ + this->slotProcessQueue(); + } + else + { + if( !m_queueTimer.isActive() ) + { + m_queueTimer.start(); + } + else + { + // Do nothing. timeout will trigger... + } + } +} + +void TransQueue::stopProcessing( bool force ) +{ + if( force && m_queueTimer.isActive() ) + { + m_queueTimer.stop(); + } +} + +void TransQueue::slotProcessQueue() +{ + LogDebug( "[TransQueue::slotProcessQueue()]", QString("Starting processing the queue.") ); + if( !m_queue.isEmpty() ) + { + LogDebug( "[TransQueue::slotProcessQueue()]", QString("Number of entries in the queue before : %1").arg( m_queue.size() ) ); + auto pData = m_queue.dequeue(); + ThreadContextScope tcs(pData->traceId()); + emit signalProcessData( pData ); + LogDebug( "[TransQueue::slotProcessQueue()]", QString("Number of entries in the queue after : %1").arg( m_queue.size() ) ); + LogInfo( "[TransQueue::slotProcessQueue()]", QString("TTL Of package : %1").arg( pData->TTL() ) ); + } + + // Timer is intentionally set as a singleshot. We have to restart it + // every time this slot is called. + if( m_queue.size() > 0 ) + { + LogInfo( "[TransQueue::slotProcessQueue()]", QString( "Number of transactions in Queue : %1" ).arg( m_queue.size() ) ); + m_queueTimer.start(); + } + else + { + LogInfo( "[TransQueue::slotProcessQueue()]", "Transaction Queue is Empty." ); + // Just to be sure..... + m_queueTimer.stop(); + } +} + +bool TransQueue::dumpToDisk( const QSharedPointer& data ) const +{ + if( data ) + { + QString strLogFile = DCXmlConfig::Instance().transactionPath(); + OrmXmlWriter oWriter( strLogFile.toStdString() ); + if( oWriter.writeToDisk() ) + { + LogWarning( "[TransQueue::dumpToDisk]", + QString( "Datafile failed to save to disk. Content : %2" ).arg( data->getMainTableContainer()->timeStamp() ).arg( QString( oWriter.asString().c_str() ) ) ); + } + + } + return false; +} diff --git b/src/transqueue.h a/src/transqueue.h new file mode 100644 index 0000000..aad635c --- /dev/null +++ a/src/transqueue.h @@ -0,0 +1,145 @@ +/* **************************************************************************** + * Copyright 2019 Open Systems Development BV * + * * + * Permission is hereby granted, free of charge, to any person obtaining a * + * copy of this software and associated documentation files (the "Software"), * + * to deal in the Software without restriction, including without limitation * + * the rights to use, copy, modify, merge, publish, distribute, sublicense, * + * and/or sell copies of the Software, and to permit persons to whom the * + * Software is furnished to do so, subject to the following conditions: * + * * + * The above copyright notice and this permission notice shall be included in * + * all copies or substantial portions of the Software. * + * * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * + * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * + * DEALINGS IN THE SOFTWARE. * + * ***************************************************************************/ + +#ifndef OSDEV_COMPONENTS_TRANSQUEUE_H +#define OSDEV_COMPONENTS_TRANSQUEUE_H + +#include "ormreldata.h" + +#include +#include +#include +#include +#include + +/* + * _________________________________________ + * / If voting could change the system, it \ + * | would be illegal. If not voting could | + * \ change the system, it would be illegal. / + * ----------------------------------------- + * \ + * \ + * .--. + * |o_o | + * |:_/ | + * // \ \ + * (| | ) + * /'\_ _/`\ + * \___)=(___/ + * + */ + +namespace osdev { +namespace components { +/*! + * \class TransQueue + * \brief The TransQueue class implements a transaction queue which will + * catch rejected transactions from the ORM layer and offers them + * back through the ETL-layer. An ORMRelPackage is "tagged" with + * a transacion-id, which will keep track on how many times an + * ORMRelPackage enters the queue. If the "TTL_iD (Time_To_live)" + * reaches zero, the package is dropped and the ID is removed from + * the administration. + */ +class TransQueue : public QObject +{ + Q_OBJECT + +public: + /*! + * \brief Represents a queue of transactions. + * \param parent The parent QObject instance. + */ + explicit TransQueue( QObject *parent = nullptr ); + + // Deleted copy- and move constructors + TransQueue( const TransQueue& ) = delete; + TransQueue( const TransQueue&& ) = delete; + TransQueue& operator=( const TransQueue& ) = delete; + TransQueue& operator=( const TransQueue&& ) = delete; + + /*! + * \brief Sets the TimeOut. + * \param milliseconds The timeout to set in milliseconds. + */ + void setTimeOut( int milliseconds = 1 ); + + /*! + * \return Whether this instance is currently processing transactions. + */ + bool processing() const; + + /*! + * \brief Starts processing of transactions. + * \param force Forces start of processing. + */ + void startProcessing( bool force = false ); + + /*! + * \brief Stops processing of transactions. + * \param force Forces stop of processing. + */ + void stopProcessing( bool force = false ); + + /*! + * \return The number of currently outstanding transactions. + */ + int transactions() const { return m_queue.size(); } + +private: + /*! + * \brief This method will write the Contents of the data structure + * to a transaction file for archiving and traceability purposes. + * Not really decided on the format yet, but this will evolve over time. + * \param data - The datastructure we want to write to a transaction Logfile. + * Send as pointer. After writing to disk, the object have to be deleted. + */ + bool dumpToDisk( const QSharedPointer& data ) const; + + int m_TTL; ///< Time to Live start value. + QTimer m_queueTimer; ///< Timer controlling the processing of the queue. + QMutex m_queueMutex; ///< Mutex to prevent race conditions. Although QQueue is thread safe, we have to make sure *we* are. + QQueue> m_queue; ///< The actual FIFO taking an ORMRelData pointer as input. + +signals: + void signalProcessData( QSharedPointer pData ); + +public slots: + /*! + * \brief Sets (adds) a transaction to the queue. + * \param pData The transaction data to set. + */ + void setTransaction( const QSharedPointer& pData ); + + +private slots: + /*! + * \brief slotProcessQueue + */ + void slotProcessQueue(); +}; + +} /* End namespace components */ +} /* End namespace osdev */ + +#endif /* OSDEV_COMPONENTS_TRANSQUEUE_H */ diff --git b/tests/CMakeLists.txt a/tests/CMakeLists.txt new file mode 100644 index 0000000..3635320 --- /dev/null +++ a/tests/CMakeLists.txt @@ -0,0 +1,30 @@ +cmake_minimum_required(VERSION 3.0) +LIST(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/../cmake) + +include(projectheader) +project_header(test_logutils) + +include_directories( SYSTEM + ${CMAKE_CURRENT_SOURCE_DIR}/../../src +) + +include(compiler) +set(SRC_LIST +) + +# add_executable( ${PROJECT_NAME} +# ${SRC_LIST} +# ) + +# target_link_libraries( +# ${PROJECT_NAME} +# ) + +# set_target_properties( ${PROJECT_NAME} PROPERTIES +# RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/bin +# LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/lib +# ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/archive +# ) + +# include(installation) +# install_application()