#include "ormtransqueue.h" #include #include "log.h" namespace osdev { namespace caelus { OrmTransQueue::OrmTransQueue( QObject *_parent ) : QObject(_parent) , m_pQueueTimer( nullptr ) , m_pQueueMutex( nullptr ) , m_ormQueue() { m_pQueueMutex = new QMutex( QMutex::NonRecursive ); setTimeOut( 1000 ); } void OrmTransQueue::setTimeOut( int mseconds ) { if( nullptr == m_pQueueTimer ) { m_pQueueTimer = new QTimer( this ); m_pQueueTimer->setSingleShot( false ); // Connect signal / slot.. connect( m_pQueueTimer, SIGNAL( timeout() ), this, SLOT( slotProcessQueue() ), Qt::QueuedConnection ); } // Set the interval in milliseconds and start. m_pQueueTimer->setInterval( mseconds ); } bool OrmTransQueue::processing() const { return m_pQueueTimer->isActive(); } void OrmTransQueue::startProcessing( bool force ) { if( force ) { m_pQueueTimer->stop(); this->slotProcessQueue(); } else { if( !m_pQueueTimer->isActive() ) { m_pQueueTimer->start(); } } } void OrmTransQueue::stopProcessing( bool force ) { if( force && !m_pQueueTimer->isActive() ) { m_pQueueTimer->stop(); } } bool OrmTransQueue::setTransaction( ORMRelData *pData ) { QMutexLocker mutLock( m_pQueueMutex ); // << Later on we should do a conditional wait LogDebug( "[OrmTransQueue::setTransaction]", "Adding transaction to the transaction queue : "); m_ormQueue.enqueue( pData ); LogDebug("[OrmTransQueue::setTransaction]", QString("Number of transactions in the queue : %1").arg( m_ormQueue.size() ) ); this->startProcessing(); return true; // << Rethink about it.. } void OrmTransQueue::slotProcessQueue() { m_pQueueTimer->stop(); LogInfo( "[OrmTransQueue::slotProcessQueue]", "Start processing the transaction queue." ); if( m_pQueueMutex->tryLock() ) { LogDebug( "[OrmTransQueue::slotProcessQueue]", "!!!!!!!!!!!!!!!!!!!!! Lock Acquired !!!!!!!!!!!!!!!" ); if( !m_ormQueue.isEmpty() ) { LogInfo( "[OrmTransQueue::slotProcessQueue]", QString( "{ Before Emit } Number of transactions in the queue : %1 ").arg( m_ormQueue.size() ) ); ORMRelData *pData = m_ormQueue.dequeue(); Q_UNUSED( pData ); LogDebug( "[OrmTransQueue::slotProcessQueue]", QString("{ Before Emit, after dequeue} Number of transactions in the queue : %1 ").arg( m_ormQueue.size() ) ); // emit signalProcessData( pData ); LogDebug( "[OrmTransQueue::slotProcessQueue]", QString("{ After Emit, after dequeue} Number of transactions in the queue : %1 ").arg( m_ormQueue.size() ) ); QCoreApplication::processEvents(); } m_pQueueMutex->unlock(); LogDebug( "[OrmTransQueue::slotProcessQueue]", "!!!!!!!!!!!!!!!!!!!!! Lock Freed !!!!!!!!!!!!!!!" ); } else { LogDebug( "[OrmTransQueue::slotProcessQueue]", "!!!!!!!!!!!!!!!!!!!!! No Lock !!!!!!!!!!!!!!!" ); } LogInfo( "[OrmTransQueue::slotProcessQueue]", "Done processing the transaction queue." ); if( m_ormQueue.size() > 0 ) { m_pQueueTimer->start(); } } } /* End namespace caelus */ } /* End namespace osdev */