diff --git a/Csocket.cpp b/Csocket.cpp index 02e7363a..011c9796 100644 --- a/Csocket.cpp +++ b/Csocket.cpp @@ -1,6 +1,6 @@ /** @file * -* Copyright (c) 1999-2009 Jim Hull +* Copyright (c) 1999-2011 Jim Hull * All rights reserved * * Redistribution and use in source and binary forms, with or without modification, @@ -475,6 +475,20 @@ bool InitSSL( ECompType eCompressionType ) return( true ); } +void CSAdjustTVTimeout( struct timeval & tv, long iTimeoutMS ) +{ + if( iTimeoutMS >= 0 ) + { + long iCurTimeout = tv.tv_usec / 1000; + iCurTimeout += tv.tv_sec * 1000; + if( iCurTimeout > iTimeoutMS ) + { + tv.tv_sec = iTimeoutMS / 1000; + tv.tv_usec = iTimeoutMS % 1000; + } + } +} + void SSLErrors( const char *filename, u_int iLineNum ) { unsigned long iSSLError = 0; @@ -595,7 +609,141 @@ const CS_STRING & CCron::GetName() const { return( m_sName ); } void CCron::SetName( const CS_STRING & sName ) { m_sName = sName; } void CCron::RunJob() { CS_DEBUG( "This should be overridden" ); } -Csock::Csock( int itimeout ) +bool CSMonitorFD::GatherFDsForSelect( std::map< int, short > & miiReadyFds, long & iTimeoutMS ) +{ + iTimeoutMS = -1; // don't bother changing anything in the default implementation + for( std::map< int, short >::iterator it = m_miiMonitorFDs.begin(); it != m_miiMonitorFDs.end(); ++it ) + { + miiReadyFds[it->first] = it->second; + } + return( m_bEnabled ); +} + +bool CSMonitorFD::CheckFDs( const std::map< int, short > & miiReadyFds ) +{ + std::map< int, short > miiTriggerdFds; + for( std::map< int, short >::iterator it = m_miiMonitorFDs.begin(); it != m_miiMonitorFDs.end(); ++it ) + { + std::map< int, short >::const_iterator itFD = miiReadyFds.find( it->first ); + if( itFD != miiReadyFds.end() ) + miiTriggerdFds[itFD->first] = itFD->second; + } + if( miiTriggerdFds.size() ) + return( FDsThatTriggered( miiTriggerdFds ) ); + return( m_bEnabled ); +} + +CSockCommon::~CSockCommon() +{ + // delete any left over crons + CleanupCrons(); + CleanupFDMonitors(); +} + +void CSockCommon::CleanupCrons() +{ + for( size_t a = 0; a < m_vcCrons.size(); a++ ) + CS_Delete( m_vcCrons[a] ); + m_vcCrons.clear(); +} + +void CSockCommon::CleanupFDMonitors() +{ + for( size_t a = 0; a < m_vcMonitorFD.size(); a++ ) + CS_Delete( m_vcMonitorFD[a] ); + m_vcMonitorFD.clear(); +} + +void CSockCommon::CheckFDs( const std::map< int, short > & miiReadyFds ) +{ + for( size_t uMon = 0; uMon < m_vcMonitorFD.size(); ++uMon ) + { + if( !m_vcMonitorFD[uMon]->IsEnabled() || !m_vcMonitorFD[uMon]->CheckFDs( miiReadyFds ) ) + m_vcMonitorFD.erase( m_vcMonitorFD.begin() + uMon-- ); + } +} + +void CSockCommon::AssignFDs( std::map< int, short > & miiReadyFds, struct timeval * tvtimeout ) +{ + for( size_t uMon = 0; uMon < m_vcMonitorFD.size(); ++uMon ) + { + long iTimeoutMS = -1; + if( m_vcMonitorFD[uMon]->IsEnabled() && m_vcMonitorFD[uMon]->GatherFDsForSelect( miiReadyFds, iTimeoutMS ) ) + { + CSAdjustTVTimeout( *tvtimeout, iTimeoutMS ); + } + else + { + CS_Delete( m_vcMonitorFD[uMon] ); + m_vcMonitorFD.erase( m_vcMonitorFD.begin() + uMon-- ); + } + } +} + + +void CSockCommon::Cron() +{ + time_t iNow = 0; + + for( vector::size_type a = 0; a < m_vcCrons.size(); a++ ) + { + CCron *pcCron = m_vcCrons[a]; + + if ( !pcCron->isValid() ) + { + CS_Delete( pcCron ); + m_vcCrons.erase( m_vcCrons.begin() + a-- ); + } else + pcCron->run( iNow ); + } +} + +void CSockCommon::AddCron( CCron * pcCron ) +{ + m_vcCrons.push_back( pcCron ); +} + +void CSockCommon::DelCron( const CS_STRING & sName, bool bDeleteAll, bool bCaseSensitive ) +{ + for( u_int a = 0; a < m_vcCrons.size(); a++ ) + { + int (*Cmp)(const char *, const char *) = ( bCaseSensitive ? strcmp : strcasecmp ); + if ( Cmp( m_vcCrons[a]->GetName().c_str(), sName.c_str() ) == 0 ) + { + m_vcCrons[a]->Stop(); + CS_Delete( m_vcCrons[a] ); + m_vcCrons.erase( m_vcCrons.begin() + a-- ); + if( !bDeleteAll ) + break; + } + } +} + +void CSockCommon::DelCron( u_int iPos ) +{ + if ( iPos < m_vcCrons.size() ) + { + m_vcCrons[iPos]->Stop(); + CS_Delete( m_vcCrons[iPos] ); + m_vcCrons.erase( m_vcCrons.begin() + iPos ); + } +} + +void CSockCommon::DelCronByAddr( CCron *pcCron ) +{ + for( u_int a = 0; a < m_vcCrons.size(); a++ ) + { + if ( m_vcCrons[a] == pcCron ) + { + m_vcCrons[a]->Stop(); + CS_Delete( m_vcCrons[a] ); + m_vcCrons.erase( m_vcCrons.begin() + a ); + return; + } + } +} + +Csock::Csock( int itimeout ) : CSockCommon() { #ifdef HAVE_LIBSSL m_pCerVerifyCB = NULL; @@ -603,7 +751,7 @@ Csock::Csock( int itimeout ) Init( "", 0, itimeout ); } -Csock::Csock( const CS_STRING & sHostname, u_short iport, int itimeout ) +Csock::Csock( const CS_STRING & sHostname, u_short iport, int itimeout ) : CSockCommon() { #ifdef HAVE_LIBSSL m_pCerVerifyCB = NULL; @@ -638,9 +786,6 @@ Csock::~Csock() CloseSocksFD(); - // delete any left over crons - for( vector::size_type i = 0; i < m_vcCrons.size(); i++ ) - CS_Delete( m_vcCrons[i] ); } void Csock::CloseSocksFD() @@ -668,7 +813,9 @@ void Csock::Dereference() m_ssl_ctx = NULL; #endif /* HAVE_LIBSSL */ + // don't delete and erase, just erase since they were moved to the copied sock m_vcCrons.clear(); + m_vcMonitorFD.clear(); Close( CLT_DEREFERENCE ); } @@ -735,15 +882,10 @@ void Csock::Copy( const Csock & cCopy ) #endif /* HAVE_LIBSSL */ - if( !m_vcCrons.empty() ) - { - for( u_long a = 0; a < m_vcCrons.size(); a++ ) - { - CS_Delete( m_vcCrons[a] ); - } - m_vcCrons.clear(); - } + CleanupCrons(); + CleanupFDMonitors(); m_vcCrons = cCopy.m_vcCrons; + m_vcMonitorFD = cCopy.m_vcMonitorFD; m_eConState = cCopy.m_eConState; m_sBindHost = cCopy.m_sBindHost; @@ -2128,67 +2270,6 @@ void Csock::SetRate( u_int iBytes, unsigned long long iMilliseconds ) u_int Csock::GetRateBytes() { return( m_iMaxBytes ); } unsigned long long Csock::GetRateTime() { return( m_iMaxMilliSeconds ); } -void Csock::Cron() -{ - time_t iNow = 0; - - for( vector::size_type a = 0; a < m_vcCrons.size(); a++ ) - { - CCron *pcCron = m_vcCrons[a]; - - if ( !pcCron->isValid() ) - { - CS_Delete( pcCron ); - m_vcCrons.erase( m_vcCrons.begin() + a-- ); - } else - pcCron->run( iNow ); - } -} - -void Csock::AddCron( CCron * pcCron ) -{ - m_vcCrons.push_back( pcCron ); -} - -void Csock::DelCron( const CS_STRING & sName, bool bDeleteAll, bool bCaseSensitive ) -{ - for( u_int a = 0; a < m_vcCrons.size(); a++ ) - { - int (*Cmp)(const char *, const char *) = ( bCaseSensitive ? strcmp : strcasecmp ); - if ( Cmp( m_vcCrons[a]->GetName().c_str(), sName.c_str() ) == 0 ) - { - m_vcCrons[a]->Stop(); - CS_Delete( m_vcCrons[a] ); - m_vcCrons.erase( m_vcCrons.begin() + a-- ); - if( !bDeleteAll ) - break; - } - } -} - -void Csock::DelCron( u_int iPos ) -{ - if ( iPos < m_vcCrons.size() ) - { - m_vcCrons[iPos]->Stop(); - CS_Delete( m_vcCrons[iPos] ); - m_vcCrons.erase( m_vcCrons.begin() + iPos ); - } -} - -void Csock::DelCronByAddr( CCron *pcCron ) -{ - for( u_int a = 0; a < m_vcCrons.size(); a++ ) - { - if ( m_vcCrons[a] == pcCron ) - { - m_vcCrons[a]->Stop(); - CS_Delete( m_vcCrons[a] ); - m_vcCrons.erase( m_vcCrons.begin() + a ); - return; - } - } -} void Csock::EnableReadLine() { m_bEnableReadLine = true; } void Csock::DisableReadLine() { @@ -2263,8 +2344,15 @@ int Csock::GetAddrInfo( const CS_STRING & sHostname, CSSockAddr & csSockAddr ) int iFamily = AF_INET; #ifdef HAVE_IPV6 - // as of ares 1.6.0 if it fails on af_inet6, it falls back to af_inet, this code was here in the previous Csocket version, just adding the comment as a reminder +#if ARES_VERSION >= CREATE_ARES_VER( 1, 7, 5 ) + // as of ares 1.7.5, it falls back to af_inet only when AF_UNSPEC is specified + // so this can finally let the code flow through as anticipated :) + iFamily = csSockAddr.GetAFRequire(); +#else + // as of ares 1.6.0 if it fails on af_inet6, it falls back to af_inet, + // this code was here in the previous Csocket version, just adding the comment as a reminder iFamily = csSockAddr.GetAFRequire() == CSSockAddr::RAF_ANY ? AF_INET6 : csSockAddr.GetAFRequire(); +#endif /* CREATE_ARES_VER( 1, 7, 5 ) */ #endif /* HAVE_IPV6 */ ares_gethostbyname( m_pARESChannel, sHostname.c_str(), iFamily, AresHostCallback, this ); } @@ -2498,3 +2586,940 @@ void Csock::Init( const CS_STRING & sHostname, u_short uPort, int itimeout ) #endif /* HAVE_C_ARES */ } +////////////////////////// CSocketManager ////////////////////////// +CSocketManager::CSocketManager() : std::vector(), CSockCommon() +{ + m_errno = SUCCESS; + m_iCallTimeouts = millitime(); + m_iSelectWait = 100000; // Default of 100 milliseconds + m_iBytesRead = 0; + m_iBytesWritten = 0; +} + +CSocketManager::~CSocketManager() +{ + clear(); +} + +void CSocketManager::clear() +{ + while ( this->size() ) + DelSock( 0 ); +} + +void CSocketManager::Cleanup() +{ + CleanupCrons(); + CleanupFDMonitors(); + clear(); +} + +Csock * CSocketManager::GetSockObj( const CS_STRING & sHostname, u_short uPort, int iTimeout ) +{ + return( new Csock( sHostname, uPort, iTimeout ) ); +} + +bool CSocketManager::Connect( const CSConnection & cCon, Csock * pcSock ) +{ + // create the new object + if ( !pcSock ) + pcSock = GetSockObj( cCon.GetHostname(), cCon.GetPort(), cCon.GetTimeout() ); + else + { + pcSock->SetHostName( cCon.GetHostname() ); + pcSock->SetPort( cCon.GetPort() ); + pcSock->SetTimeout( cCon.GetTimeout() ); + } + + if( cCon.GetAFRequire() != CSSockAddr::RAF_ANY ) + pcSock->SetAFRequire( cCon.GetAFRequire() ); + + // make it NON-Blocking IO + pcSock->BlockIO( false ); + + // bind the vhost + pcSock->SetBindHost( cCon.GetBindHost() ); + +#ifdef HAVE_LIBSSL + pcSock->SetSSL( cCon.GetIsSSL() ); + if( cCon.GetIsSSL() ) + { + if( !cCon.GetPemLocation().empty() ) + { + pcSock->SetPemLocation( cCon.GetPemLocation() ); + pcSock->SetPemPass( cCon.GetPemPass() ); + } + if( !cCon.GetCipher().empty() ) + pcSock->SetCipher( cCon.GetCipher() ); + } +#endif /* HAVE_LIBSSL */ + + pcSock->SetType( Csock::OUTBOUND ); + + pcSock->SetConState( Csock::CST_START ); + AddSock( pcSock, cCon.GetSockName() ); + return( true ); +} + +bool CSocketManager::Listen( const CSListener & cListen, Csock * pcSock, u_short *piRandPort ) +{ + if ( !pcSock ) + pcSock = GetSockObj( "", 0 ); + + pcSock->BlockIO( false ); + if( cListen.GetAFRequire() != CSSockAddr::RAF_ANY ) + { + pcSock->SetAFRequire( cListen.GetAFRequire() ); +#ifdef HAVE_IPV6 + if( cListen.GetAFRequire() == CSSockAddr::RAF_INET6 ) + pcSock->SetIPv6( true ); +#endif /* HAVE_IPV6 */ + } +#ifdef HAVE_IPV6 + else + { + pcSock->SetIPv6( true ); + } +#endif /* HAVE_IPV6 */ +#ifdef HAVE_LIBSSL + pcSock->SetSSL( cListen.GetIsSSL() ); + if( ( cListen.GetIsSSL() ) && ( !cListen.GetPemLocation().empty() ) ) + { + pcSock->SetPemLocation( cListen.GetPemLocation() ); + pcSock->SetPemPass( cListen.GetPemPass() ); + pcSock->SetCipher( cListen.GetCipher() ); + pcSock->SetRequireClientCertFlags( cListen.GetRequireClientCertFlags() ); + } +#endif /* HAVE_LIBSSL */ + + if( piRandPort ) + *piRandPort = 0; + + if ( pcSock->Listen( cListen.GetPort(), cListen.GetMaxConns(), cListen.GetBindHost(), cListen.GetTimeout() ) ) + { + AddSock( pcSock, cListen.GetSockName() ); + if( ( piRandPort ) && ( cListen.GetPort() == 0 ) ) + { + cs_sock_t iSock = pcSock->GetSock(); + + if ( iSock == CS_INVALID_SOCK ) + { + CS_DEBUG( "Failed to attain a valid file descriptor" ); + pcSock->Close(); + return( false ); + } + struct sockaddr_in mLocalAddr; + socklen_t mLocalLen = sizeof( mLocalAddr ); + getsockname( iSock, (struct sockaddr *) &mLocalAddr, &mLocalLen ); + *piRandPort = ntohs( mLocalAddr.sin_port ); + } + return( true ); + } + + CS_Delete( pcSock ); + return( false ); +} + + +bool CSocketManager::HasFDs() const +{ + return( this->size() || m_vcMonitorFD.size() ); +} + +void CSocketManager::Loop() +{ + for( u_int a = 0; a < this->size(); a++ ) + { + Csock *pcSock = (*this)[a]; + + if ( ( pcSock->GetType() != Csock::OUTBOUND ) || ( pcSock->GetConState() == Csock::CST_OK ) ) + continue; + if ( pcSock->GetConState() == Csock::CST_DNS ) + { + if ( pcSock->DNSLookup( Csock::DNS_VHOST ) == ETIMEDOUT ) + { + pcSock->SockError( EDOM ); + DelSock( a-- ); + continue; + } + } + + if ( pcSock->GetConState() == Csock::CST_BINDVHOST ) + { + if ( !pcSock->SetupVHost() ) + { + pcSock->SockError( GetSockError() ); + DelSock( a-- ); + continue; + } + } + + if ( pcSock->GetConState() == Csock::CST_DESTDNS ) + { + if ( pcSock->DNSLookup( Csock::DNS_DEST ) == ETIMEDOUT ) + { + pcSock->SockError( EADDRNOTAVAIL ); + DelSock( a-- ); + continue; + } + } + if ( pcSock->GetConState() == Csock::CST_CONNECT ) + { + if ( !pcSock->Connect( pcSock->GetBindHost(), true ) ) + { + if ( GetSockError() == ECONNREFUSED ) + pcSock->ConnectionRefused(); + else + pcSock->SockError( GetSockError() ); + + DelSock( a-- ); + continue; + } + } +#ifdef HAVE_LIBSSL + if( pcSock->GetConState() == Csock::CST_CONNECTSSL ) + { + if ( pcSock->GetSSL() ) + { + if ( !pcSock->ConnectSSL() ) + { + if ( GetSockError() == ECONNREFUSED ) + pcSock->ConnectionRefused(); + else + pcSock->SockError( GetSockError() == 0 ? ECONNABORTED : GetSockError() ); + + DelSock( a-- ); + continue; + } + } + } +#endif /* HAVE_LIBSSL */ + } + + std::map mpeSocks; + Select( mpeSocks ); + + switch( m_errno ) + { + case SUCCESS: + { + for( std::map::iterator itSock = mpeSocks.begin(); itSock != mpeSocks.end(); itSock++ ) + { + Csock * pcSock = itSock->first; + EMessages iErrno = itSock->second; + + if ( iErrno == SUCCESS ) + { + // read in data + // if this is a + int iLen = 0; + + if ( pcSock->GetSSL() ) + iLen = pcSock->GetPending(); + + if ( iLen <= 0 ) + iLen = CS_BLOCKSIZE; + + CSCharBuffer cBuff( iLen ); + + cs_ssize_t bytes = pcSock->Read( cBuff(), iLen ); + + if ( bytes != Csock::READ_TIMEDOUT && bytes != Csock::READ_CONNREFUSED && bytes != Csock::READ_ERR && !pcSock->IsConnected() ) + { + pcSock->SetIsConnected( true ); + pcSock->Connected(); + } + + switch( bytes ) + { + case Csock::READ_EOF: + { + DelSockByAddr( pcSock ); + break; + } + + case Csock::READ_ERR: + { + pcSock->SockError( GetSockError() ); + DelSockByAddr( pcSock ); + break; + } + + case Csock::READ_EAGAIN: + break; + + case Csock::READ_CONNREFUSED: + pcSock->ConnectionRefused(); + DelSockByAddr( pcSock ); + break; + + case Csock::READ_TIMEDOUT: + pcSock->Timeout(); + DelSockByAddr( pcSock ); + break; + + default: + { + if ( Csock::TMO_READ & pcSock->GetTimeoutType() ) + pcSock->ResetTimer(); // reset the timeout timer + + pcSock->ReadData( cBuff(), bytes ); // Call ReadData() before PushBuff() so that it is called before the ReadLine() event - LD 07/18/05 + pcSock->PushBuff( cBuff(), bytes ); + break; + } + } + + } else if ( iErrno == SELECT_ERROR ) + { + // a socket came back with an error + // usually means it was closed + DelSockByAddr( pcSock ); + } + } + break; + } + + case SELECT_TIMEOUT: + case SELECT_TRYAGAIN: + case SELECT_ERROR: + default : + break; + } + + unsigned long long iMilliNow = millitime(); + if ( ( iMilliNow - m_iCallTimeouts ) >= 1000 ) + { + m_iCallTimeouts = iMilliNow; + // call timeout on all the sockets that recieved no data + for( unsigned int i = 0; i < this->size(); i++ ) + { + if ( (*this)[i]->GetConState() != Csock::CST_OK ) + continue; + + if ( (*this)[i]->CheckTimeout( (time_t)(iMilliNow / 1000 ) ) ) + DelSock( i-- ); + } + } + // run any Manager Crons we may have + Cron(); +} + +void CSocketManager::DynamicSelectLoop( u_long iLowerBounds, u_long iUpperBounds, time_t iMaxResolution ) +{ + SetSelectTimeout( iLowerBounds ); + if( m_errno == SELECT_TIMEOUT ) + { // only do this if the previous call to select was a timeout + time_t iNow = time( NULL ); + u_long iSelectTimeout = GetDynamicSleepTime( iNow, iMaxResolution ); + iSelectTimeout *= 1000000; + iSelectTimeout = std::max( iLowerBounds, iSelectTimeout ); + iSelectTimeout = std::min( iSelectTimeout, iUpperBounds ); + if( iLowerBounds != iSelectTimeout ) + SetSelectTimeout( iSelectTimeout ); + } + Loop(); +} + +void CSocketManager::AddSock( Csock *pcSock, const CS_STRING & sSockName ) +{ + pcSock->SetSockName( sSockName ); + this->push_back( pcSock ); +} + +Csock * CSocketManager::FindSockByRemotePort( u_short iPort ) +{ + for( unsigned int i = 0; i < this->size(); i++ ) + { + if ( (*this)[i]->GetRemotePort() == iPort ) + return( (*this)[i] ); + } + + return( NULL ); +} + +Csock * CSocketManager::FindSockByLocalPort( u_short iPort ) +{ + for( unsigned int i = 0; i < this->size(); i++ ) + if ( (*this)[i]->GetLocalPort() == iPort ) + return( (*this)[i] ); + + return( NULL ); +} + +Csock * CSocketManager::FindSockByName( const CS_STRING & sName ) +{ + std::vector::iterator it; + std::vector::iterator it_end = this->end(); + for( it = this->begin(); it != it_end; it++ ) + if ( (*it)->GetSockName() == sName ) + return( *it ); + + return( NULL ); +} + +Csock * CSocketManager::FindSockByFD( cs_sock_t iFD ) +{ + for( unsigned int i = 0; i < this->size(); i++ ) + if ( ( (*this)[i]->GetRSock() == iFD ) || ( (*this)[i]->GetWSock() == iFD ) ) + return( (*this)[i] ); + + return( NULL ); +} + +std::vector CSocketManager::FindSocksByName( const CS_STRING & sName ) +{ + std::vector vpSocks; + + for( unsigned int i = 0; i < this->size(); i++ ) + if ( (*this)[i]->GetSockName() == sName ) + vpSocks.push_back( (*this)[i] ); + + return( vpSocks ); +} + +std::vector CSocketManager::FindSocksByRemoteHost( const CS_STRING & sHostname ) +{ + std::vector vpSocks; + + for( unsigned int i = 0; i < this->size(); i++ ) + if ( (*this)[i]->GetHostName() == sHostname ) + vpSocks.push_back( (*this)[i] ); + + return( vpSocks ); +} + +void CSocketManager::DelSockByAddr( Csock *pcSock ) +{ + for( u_int a = 0; a < this->size(); a++ ) + { + if ( pcSock == (*this)[a] ) + { + DelSock( a ); + return; + } + } +} +void CSocketManager::DelSock( u_int iPos ) +{ + if ( iPos >= this->size() ) + { + CS_DEBUG( "Invalid Sock Position Requested! [" << iPos << "]" ); + return; + } + + Csock * pSock = (*this)[iPos]; + + if( pSock->GetCloseType() != Csock::CLT_DEREFERENCE ) + { + if ( pSock->IsConnected() ) + pSock->Disconnected(); // only call disconnected event if connected event was called (IE IsConnected was set) + + m_iBytesRead += pSock->GetBytesRead(); + m_iBytesWritten += pSock->GetBytesWritten(); + } + + CS_Delete( pSock ); + this->erase( this->begin() + iPos ); +} + +bool CSocketManager::SwapSockByIdx( Csock *pNewSock, u_long iOrginalSockIdx ) +{ + if( iOrginalSockIdx >= this->size() ) + { + CS_DEBUG( "Invalid Sock Position Requested! [" << iOrginalSockIdx << "]" ); + return( false ); + } + + Csock *pSock = (*this)[iOrginalSockIdx]; + pNewSock->Copy( *pSock ); + pSock->Dereference(); + (*this)[iOrginalSockIdx] = (Csock *)pNewSock; + this->push_back( (Csock *)pSock ); // this allows it to get cleaned up + return( true ); +} + +bool CSocketManager::SwapSockByAddr( Csock *pNewSock, Csock *pOrigSock ) +{ + for( u_long a = 0; a < this->size(); a++ ) + { + if( (*this)[a] == pOrigSock ) + return( SwapSockByIdx( pNewSock, a ) ); + } + return( false ); +} + +unsigned long long CSocketManager::GetBytesRead() const +{ + // Start with the total bytes read from destroyed sockets + unsigned long long iRet = m_iBytesRead; + + // Add in the outstanding bytes read from active sockets + for( u_int a = 0; a < this->size(); a++ ) + iRet += (*this)[a]->GetBytesRead(); + + return( iRet ); +} + +unsigned long long CSocketManager::GetBytesWritten() const +{ + // Start with the total bytes written to destroyed sockets + unsigned long long iRet = m_iBytesWritten; + + // Add in the outstanding bytes written to active sockets + for( u_int a = 0; a < this->size(); a++ ) + iRet += (*this)[a]->GetBytesWritten(); + + return( iRet ); +} + +void CSocketManager::FDSetCheck( int iFd, std::map< int, short > & miiReadyFds, ECheckType eType ) +{ + std::map< int, short >::iterator it = miiReadyFds.find( iFd ); + if( it != miiReadyFds.end() ) + it->second = (short)(it->second | eType ); // TODO need to figure out why |= throws 'short int' from 'int' may alter its value + else + miiReadyFds[iFd] = eType; +} +bool CSocketManager::FDHasCheck( int iFd, std::map< int, short > & miiReadyFds, ECheckType eType ) +{ + std::map< int, short >::iterator it = miiReadyFds.find( iFd ); + if( it != miiReadyFds.end() ) + return( (it->second & eType) ); + return( false ); +} + +int CSocketManager::Select( std::map< int, short > & miiReadyFds, struct timeval *tvtimeout) +{ + AssignFDs( miiReadyFds, tvtimeout ); +#ifdef CSOCK_USE_POLL + if( miiReadyFds.empty() ) + return( select( 0, NULL, NULL, NULL, tvtimeout ) ); + + struct pollfd * pFDs = (struct pollfd *)malloc( sizeof( struct pollfd ) * miiReadyFds.size() ); + size_t uCurrPoll = 0; + for( std::map< int, short >::iterator it = miiReadyFds.begin(); it != miiReadyFds.end(); ++it, ++uCurrPoll ) + { + short iEvents = 0; + if( it->second & ECT_Read ) + iEvents |= POLLIN; + if( it->second & ECT_Write ) + iEvents |= POLLOUT; + pFDs[uCurrPoll].fd = it->first; + pFDs[uCurrPoll].events = iEvents; + pFDs[uCurrPoll].revents = 0; + } + int iTimeout = (int)(tvtimeout->tv_usec / 1000); + iTimeout += (int)(tvtimeout->tv_sec * 1000); + size_t uMaxFD = miiReadyFds.size(); + int iRet = poll( pFDs, uMaxFD, iTimeout ); + miiReadyFds.clear(); + for( uCurrPoll = 0; uCurrPoll < uMaxFD; ++uCurrPoll ) + { + short iEvents = 0; + if( (pFDs[uCurrPoll].revents & (POLLIN|POLLERR|POLLHUP|POLLNVAL) ) ) + iEvents |= ECT_Read; + if( (pFDs[uCurrPoll].revents & POLLOUT ) ) + iEvents |= ECT_Write; + std::map< int, short >::iterator it = miiReadyFds.find( pFDs[uCurrPoll].fd ); + if( it != miiReadyFds.end() ) + it->second |= iEvents; + else + miiReadyFds[pFDs[uCurrPoll].fd] = iEvents; + } + free( pFDs ); +#else + fd_set rfds, wfds; + TFD_ZERO( &rfds ); + TFD_ZERO( &wfds ); + bool bHasWrite = false; + int iHighestFD = 0; + for( std::map< int, short >::iterator it = miiReadyFds.begin(); it != miiReadyFds.end(); ++it ) + { + iHighestFD = std::max( it->first, iHighestFD ); + if( it->second & ECT_Read ) + { + TFD_SET( it->first, &rfds ); + } + if( it->second & ECT_Write ) + { + bHasWrite = true; + TFD_SET( it->first, &wfds ); + } + } + + int iRet = select( iHighestFD + 1, &rfds, ( bHasWrite ? &wfds : NULL ), NULL, tvtimeout ); + if( iRet <= 0 ) + miiReadyFds.clear(); + else + { + for( std::map< int, short >::iterator it = miiReadyFds.begin(); it != miiReadyFds.end(); ++it ) + { + if( (it->second & ECT_Read) && !TFD_ISSET( it->first, &rfds ) ) + it->second &= ~ECT_Read; + if( (it->second & ECT_Write) && !TFD_ISSET( it->first, &wfds ) ) + it->second &= ~ECT_Write; + } + } +#endif /* CSOCK_USE_POLL */ + + return( iRet ); +} + +void CSocketManager::Select( std::map & mpeSocks ) +{ + mpeSocks.clear(); + struct timeval tv; + + std::map< int, short > miiReadyFds; + tv.tv_sec = m_iSelectWait / 1000000; + tv.tv_usec = m_iSelectWait % 1000000; + u_int iQuickReset = 1000; + if ( m_iSelectWait == 0 ) + iQuickReset = 0; + + bool bHasAvailSocks = false; + unsigned long long iNOW = 0; + for( unsigned int i = 0; i < this->size(); i++ ) + { + Csock *pcSock = (*this)[i]; + + Csock::ECloseType eCloseType = pcSock->GetCloseType(); + + if( eCloseType == Csock::CLT_NOW || eCloseType == Csock::CLT_DEREFERENCE || ( eCloseType == Csock::CLT_AFTERWRITE && pcSock->GetWriteBuffer().empty() ) ) + { + DelSock( i-- ); // close any socks that have requested it + continue; + } + else + pcSock->Cron(); // call the Cron handler here + + cs_sock_t & iRSock = pcSock->GetRSock(); + cs_sock_t & iWSock = pcSock->GetWSock(); +#ifndef CSOCK_USE_POLL + if( iRSock > FD_SETSIZE || iWSock > FD_SETSIZE ) + { + CS_DEBUG( "FD is larger than select() can handle" ); + DelSock( i-- ); + continue; + } +#endif /* CSOCK_USE_POLL */ + +#ifdef HAVE_C_ARES + ares_channel pChannel = pcSock->GetAresChannel(); + if( pChannel ) + { + ares_socket_t aiAresSocks[1]; + aiAresSocks[0] = ARES_SOCKET_BAD; + int iSockMask = ares_getsock( pChannel, aiAresSocks, 1 ); + if( ARES_GETSOCK_READABLE( iSockMask, 0 ) ) + FDSetCheck( aiAresSocks[0], miiReadyFds, ECT_Read ); + if( ARES_GETSOCK_WRITABLE( iSockMask, 0 ) ) + FDSetCheck( aiAresSocks[0], miiReadyFds, ECT_Write ); + // let ares drop the timeout if it has something timing out sooner then whats in tv currently + ares_timeout( pChannel, &tv, &tv ); + } +#endif /* HAVE_C_ARES */ + + pcSock->AssignFDs( miiReadyFds, &tv ); + + if ( pcSock->GetConState() != Csock::CST_OK ) + continue; + + bHasAvailSocks = true; + + bool bIsReadPaused = pcSock->IsReadPaused(); + if ( bIsReadPaused ) + { + pcSock->ReadPaused(); + bIsReadPaused = pcSock->IsReadPaused(); // re-read it again, incase it changed status) + } + if ( iRSock == CS_INVALID_SOCK || iWSock == CS_INVALID_SOCK ) + { + SelectSock( mpeSocks, SUCCESS, pcSock ); + continue; // invalid sock fd + } + + if( pcSock->GetType() != Csock::LISTENER ) + { + bool bHasWriteBuffer = !pcSock->GetWriteBuffer().empty(); + + if ( !bIsReadPaused ) + FDSetCheck( iRSock, miiReadyFds, ECT_Read ); + + if( pcSock->AllowWrite( iNOW ) && ( !pcSock->IsConnected() || bHasWriteBuffer ) ) + { + if( !pcSock->IsConnected() ) + { // set the write bit if not connected yet + FDSetCheck( iWSock, miiReadyFds, ECT_Write ); + } + else if( bHasWriteBuffer && !pcSock->GetSSL() ) + { // always set the write bit if there is data to send when NOT ssl + FDSetCheck( iWSock, miiReadyFds, ECT_Write ); + } + else if( bHasWriteBuffer && pcSock->GetSSL() && pcSock->SslIsEstablished() ) + { // ONLY set the write bit if there is data to send and the SSL handshake is finished + FDSetCheck( iWSock, miiReadyFds, ECT_Write ); + } + } + + if( pcSock->GetSSL() && !pcSock->SslIsEstablished() && bHasWriteBuffer ) + { // if this is an unestabled SSL session with data to send ... try sending it + // do this here, cause otherwise ssl will cause a small + // cpu spike waiting for the handshake to finish + // resend this data + if ( !pcSock->Write( "" ) ) + { + pcSock->Close(); + } + // warning ... setting write bit in here causes massive CPU spinning on invalid SSL servers + // http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=631590 + // however, we can set the select WAY down and it will retry quickly, but keep it from spinning at 100% + tv.tv_usec = iQuickReset; + tv.tv_sec = 0; + } + } + else + { + FDSetCheck( iRSock, miiReadyFds, ECT_Read ); + } + + if( pcSock->GetSSL() && pcSock->GetType() != Csock::LISTENER ) + { + if ( ( pcSock->GetPending() > 0 ) && ( !pcSock->IsReadPaused() ) ) + SelectSock( mpeSocks, SUCCESS, pcSock ); + } + } + + // old fashion select, go fer it + int iSel; + + if( !mpeSocks.empty() ) // .1 ms pause to see if anything else is ready (IE if there is SSL data pending, don't wait too long) + { + tv.tv_usec = iQuickReset; + tv.tv_sec = 0; + } + else if ( !this->empty() && !bHasAvailSocks ) + { + tv.tv_usec = iQuickReset; + tv.tv_sec = 0; + } + + iSel = Select( miiReadyFds, &tv ); + + if ( iSel == 0 ) + { + if ( mpeSocks.empty() ) + m_errno = SELECT_TIMEOUT; + else + m_errno = SUCCESS; +#ifdef HAVE_C_ARES + // run through ares channels and process timeouts + for( u_long uSock = 0; uSock < this->size(); ++uSock ) + { + Csock *pcSock = this->at( uSock ); + ares_channel pChannel = pcSock->GetAresChannel(); + if( pChannel ) + ares_process_fd( pChannel, ARES_SOCKET_BAD, ARES_SOCKET_BAD ); + } +#endif /* HAVE_C_ARES */ + + return; + } + + if ( ( iSel == -1 ) && ( errno == EINTR ) ) + { + if ( mpeSocks.empty() ) + m_errno = SELECT_TRYAGAIN; + else + m_errno = SUCCESS; + + return; + } + else if ( iSel == -1 ) + { + if ( mpeSocks.empty() ) + m_errno = SELECT_ERROR; + else + m_errno = SUCCESS; + + return; + } + else + { + m_errno = SUCCESS; + } + + CheckFDs( miiReadyFds ); + + // find out wich one is ready + for( unsigned int i = 0; i < this->size(); i++ ) + { + Csock *pcSock = (*this)[i]; + +#ifdef HAVE_C_ARES + ares_channel pChannel = pcSock->GetAresChannel(); + if( pChannel ) + { + ares_socket_t aiAresSocks[1]; + aiAresSocks[0] = ARES_SOCKET_BAD; + ares_getsock( pChannel, aiAresSocks, 1 ); + if( FDHasCheck( aiAresSocks[0], miiReadyFds, ECT_Read ) || FDHasCheck( aiAresSocks[0], miiReadyFds, ECT_Write ) ) + ares_process_fd( pChannel, aiAresSocks[0], aiAresSocks[0] ); + } +#endif /* HAVE_C_ARES */ + pcSock->CheckFDs( miiReadyFds ); + + if ( pcSock->GetConState() != Csock::CST_OK ) + continue; + + cs_sock_t & iRSock = pcSock->GetRSock(); + cs_sock_t & iWSock = pcSock->GetWSock(); + EMessages iErrno = SUCCESS; + + if ( iRSock == CS_INVALID_SOCK || iWSock == CS_INVALID_SOCK ) + { + // trigger a success so it goes through the normal motions + // and an error is produced + SelectSock( mpeSocks, SUCCESS, pcSock ); + continue; // watch for invalid socks + } + + if ( FDHasCheck( iWSock, miiReadyFds, ECT_Write ) ) + { + if ( iSel > 0 ) + { + iErrno = SUCCESS; + if ( ( !pcSock->GetWriteBuffer().empty() ) && ( pcSock->IsConnected() ) ) + { // write whats in the socks send buffer + if ( !pcSock->Write( "" ) ) + { + // write failed, sock died :( + iErrno = SELECT_ERROR; + } + } + } else + iErrno = SELECT_ERROR; + + SelectSock( mpeSocks, iErrno, pcSock ); + + } + else if ( FDHasCheck( iRSock, miiReadyFds, ECT_Read ) ) + { + if ( iSel > 0 ) + iErrno = SUCCESS; + else + iErrno = SELECT_ERROR; + + if ( pcSock->GetType() != Csock::LISTENER ) + SelectSock( mpeSocks, iErrno, pcSock ); + else // someone is coming in! + { + CS_STRING sHost; + u_short port; + cs_sock_t inSock = pcSock->Accept( sHost, port ); + + if ( inSock != CS_INVALID_SOCK ) + { + if ( Csock::TMO_ACCEPT & pcSock->GetTimeoutType() ) + pcSock->ResetTimer(); // let them now it got dinged + + // if we have a new sock, then add it + Csock *NewpcSock = (Csock *)pcSock->GetSockObj( sHost, port ); + + if ( !NewpcSock ) + NewpcSock = GetSockObj( sHost, port ); + + NewpcSock->BlockIO( false ); + NewpcSock->SetType( Csock::INBOUND ); + NewpcSock->SetRSock( inSock ); + NewpcSock->SetWSock( inSock ); + NewpcSock->SetIPv6( pcSock->GetIPv6() ); + + bool bAddSock = true; +#ifdef HAVE_LIBSSL + // + // is this ssl ? + if ( pcSock->GetSSL() ) + { + NewpcSock->SetCipher( pcSock->GetCipher() ); + NewpcSock->SetPemLocation( pcSock->GetPemLocation() ); + NewpcSock->SetPemPass( pcSock->GetPemPass() ); + NewpcSock->SetRequireClientCertFlags( pcSock->GetRequireClientCertFlags() ); + bAddSock = NewpcSock->AcceptSSL(); + } + +#endif /* HAVE_LIBSSL */ + if ( bAddSock ) + { + // set the name of the listener + NewpcSock->SetParentSockName( pcSock->GetSockName() ); + NewpcSock->SetRate( pcSock->GetRateBytes(), pcSock->GetRateTime() ); + if ( NewpcSock->GetSockName().empty() ) + { + std::stringstream s; + s << sHost << ":" << port; + AddSock( NewpcSock, s.str() ); + + } else + AddSock( NewpcSock, NewpcSock->GetSockName() ); + } else + CS_Delete( NewpcSock ); + } +#ifdef _WIN32 + else if( GetSockError() != WSAEWOULDBLOCK ) +#else /* _WIN32 */ + else if( GetSockError() != EAGAIN ) +#endif /* _WIN32 */ + { + pcSock->SockError( GetSockError() ); + } + } + } + } +} + +time_t CSocketManager::GetDynamicSleepTime( time_t iNow, time_t iMaxResolution ) const +{ + time_t iNextRunTime = iNow + iMaxResolution; + std::vector::const_iterator it; + // This is safe, because we don't modify the vector. + std::vector::const_iterator it_end = this->end(); + + for (it = this->begin(); it != it_end; it++) + { + Csock* pSock = *it; + + if( pSock->GetConState() != Csock::CST_OK ) + iNextRunTime = iNow; // this is in a nebulous state, need to let it proceed like normal + + time_t iTimeoutInSeconds = pSock->GetTimeout(); + if( iTimeoutInSeconds > 0 ) + { + time_t iNextTimeout = pSock->GetNextCheckTimeout( iNow ); + iNextRunTime = std::min( iNextRunTime, iNextTimeout ); + } + + const std::vector & vCrons = pSock->GetCrons(); + std::vector::const_iterator cit; + std::vector::const_iterator cit_end = vCrons.end(); + for (cit = vCrons.begin(); cit != cit_end; cit++) + iNextRunTime = std::min( iNextRunTime, (*cit)->GetNextRun() ); + } + std::vector::const_iterator cit; + std::vector::const_iterator cit_end = m_vcCrons.end(); + for (cit = m_vcCrons.begin(); cit != cit_end; cit++) + iNextRunTime = std::min( iNextRunTime, (*cit)->GetNextRun() ); + + if( iNextRunTime < iNow ) + return( 0 ); // smallest unit possible + return( std::min( iNextRunTime - iNow, iMaxResolution ) ); +} + +void CSocketManager::SelectSock( std::map & mpeSocks, EMessages eErrno, Csock * pcSock ) +{ + if ( mpeSocks.find( pcSock ) != mpeSocks.end() ) + return; + + mpeSocks[pcSock] = eErrno; +} + diff --git a/Csocket.h b/Csocket.h index 18f6528f..df44d472 100644 --- a/Csocket.h +++ b/Csocket.h @@ -1,6 +1,6 @@ /** * -* Copyright (c) 1999-2009 Jim Hull +* Copyright (c) 1999-2011 Jim Hull * All rights reserved * * Redistribution and use in source and binary forms, with or without modification, @@ -278,6 +278,9 @@ enum ECompType CT_RLE = 2 }; +//! adjusts tv with a new timeout if iTimeoutMS is smaller +void CSAdjustTVTimeout( struct timeval & tv, long iTimeoutMS ); + void SSLErrors( const char *filename, u_int iLineNum ); /** @@ -401,6 +404,99 @@ private: CS_STRING m_sName; }; +/** + * @class CSMonitorFD + * @brief class to tie sockets to for monitoring by Csocket at either the Csock or TSockManager + */ +class CSMonitorFD +{ +public: + CSMonitorFD() { m_bEnabled = true; } + virtual ~CSMonitorFD() {} + + /** + * @brief called before select, typically you don't need to reimplement this just add sockets via Add and let the default implementation have its way + * @param miiReadyFds fill with fd's to monitor and the associated bit to check them for (@see CSockManager::ECheckType) + * @param iTimeoutMS the timeout to change to, setting this to -1 (the default) + * @return returning false will remove this from monitoring. The same effect can be had by setting m_bEnabled to false as it is returned from this + */ + virtual bool GatherFDsForSelect( std::map< int, short > & miiReadyFds, long & iTimeoutMS ); + + /** + * @brief called when there are fd's belonging to this class that have triggered + * @param miiReadyFds the map of fd's with the bits that triggered them (@see CSockManager::ECheckType) + * @return returning false will remove this from monitoring + */ + virtual bool FDsThatTriggered( const std::map< int, short > & miiReadyFds ) { return( true ); } + + /** + * @brief gets called to diff miiReadyFds with m_miiMonitorFDs, and calls FDsThatTriggered when appropriate. Typically you don't need to reimplement this. + * @param miiReadyFds the map of all triggered fd's, not just the fd's from this class + * @return returning false will remove this from monitoring + */ + virtual bool CheckFDs( const std::map< int, short > & miiReadyFds ); + + /** + * @brief adds a file descriptor to be monitored + * @param iFD the file descriptor + * @param iMonitorEvents bitset of events to monitor for (@see CSockManager::ECheckType) + */ + void Add( int iFD, short iMonitorEvents ) { m_miiMonitorFDs[iFD] = iMonitorEvents; } + //! removes this fd from monitoring + void Remove( int iFD ) { m_miiMonitorFDs.erase( iFD ); } + //! causes this monitor to be removed + void DisableMonitor() { m_bEnabled = false; } + + bool IsEnabled() const { return( m_bEnabled ); } + +protected: + std::map< int, short > m_miiMonitorFDs; + bool m_bEnabled; +}; + +/** + * @class CSockCommon + * @brief simple class to share common code to both TSockManager and Csock + */ +class CSockCommon +{ +public: + CSockCommon() {} + virtual ~CSockCommon(); + + void CleanupCrons(); + void CleanupFDMonitors(); + + //! returns a const reference to the crons associated to this socket + const std::vector & GetCrons() const { return( m_vcCrons ); } + //! This has a garbage collecter, and is used internall to call the jobs + virtual void Cron(); + + //! insert a newly created cron + virtual void AddCron( CCron * pcCron ); + /** + * @brief deletes a cron by name + * @param sName the name of the cron + * @param bDeleteAll delete all crons that match sName + * @param bCaseSensitive use strcmp or strcasecmp + */ + virtual void DelCron( const CS_STRING & sName, bool bDeleteAll = true, bool bCaseSensitive = true ); + //! delete cron by idx + virtual void DelCron( u_int iPos ); + //! delete cron by address + virtual void DelCronByAddr( CCron *pcCron ); + + void CheckFDs( const std::map< int, short > & miiReadyFds ); + void AssignFDs( std::map< int, short > & miiReadyFds, struct timeval * tvtimeout ); + + //! add an FD set to monitor + void MonitorFD( CSMonitorFD * pMonitorFD ) { m_vcMonitorFD.push_back( pMonitorFD ); } + +protected: + std::vector m_vcCrons; + std::vector m_vcMonitorFD; +}; + #ifdef HAVE_LIBSSL typedef int (*FPCertVerifyCB)( int, X509_STORE_CTX * ); #endif /* HAVE_LIBSSL */ @@ -414,7 +510,7 @@ typedef int (*FPCertVerifyCB)( int, X509_STORE_CTX * ); * @see TSocketManager * @author Jim Hull */ -class Csock +class Csock : public CSockCommon { public: //! default constructor, sets a timeout of 60 seconds @@ -447,7 +543,7 @@ public: { OUTBOUND = 0, //!< outbound connection LISTENER = 1, //!< a socket accepting connections - INBOUND = 2 //!< an inbound connection, passed from LISTENER + INBOUND = 2 //!< an inbound connection, passed from LISTENER }; enum EFRead @@ -515,13 +611,13 @@ public: /** * WriteSelect on this socket - * Only good if JUST using this socket, otherwise use the TSocketManager + * Only good if JUST using this socket, otherwise use the CSocketManager */ virtual int WriteSelect(); /** * ReadSelect on this socket - * Only good if JUST using this socket, otherwise use the TSocketManager + * Only good if JUST using this socket, otherwise use the CSocketManager */ virtual int ReadSelect(); @@ -785,22 +881,6 @@ public: u_int GetRateBytes(); unsigned long long GetRateTime(); - //! This has a garbage collecter, and is used internall to call the jobs - virtual void Cron(); - - //! insert a newly created cron - virtual void AddCron( CCron * pcCron ); - /** - * @brief deletes a cron by name - * @param sName the name of the cron - * @param bDeleteAll delete all crons that match sName - * @param bCaseSensitive use strcmp or strcasecmp - */ - virtual void DelCron( const CS_STRING & sName, bool bDeleteAll = true, bool bCaseSensitive = true ); - //! delete cron by idx - virtual void DelCron( u_int iPos ); - //! delete cron by address - virtual void DelCronByAddr( CCron *pcCron ); /** * Override these functions for an easy interface when using the Socket Manager @@ -888,7 +968,7 @@ public: */ virtual void ConnectionRefused() {} /** - * This gets called every iteration of TSocketManager::Select() if the socket is ReadPaused + * This gets called every iteration of CSocketManager::Select() if the socket is ReadPaused */ virtual void ReadPaused() {} @@ -991,8 +1071,6 @@ public: //! returns true if this socket can write its data, primarily used with rate shaping, initialize iNOW to 0 and it sets it on the first call bool AllowWrite( unsigned long long & iNOW ) const; - //! returns a const reference to the crons associated to this socket - const std::vector & GetCrons() const { return( m_vcCrons ); } void SetSkipConnect( bool b ) { m_bSkipConnect = b; } @@ -1045,7 +1123,6 @@ private: #endif /* HAVE_LIBSSL */ - std::vector m_vcCrons; //! Create the socket cs_sock_t CreateSocket( bool bListen = false ); @@ -1244,7 +1321,7 @@ public: #endif /* HAVE_LIBSSL */ /** -* @class TSocketManager +* @class CSocketManager * @brief Best class to use to interact with the sockets * * handles SSL and NON Blocking IO @@ -1266,38 +1343,15 @@ public: * @author Jim Hull */ -template -class TSocketManager : public std::vector +class CSocketManager : public std::vector, public CSockCommon { public: - TSocketManager() : std::vector() - { - m_errno = SUCCESS; - m_iCallTimeouts = millitime(); - m_iSelectWait = 100000; // Default of 100 milliseconds - m_iBytesRead = 0; - m_iBytesWritten = 0; - } + CSocketManager(); + virtual ~CSocketManager(); + virtual void clear(); + virtual void Cleanup(); - virtual ~TSocketManager() - { - Cleanup(); - } - - void clear() - { - while ( this->size() ) - DelSock( 0 ); - } - - virtual void Cleanup() - { - for( u_int a = 0; a < m_vcCrons.size(); a++ ) - CS_Delete( m_vcCrons[a] ); - - m_vcCrons.clear(); - clear(); - } + virtual Csock * GetSockObj( const CS_STRING & sHostname, u_short uPort, int iTimeout = 60 ); enum EMessages { @@ -1314,290 +1368,20 @@ public: * @param pcSock the socket used for the connectiong, can be NULL * @return true on success */ - bool Connect( const CSConnection & cCon, T * pcSock = NULL ) - { - // create the new object - if ( !pcSock ) - pcSock = new T( cCon.GetHostname(), cCon.GetPort(), cCon.GetTimeout() ); - else - { - pcSock->SetHostName( cCon.GetHostname() ); - pcSock->SetPort( cCon.GetPort() ); - pcSock->SetTimeout( cCon.GetTimeout() ); - } + bool Connect( const CSConnection & cCon, Csock * pcSock = NULL ); - if( cCon.GetAFRequire() != CSSockAddr::RAF_ANY ) - pcSock->SetAFRequire( cCon.GetAFRequire() ); + virtual bool Listen( const CSListener & cListen, Csock * pcSock = NULL, u_short *piRandPort = NULL ); - // make it NON-Blocking IO - pcSock->BlockIO( false ); - - // bind the vhost - pcSock->SetBindHost( cCon.GetBindHost() ); - -#ifdef HAVE_LIBSSL - pcSock->SetSSL( cCon.GetIsSSL() ); - if( cCon.GetIsSSL() ) - { - if( !cCon.GetPemLocation().empty() ) - { - pcSock->SetPemLocation( cCon.GetPemLocation() ); - pcSock->SetPemPass( cCon.GetPemPass() ); - } - if( !cCon.GetCipher().empty() ) - pcSock->SetCipher( cCon.GetCipher() ); - } -#endif /* HAVE_LIBSSL */ - - pcSock->SetType( T::OUTBOUND ); - - pcSock->SetConState( T::CST_START ); - AddSock( pcSock, cCon.GetSockName() ); - return( true ); - } - - virtual bool Listen( const CSListener & cListen, T * pcSock = NULL, u_short *piRandPort = NULL ) - { - if ( !pcSock ) - pcSock = new T(); - - pcSock->BlockIO( false ); - if( cListen.GetAFRequire() != CSSockAddr::RAF_ANY ) - { - pcSock->SetAFRequire( cListen.GetAFRequire() ); -#ifdef HAVE_IPV6 - if( cListen.GetAFRequire() == CSSockAddr::RAF_INET6 ) - pcSock->SetIPv6( true ); -#endif /* HAVE_IPV6 */ - } -#ifdef HAVE_IPV6 - else - { - pcSock->SetIPv6( true ); - } -#endif /* HAVE_IPV6 */ -#ifdef HAVE_LIBSSL - pcSock->SetSSL( cListen.GetIsSSL() ); - if( ( cListen.GetIsSSL() ) && ( !cListen.GetPemLocation().empty() ) ) - { - pcSock->SetPemLocation( cListen.GetPemLocation() ); - pcSock->SetPemPass( cListen.GetPemPass() ); - pcSock->SetCipher( cListen.GetCipher() ); - pcSock->SetRequireClientCertFlags( cListen.GetRequireClientCertFlags() ); - } -#endif /* HAVE_LIBSSL */ - - if( piRandPort ) - *piRandPort = 0; - - if ( pcSock->Listen( cListen.GetPort(), cListen.GetMaxConns(), cListen.GetBindHost(), cListen.GetTimeout() ) ) - { - AddSock( pcSock, cListen.GetSockName() ); - if( ( piRandPort ) && ( cListen.GetPort() == 0 ) ) - { - cs_sock_t iSock = pcSock->GetSock(); - - if ( iSock == CS_INVALID_SOCK ) - { - CS_DEBUG( "Failed to attain a valid file descriptor" ); - pcSock->Close(); - return( false ); - } - struct sockaddr_in mLocalAddr; - socklen_t mLocalLen = sizeof( mLocalAddr ); - getsockname( iSock, (struct sockaddr *) &mLocalAddr, &mLocalLen ); - *piRandPort = ntohs( mLocalAddr.sin_port ); - } - return( true ); - } - - CS_Delete( pcSock ); - return( false ); - } + //! simple method to see if there are file descriptors being processed, useful to know if all the work is done in the manager + bool HasFDs() const; /** * Best place to call this class for running, all the call backs are called. * You should through this in your main while loop (long as its not blocking) * all the events are called as needed. */ - virtual void Loop() - { - for( u_int a = 0; a < this->size(); a++ ) - { - T *pcSock = (*this)[a]; - - if ( ( pcSock->GetType() != T::OUTBOUND ) || ( pcSock->GetConState() == T::CST_OK ) ) - continue; - if ( pcSock->GetConState() == T::CST_DNS ) - { - if ( pcSock->DNSLookup( T::DNS_VHOST ) == ETIMEDOUT ) - { - pcSock->SockError( EDOM ); - DelSock( a-- ); - continue; - } - } - - if ( pcSock->GetConState() == T::CST_BINDVHOST ) - { - if ( !pcSock->SetupVHost() ) - { - pcSock->SockError( GetSockError() ); - DelSock( a-- ); - continue; - } - } - - if ( pcSock->GetConState() == T::CST_DESTDNS ) - { - if ( pcSock->DNSLookup( T::DNS_DEST ) == ETIMEDOUT ) - { - pcSock->SockError( EADDRNOTAVAIL ); - DelSock( a-- ); - continue; - } - } - if ( pcSock->GetConState() == T::CST_CONNECT ) - { - if ( !pcSock->Connect( pcSock->GetBindHost(), true ) ) - { - if ( GetSockError() == ECONNREFUSED ) - pcSock->ConnectionRefused(); - else - pcSock->SockError( GetSockError() ); - - DelSock( a-- ); - continue; - } - } -#ifdef HAVE_LIBSSL - if( pcSock->GetConState() == T::CST_CONNECTSSL ) - { - if ( pcSock->GetSSL() ) - { - if ( !pcSock->ConnectSSL() ) - { - if ( GetSockError() == ECONNREFUSED ) - pcSock->ConnectionRefused(); - else - pcSock->SockError( GetSockError() == 0 ? ECONNABORTED : GetSockError() ); - - DelSock( a-- ); - continue; - } - } - } -#endif /* HAVE_LIBSSL */ - } - - std::map mpeSocks; - Select( mpeSocks ); - - switch( m_errno ) - { - case SUCCESS: - { - for( typename std::map::iterator itSock = mpeSocks.begin(); itSock != mpeSocks.end(); itSock++ ) - { - T * pcSock = itSock->first; - EMessages iErrno = itSock->second; - - if ( iErrno == SUCCESS ) - { - // read in data - // if this is a - int iLen = 0; - - if ( pcSock->GetSSL() ) - iLen = pcSock->GetPending(); - - if ( iLen <= 0 ) - iLen = CS_BLOCKSIZE; - - CSCharBuffer cBuff( iLen ); - - cs_ssize_t bytes = pcSock->Read( cBuff(), iLen ); - - if ( bytes != T::READ_TIMEDOUT && bytes != T::READ_CONNREFUSED && bytes != T::READ_ERR && !pcSock->IsConnected() ) - { - pcSock->SetIsConnected( true ); - pcSock->Connected(); - } - - switch( bytes ) - { - case T::READ_EOF: - { - DelSockByAddr( pcSock ); - break; - } - - case T::READ_ERR: - { - pcSock->SockError( GetSockError() ); - DelSockByAddr( pcSock ); - break; - } - - case T::READ_EAGAIN: - break; - - case T::READ_CONNREFUSED: - pcSock->ConnectionRefused(); - DelSockByAddr( pcSock ); - break; - - case T::READ_TIMEDOUT: - pcSock->Timeout(); - DelSockByAddr( pcSock ); - break; - - default: - { - if ( T::TMO_READ & pcSock->GetTimeoutType() ) - pcSock->ResetTimer(); // reset the timeout timer - - pcSock->ReadData( cBuff(), bytes ); // Call ReadData() before PushBuff() so that it is called before the ReadLine() event - LD 07/18/05 - pcSock->PushBuff( cBuff(), bytes ); - break; - } - } - - } else if ( iErrno == SELECT_ERROR ) - { - // a socket came back with an error - // usually means it was closed - DelSockByAddr( pcSock ); - } - } - break; - } - - case SELECT_TIMEOUT: - case SELECT_TRYAGAIN: - case SELECT_ERROR: - default : - break; - } - - unsigned long long iMilliNow = millitime(); - if ( ( iMilliNow - m_iCallTimeouts ) >= 1000 ) - { - m_iCallTimeouts = iMilliNow; - // call timeout on all the sockets that recieved no data - for( unsigned int i = 0; i < this->size(); i++ ) - { - if ( (*this)[i]->GetConState() != T::CST_OK ) - continue; - - if ( (*this)[i]->CheckTimeout( iMilliNow / 1000 ) ) - DelSock( i-- ); - } - } - // run any Manager Crons we may have - Cron(); - } + virtual void Loop(); /** * @brief this is similar to loop, except that it dynamically adjusts the select time based on jobs and timeouts in sockets @@ -1614,154 +1398,35 @@ public: * @param iUpperBounds the upper bounds to use in MICROSECONDS * @param iMaxResolution the maximum time to calculate overall in seconds */ - void DynamicSelectLoop( u_long iLowerBounds, u_long iUpperBounds, time_t iMaxResolution = 3600 ) - { - SetSelectTimeout( iLowerBounds ); - if( m_errno == SELECT_TIMEOUT ) - { // only do this if the previous call to select was a timeout - time_t iNow = time( NULL ); - u_long iSelectTimeout = GetDynamicSleepTime( iNow, iMaxResolution ); - iSelectTimeout *= 1000000; - iSelectTimeout = std::max( iLowerBounds, iSelectTimeout ); - iSelectTimeout = std::min( iSelectTimeout, iUpperBounds ); - if( iLowerBounds != iSelectTimeout ) - SetSelectTimeout( iSelectTimeout ); - } - Loop(); - } + void DynamicSelectLoop( u_long iLowerBounds, u_long iUpperBounds, time_t iMaxResolution = 3600 ); /** * Make this method virtual, so you can override it when a socket is added. * Assuming you might want to do some extra stuff */ - virtual void AddSock( T *pcSock, const CS_STRING & sSockName ) - { - pcSock->SetSockName( sSockName ); - this->push_back( pcSock ); - } + virtual void AddSock( Csock *pcSock, const CS_STRING & sSockName ); //! returns a pointer to the FIRST sock found by port or NULL on no match - virtual T * FindSockByRemotePort( u_short iPort ) - { - for( unsigned int i = 0; i < this->size(); i++ ) - { - if ( (*this)[i]->GetRemotePort() == iPort ) - return( (*this)[i] ); - } - - return( NULL ); - } + virtual Csock * FindSockByRemotePort( u_short iPort ); //! returns a pointer to the FIRST sock found by port or NULL on no match - virtual T * FindSockByLocalPort( u_short iPort ) - { - for( unsigned int i = 0; i < this->size(); i++ ) - if ( (*this)[i]->GetLocalPort() == iPort ) - return( (*this)[i] ); - - return( NULL ); - } + virtual Csock * FindSockByLocalPort( u_short iPort ); //! returns a pointer to the FIRST sock found by name or NULL on no match - virtual T * FindSockByName( const CS_STRING & sName ) - { - typename std::vector::iterator it; - typename std::vector::iterator it_end = this->end(); - for( it = this->begin(); it != it_end; it++ ) - if ( (*it)->GetSockName() == sName ) - return( *it ); - - return( NULL ); - } + virtual Csock * FindSockByName( const CS_STRING & sName ); //! returns a pointer to the FIRST sock found by filedescriptor or NULL on no match - virtual T * FindSockByFD( cs_sock_t iFD ) - { - for( unsigned int i = 0; i < this->size(); i++ ) - if ( ( (*this)[i]->GetRSock() == iFD ) || ( (*this)[i]->GetWSock() == iFD ) ) - return( (*this)[i] ); + virtual Csock * FindSockByFD( cs_sock_t iFD ); - return( NULL ); - } - - virtual std::vector FindSocksByName( const CS_STRING & sName ) - { - std::vector vpSocks; - - for( unsigned int i = 0; i < this->size(); i++ ) - if ( (*this)[i]->GetSockName() == sName ) - vpSocks.push_back( (*this)[i] ); - - return( vpSocks ); - } + virtual std::vector FindSocksByName( const CS_STRING & sName ); //! returns a vector of pointers to socks with sHostname as being connected - virtual std::vector FindSocksByRemoteHost( const CS_STRING & sHostname ) - { - std::vector vpSocks; - - for( unsigned int i = 0; i < this->size(); i++ ) - if ( (*this)[i]->GetHostName() == sHostname ) - vpSocks.push_back( (*this)[i] ); - - return( vpSocks ); - } + virtual std::vector FindSocksByRemoteHost( const CS_STRING & sHostname ); //! return the last known error as set by this class int GetErrno() { return( m_errno ); } - //! add a cronjob at the manager level - virtual void AddCron( CCron *pcCron ) - { - m_vcCrons.push_back( pcCron ); - } - /** - * @brief deletes a cron by name - * @param sName the name of the cron - * @param bDeleteAll delete all crons that match sName - * @param bCaseSensitive use strcmp or strcasecmp - */ - virtual void DelCron( const CS_STRING & sName, bool bDeleteAll = true, bool bCaseSensitive = true ) - { - for( u_int a = 0; a < m_vcCrons.size(); a++ ) - { - int (*Cmp)(const char *, const char *) = ( bCaseSensitive ? strcmp : strcasecmp ); - if ( Cmp( m_vcCrons[a]->GetName().c_str(), sName.c_str() ) == 0 ) - { - m_vcCrons[a]->Stop(); - CS_Delete( m_vcCrons[a] ); - m_vcCrons.erase( m_vcCrons.begin() + a-- ); - if( !bDeleteAll ) - break; - } - } - } - - //! delete cron by idx - virtual void DelCron( u_int iPos ) - { - if ( iPos < m_vcCrons.size() ) - { - m_vcCrons[iPos]->Stop(); - CS_Delete( m_vcCrons[iPos] ); - m_vcCrons.erase( m_vcCrons.begin() + iPos ); - } - } - //! delete cron by address - virtual void DelCronByAddr( CCron *pcCron ) - { - for( u_int a = 0; a < m_vcCrons.size(); a++ ) - { - if ( m_vcCrons[a] == pcCron ) - { - m_vcCrons[a]->Stop(); - CS_Delete( m_vcCrons[a] ); - m_vcCrons.erase( m_vcCrons.begin() + a ); - return; - } - } - } //! Get the Select Timeout in MICROSECONDS ( 1000 == 1 millisecond ) u_long GetSelectTimeout() { return( m_iSelectWait ); } @@ -1769,50 +1434,18 @@ public: //! Setting this to 0 will cause no timeout to happen, Select() will return instantly void SetSelectTimeout( u_long iTimeout ) { m_iSelectWait = iTimeout; } - std::vector & GetCrons() { return( m_vcCrons ); } - //! Delete a sock by addr //! its position is looked up //! the socket is deleted, the appropriate call backs are peformed //! and its instance is removed from the manager - virtual void DelSockByAddr( T *pcSock ) - { - for( u_int a = 0; a < this->size(); a++ ) - { - if ( pcSock == (*this)[a] ) - { - DelSock( a ); - return; - } - } - } + virtual void DelSockByAddr( Csock *pcSock ); + //! Delete a sock by position in the vector //! the socket is deleted, the appropriate call backs are peformed //! and its instance is removed from the manager //! deleting in a loop can be tricky, be sure you watch your position. //! ie for( u_int a = 0; a < size(); a++ ) DelSock( a-- ); - virtual void DelSock( u_int iPos ) - { - if ( iPos >= this->size() ) - { - CS_DEBUG( "Invalid Sock Position Requested! [" << iPos << "]" ); - return; - } - - T * pSock = (*this)[iPos]; - - if( pSock->GetCloseType() != T::CLT_DEREFERENCE ) - { - if ( pSock->IsConnected() ) - pSock->Disconnected(); // only call disconnected event if connected event was called (IE IsConnected was set) - - m_iBytesRead += pSock->GetBytesRead(); - m_iBytesWritten += pSock->GetBytesWritten(); - } - - CS_Delete( pSock ); - this->erase( this->begin() + iPos ); - } + virtual void DelSock( u_int iPos ); /** * @brief swaps out a sock with a copy of the original sock @@ -1820,21 +1453,7 @@ public: * @param iOrginalSockIdx the position in this sockmanager of the original sock * @return true on success */ - virtual bool SwapSockByIdx( Csock *pNewSock, u_long iOrginalSockIdx ) - { - if( iOrginalSockIdx >= this->size() ) - { - CS_DEBUG( "Invalid Sock Position Requested! [" << iOrginalSockIdx << "]" ); - return( false ); - } - - Csock *pSock = (*this)[iOrginalSockIdx]; - pNewSock->Copy( *pSock ); - pSock->Dereference(); - (*this)[iOrginalSockIdx] = (T *)pNewSock; - this->push_back( (T *)pSock ); // this allows it to get cleaned up - return( true ); - } + virtual bool SwapSockByIdx( Csock *pNewSock, u_long iOrginalSockIdx ); /** * @brief swaps out a sock with a copy of the original sock @@ -1842,140 +1461,28 @@ public: * @param pOrigSock the address of the original socket * @return true on success */ - virtual bool SwapSockByAddr( Csock *pNewSock, Csock *pOrigSock ) - { - for( u_long a = 0; a < this->size(); a++ ) - { - if( (*this)[a] == pOrigSock ) - return( SwapSockByIdx( pNewSock, a ) ); - } - return( false ); - } + virtual bool SwapSockByAddr( Csock *pNewSock, Csock *pOrigSock ); //! Get the bytes read from all sockets current and past - unsigned long long GetBytesRead() const - { - // Start with the total bytes read from destroyed sockets - unsigned long long iRet = m_iBytesRead; - - // Add in the outstanding bytes read from active sockets - for( u_int a = 0; a < this->size(); a++ ) - iRet += (*this)[a]->GetBytesRead(); - - return( iRet ); - } + unsigned long long GetBytesRead() const; //! Get the bytes written to all sockets current and past - unsigned long long GetBytesWritten() const - { - // Start with the total bytes written to destroyed sockets - unsigned long long iRet = m_iBytesWritten; + unsigned long long GetBytesWritten() const; - // Add in the outstanding bytes written to active sockets - for( u_int a = 0; a < this->size(); a++ ) - iRet += (*this)[a]->GetBytesWritten(); - - return( iRet ); - } - -protected: //! this is a strict wrapper around C-api select(). Added in the event you need to do special work here enum ECheckType { - eCheckRead = 1, - eCheckWrite = 2 + ECT_Read = 1, + ECT_Write = 2 }; - void FDSetCheck( int iFd, std::map< int, short > & miiReadyFds, ECheckType eType ) - { - std::map< int, short >::iterator it = miiReadyFds.find( iFd ); - if( it != miiReadyFds.end() ) - it->second |= eType; - else - miiReadyFds[iFd] = eType; - } - bool FDHasCheck( int iFd, std::map< int, short > & miiReadyFds, ECheckType eType ) - { - std::map< int, short >::iterator it = miiReadyFds.find( iFd ); - if( it != miiReadyFds.end() ) - return( (it->second & eType) ); - return( false ); - } - virtual int Select( std::map< int, short > & miiReadyFds, struct timeval *tvtimeout) - { -#ifdef CSOCK_USE_POLL - if( miiReadyFds.empty() ) - return( select( 0, NULL, NULL, NULL, tvtimeout ) ); + void FDSetCheck( int iFd, std::map< int, short > & miiReadyFds, ECheckType eType ); + bool FDHasCheck( int iFd, std::map< int, short > & miiReadyFds, ECheckType eType ); - struct pollfd * pFDs = (struct pollfd *)malloc( sizeof( struct pollfd ) * miiReadyFds.size() ); - size_t uCurrPoll = 0; - for( std::map< int, short >::iterator it = miiReadyFds.begin(); it != miiReadyFds.end(); ++it, ++uCurrPoll ) - { - short iEvents = 0; - if( it->second & eCheckRead ) - iEvents |= POLLIN; - if( it->second & eCheckWrite ) - iEvents |= POLLOUT; - pFDs[uCurrPoll].fd = it->first; - pFDs[uCurrPoll].events = iEvents; - pFDs[uCurrPoll].revents = 0; - } - int iTimeout = (int)(tvtimeout->tv_usec / 1000); - iTimeout += (int)(tvtimeout->tv_sec * 1000); - size_t uMaxFD = miiReadyFds.size(); - int iRet = poll( pFDs, uMaxFD, iTimeout ); - miiReadyFds.clear(); - for( uCurrPoll = 0; uCurrPoll < uMaxFD; ++uCurrPoll ) - { - short iEvents = 0; - if( (pFDs[uCurrPoll].revents & (POLLIN|POLLERR|POLLHUP|POLLNVAL) ) ) - iEvents |= eCheckRead; - if( (pFDs[uCurrPoll].revents & POLLOUT ) ) - iEvents |= eCheckWrite; - std::map< int, short >::iterator it = miiReadyFds.find( pFDs[uCurrPoll].fd ); - if( it != miiReadyFds.end() ) - it->second |= iEvents; - else - miiReadyFds[pFDs[uCurrPoll].fd] = iEvents; - } - free( pFDs ); -#else - fd_set rfds, wfds; - TFD_ZERO( &rfds ); - TFD_ZERO( &wfds ); - bool bHasWrite = false; - int iHighestFD = 0; - for( std::map< int, short >::iterator it = miiReadyFds.begin(); it != miiReadyFds.end(); ++it ) - { - iHighestFD = std::max( it->first, iHighestFD ); - if( it->second & eCheckRead ) - { - TFD_SET( it->first, &rfds ); - } - if( it->second & eCheckWrite ) - { - bHasWrite = true; - TFD_SET( it->first, &wfds ); - } - } +protected: - int iRet = select( iHighestFD + 1, &rfds, ( bHasWrite ? &wfds : NULL ), NULL, tvtimeout ); - if( iRet <= 0 ) - miiReadyFds.clear(); - else - { - for( std::map< int, short >::iterator it = miiReadyFds.begin(); it != miiReadyFds.end(); ++it ) - { - if( (it->second & eCheckRead) && !TFD_ISSET( it->first, &rfds ) ) - it->second &= ~eCheckRead; - if( (it->second & eCheckWrite) && !TFD_ISSET( it->first, &wfds ) ) - it->second &= ~eCheckWrite; - } - } -#endif /* CSOCK_USE_POLL */ + virtual int Select( std::map< int, short > & miiReadyFds, struct timeval *tvtimeout); - return( iRet ); - } private: /** * fills a map of socks to a message for check @@ -1983,391 +1490,40 @@ private: * each struct contains the socks error * @see GetErrno() */ - virtual void Select( std::map & mpeSocks ) - { - mpeSocks.clear(); - struct timeval tv; + void Select( std::map & mpeSocks ); - std::map< int, short > miiReadyFds; - tv.tv_sec = m_iSelectWait / 1000000; - tv.tv_usec = m_iSelectWait % 1000000; - u_int iQuickReset = 1000; - if ( m_iSelectWait == 0 ) - iQuickReset = 0; - - bool bHasAvailSocks = false; - unsigned long long iNOW = 0; - for( unsigned int i = 0; i < this->size(); i++ ) - { - T *pcSock = (*this)[i]; - - Csock::ECloseType eCloseType = pcSock->GetCloseType(); - - if( eCloseType == T::CLT_NOW || eCloseType == T::CLT_DEREFERENCE || ( eCloseType == T::CLT_AFTERWRITE && pcSock->GetWriteBuffer().empty() ) ) - { - DelSock( i-- ); // close any socks that have requested it - continue; - } - else - pcSock->Cron(); // call the Cron handler here - - cs_sock_t & iRSock = pcSock->GetRSock(); - cs_sock_t & iWSock = pcSock->GetWSock(); -#ifndef CSOCK_USE_POLL - if( iRSock > FD_SETSIZE || iWSock > FD_SETSIZE ) - { - CS_DEBUG( "FD is larger than select() can handle" ); - DelSock( i-- ); - continue; - } -#endif /* CSOCK_USE_POLL */ - -#ifdef HAVE_C_ARES - ares_channel pChannel = pcSock->GetAresChannel(); - if( pChannel ) - { - ares_socket_t aiAresSocks[1]; - aiAresSocks[0] = ARES_SOCKET_BAD; - int iSockMask = ares_getsock( pChannel, aiAresSocks, 1 ); - if( ARES_GETSOCK_READABLE( iSockMask, 0 ) ) - FDSetCheck( aiAresSocks[0], miiReadyFds, eCheckRead ); - if( ARES_GETSOCK_WRITABLE( iSockMask, 0 ) ) - FDSetCheck( aiAresSocks[0], miiReadyFds, eCheckWrite ); - // let ares drop the timeout if it has something timing out sooner then whats in tv currently - ares_timeout( pChannel, &tv, &tv ); - } -#endif /* HAVE_C_ARES */ - - if ( pcSock->GetConState() != T::CST_OK ) - continue; - - bHasAvailSocks = true; - - bool bIsReadPaused = pcSock->IsReadPaused(); - if ( bIsReadPaused ) - { - pcSock->ReadPaused(); - bIsReadPaused = pcSock->IsReadPaused(); // re-read it again, incase it changed status) - } - if ( iRSock == CS_INVALID_SOCK || iWSock == CS_INVALID_SOCK ) - { - SelectSock( mpeSocks, SUCCESS, pcSock ); - continue; // invalid sock fd - } - - if( pcSock->GetType() != T::LISTENER ) - { - bool bHasWriteBuffer = !pcSock->GetWriteBuffer().empty(); - - if ( !bIsReadPaused ) - FDSetCheck( iRSock, miiReadyFds, eCheckRead ); - - if( pcSock->AllowWrite( iNOW ) && ( !pcSock->IsConnected() || bHasWriteBuffer ) ) - { - if( !pcSock->IsConnected() ) - { // set the write bit if not connected yet - FDSetCheck( iWSock, miiReadyFds, eCheckWrite ); - } - else if( bHasWriteBuffer && !pcSock->GetSSL() ) - { // always set the write bit if there is data to send when NOT ssl - FDSetCheck( iWSock, miiReadyFds, eCheckWrite ); - } - else if( bHasWriteBuffer && pcSock->GetSSL() && pcSock->SslIsEstablished() ) - { // ONLY set the write bit if there is data to send and the SSL handshake is finished - FDSetCheck( iWSock, miiReadyFds, eCheckWrite ); - } - } - - if( pcSock->GetSSL() && !pcSock->SslIsEstablished() && bHasWriteBuffer ) - { // if this is an unestabled SSL session with data to send ... try sending it - // do this here, cause otherwise ssl will cause a small - // cpu spike waiting for the handshake to finish - // resend this data - if ( !pcSock->Write( "" ) ) - { - pcSock->Close(); - } - // warning ... setting write bit in here causes massive CPU spinning on invalid SSL servers - // http://bugs.debian.org/cgi-bin/bugreport.cgi?bug=631590 - // however, we can set the select WAY down and it will retry quickly, but keep it from spinning at 100% - tv.tv_usec = iQuickReset; - tv.tv_sec = 0; - } - } - else - { - FDSetCheck( iRSock, miiReadyFds, eCheckRead ); - } - - if( pcSock->GetSSL() && pcSock->GetType() != Csock::LISTENER ) - { - if ( ( pcSock->GetPending() > 0 ) && ( !pcSock->IsReadPaused() ) ) - SelectSock( mpeSocks, SUCCESS, pcSock ); - } - } - - // old fashion select, go fer it - int iSel; - - if( !mpeSocks.empty() ) // .1 ms pause to see if anything else is ready (IE if there is SSL data pending, don't wait too long) - { - tv.tv_usec = iQuickReset; - tv.tv_sec = 0; - } - else if ( !this->empty() && !bHasAvailSocks ) - { - tv.tv_usec = iQuickReset; - tv.tv_sec = 0; - } - - iSel = Select( miiReadyFds, &tv ); - if ( iSel == 0 ) - { - if ( mpeSocks.empty() ) - m_errno = SELECT_TIMEOUT; - else - m_errno = SUCCESS; -#ifdef HAVE_C_ARES - // run through ares channels and process timeouts - for( u_long uSock = 0; uSock < this->size(); ++uSock ) - { - T *pcSock = this->at( uSock ); - ares_channel pChannel = pcSock->GetAresChannel(); - if( pChannel ) - ares_process_fd( pChannel, ARES_SOCKET_BAD, ARES_SOCKET_BAD ); - } -#endif /* HAVE_C_ARES */ - - return; - } - - if ( ( iSel == -1 ) && ( errno == EINTR ) ) - { - if ( mpeSocks.empty() ) - m_errno = SELECT_TRYAGAIN; - else - m_errno = SUCCESS; - - return; - } - else if ( iSel == -1 ) - { - if ( mpeSocks.empty() ) - m_errno = SELECT_ERROR; - else - m_errno = SUCCESS; - - return; - } - else - { - m_errno = SUCCESS; - } - - // find out wich one is ready - for( unsigned int i = 0; i < this->size(); i++ ) - { - T *pcSock = (*this)[i]; - -#ifdef HAVE_C_ARES - ares_channel pChannel = pcSock->GetAresChannel(); - if( pChannel ) - { - ares_socket_t aiAresSocks[1]; - aiAresSocks[0] = ARES_SOCKET_BAD; - ares_getsock( pChannel, aiAresSocks, 1 ); - if( FDHasCheck( aiAresSocks[0], miiReadyFds, eCheckRead ) || FDHasCheck( aiAresSocks[0], miiReadyFds, eCheckWrite ) ) - ares_process_fd( pChannel, aiAresSocks[0], aiAresSocks[0] ); - } -#endif /* HAVE_C_ARES */ - - if ( pcSock->GetConState() != T::CST_OK ) - continue; - - cs_sock_t & iRSock = pcSock->GetRSock(); - cs_sock_t & iWSock = pcSock->GetWSock(); - EMessages iErrno = SUCCESS; - - if ( iRSock == CS_INVALID_SOCK || iWSock == CS_INVALID_SOCK ) - { - // trigger a success so it goes through the normal motions - // and an error is produced - SelectSock( mpeSocks, SUCCESS, pcSock ); - continue; // watch for invalid socks - } - - if ( FDHasCheck( iWSock, miiReadyFds, eCheckWrite ) ) - { - if ( iSel > 0 ) - { - iErrno = SUCCESS; - if ( ( !pcSock->GetWriteBuffer().empty() ) && ( pcSock->IsConnected() ) ) - { // write whats in the socks send buffer - if ( !pcSock->Write( "" ) ) - { - // write failed, sock died :( - iErrno = SELECT_ERROR; - } - } - } else - iErrno = SELECT_ERROR; - - SelectSock( mpeSocks, iErrno, pcSock ); - - } - else if ( FDHasCheck( iRSock, miiReadyFds, eCheckRead ) ) - { - if ( iSel > 0 ) - iErrno = SUCCESS; - else - iErrno = SELECT_ERROR; - - if ( pcSock->GetType() != T::LISTENER ) - SelectSock( mpeSocks, iErrno, pcSock ); - else // someone is coming in! - { - CS_STRING sHost; - u_short port; - cs_sock_t inSock = pcSock->Accept( sHost, port ); - - if ( inSock != CS_INVALID_SOCK ) - { - if ( T::TMO_ACCEPT & pcSock->GetTimeoutType() ) - pcSock->ResetTimer(); // let them now it got dinged - - // if we have a new sock, then add it - T *NewpcSock = (T *)pcSock->GetSockObj( sHost, port ); - - if ( !NewpcSock ) - NewpcSock = new T( sHost, port ); - - NewpcSock->BlockIO( false ); - NewpcSock->SetType( T::INBOUND ); - NewpcSock->SetRSock( inSock ); - NewpcSock->SetWSock( inSock ); - NewpcSock->SetIPv6( pcSock->GetIPv6() ); - - bool bAddSock = true; -#ifdef HAVE_LIBSSL - // - // is this ssl ? - if ( pcSock->GetSSL() ) - { - NewpcSock->SetCipher( pcSock->GetCipher() ); - NewpcSock->SetPemLocation( pcSock->GetPemLocation() ); - NewpcSock->SetPemPass( pcSock->GetPemPass() ); - NewpcSock->SetRequireClientCertFlags( pcSock->GetRequireClientCertFlags() ); - bAddSock = NewpcSock->AcceptSSL(); - } - -#endif /* HAVE_LIBSSL */ - if ( bAddSock ) - { - // set the name of the listener - NewpcSock->SetParentSockName( pcSock->GetSockName() ); - NewpcSock->SetRate( pcSock->GetRateBytes(), pcSock->GetRateTime() ); - if ( NewpcSock->GetSockName().empty() ) - { - std::stringstream s; - s << sHost << ":" << port; - AddSock( NewpcSock, s.str() ); - - } else - AddSock( NewpcSock, NewpcSock->GetSockName() ); - } else - CS_Delete( NewpcSock ); - } -#ifdef _WIN32 - else if( GetSockError() != WSAEWOULDBLOCK ) -#else /* _WIN32 */ - else if( GetSockError() != EAGAIN ) -#endif /* _WIN32 */ - { - pcSock->SockError( GetSockError() ); - } - } - } - } - } - - time_t GetDynamicSleepTime( time_t iNow, time_t iMaxResolution = 3600 ) const - { - time_t iNextRunTime = iNow + iMaxResolution; - typename std::vector::const_iterator it; - // This is safe, because we don't modify the vector. - typename std::vector::const_iterator it_end = this->end(); - - for (it = this->begin(); it != it_end; it++) - { - T* pSock = *it; - - if( pSock->GetConState() != T::CST_OK ) - iNextRunTime = iNow; // this is in a nebulous state, need to let it proceed like normal - - time_t iTimeoutInSeconds = pSock->GetTimeout(); - if( iTimeoutInSeconds > 0 ) - { - time_t iNextTimeout = pSock->GetNextCheckTimeout( iNow ); - iNextRunTime = std::min( iNextRunTime, iNextTimeout ); - } - - const std::vector & vCrons = pSock->GetCrons(); - std::vector::const_iterator cit; - std::vector::const_iterator cit_end = vCrons.end(); - for (cit = vCrons.begin(); cit != cit_end; cit++) - iNextRunTime = std::min( iNextRunTime, (*cit)->GetNextRun() ); - } - std::vector::const_iterator cit; - std::vector::const_iterator cit_end = m_vcCrons.end(); - for (cit = m_vcCrons.begin(); cit != cit_end; cit++) - iNextRunTime = std::min( iNextRunTime, (*cit)->GetNextRun() ); - - if( iNextRunTime < iNow ) - return( 0 ); // smallest unit possible - return( std::min( iNextRunTime - iNow, iMaxResolution ) ); - } + time_t GetDynamicSleepTime( time_t iNow, time_t iMaxResolution = 3600 ) const; //! internal use only - virtual void SelectSock( std::map & mpeSocks, EMessages eErrno, T * pcSock ) - { - if ( mpeSocks.find( pcSock ) != mpeSocks.end() ) - return; - - mpeSocks[pcSock] = eErrno; - } - - //! these crons get ran and checked in Loop() - virtual void Cron() - { - time_t iNow = 0; - for( unsigned int a = 0; a < m_vcCrons.size(); a++ ) - { - CCron *pcCron = m_vcCrons[a]; - - if ( !pcCron->isValid() ) - { - CS_Delete( pcCron ); - m_vcCrons.erase( m_vcCrons.begin() + a-- ); - } else - pcCron->run( iNow ); - } - } + virtual void SelectSock( std::map & mpeSocks, EMessages eErrno, Csock * pcSock ); //////// // Connection State Functions /////////// // members - EMessages m_errno; - std::vector m_vcCrons; - unsigned long long m_iCallTimeouts; - unsigned long long m_iBytesRead; - unsigned long long m_iBytesWritten; - u_long m_iSelectWait; + EMessages m_errno; + unsigned long long m_iCallTimeouts; + unsigned long long m_iBytesRead; + unsigned long long m_iBytesWritten; + u_long m_iSelectWait; }; -//! basic socket class -typedef TSocketManager CSocketManager; +/** + * @class TSocketManager + * @brief ease of use templated socket manager + */ +template +class TSocketManager : public CSocketManager +{ +public: + TSocketManager() : CSocketManager() {} + virtual ~TSocketManager() {} + virtual T * GetSockObj( const CS_STRING & sHostname, u_short uPort, int iTimeout = 60 ) + { + return( new T( sHostname, uPort, iTimeout ) ); + } +}; #ifndef _NO_CSOCKET_NS } diff --git a/Socket.cpp b/Socket.cpp index 9dc0bbcc..4c110b6b 100644 --- a/Socket.cpp +++ b/Socket.cpp @@ -16,7 +16,7 @@ unsigned int CSockManager::GetAnonConnectionCount(const CString &sIP) const { unsigned int ret = 0; for (it = begin(); it != end(); ++it) { - CZNCSock *pSock = *it; + Csock *pSock = *it; // Logged in CClients have "USR::" as their sockname if (pSock->GetType() == Csock::INBOUND && pSock->GetRemoteIP() == sIP && pSock->GetSockName().Left(5) != "USR::") { diff --git a/modules/extra/shell.cpp b/modules/extra/shell.cpp index 9e58309e..860b073e 100644 --- a/modules/extra/shell.cpp +++ b/modules/extra/shell.cpp @@ -50,7 +50,7 @@ public: } virtual ~CShellMod() { - vector vSocks = m_pManager->FindSocksByName("SHELL"); + vector vSocks = m_pManager->FindSocksByName("SHELL"); for (unsigned int a = 0; a < vSocks.size(); a++) { m_pManager->DelSockByAddr(vSocks[a]);