/* **************************************************************************** * Copyright 2019 Open Systems Development BV * * * * Permission is hereby granted, free of charge, to any person obtaining a * * copy of this software and associated documentation files (the "Software"), * * to deal in the Software without restriction, including without limitation * * the rights to use, copy, modify, merge, publish, distribute, sublicense, * * and/or sell copies of the Software, and to permit persons to whom the * * Software is furnished to do so, subject to the following conditions: * * * * The above copyright notice and this permission notice shall be included in * * all copies or substantial portions of the Software. * * * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER * * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING * * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER * * DEALINGS IN THE SOFTWARE. * * ***************************************************************************/ #include "transqueue.h" #include "dcxmlconfig.h" #include "log.h" #include "threadcontext.h" #include "ormxmlwriter.h" #include #include #include using namespace osdev::components; TransQueue::TransQueue( QObject *_parent ) : QObject( _parent ) , m_TTL( DCXmlConfig::Instance().timeToLive() ) , m_queueTimer() , m_queueMutex() , m_queue() { // Connect the timer to the internal timeout-slot. connect( &m_queueTimer, &QTimer::timeout, this, &TransQueue::slotProcessQueue ); } void TransQueue::setTimeOut( int milli_seconds ) { m_queueTimer.setSingleShot( true ); m_queueTimer.setInterval( milli_seconds ); } void TransQueue::setTransaction( const QSharedPointer& pData ) { QMutexLocker mutLock( &m_queueMutex ); if( pData ) { if( pData->TTL() == -1 ) { pData->setTTL( m_TTL ); m_queue.enqueue( pData ); } else if( pData->TTL() == 0 ) { if( !this->dumpToDisk( pData ) ) { LogError( "[TransQueue::setTransaction]", QString( "There was a problem dumping the data disk. That is why we dump it to the LogFile : %1" ).arg( pData->asString() ) ); } } else if( pData->TTL() > 0 ) { pData->decTTL(); m_queue.enqueue( pData ); } this->startProcessing(); } } bool TransQueue::processing() const { return m_queueTimer.isActive(); } void TransQueue::startProcessing( bool force ) { if( force ) { m_queueTimer.stop(); /* * The timer will be started after slotProcessQueue is done processing. * Timer is implemented as a singleshot, preventing timeout() signals * to pile up due to the processingtime of the slot. */ this->slotProcessQueue(); } else { if( !m_queueTimer.isActive() ) { m_queueTimer.start(); } else { // Do nothing. timeout will trigger... } } } void TransQueue::stopProcessing( bool force ) { if( force && m_queueTimer.isActive() ) { m_queueTimer.stop(); } } void TransQueue::slotProcessQueue() { LogDebug( "[TransQueue::slotProcessQueue()]", QString("Starting processing the queue.") ); if( !m_queue.isEmpty() ) { LogDebug( "[TransQueue::slotProcessQueue()]", QString("Number of entries in the queue before : %1").arg( m_queue.size() ) ); auto pData = m_queue.dequeue(); ThreadContextScope tcs(pData->traceId()); emit signalProcessData( pData ); LogDebug( "[TransQueue::slotProcessQueue()]", QString("Number of entries in the queue after : %1").arg( m_queue.size() ) ); LogInfo( "[TransQueue::slotProcessQueue()]", QString("TTL Of package : %1").arg( pData->TTL() ) ); } // Timer is intentionally set as a singleshot. We have to restart it // every time this slot is called. if( m_queue.size() > 0 ) { LogInfo( "[TransQueue::slotProcessQueue()]", QString( "Number of transactions in Queue : %1" ).arg( m_queue.size() ) ); m_queueTimer.start(); } else { LogInfo( "[TransQueue::slotProcessQueue()]", "Transaction Queue is Empty." ); // Just to be sure..... m_queueTimer.stop(); } } bool TransQueue::dumpToDisk( const QSharedPointer& data ) const { if( data ) { QString strLogFile = DCXmlConfig::Instance().transactionPath(); OrmXmlWriter oWriter( strLogFile.toStdString() ); if( oWriter.writeToDisk() ) { LogWarning( "[TransQueue::dumpToDisk]", QString( "Datafile failed to save to disk. Content : %2" ).arg( data->getMainTableContainer()->timeStamp() ).arg( QString( oWriter.asString().c_str() ) ) ); } } return false; }