1 // ------------------------------------------------
6 // Servents are the actual connections between clients. They do the handshaking,
7 // transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 // to it on connect, it uses this to transfer all of its data.
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
43 #include "win32/seh.h"
46 const int DIRECT_WRITE_TIMEOUT = 60;
48 // -----------------------------------
49 char *Servent::statusMsgs[]=
66 // -----------------------------------
67 char *Servent::typeMsgs[]=
78 // -----------------------------------
79 bool Servent::isPrivate()
82 return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
84 // -----------------------------------
85 bool Servent::isAllowed(int a)
89 if (servMgr->isFiltered(ServFilter::F_BAN,h))
95 // -----------------------------------
96 bool Servent::isFiltered(int f)
99 return servMgr->isFiltered(f,h);
102 int servent_count = 1;
103 // -----------------------------------
104 Servent::Servent(int index)
105 :outPacketsPri(MAX_OUTPACKETS)
106 ,outPacketsNorm(MAX_OUTPACKETS)
113 servent_id = servent_count++;
119 // -----------------------------------
124 // -----------------------------------
127 thread.active = false;
129 setStatus(S_CLOSING);
133 PCPStream *pcp = pcpStream;
139 chanMgr->hitlistlock.on();
140 ChanHitList *chl = chanMgr->findHitListByID(chanID);
142 ChanHit *chh = chl->hit;
143 ChanHit *prev = NULL;
145 if (chh->servent_id == this->servent_id){
146 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
148 chh->numListeners = 0;
153 ChanHit *next = chh->next;
168 chanMgr->hitlistlock.off();
186 if (type != T_SERVER)
193 // -----------------------------------
194 void Servent::abort()
196 thread.active = false;
204 // -----------------------------------
205 void Servent::reset()
219 outputProtocol = ChanInfo::SP_UNKNOWN;
228 lastConnect = lastPing = lastPacket = 0;
229 loginPassword.clear();
232 priorityConnect = false;
236 outPacketsNorm.reset();
237 outPacketsPri.reset();
248 // -----------------------------------
249 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
254 && (!cid.isSet() || chanID.isSame(cid))
255 && (!sid.isSet() || !sid.isSame(remoteID))
256 && (pcpStream != NULL)
259 return pcpStream->sendPacket(pack,did);
265 // -----------------------------------
266 bool Servent::acceptGIV(ClientSocket *givSock)
276 // -----------------------------------
277 Host Servent::getHost()
287 // -----------------------------------
288 bool Servent::outputPacket(GnuPacket &p, bool pri)
294 r = outPacketsPri.write(p);
297 if (servMgr->useFlowControl)
299 int per = outPacketsNorm.percentFull();
310 // if in flowcontrol, only allow packets with less of a hop count than already in queue
311 if (p.hops >= outPacketsNorm.findMinHop())
316 r = outPacketsNorm.write(p);
323 // -----------------------------------
324 bool Servent::initServer(Host &h)
338 thread.func = serverProc;
342 if (!sys->startThread(&thread))
343 throw StreamException("Can`t start thread");
345 }catch(StreamException &e)
347 LOG_ERROR("Bad server: %s",e.msg);
354 // -----------------------------------
355 void Servent::checkFree()
358 throw StreamException("Socket already set");
360 throw StreamException("Thread already active");
362 // -----------------------------------
363 void Servent::initIncoming(ClientSocket *s, unsigned int a)
374 thread.func = incomingProc;
375 thread.finish = false;
377 setStatus(S_PROTOCOL);
380 sock->host.toStr(ipStr);
381 LOG_DEBUG("Incoming from %s",ipStr);
384 if (!sys->startThread(&thread))
385 throw StreamException("Can`t start thread");
386 }catch(StreamException &e)
388 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
389 //servMgr->shutdownTimer = 1;
392 LOG_ERROR("INCOMING FAILED: %s",e.msg);
397 // -----------------------------------
398 void Servent::initOutgoing(TYPE ty)
408 thread.func = outgoingProc;
410 if (!sys->startThread(&thread))
411 throw StreamException("Can`t start thread");
413 }catch(StreamException &e)
415 LOG_ERROR("Unable to start outgoing: %s",e.msg);
420 // -----------------------------------
421 void Servent::initPCP(Host &rh)
435 if (!isAllowed(ALLOW_NETWORK))
436 throw StreamException("Servent not allowed");
439 thread.func = outgoingProc;
441 LOG_DEBUG("Outgoing to %s",ipStr);
443 if (!sys->startThread(&thread))
444 throw StreamException("Can`t start thread");
446 }catch(StreamException &e)
448 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
454 // -----------------------------------
455 void Servent::initChannelFetch(Host &host)
469 if (!isAllowed(ALLOW_DATA))
470 throw StreamException("Servent not allowed");
476 // -----------------------------------
477 void Servent::initGIV(Host &h, GnuID &id)
491 if (!isAllowed(ALLOW_NETWORK))
492 throw StreamException("Servent not allowed");
497 thread.func = givProc;
501 if (!sys->startThread(&thread))
502 throw StreamException("Can`t start thread");
504 }catch(StreamException &e)
506 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
510 // -----------------------------------
511 void Servent::createSocket()
514 LOG_ERROR("Servent::createSocket attempt made while active");
516 sock = sys->createSocket();
518 // -----------------------------------
519 void Servent::setStatus(STATUS s)
525 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
526 lastConnect = sys->getTime();
531 // -----------------------------------
532 void Servent::handshakeOut()
534 sock->writeLine(GNU_PEERCONN);
538 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
539 sock->writeLineF("%s %d",PCX_HS_PCP,1);
542 sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
544 if (networkID.isSet())
546 networkID.toStr(str);
547 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
550 servMgr->sessionID.toStr(str);
551 sock->writeLineF("%s %s",PCX_HS_ID,str);
554 sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
560 int r = http.readResponse();
564 LOG_ERROR("Expected 200, got %d",r);
565 throw StreamException("Unexpected HTTP response");
569 bool versionValid = false;
574 while (http.nextHeader())
576 LOG_DEBUG(http.cmdLine);
578 char *arg = http.getArgStr();
582 if (http.isHeader(HTTP_HS_AGENT))
586 if (strnicmp(arg,"PeerCast/",9)==0)
587 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
588 }else if (http.isHeader(PCX_HS_NETWORKID))
589 clientID.fromStr(arg);
592 if (!clientID.isSame(networkID))
593 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
596 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
599 sock->writeLine(GNU_OK);
605 // -----------------------------------
606 void Servent::processOutChannel()
611 // -----------------------------------
612 void Servent::handshakeIn()
620 bool versionValid = false;
621 bool diffRootVer = false;
626 while (http.nextHeader())
628 LOG_DEBUG("%s",http.cmdLine);
630 char *arg = http.getArgStr();
634 if (http.isHeader(HTTP_HS_AGENT))
638 if (strnicmp(arg,"PeerCast/",9)==0)
640 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
641 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
643 }else if (http.isHeader(PCX_HS_NETWORKID))
645 clientID.fromStr(arg);
647 }else if (http.isHeader(PCX_HS_PRIORITY))
649 priorityConnect = atoi(arg)!=0;
651 }else if (http.isHeader(PCX_HS_ID))
655 if (id.isSame(servMgr->sessionID))
656 throw StreamException("Servent loopback");
658 }else if (http.isHeader(PCX_HS_OS))
660 if (stricmp(arg,PCX_OS_LINUX)==0)
662 else if (stricmp(arg,PCX_OS_WIN32)==0)
664 else if (stricmp(arg,PCX_OS_MACOSX)==0)
666 else if (stricmp(arg,PCX_OS_WINAMP2)==0)
672 if (!clientID.isSame(networkID))
673 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
675 // if this is a priority connection and all incoming connections
676 // are full then kill an old connection to make room. Otherwise reject connection.
677 //if (!priorityConnect)
680 if (servMgr->pubInFull())
681 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
685 throw HTTPException(HTTP_SC_FORBIDDEN,403);
687 sock->writeLine(GNU_OK);
689 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
691 if (networkID.isSet())
694 networkID.toStr(idStr);
695 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
700 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
701 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
702 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
703 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
704 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
709 sock->writeString(PCX_HS_DL);
710 sock->writeLine(PCX_DL_URL);
713 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
720 sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
726 while (http.nextHeader());
729 // -----------------------------------
730 bool Servent::pingHost(Host &rhost,GnuID &rsid)
734 LOG_DEBUG("Ping host %s: trying..",ipstr);
735 ClientSocket *s=NULL;
739 s = sys->createSocket();
745 s->setReadTimeout(15000);
746 s->setWriteTimeout(15000);
752 atom.writeInt(PCP_CONNECT,1);
753 atom.writeParent(PCP_HELO,1);
754 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
760 ID4 id = atom.read(numc,numd);
763 for(int i=0; i<numc; i++)
766 ID4 pid = atom.read(c,d);
767 if (pid == PCP_SESSIONID)
768 atom.readBytes(sid.id,16,d);
774 LOG_DEBUG("Ping response: %s",id.getString().str());
775 throw StreamException("Bad ping response");
778 if (!sid.isSame(rsid))
779 throw StreamException("SIDs don`t match");
782 LOG_DEBUG("Ping host %s: OK",ipstr);
783 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
787 }catch(StreamException &e)
789 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
805 // -----------------------------------
806 bool Servent::handshakeStream(ChanInfo &chanInfo)
813 unsigned int reqPos=0;
814 unsigned short listenPort = 0;
818 while (http.nextHeader())
820 char *arg = http.getArgStr();
824 if (http.isHeader(PCX_HS_PCP))
825 gotPCP = atoi(arg)!=0;
826 else if (http.isHeader(PCX_HS_POS))
828 else if (http.isHeader(PCX_HS_PORT))
829 listenPort = (unsigned short)atoi(arg);
830 else if (http.isHeader("icy-metadata"))
831 addMetadata = atoi(arg) > 0;
832 else if (http.isHeader(HTTP_HS_AGENT))
834 else if (http.isHeader("Pragma"))
836 char *ssc = stristr(arg,"stream-switch-count=");
837 char *so = stristr(arg,"stream-offset");
842 //nsSwitchNum = atoi(ssc+20);
846 LOG_DEBUG("Stream: %s",http.cmdLine);
850 if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
851 outputProtocol = ChanInfo::SP_PEERCAST;
853 if (outputProtocol == ChanInfo::SP_HTTP)
855 if ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
856 || (chanInfo.contentType == ChanInfo::T_WMA)
857 || (chanInfo.contentType == ChanInfo::T_WMV)
858 || (chanInfo.contentType == ChanInfo::T_ASX)
860 outputProtocol = ChanInfo::SP_MMS;
864 bool chanFound=false;
865 bool chanReady=false;
867 ChanHit *sourceHit = NULL;
869 Channel *ch = chanMgr->findChannelByID(chanInfo.id);
873 if (reqPos || !isIndexTxt(&chanInfo))
875 streamPos = ch->rawData.findOldestPos(reqPos);
876 //streamPos = ch->rawData.getLatestPos();
879 streamPos = ch->rawData.getLatestPos();
882 chanID = chanInfo.id;
883 serventHit.host.ip = getHost().ip;
884 serventHit.host.port = listenPort;
885 if (serventHit.host.globalIP())
886 serventHit.rhost[0] = serventHit.host;
888 serventHit.rhost[1] = serventHit.host;
889 serventHit.chanID = chanID;
892 chanReady = canStream(ch);
893 if (0 && !chanReady && ch->isPlaying())
895 if (ch->info.getUptime() > 60
896 && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
898 sourceHit = &ch->sourceHost; // send source host info
902 // connect "this" host later
903 chanMgr->addHit(serventHit);
907 getHost().toStr(tmp);
908 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
911 else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
913 chanReady = canStream(ch);
915 LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
918 if (!chanReady) type = T_INCOMING;
919 thread.active = chanReady;
920 setStatus(S_CONNECTED);
922 channel_id = ch->channel_id;
925 if (servMgr->isCheckPushStream())
927 if (chanReady == true)
931 if (!h.isLocalhost())
935 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL)
939 LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
944 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
946 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
948 ChanHitList *chl = &chanMgr->hitlists[i];
950 hits[numHits++] = chl;
954 for(int i=0; i<numHits; i++)
956 ChanHitList *chl = hits[i];
959 for (int j=0; j<ChanHitList::MAX_HITS; j++)
961 ChanHit *hit = &chl->hits[j];
962 if (hit->host.isValid() && (h.ip == hit->host.ip))
966 numRelay = hit->numRelays;
971 if ((isfw == true) && (numRelay == 0))
975 LOG_ERROR("Block firewalled Servent : %s",strip);
979 ChanHitList *chl = chanMgr->findHitList(chanInfo);
980 ChanHit *hit = chl->hit;
982 if (hit->host.isValid() && (h.ip == hit->host.ip))
984 if ((hit->firewalled) && (hit->numRelays == 0)){
987 LOG_ERROR("Block firewalled Servent : %s",strip);
1000 // LockBlock lockblock(chanMgr->hitlistlock);
1002 // lockblock.lockon();
1003 ChanHitList *chl = chanMgr->findHitList(chanInfo);
1011 bool result = false;
1014 chanInfo.id.toStr(idStr);
1017 servMgr->sessionID.toStr(sidStr);
1019 Host rhost = sock->host;
1024 AtomStream atom(*sock);
1030 sock->writeLine(HTTP_SC_NOTFOUND);
1031 sock->writeLine("");
1032 LOG_DEBUG("Sending channel not found");
1039 if (outputProtocol == ChanInfo::SP_PCP)
1043 MemoryStream mem(tbuf, sizeof(tbuf));
1044 mem.writeLine(HTTP_SC_UNAVAILABLE);
1045 mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1047 sock->write(tbuf, mem.getPosition());
1049 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1052 rhost.toStr(ripStr);
1054 LOG_DEBUG("Sending channel unavailable");
1059 AtomStream atom2(mem);
1061 int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1064 sourceHit->writeAtoms(atom2,chanInfo.id);
1066 sourceHit->host.toStr(tmp);
1067 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1070 chanMgr->hitlistlock.on();
1072 chl = chanMgr->findHitList(chanInfo);
1074 if (chl && !sourceHit)
1078 // search for up to 8 other hits
1086 // find best hit this network if local IP
1087 if (!rhost.globalIP())
1090 chs.matchHost = servMgr->serverHost;
1092 chs.excludeID = remoteID;
1093 if (chl->pickHits(chs)){
1095 LOG_DEBUG("find best hit this network if local IP");
1099 // find best hit on same network
1103 chs.matchHost = rhost;
1105 chs.excludeID = remoteID;
1106 if (chl->pickHits(chs)){
1108 LOG_DEBUG("find best hit on same network");
1113 // find best hit on other networks
1114 /* if (!best.host.ip)
1118 chs.excludeID = remoteID;
1119 if (chl->pickHits(chs)){
1121 LOG_DEBUG("find best hit on other networks");
1129 best.writeAtoms(atom2,chanInfo.id);
1135 // chanMgr->hitlistlock.on();
1136 int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1137 // chanMgr->hitlistlock.off();
1138 for (int i = 0; i < rhcnt; i++){
1139 chs.best[i].writeAtoms(atom2, chanInfo.id);
1140 chs.best[i].host.toStr(tmp);
1141 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1142 best.host.ip = chs.best[i].host.ip;
1149 LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1152 else if (rhost.port)
1154 // find firewalled host
1157 chs.useFirewalled = true;
1158 chs.excludeID = remoteID;
1159 if (chl->pickHits(chs))
1162 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1163 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1167 // if all else fails, use tracker
1170 // find best tracker on this network if local IP
1171 if (!rhost.globalIP())
1174 chs.matchHost = servMgr->serverHost;
1175 chs.trackersOnly = true;
1176 chs.excludeID = remoteID;
1177 if (chl->pickHits(chs))
1182 // find local tracker
1186 chs.matchHost = rhost;
1187 chs.trackersOnly = true;
1188 chs.excludeID = remoteID;
1189 if (chl->pickHits(chs))
1193 // find global tracker
1197 chs.trackersOnly = true;
1198 chs.excludeID = remoteID;
1199 if (chl->pickHits(chs))
1205 best.writeAtoms(atom2,chanInfo.id);
1206 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1207 }else if (rhost.port)
1209 // find firewalled tracker
1211 chs.useFirewalled = true;
1212 chs.trackersOnly = true;
1213 chs.excludeID = remoteID;
1215 if (chl->pickHits(chs))
1218 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1219 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1228 chanMgr->hitlistlock.off();
1230 // return not available yet code
1231 atom2.writeInt(PCP_QUIT,error);
1232 sock->write(tbuf, mem.getPosition());
1237 // wait disconnect from other host
1239 while(sock->read(c, sizeof(c))){
1242 }catch(StreamException &e){
1243 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1248 LOG_DEBUG("Sending channel unavailable");
1249 sock->writeLine(HTTP_SC_UNAVAILABLE);
1250 sock->writeLine("");
1256 if (chanInfo.contentType != ChanInfo::T_MP3)
1259 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check
1262 sock->writeLine(ICY_OK);
1264 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1265 sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1266 sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1267 sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1268 sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1269 sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1270 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1272 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1277 sock->writeLine(HTTP_SC_OK);
1279 if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1281 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1283 sock->writeLine("Accept-Ranges: none");
1285 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1286 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1287 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1288 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1289 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1290 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1294 if (outputProtocol == ChanInfo::SP_HTTP)
1296 switch (chanInfo.contentType)
1298 case ChanInfo::T_OGG:
1299 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1301 case ChanInfo::T_MP3:
1302 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1304 case ChanInfo::T_MOV:
1305 sock->writeLine("Connection: close");
1306 sock->writeLine("Content-Length: 10000000");
1307 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1309 case ChanInfo::T_MPG:
1310 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1312 case ChanInfo::T_NSV:
1313 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1315 case ChanInfo::T_ASX:
1316 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1318 case ChanInfo::T_WMA:
1319 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1321 case ChanInfo::T_WMV:
1322 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1325 } else if (outputProtocol == ChanInfo::SP_MMS)
1327 sock->writeLine("Server: Rex/9.0.0.2980");
1328 sock->writeLine("Cache-Control: no-cache");
1329 sock->writeLine("Pragma: no-cache");
1330 sock->writeLine("Pragma: client-id=3587303426");
1331 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1335 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1338 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1340 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1341 sock->writeLine("Connection: Keep-Alive");
1344 } else if (outputProtocol == ChanInfo::SP_PCP)
1346 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1347 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1349 }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1351 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1354 sock->writeLine("");
1359 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1360 atom.writeInt(PCP_OK,0);
1361 if (rhost.globalIP())
1362 serventHit.rhost[0] = rhost;
1364 serventHit.rhost[1] = rhost;
1365 serventHit.sessionID = remoteID;
1366 serventHit.numHops = 1;
1367 chanMgr->addHit(serventHit);
1377 // -----------------------------------
1378 void Servent::handshakeGiv(GnuID &id)
1384 sock->writeLineF("GIV /%s",idstr);
1386 sock->writeLine("GIV");
1388 sock->writeLine("");
1392 // -----------------------------------
1393 void Servent::processGnutella()
1397 //if (servMgr->isRoot && !servMgr->needConnections())
1398 if (servMgr->isRoot)
1406 gnuStream.init(sock);
1407 setStatus(S_CONNECTED);
1409 if (!servMgr->isRoot)
1411 chanMgr->broadcastRelays(this, 1, 1);
1414 if ((p=outPacketsNorm.curr()))
1415 gnuStream.sendPacket(*p);
1421 // if (type != T_LOOKUP)
1422 // chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1424 lastPacket = lastPing = sys->getTime();
1425 bool doneBigPing=false;
1427 const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy
1428 const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity
1430 unsigned int currBytes=0;
1431 unsigned int lastWait=0;
1433 unsigned int lastTotalIn=0,lastTotalOut=0;
1435 while (thread.active && sock->active())
1438 if (sock->readReady())
1440 lastPacket = sys->getTime();
1442 if (gnuStream.readPacket(pack))
1445 sock->host.toStr(ipstr);
1448 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1450 if (pack.func != GNU_FUNC_PONG)
1451 if (servMgr->seenPacket(pack))
1452 ret = GnuStream::R_DUPLICATE;
1454 seenIDs.add(pack.id);
1457 if (ret == GnuStream::R_PROCESS)
1460 ret = gnuStream.processPacket(pack,this,routeID);
1462 if (flowControl && (ret == GnuStream::R_BROADCAST))
1463 ret = GnuStream::R_DROP;
1469 case GnuStream::R_BROADCAST:
1470 if (servMgr->broadcast(pack,this))
1471 stats.add(Stats::NUMBROADCASTED);
1473 stats.add(Stats::NUMDROPPED);
1475 case GnuStream::R_ROUTE:
1476 if (servMgr->route(pack,routeID,NULL))
1477 stats.add(Stats::NUMROUTED);
1479 stats.add(Stats::NUMDROPPED);
1481 case GnuStream::R_ACCEPTED:
1482 stats.add(Stats::NUMACCEPTED);
1484 case GnuStream::R_DUPLICATE:
1485 stats.add(Stats::NUMDUP);
1487 case GnuStream::R_DEAD:
1488 stats.add(Stats::NUMDEAD);
1490 case GnuStream::R_DISCARD:
1491 stats.add(Stats::NUMDISCARDED);
1493 case GnuStream::R_BADVERSION:
1494 stats.add(Stats::NUMOLD);
1496 case GnuStream::R_DROP:
1497 stats.add(Stats::NUMDROPPED);
1502 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1507 LOG_ERROR("Bad packet");
1514 if ((p=outPacketsPri.curr())) // priority packet
1516 gnuStream.sendPacket(*p);
1518 outPacketsPri.next();
1519 } else if ((p=outPacketsNorm.curr())) // or.. normal packet
1521 gnuStream.sendPacket(*p);
1523 outPacketsNorm.next();
1526 int lpt = sys->getTime()-lastPacket;
1530 if ((sys->getTime()-lastPing) > 15)
1533 lastPing = sys->getTime();
1537 if (lpt > packetTimeoutSecs)
1540 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1543 lastPing = sys->getTime();
1548 if (lpt > abortTimeoutSecs)
1549 throw TimeoutException();
1552 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1553 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1555 unsigned int bytes = totIn+totOut;
1557 lastTotalIn = sock->totalBytesIn;
1558 lastTotalOut = sock->totalBytesOut;
1560 const int serventBandwidth = 1000;
1562 int delay = sys->idleSleepTime;
1563 if ((bytes) && (serventBandwidth >= 8))
1564 delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize
1566 if (delay < (int)sys->idleSleepTime)
1567 delay = sys->idleSleepTime;
1568 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1576 // -----------------------------------
1577 void Servent::processRoot()
1582 gnuStream.init(sock);
1583 setStatus(S_CONNECTED);
1587 unsigned int lastConnect = sys->getTime();
1589 while (thread.active && sock->active())
1591 if (gnuStream.readPacket(pack))
1594 sock->host.toStr(ipstr);
1596 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1599 if (pack.func == GNU_FUNC_PING) // if ping then pong back some hosts and close
1603 int cnt = servMgr->getNewestServents(hl,32,sock->host);
1606 int start = sys->rnd() % cnt;
1607 int max = cnt>8?8:cnt;
1609 for(int i=0; i<max; i++)
1613 pong.initPong(hl[start],false,pack);
1614 gnuStream.sendPacket(pong);
1617 hl[start].toStr(ipstr);
1619 //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1620 start = (start+1) % cnt;
1623 sock->host.toStr(str);
1624 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1627 LOG_NETWORK("No Pongs to send");
1630 }else if (pack.func == GNU_FUNC_PONG) // pong?
1632 MemoryStream pong(pack.data,pack.len);
1635 port = pong.readShort();
1636 ip = pong.readLong();
1641 if ((ip) && (port) && (h.globalIP()))
1644 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1645 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1648 } else if (pack.func == GNU_FUNC_HIT)
1650 MemoryStream data(pack.data,pack.len);
1652 gnuStream.readHit(data,hit,pack.hops,pack.id);
1655 //if (gnuStream.packetsIn > 5) // die if we get too many packets
1659 if((sys->getTime()-lastConnect > 60))
1664 }catch(StreamException &e)
1666 LOG_ERROR("Relay: %s",e.msg);
1672 // -----------------------------------
1673 int Servent::givProcMain(ThreadInfo *thread)
1676 Servent *sv = (Servent*)thread->data;
1679 sv->handshakeGiv(sv->givID);
1680 sv->handshakeIncoming();
1682 }catch(StreamException &e)
1684 LOG_ERROR("GIV: %s",e.msg);
1688 sys->endThread(thread);
1692 // -----------------------------------
1693 int Servent::givProc(ThreadInfo *thread)
1695 SEH_THREAD(givProcMain, Servent::givProc);
1698 // -----------------------------------
1699 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1702 bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1703 bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1705 bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1708 MemoryStream mem(tbuf, sizeof(tbuf));
1709 AtomStream atom2(mem);
1710 atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1711 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1712 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1713 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1715 atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1717 atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1719 atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1720 atom.io.write(tbuf, mem.getPosition());
1723 LOG_DEBUG("PCP outgoing waiting for OLEH..");
1726 ID4 id = atom.read(numc,numd);
1729 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1730 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1731 throw StreamException("Got unexpected PCP response");
1746 // read OLEH response
1747 for(int i=0; i<numc; i++)
1750 ID4 id = atom.read(c,dlen);
1752 if (id == PCP_HELO_AGENT)
1754 atom.readString(arg,sizeof(arg),dlen);
1757 }else if (id == PCP_HELO_REMOTEIP)
1759 thisHost.ip = atom.readInt();
1761 }else if (id == PCP_HELO_PORT)
1763 thisHost.port = atom.readShort();
1765 }else if (id == PCP_HELO_VERSION)
1767 version = atom.readInt();
1769 }else if (id == PCP_HELO_DISABLE)
1771 disable = atom.readInt();
1773 }else if (id == PCP_HELO_SESSIONID)
1775 atom.readBytes(rid.id,16);
1776 if (rid.isSame(servMgr->sessionID))
1777 throw StreamException("Servent loopback");
1781 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1788 // update server ip/firewall status
1791 if (thisHost.isValid())
1793 if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1796 thisHost.toStr(ipstr);
1797 LOG_DEBUG("Got new ip: %s",ipstr);
1798 servMgr->serverHost.ip = thisHost.ip;
1801 if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1803 if (thisHost.port && thisHost.globalIP())
1804 servMgr->setFirewall(ServMgr::FW_OFF);
1806 servMgr->setFirewall(ServMgr::FW_ON);
1812 LOG_ERROR("client disabled: %d",disable);
1813 servMgr->isDisabled = true;
1816 servMgr->isDisabled = false;
1824 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1825 throw StreamException("Remote host not identified");
1828 LOG_DEBUG("PCP Outgoing handshake complete.");
1832 // -----------------------------------
1833 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1836 ID4 id = atom.read(numc,numd);
1841 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1842 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1843 throw StreamException("Got unexpected PCP response");
1862 for(int i=0; i<numc; i++)
1866 ID4 id = atom.read(c,dlen);
1868 if (id == PCP_HELO_AGENT)
1870 atom.readString(arg,sizeof(arg),dlen);
1873 }else if (id == PCP_HELO_VERSION)
1875 version = atom.readInt();
1877 }else if (id == PCP_HELO_SESSIONID)
1879 atom.readBytes(rid.id,16);
1880 if (rid.isSame(servMgr->sessionID))
1881 throw StreamException("Servent loopback");
1883 }else if (id == PCP_HELO_BCID)
1885 atom.readBytes(bcID.id,16);
1887 }else if (id == PCP_HELO_OSTYPE)
1889 osType = atom.readInt();
1890 }else if (id == PCP_HELO_PORT)
1892 rhost.port = atom.readShort();
1893 }else if (id == PCP_HELO_PING)
1895 pingPort = atom.readShort();
1898 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1905 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1908 if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1909 rhost.ip = servMgr->serverHost.ip;
1914 rhost.toStr(ripStr);
1915 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1916 rhost.port = pingPort;
1917 if (!rhost.globalIP() || !pingHost(rhost,rid))
1921 if (servMgr->isRoot)
1925 if (bcID.getFlags() & 1) // private
1927 BCID *bcid = servMgr->findValidBCID(bcID);
1928 if (!bcid || (bcid && !bcid->valid))
1930 atom.writeParent(PCP_OLEH,1);
1931 atom.writeInt(PCP_HELO_DISABLE,1);
1932 throw StreamException("Client is banned");
1940 MemoryStream mem(tbuf, sizeof(tbuf));
1941 AtomStream atom2(mem);
1942 atom2.writeParent(PCP_OLEH,5);
1943 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1944 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1945 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1946 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1947 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1951 if (version < PCP_CLIENT_MINVERSION)
1953 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1954 atom.io.write(tbuf, mem.getPosition());
1955 throw StreamException("Agent is not valid");
1961 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1962 atom.io.write(tbuf, mem.getPosition());
1963 throw StreamException("Remote host not identified");
1968 if (servMgr->isRoot)
1970 servMgr->writeRootAtoms(atom2,false);
1973 atom.io.write(tbuf, mem.getPosition());
1975 LOG_DEBUG("PCP Incoming handshake complete.");
1979 // -----------------------------------
1980 void Servent::processIncomingPCP(bool suggestOthers)
1982 PCPStream::readVersion(*sock);
1985 AtomStream atom(*sock);
1986 Host rhost = sock->host;
1988 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1991 bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1992 || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1993 bool unavailable = servMgr->controlInFull();
1994 bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1999 if (unavailable || alreadyConnected || offair)
2003 if (alreadyConnected)
2004 error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
2005 else if (unavailable)
2006 error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
2008 error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
2010 error = PCP_ERROR_QUIT;
2020 for(int i=0; i<8; i++)
2024 // find best hit on this network
2025 if (!rhost.globalIP())
2028 chs.matchHost = servMgr->serverHost;
2030 chs.excludeID = remoteID;
2031 chs.trackersOnly = true;
2032 chs.useBusyControls = false;
2033 if (chanMgr->pickHits(chs))
2037 // find best hit on same network
2041 chs.matchHost = rhost;
2043 chs.excludeID = remoteID;
2044 chs.trackersOnly = true;
2045 chs.useBusyControls = false;
2046 if (chanMgr->pickHits(chs))
2050 // else find best hit on other networks
2055 chs.excludeID = remoteID;
2056 chs.trackersOnly = true;
2057 chs.useBusyControls = false;
2058 if (chanMgr->pickHits(chs))
2067 best.writeAtoms(atom,noID);
2072 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2074 else if (rhost.port)
2076 // send push request to best firewalled tracker on other network
2079 chs.excludeID = remoteID;
2080 chs.trackersOnly = true;
2081 chs.useFirewalled = true;
2082 chs.useBusyControls = false;
2083 if (chanMgr->pickHits(chs))
2088 int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2089 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2093 LOG_DEBUG("No available trackers");
2098 LOG_ERROR("Sending QUIT to incoming: %d",error);
2100 atom.writeInt(PCP_QUIT,error);
2106 setStatus(S_CONNECTED);
2108 atom.writeInt(PCP_OK,0);
2111 atom.writeParent(PCP_ROOT,1);
2112 atom.writeParent(PCP_ROOT_UPDATE,0);
2114 pcpStream = new PCPStream(remoteID);
2118 while (!error && thread.active && !sock->eof())
2120 error = pcpStream->readPacket(*sock,bcs);
2123 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2124 error = PCP_ERROR_OFFAIR;
2125 if (peercastInst->isQuitting)
2126 error = PCP_ERROR_SHUTDOWN;
2129 pcpStream->flush(*sock);
2131 error += PCP_ERROR_QUIT;
2132 atom.writeInt(PCP_QUIT,error);
2134 LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2138 // -----------------------------------
2139 int Servent::outgoingProcMain(ThreadInfo *thread)
2142 LOG_DEBUG("COUT started");
2144 Servent *sv = (Servent*)thread->data;
2148 sv->pcpStream = new PCPStream(noID);
2150 while (sv->thread.active)
2152 sv->setStatus(S_WAIT);
2154 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2164 if (servMgr->rootHost.isEmpty())
2169 sv->sock = sv->pushSock;
2170 sv->pushSock = NULL;
2171 bestHit.host = sv->sock->host;
2177 ChanHitList *chl = chanMgr->findHitListByID(noID);
2180 // find local tracker
2182 chs.matchHost = servMgr->serverHost;
2183 chs.waitDelay = MIN_TRACKER_RETRY;
2184 chs.excludeID = servMgr->sessionID;
2185 chs.trackersOnly = true;
2186 if (!chl->pickHits(chs))
2188 // else find global tracker
2190 chs.waitDelay = MIN_TRACKER_RETRY;
2191 chs.excludeID = servMgr->sessionID;
2192 chs.trackersOnly = true;
2198 bestHit = chs.best[0];
2203 unsigned int ctime = sys->getTime();
2205 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2207 bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2209 chanMgr->lastYPConnect = ctime;
2213 }while (!bestHit.host.ip && (sv->thread.active));
2216 if (!bestHit.host.ip) // give up
2218 LOG_ERROR("COUT giving up");
2223 bestHit.host.toStr(ipStr);
2229 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2233 sv->setStatus(S_CONNECTING);
2234 sv->sock = sys->createSocket();
2236 throw StreamException("Unable to create socket");
2237 sv->sock->open(bestHit.host);
2238 sv->sock->connect();
2242 sv->sock->setReadTimeout(30000);
2243 AtomStream atom(*sv->sock);
2245 sv->setStatus(S_HANDSHAKE);
2247 Host rhost = sv->sock->host;
2248 atom.writeInt(PCP_CONNECT,1);
2249 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2251 sv->setStatus(S_CONNECTED);
2253 LOG_DEBUG("COUT to %s: OK",ipStr);
2255 sv->pcpStream->init(sv->remoteID);
2258 bcs.servent_id = sv->servent_id;
2260 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2262 error = sv->pcpStream->readPacket(*sv->sock,bcs);
2266 if (!chanMgr->isBroadcasting())
2267 error = PCP_ERROR_OFFAIR;
2268 if (peercastInst->isQuitting)
2269 error = PCP_ERROR_SHUTDOWN;
2271 if (sv->pcpStream->nextRootPacket)
2272 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2273 error = PCP_ERROR_NOROOT;
2275 sv->setStatus(S_CLOSING);
2277 sv->pcpStream->flush(*sv->sock);
2279 error += PCP_ERROR_QUIT;
2280 atom.writeInt(PCP_QUIT,error);
2282 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2284 }catch(TimeoutException &e)
2286 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2287 sv->setStatus(S_TIMEOUT);
2288 }catch(StreamException &e)
2290 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2291 sv->setStatus(S_ERROR);
2303 }catch(StreamException &) {}
2305 // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2306 if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2307 chanMgr->deadHit(bestHit);
2315 sys->endThread(thread);
2316 LOG_DEBUG("COUT ended");
2319 // -----------------------------------
2320 int Servent::outgoingProc(ThreadInfo *thread)
2322 SEH_THREAD(outgoingProcMain, Servent::outgoingProc);
2324 // -----------------------------------
2325 int Servent::incomingProcMain(ThreadInfo *thread)
2329 Servent *sv = (Servent*)thread->data;
2332 sv->sock->host.toStr(ipStr);
2336 sv->handshakeIncoming();
2337 }catch(HTTPException &e)
2341 sv->sock->writeLine(e.msg);
2343 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2344 sv->sock->writeLine("");
2345 }catch(StreamException &){}
2346 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2347 }catch(StreamException &e)
2349 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2354 sys->endThread(thread);
2357 // -----------------------------------
2358 int Servent::incomingProc(ThreadInfo *thread)
2360 SEH_THREAD(incomingProcMain, Servent::incomingProc);
2362 // -----------------------------------
2363 void Servent::processServent()
2365 setStatus(S_HANDSHAKE);
2370 throw StreamException("Servent has no socket");
2375 // -----------------------------------
2376 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2380 setStatus(S_HANDSHAKE);
2382 if (!handshakeStream(chanInfo))
2386 if (chanInfo.id.isSet())
2389 chanID = chanInfo.id;
2391 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2393 if (!waitForChannelHeader(chanInfo))
2394 throw StreamException("Channel not ready");
2396 servMgr->totalStreams++;
2398 Host host = sock->host;
2399 host.port = 0; // force to 0 so we ignore the incoming port
2401 Channel *ch = chanMgr->findChannelByID(chanID);
2403 throw StreamException("Channel not found");
2405 if (outputProtocol == ChanInfo::SP_HTTP)
2407 if ((addMetadata) && (chanMgr->icyMetaInterval))
2408 sendRawMetaChannel(chanMgr->icyMetaInterval);
2410 sendRawChannel(true,true);
2412 }else if (outputProtocol == ChanInfo::SP_MMS)
2416 sendRawChannel(true,true);
2419 sendRawChannel(true,false);
2422 }else if (outputProtocol == ChanInfo::SP_PCP)
2426 } else if (outputProtocol == ChanInfo::SP_PEERCAST)
2428 sendPeercastChannel();
2432 setStatus(S_CLOSING);
2435 // -----------------------------------------
2439 file.openReadOnly("c://test.mp3");
2441 LOG_DEBUG("raw file read");
2446 LOG_DEBUG("send %d",cnt++);
2447 file.read(buf,sizeof(buf));
2448 sock->write(buf,sizeof(buf));
2452 LOG_DEBUG("raw file sent");
2457 // -----------------------------------
2458 bool Servent::waitForChannelHeader(ChanInfo &info)
2460 for(int i=0; i<30*10; i++)
2462 Channel *ch = chanMgr->findChannelByID(info.id);
2466 if (ch->isPlaying() && (ch->rawData.writePos>0))
2469 if (!thread.active || !sock->active())
2475 // -----------------------------------
2476 void Servent::sendRawChannel(bool sendHead, bool sendData)
2481 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2483 Channel *ch = chanMgr->findChannelByID(chanID);
2485 throw StreamException("Channel not found");
2487 setStatus(S_CONNECTED);
2489 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2493 ch->headPack.writeRaw(*sock);
2494 streamPos = ch->headPack.pos + ch->headPack.len;
2495 LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2501 unsigned int streamIndex = ch->streamIndex;
2502 unsigned int connectTime = sys->getTime();
2503 unsigned int lastWriteTime = connectTime;
2505 while ((thread.active) && sock->active())
2507 ch = chanMgr->findChannelByID(chanID);
2512 if (streamIndex != ch->streamIndex)
2514 streamIndex = ch->streamIndex;
2515 streamPos = ch->headPack.pos;
2516 LOG_DEBUG("sendRaw got new stream index");
2520 if (ch->rawData.findPacket(streamPos,rawPack))
2522 if (syncPos != rawPack.sync)
2523 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2524 syncPos = rawPack.sync+1;
2526 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2528 rawPack.writeRaw(*sock);
2529 lastWriteTime = sys->getTime();
2532 if (rawPack.pos < streamPos)
2533 LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2534 streamPos = rawPack.pos+rawPack.len;
2535 } else if (sock->readReady()) {
2537 int error = sock->readUpto(&c, 1);
2538 if (error == 0) sock->close();
2542 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2543 throw TimeoutException();
2548 }catch(StreamException &e)
2550 LOG_ERROR("Stream channel: %s",e.msg);
2555 // -----------------------------------
2556 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2560 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2561 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2562 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2564 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2566 Channel *ch = &chanMgr->channels[i];
2567 if (ch->isPlaying())
2568 chanIDs[numChanIDs++]=ch->info.id;
2573 setStatus(S_CONNECTED);
2578 for(int i=0; i<numChanIDs; i++)
2580 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2583 LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2584 ch->headPack.writeRaw(*sock);
2585 chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2586 chanStreamIndex[i] = ch->streamIndex;
2587 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2596 unsigned int connectTime=sys->getTime();
2598 while ((thread.active) && sock->active())
2601 for(int i=1; i<numChanIDs; i++)
2603 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2606 if (chanStreamIndex[i] != ch->streamIndex)
2608 chanStreamIndex[i] = ch->streamIndex;
2609 chanStreamPos[i] = ch->headPack.pos;
2610 LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2614 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2616 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2617 rawPack.writeRaw(*sock);
2620 if (rawPack.pos < chanStreamPos[i])
2621 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2622 chanStreamPos[i] = rawPack.pos+rawPack.len;
2625 //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2635 }catch(StreamException &e)
2637 LOG_ERROR("Stream channel: %s",e.msg);
2642 // -----------------------------------
2643 void Servent::sendRawMetaChannel(int interval)
2648 Channel *ch = chanMgr->findChannelByID(chanID);
2650 throw StreamException("Channel not found");
2652 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2654 setStatus(S_CONNECTED);
2656 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2659 String lastTitle,lastURL;
2661 int lastMsgTime=sys->getTime();
2667 if ((interval > sizeof(buf)) || (interval < 1))
2668 throw StreamException("Bad ICY Meta Interval value");
2670 unsigned int connectTime = sys->getTime();
2671 unsigned int lastWriteTime = connectTime;
2673 streamPos = 0; // raw meta channel has no header (its MP3)
2675 while ((thread.active) && sock->active())
2677 ch = chanMgr->findChannelByID(chanID);
2683 if (ch->rawData.findPacket(streamPos,rawPack))
2686 if (syncPos != rawPack.sync)
2687 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2688 syncPos = rawPack.sync+1;
2690 MemoryStream mem(rawPack.data,rawPack.len);
2692 if (rawPack.type == ChanPacket::T_DATA)
2695 int len = rawPack.len;
2696 char *p = rawPack.data;
2700 if ((bufPos+rl) > interval)
2701 rl = interval-bufPos;
2702 memcpy(&buf[bufPos],p,rl);
2707 if (bufPos >= interval)
2710 sock->write(buf,interval);
2711 lastWriteTime = sys->getTime();
2713 if (chanMgr->broadcastMsgInterval)
2714 if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2717 lastMsgTime = sys->getTime();
2720 String *metaTitle = &ch->info.track.title;
2721 if (!ch->info.comment.isEmpty() && (showMsg))
2722 metaTitle = &ch->info.comment;
2725 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2734 title.convertTo(String::T_META);
2735 url.convertTo(String::T_META);
2737 sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2738 int len = ((strlen(tmp) + 15+1) / 16);
2739 sock->writeChar(len);
2740 sock->write(tmp,len*16);
2742 lastTitle = *metaTitle;
2743 lastURL = ch->info.url;
2745 LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2755 streamPos = rawPack.pos + rawPack.len;
2758 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2759 throw TimeoutException();
2764 }catch(StreamException &e)
2766 LOG_ERROR("Stream channel: %s",e.msg);
2769 // -----------------------------------
2770 void Servent::sendPeercastChannel()
2774 setStatus(S_CONNECTED);
2776 Channel *ch = chanMgr->findChannelByID(chanID);
2778 throw StreamException("Channel not found");
2780 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2782 sock->writeTag("PCST");
2786 ch->headPack.writePeercast(*sock);
2788 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2789 pack.writePeercast(*sock);
2792 unsigned int syncPos=0;
2793 while ((thread.active) && sock->active())
2795 ch = chanMgr->findChannelByID(chanID);
2800 if (ch->rawData.findPacket(streamPos,rawPack))
2802 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2804 sock->writeTag("SYNC");
2805 sock->writeShort(4);
2806 sock->writeShort(0);
2807 sock->write(&syncPos,4);
2810 rawPack.writePeercast(*sock);
2812 streamPos = rawPack.pos + rawPack.len;
2818 }catch(StreamException &e)
2820 LOG_ERROR("Stream channel: %s",e.msg);
2824 //WLock canStreamLock;
2826 // -----------------------------------
2827 void Servent::sendPCPChannel()
2829 bool skipCheck = false;
2830 unsigned int ptime = 0;
2831 int npacket = 0, upsize = 0;
2833 Channel *ch = chanMgr->findChannelByID(chanID);
2835 throw StreamException("Channel not found");
2837 AtomStream atom(*sock);
2839 pcpStream = new PCPStream(remoteID);
2845 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2848 // setStatus(S_CONNECTED);
2850 //canStreamLock.on();
2851 //thread.active = canStream(ch);
2852 //setStatus(S_CONNECTED);
2853 //canStreamLock.off();
2860 atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2861 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2862 ch->info.writeInfoAtoms(atom);
2863 ch->info.writeTrackAtoms(atom);
2866 atom.writeParent(PCP_CHAN_PKT,3);
2867 atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2868 atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2869 atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2872 streamPos = ch->headPack.pos+ch->headPack.len;
2873 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2877 unsigned int streamIndex = ch->streamIndex;
2880 char pbuf[ChanPacket::MAX_DATALEN*3];
2881 MemoryStream mems(pbuf,sizeof(pbuf));
2882 AtomStream atom2(mems);
2884 while (thread.active)
2887 Channel *ch = chanMgr->findChannelByID(chanID);
2892 if (streamIndex != ch->streamIndex)
2894 streamIndex = ch->streamIndex;
2895 streamPos = ch->headPack.pos;
2896 LOG_DEBUG("sendPCPStream got new stream index");
2901 if (ch->rawData.findPacket(streamPos,rawPack))
2903 if ((streamPos < rawPack.pos) && !rawPack.skip){
2906 getHost().IPtoStr(tmp);
2907 LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2909 if (sys->getTime() == lastSkipTime) {
2910 LOG_DEBUG("##### skip all buffer");
2911 streamPos = ch->rawData.getLatestPos();
2915 lastSkipTime = sys->getTime();
2922 if (rawPack.type == ChanPacket::T_HEAD)
2924 atom2.writeParent(PCP_CHAN,2);
2925 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2926 atom2.writeParent(PCP_CHAN_PKT,3);
2927 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2928 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2929 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2931 sock->write(pbuf, mems.getPosition());
2932 }else if (rawPack.type == ChanPacket::T_DATA)
2934 atom2.writeParent(PCP_CHAN,2);
2935 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2936 atom2.writeParent(PCP_CHAN_PKT,3);
2937 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2938 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2939 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2942 sock->bufferingWrite(pbuf, mems.getPosition());
2943 lastSkipTime = sock->bufList.lastSkipTime;
2944 lastSkipCount = sock->bufList.skipCount;
2946 sock->write(pbuf, mems.getPosition());
2950 if (rawPack.pos < streamPos)
2951 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2953 //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2955 streamPos = rawPack.pos+rawPack.len;
2958 throw StreamException("Channel not found");
2962 sock->bufferingWrite(NULL, 0);
2963 lastSkipTime = sock->bufList.lastSkipTime;
2964 lastSkipCount = sock->bufList.skipCount;
2967 bcs.servent_id = servent_id;
2968 // error = pcpStream->readPacket(*sock,bcs);
2970 unsigned int t = sys->getTime();
2973 npacket = MAX_PROC_PACKETS;
2974 upsize = MAX_OUTWARD_SIZE;
2977 int len = pcpStream->flushUb(*sock, upsize);
2980 while (npacket > 0 && sock->readReady()) {
2982 error = pcpStream->readPacket(*sock,bcs);
2984 throw StreamException("PCP exception");
2991 LOG_DEBUG("PCP channel stream closed normally.");
2993 }catch(StreamException &e)
2995 LOG_ERROR("Stream channel: %s",e.msg);
3000 pcpStream->flush(*sock);
3001 atom.writeInt(PCP_QUIT,error);
3002 }catch(StreamException &) {}
3006 // -----------------------------------
3007 int Servent::serverProcMain(ThreadInfo *thread)
3012 Servent *sv = (Servent*)thread->data;
3017 throw StreamException("Server has no socket");
3019 sv->setStatus(S_LISTENING);
3023 sv->sock->host.toStr(servIP);
3025 if (servMgr->isRoot)
3026 LOG_DEBUG("Root Server started: %s",servIP);
3028 LOG_DEBUG("Server started: %s",servIP);
3031 while ((thread->active) && (sv->sock->active()))
3033 if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3035 ClientSocket *cs = sv->sock->accept();
3037 //
\95s
\90³
\82È
\83\
\81[
\83X
\83A
\83h
\83\8c\83X(IPv4
\83}
\83\8b\83`
\83L
\83\83\83X
\83g)
\82ð
\8f\9c\8aO
3038 if (cs && (((cs->host.ip >> 24) & 0xF0) == 0xE0))
3043 LOG_ERROR("reject incoming multicast address: %s", ip);
3044 peercastApp->notifyMessage(ServMgr::NT_PEERCAST, "reject multicast address");
3048 LOG_DEBUG("accepted incoming");
3049 Servent *ns = servMgr->allocServent();
3052 servMgr->lastIncoming = sys->getTime();
3053 ns->servPort = sv->sock->host.port;
3054 ns->networkID = servMgr->networkID;
3055 ns->initIncoming(cs,sv->allow);
3057 LOG_ERROR("Out of servents");
3062 }catch(StreamException &e)
3064 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3068 LOG_DEBUG("Server stopped");
3071 sys->endThread(thread);
3075 // -----------------------------------
3076 int Servent::serverProc(ThreadInfo *thread)
3078 SEH_THREAD(serverProcMain, Servent::serverProc);
3081 // -----------------------------------
3082 bool Servent::writeVariable(Stream &s, const String &var)
3087 strcpy(buf,getTypeStr());
3088 else if (var == "status")
3089 strcpy(buf,getStatusStr());
3090 else if (var == "address")
3092 if (servMgr->enableGetName) //JP-EX s
3094 getHost().toStr(buf);
3099 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3101 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3103 ChanHitList *chl = &chanMgr->hitlists[i];
3105 hits[numHits++] = chl;
3108 ishit = isfw = false;
3112 for(int k=0; k<numHits; k++)
3114 ChanHitList *chl = hits[k];
3117 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3119 ChanHit *hit = &chl->hits[j];
3120 if (hit->host.isValid() && (h.ip == hit->host.ip))
3123 if (hit->firewalled)
3125 numRelay += hit->numRelays;
3137 strcat(buf,"<font color=red>");
3139 strcat(buf,"<font color=orange>");
3142 strcat(buf,"<font color=green>");
3146 if (ClientSocket::getHostname(h_name,h.ip))
3154 strcat(buf,"</font>");
3160 bool isRelay = true;
3162 ChanHitList *chl = chanMgr->findHitListByID(chanID);
3164 ChanHit *hit = chl->hit;
3166 if (hit->host.isValid() && (h.ip == hit->host.ip)){
3167 isfw = hit->firewalled;
3168 isRelay = hit->relay;
3169 numRelay = hit->numRelays;
3178 strcat(buf,"<font color=red>");
3180 strcat(buf,"<font color=orange>");
3185 strcpy(buf,"<font color=purple>");
3187 strcpy(buf,"<font color=blue>");
3190 strcpy(buf,"<font color=green>");
3195 if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF
\91Î
\8dô)
3201 strcat(buf,"</font>");
3204 getHost().toStr(buf);
3206 else if (var == "agent")
3207 strcpy(buf,agent.cstr());
3208 else if (var == "bitrate")
3212 unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3213 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3216 }else if (var == "uptime")
3220 uptime.setFromStopwatch(sys->getTime()-lastConnect);
3223 strcpy(buf,uptime.cstr());
3224 }else if (var.startsWith("gnet."))
3227 float ctime = (float)(sys->getTime()-lastConnect);
3228 if (var == "gnet.packetsIn")
3229 sprintf(buf,"%d",gnuStream.packetsIn);
3230 else if (var == "gnet.packetsInPerSec")
3231 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3232 else if (var == "gnet.packetsOut")
3233 sprintf(buf,"%d",gnuStream.packetsOut);
3234 else if (var == "gnet.packetsOutPerSec")
3235 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3236 else if (var == "gnet.normQueue")
3237 sprintf(buf,"%d",outPacketsNorm.numPending());
3238 else if (var == "gnet.priQueue")
3239 sprintf(buf,"%d",outPacketsPri.numPending());
3240 else if (var == "gnet.flowControl")
3241 sprintf(buf,"%d",flowControl?1:0);
3242 else if (var == "gnet.routeTime")
3244 int nr = seenIDs.numUsed();
3245 unsigned int tim = sys->getTime()-seenIDs.getOldest();
3248 tstr.setFromStopwatch(tim);
3251 strcpy(buf,tstr.cstr());