// for RTSPListenerSocket object, this ProcessEvent will be called
// Foreever, RTSPListenerSocket is also a Task object.
// it drived from Task and EventContext.
// Each Socket is derived from EventContext.
void TCPListenerSocket::ProcessEvent(int /*eventBits*/)
{
//we are executing on the same thread as every other
//socket, so whatever you do here has to be fast.
struct sockaddr_in addr;
#if __Win32__ || __osf__ || __sgi__ || __hpux__
int size = sizeof(addr);
#else
socklen_t size = sizeof(addr);
#endif
Task* theTask = NULL;
TCPSocket* theSocket = NULL;
//fSocket data member of TCPSocket.
int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size);
//test osSocket = -1;
if (osSocket == -1)
{
//take a look at what this error is.
int acceptError = OSThread::GetErrno();
if (acceptError == EAGAIN)
{
//If it's EAGAIN, there's nothing on the listen queue right now,
//so modwatch and return
// EAGIN error occur, retrigger request event
// i.e., readd to select readset for the fd relative to
// req of this object
this->RequestEvent(EV_RE);
return;
}
//test acceptError = ENFILE;
//test acceptError = EINTR;
//test acceptError = ENOENT;
//if these error gets returned, we're out of file desciptors,
//the server is going to be failing on sockets, logs, qtgroups and qtuser auth file accesses and movie files. The server is not functional.
if (acceptError == EMFILE || acceptError == ENFILE)
{
// severe error occur, exit program.
#ifndef __Win32__
QTSSModuleUtils::LogErrorStr(qtssFatalVerbosity, "Out of File Descriptors. Set max connections lower and check for competing usage from other processes. Exiting.");
#endif
exit (EXIT_FAILURE);
}
else
{
// other errors occur, cleanup the task, and set disconnect of thesocket
char errStr[256];
errStr[sizeof(errStr) -1] = 0;
qtss_snprintf(errStr, sizeof(errStr) -1, "accept error = %d '%s' on socket. Clean up and continue.", acceptError, strerror(acceptError));
WarnV( (acceptError == 0), errStr);
theTask = this->GetSessionTask(&theSocket);
if (theTask == NULL)
{
close(osSocket);
}
else
{
theTask->Signal(Task::kKillEvent); // just clean up the task
}
if (theSocket)
theSocket->fState &= ~kConnected; // turn off connected state
return;
}
}
// here a new connect request accepted. further action done below.
// get theTask, theSocket, theTask is a RTSPSession Task.
// theSocket is a TCPSocket
// for understanding easily, the source codes of GetSessionTask below.
// Task* RTSPListenerSocket::GetSessionTask(TCPSocket** outSocket)
// {
// Assert(outSocket != NULL);
// // when the server is behing a round robin DNS, the client needs to knwo the IP address ot the server
// // so that it can direct the "POST" half of the connection to the same machine when tunnelling RTSP thru HTTP
// Bool16 doReportHTTPConnectionAddress = QTSServerInterface::GetServer()->GetPrefs()->GetDoReportHTTPConnectionAddress();
// RTSPSession* theTask = NEW RTSPSession(doReportHTTPConnectionAddress);
// *outSocket = theTask->GetSocket(); // out socket is not attached to a unix socket yet.
// if (this->OverMaxConnections(0))
// this->SlowDown();
// else
// this->RunNormal();
// return theTask;
// }
theTask = this->GetSessionTask(&theSocket);
if (theTask == NULL)
{ //this should be a disconnect. do an ioctl call?
close(osSocket);
if (theSocket)
theSocket->fState &= ~kConnected; // turn off connected state
}
else
{
Assert(osSocket != EventContext::kInvalidFileDesc);
//set options on the socket
//we are a server, always disable nagle algorithm
int one = 1;
int err = ::setsockopt(osSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(int));
AssertV(err == 0, OSThread::GetErrno());
err = ::setsockopt(osSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&one, sizeof(int));
AssertV(err == 0, OSThread::GetErrno());
int sndBufSize = 96L * 1024L;
err = ::setsockopt(osSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sndBufSize, sizeof(int));
AssertV(err == 0, OSThread::GetErrno());
//setup the socket. When there is data on the socket,
//theTask will get an kReadEvent event
theSocket->Set(osSocket, &addr);
theSocket->InitNonBlocking(osSocket);
// theSocket (TCPSocket) associate with theTask (RTSPSession Task)
theSocket->SetTask(theTask);
// add theSocket's fd to select readset.
theSocket->RequestEvent(EV_RE);
// transfer thsTask to a RTSP task processing threads
theTask->SetThreadPicker(Task::GetBlockingTaskThreadPicker()); //The RTSP Task processing threads
}
if (fSleepBetweenAccepts)
{
// We are at our maximum supported sockets
// slow down so we have time to process the active ones (we will respond with errors or service).
// wake up and execute again after sleeping. The timer must be reset each time through
//qtss_printf("TCPListenerSocket slowing down\n");
this->SetIdleTimer(kTimeBetweenAcceptsInMsec); //sleep 1 second
}
else
{
// sleep until there is a read event outstanding (another client wants to connect)
//qtss_printf("TCPListenerSocket normal speed\n");
this->RequestEvent(EV_RE);
}
fOutOfDescriptors = false; // always false for now we don't properly handle this elsewhere in the code
}