1 // ------------------------------------------------
6 // Servents are the actual connections between clients. They do the handshaking,
7 // transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 // to it on connect, it uses this to transfer all of its data.
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
43 #include "win32/seh.h"
46 const int DIRECT_WRITE_TIMEOUT = 60;
48 // -----------------------------------
49 char *Servent::statusMsgs[]=
66 // -----------------------------------
67 char *Servent::typeMsgs[]=
78 // -----------------------------------
79 bool Servent::isPrivate()
82 return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
84 // -----------------------------------
85 bool Servent::isAllowed(int a)
89 if (servMgr->isFiltered(ServFilter::F_BAN,h))
95 // -----------------------------------
96 bool Servent::isFiltered(int f)
99 return servMgr->isFiltered(f,h);
102 int servent_count = 1;
103 // -----------------------------------
104 Servent::Servent(int index)
105 :outPacketsPri(MAX_OUTPACKETS)
106 ,outPacketsNorm(MAX_OUTPACKETS)
113 servent_id = servent_count++;
119 // -----------------------------------
124 // -----------------------------------
127 thread.active = false;
129 setStatus(S_CLOSING);
133 PCPStream *pcp = pcpStream;
139 chanMgr->hitlistlock.on();
140 ChanHitList *chl = chanMgr->findHitListByID(chanID);
142 ChanHit *chh = chl->hit;
143 ChanHit *prev = NULL;
145 if (chh->servent_id == this->servent_id){
146 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
148 chh->numListeners = 0;
153 ChanHit *next = chh->next;
168 chanMgr->hitlistlock.off();
186 if (type != T_SERVER)
193 // -----------------------------------
194 void Servent::abort()
196 thread.active = false;
204 // -----------------------------------
205 void Servent::reset()
219 outputProtocol = ChanInfo::SP_UNKNOWN;
228 lastConnect = lastPing = lastPacket = 0;
229 loginPassword.clear();
232 priorityConnect = false;
236 outPacketsNorm.reset();
237 outPacketsPri.reset();
248 // -----------------------------------
249 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
254 && (!cid.isSet() || chanID.isSame(cid))
255 && (!sid.isSet() || !sid.isSame(remoteID))
256 && (pcpStream != NULL)
259 return pcpStream->sendPacket(pack,did);
265 // -----------------------------------
266 bool Servent::acceptGIV(ClientSocket *givSock)
276 // -----------------------------------
277 Host Servent::getHost()
287 // -----------------------------------
288 bool Servent::outputPacket(GnuPacket &p, bool pri)
294 r = outPacketsPri.write(p);
297 if (servMgr->useFlowControl)
299 int per = outPacketsNorm.percentFull();
310 // if in flowcontrol, only allow packets with less of a hop count than already in queue
311 if (p.hops >= outPacketsNorm.findMinHop())
316 r = outPacketsNorm.write(p);
323 // -----------------------------------
324 bool Servent::initServer(Host &h)
338 thread.func = serverProc;
342 if (!sys->startThread(&thread))
343 throw StreamException("Can`t start thread");
345 }catch(StreamException &e)
347 LOG_ERROR("Bad server: %s",e.msg);
354 // -----------------------------------
355 void Servent::checkFree()
358 throw StreamException("Socket already set");
360 throw StreamException("Thread already active");
362 // -----------------------------------
363 void Servent::initIncoming(ClientSocket *s, unsigned int a)
374 thread.func = incomingProc;
375 thread.finish = false;
377 setStatus(S_PROTOCOL);
380 sock->host.toStr(ipStr);
381 LOG_DEBUG("Incoming from %s",ipStr);
383 if (!sys->startThread(&thread))
384 throw StreamException("Can`t start thread");
385 }catch(StreamException &e)
387 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
388 //servMgr->shutdownTimer = 1;
391 LOG_ERROR("INCOMING FAILED: %s",e.msg);
396 // -----------------------------------
397 void Servent::initOutgoing(TYPE ty)
407 thread.func = outgoingProc;
409 if (!sys->startThread(&thread))
410 throw StreamException("Can`t start thread");
412 }catch(StreamException &e)
414 LOG_ERROR("Unable to start outgoing: %s",e.msg);
419 // -----------------------------------
420 void Servent::initPCP(Host &rh)
434 if (!isAllowed(ALLOW_NETWORK))
435 throw StreamException("Servent not allowed");
438 thread.func = outgoingProc;
440 LOG_DEBUG("Outgoing to %s",ipStr);
442 if (!sys->startThread(&thread))
443 throw StreamException("Can`t start thread");
445 }catch(StreamException &e)
447 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
453 // -----------------------------------
454 void Servent::initChannelFetch(Host &host)
468 if (!isAllowed(ALLOW_DATA))
469 throw StreamException("Servent not allowed");
475 // -----------------------------------
476 void Servent::initGIV(Host &h, GnuID &id)
490 if (!isAllowed(ALLOW_NETWORK))
491 throw StreamException("Servent not allowed");
496 thread.func = givProc;
500 if (!sys->startThread(&thread))
501 throw StreamException("Can`t start thread");
503 }catch(StreamException &e)
505 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
509 // -----------------------------------
510 void Servent::createSocket()
513 LOG_ERROR("Servent::createSocket attempt made while active");
515 sock = sys->createSocket();
517 // -----------------------------------
518 void Servent::setStatus(STATUS s)
524 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
525 lastConnect = sys->getTime();
530 // -----------------------------------
531 void Servent::handshakeOut()
533 sock->writeLine(GNU_PEERCONN);
537 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
538 sock->writeLineF("%s %d",PCX_HS_PCP,1);
541 sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
543 if (networkID.isSet())
545 networkID.toStr(str);
546 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
549 servMgr->sessionID.toStr(str);
550 sock->writeLineF("%s %s",PCX_HS_ID,str);
553 sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
559 int r = http.readResponse();
563 LOG_ERROR("Expected 200, got %d",r);
564 throw StreamException("Unexpected HTTP response");
568 bool versionValid = false;
573 while (http.nextHeader())
575 LOG_DEBUG(http.cmdLine);
577 char *arg = http.getArgStr();
581 if (http.isHeader(HTTP_HS_AGENT))
585 if (strnicmp(arg,"PeerCast/",9)==0)
586 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
587 }else if (http.isHeader(PCX_HS_NETWORKID))
588 clientID.fromStr(arg);
591 if (!clientID.isSame(networkID))
592 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
595 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
598 sock->writeLine(GNU_OK);
604 // -----------------------------------
605 void Servent::processOutChannel()
610 // -----------------------------------
611 void Servent::handshakeIn()
619 bool versionValid = false;
620 bool diffRootVer = false;
625 while (http.nextHeader())
627 LOG_DEBUG("%s",http.cmdLine);
629 char *arg = http.getArgStr();
633 if (http.isHeader(HTTP_HS_AGENT))
637 if (strnicmp(arg,"PeerCast/",9)==0)
639 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
640 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
642 }else if (http.isHeader(PCX_HS_NETWORKID))
644 clientID.fromStr(arg);
646 }else if (http.isHeader(PCX_HS_PRIORITY))
648 priorityConnect = atoi(arg)!=0;
650 }else if (http.isHeader(PCX_HS_ID))
654 if (id.isSame(servMgr->sessionID))
655 throw StreamException("Servent loopback");
657 }else if (http.isHeader(PCX_HS_OS))
659 if (stricmp(arg,PCX_OS_LINUX)==0)
661 else if (stricmp(arg,PCX_OS_WIN32)==0)
663 else if (stricmp(arg,PCX_OS_MACOSX)==0)
665 else if (stricmp(arg,PCX_OS_WINAMP2)==0)
671 if (!clientID.isSame(networkID))
672 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
674 // if this is a priority connection and all incoming connections
675 // are full then kill an old connection to make room. Otherwise reject connection.
676 //if (!priorityConnect)
679 if (servMgr->pubInFull())
680 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
684 throw HTTPException(HTTP_SC_FORBIDDEN,403);
686 sock->writeLine(GNU_OK);
688 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
690 if (networkID.isSet())
693 networkID.toStr(idStr);
694 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
699 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
700 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
701 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
702 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
703 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
708 sock->writeString(PCX_HS_DL);
709 sock->writeLine(PCX_DL_URL);
712 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
719 sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
725 while (http.nextHeader());
728 // -----------------------------------
729 bool Servent::pingHost(Host &rhost,GnuID &rsid)
733 LOG_DEBUG("Ping host %s: trying..",ipstr);
734 ClientSocket *s=NULL;
738 s = sys->createSocket();
744 s->setReadTimeout(15000);
745 s->setWriteTimeout(15000);
751 atom.writeInt(PCP_CONNECT,1);
752 atom.writeParent(PCP_HELO,1);
753 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
759 ID4 id = atom.read(numc,numd);
762 for(int i=0; i<numc; i++)
765 ID4 pid = atom.read(c,d);
766 if (pid == PCP_SESSIONID)
767 atom.readBytes(sid.id,16,d);
773 LOG_DEBUG("Ping response: %s",id.getString().str());
774 throw StreamException("Bad ping response");
777 if (!sid.isSame(rsid))
778 throw StreamException("SIDs don`t match");
781 LOG_DEBUG("Ping host %s: OK",ipstr);
782 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
786 }catch(StreamException &e)
788 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
804 // -----------------------------------
805 bool Servent::handshakeStream(ChanInfo &chanInfo)
812 unsigned int reqPos=0;
813 unsigned short listenPort = 0;
817 while (http.nextHeader())
819 char *arg = http.getArgStr();
823 if (http.isHeader(PCX_HS_PCP))
824 gotPCP = atoi(arg)!=0;
825 else if (http.isHeader(PCX_HS_POS))
827 else if (http.isHeader(PCX_HS_PORT))
828 listenPort = (unsigned short)atoi(arg);
829 else if (http.isHeader("icy-metadata"))
830 addMetadata = atoi(arg) > 0;
831 else if (http.isHeader(HTTP_HS_AGENT))
833 else if (http.isHeader("Pragma"))
835 char *ssc = stristr(arg,"stream-switch-count=");
836 char *so = stristr(arg,"stream-offset");
841 //nsSwitchNum = atoi(ssc+20);
845 LOG_DEBUG("Stream: %s",http.cmdLine);
849 if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
850 outputProtocol = ChanInfo::SP_PEERCAST;
852 if (outputProtocol == ChanInfo::SP_HTTP)
854 if ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
855 || (chanInfo.contentType == ChanInfo::T_WMA)
856 || (chanInfo.contentType == ChanInfo::T_WMV)
857 || (chanInfo.contentType == ChanInfo::T_ASX)
859 outputProtocol = ChanInfo::SP_MMS;
863 bool chanFound=false;
864 bool chanReady=false;
866 ChanHit *sourceHit = NULL;
868 Channel *ch = chanMgr->findChannelByID(chanInfo.id);
872 if (reqPos || !isIndexTxt(&chanInfo))
874 streamPos = ch->rawData.findOldestPos(reqPos);
875 //streamPos = ch->rawData.getLatestPos();
878 streamPos = ch->rawData.getLatestPos();
881 chanID = chanInfo.id;
882 serventHit.host.ip = getHost().ip;
883 serventHit.host.port = listenPort;
884 if (serventHit.host.globalIP())
885 serventHit.rhost[0] = serventHit.host;
887 serventHit.rhost[1] = serventHit.host;
888 serventHit.chanID = chanID;
891 chanReady = canStream(ch);
892 if (0 && !chanReady && ch->isPlaying())
894 if (ch->info.getUptime() > 60
895 && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
897 sourceHit = &ch->sourceHost; // send source host info
901 // connect "this" host later
902 chanMgr->addHit(serventHit);
906 getHost().toStr(tmp);
907 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
910 else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
912 chanReady = canStream(ch);
914 LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
917 if (!chanReady) type = T_INCOMING;
918 thread.active = chanReady;
919 setStatus(S_CONNECTED);
921 channel_id = ch->channel_id;
924 if (servMgr->isCheckPushStream())
926 if (chanReady == true)
930 if (!h.isLocalhost())
934 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL)
938 LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
943 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
945 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
947 ChanHitList *chl = &chanMgr->hitlists[i];
949 hits[numHits++] = chl;
953 for(int i=0; i<numHits; i++)
955 ChanHitList *chl = hits[i];
958 for (int j=0; j<ChanHitList::MAX_HITS; j++)
960 ChanHit *hit = &chl->hits[j];
961 if (hit->host.isValid() && (h.ip == hit->host.ip))
965 numRelay = hit->numRelays;
970 if ((isfw == true) && (numRelay == 0))
974 LOG_ERROR("Block firewalled Servent : %s",strip);
978 ChanHitList *chl = chanMgr->findHitList(chanInfo);
979 ChanHit *hit = (chl ? chl->hit : NULL);
981 if (hit->host.isValid() && (h.ip == hit->host.ip))
983 if ((hit->firewalled) && (hit->numRelays == 0)){
986 LOG_ERROR("Block firewalled Servent : %s",strip);
999 // LockBlock lockblock(chanMgr->hitlistlock);
1001 // lockblock.lockon();
1002 ChanHitList *chl = chanMgr->findHitList(chanInfo);
1010 bool result = false;
1013 chanInfo.id.toStr(idStr);
1016 servMgr->sessionID.toStr(sidStr);
1018 Host rhost = sock->host;
1023 AtomStream atom(*sock);
1029 sock->writeLine(HTTP_SC_NOTFOUND);
1030 sock->writeLine("");
1031 LOG_DEBUG("Sending channel not found");
1038 if (outputProtocol == ChanInfo::SP_PCP)
1042 MemoryStream mem(tbuf, sizeof(tbuf));
1043 mem.writeLine(HTTP_SC_UNAVAILABLE);
1044 mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1046 sock->write(tbuf, mem.getPosition());
1048 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1051 rhost.toStr(ripStr);
1053 LOG_DEBUG("Sending channel unavailable");
1058 AtomStream atom2(mem);
1060 int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1063 sourceHit->writeAtoms(atom2,chanInfo.id);
1065 sourceHit->host.toStr(tmp);
1066 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1069 chanMgr->hitlistlock.on();
1071 chl = chanMgr->findHitList(chanInfo);
1073 if (chl && !sourceHit)
1077 // search for up to 8 other hits
1085 // find best hit this network if local IP
1086 if (!rhost.globalIP())
1089 chs.matchHost = servMgr->serverHost;
1091 chs.excludeID = remoteID;
1092 if (chl->pickHits(chs)){
1094 LOG_DEBUG("find best hit this network if local IP");
1098 // find best hit on same network
1102 chs.matchHost = rhost;
1104 chs.excludeID = remoteID;
1105 if (chl->pickHits(chs)){
1107 LOG_DEBUG("find best hit on same network");
1112 // find best hit on other networks
1113 /* if (!best.host.ip)
1117 chs.excludeID = remoteID;
1118 if (chl->pickHits(chs)){
1120 LOG_DEBUG("find best hit on other networks");
1128 best.writeAtoms(atom2,chanInfo.id);
1134 // chanMgr->hitlistlock.on();
1135 int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1136 // chanMgr->hitlistlock.off();
1137 for (int i = 0; i < rhcnt; i++){
1138 chs.best[i].writeAtoms(atom2, chanInfo.id);
1139 chs.best[i].host.toStr(tmp);
1140 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1141 best.host.ip = chs.best[i].host.ip;
1148 LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1151 else if (rhost.port)
1153 // find firewalled host
1156 chs.useFirewalled = true;
1157 chs.excludeID = remoteID;
1158 if (chl->pickHits(chs))
1161 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1162 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1166 // if all else fails, use tracker
1169 // find best tracker on this network if local IP
1170 if (!rhost.globalIP())
1173 chs.matchHost = servMgr->serverHost;
1174 chs.trackersOnly = true;
1175 chs.excludeID = remoteID;
1176 if (chl->pickHits(chs))
1181 // find local tracker
1185 chs.matchHost = rhost;
1186 chs.trackersOnly = true;
1187 chs.excludeID = remoteID;
1188 if (chl->pickHits(chs))
1192 // find global tracker
1196 chs.trackersOnly = true;
1197 chs.excludeID = remoteID;
1198 if (chl->pickHits(chs))
1204 best.writeAtoms(atom2,chanInfo.id);
1205 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1206 }else if (rhost.port)
1208 // find firewalled tracker
1210 chs.useFirewalled = true;
1211 chs.trackersOnly = true;
1212 chs.excludeID = remoteID;
1214 if (chl->pickHits(chs))
1217 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1218 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1227 chanMgr->hitlistlock.off();
1229 // return not available yet code
1230 atom2.writeInt(PCP_QUIT,error);
1231 sock->write(tbuf, mem.getPosition());
1236 // wait disconnect from other host
1238 while(sock->read(c, sizeof(c))){
1241 }catch(StreamException &e){
1242 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1247 LOG_DEBUG("Sending channel unavailable");
1248 sock->writeLine(HTTP_SC_UNAVAILABLE);
1249 sock->writeLine("");
1255 if (chanInfo.contentType != ChanInfo::T_MP3)
1258 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check
1261 sock->writeLine(ICY_OK);
1263 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1264 sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1265 sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1266 sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1267 sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1268 sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1269 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1271 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1276 sock->writeLine(HTTP_SC_OK);
1278 if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1280 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1282 sock->writeLine("Accept-Ranges: none");
1284 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1285 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1286 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1287 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1288 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1289 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1293 if (outputProtocol == ChanInfo::SP_HTTP)
1295 switch (chanInfo.contentType)
1297 case ChanInfo::T_MOV:
1298 sock->writeLine("Connection: close");
1299 sock->writeLine("Content-Length: 10000000");
1302 sock->writeLineF("%s %s",HTTP_HS_CONTENT,chanInfo.getMIMEType());
1303 } else if (outputProtocol == ChanInfo::SP_MMS)
1305 sock->writeLine("Server: Rex/9.0.0.2980");
1306 sock->writeLine("Cache-Control: no-cache");
1307 sock->writeLine("Pragma: no-cache");
1308 sock->writeLine("Pragma: client-id=3587303426");
1309 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1313 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1316 if (agent.contains("Android"))
1318 LOG_DEBUG("INFO: Android client detected.");
1319 sock->writeLineF("%s %s", HTTP_HS_CONTENT, MIME_WMV);
1322 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1324 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1325 sock->writeLine("Connection: Keep-Alive");
1329 } else if (outputProtocol == ChanInfo::SP_PCP)
1331 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1332 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1334 }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1336 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1339 sock->writeLine("");
1344 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1345 atom.writeInt(PCP_OK,0);
1346 if (rhost.globalIP())
1347 serventHit.rhost[0] = rhost;
1349 serventHit.rhost[1] = rhost;
1350 serventHit.sessionID = remoteID;
1351 serventHit.numHops = 1;
1352 chanMgr->addHit(serventHit);
1362 // -----------------------------------
1363 void Servent::handshakeGiv(GnuID &id)
1369 sock->writeLineF("GIV /%s",idstr);
1371 sock->writeLine("GIV");
1373 sock->writeLine("");
1377 // -----------------------------------
1378 void Servent::processGnutella()
1382 //if (servMgr->isRoot && !servMgr->needConnections())
1383 if (servMgr->isRoot)
1391 gnuStream.init(sock);
1392 setStatus(S_CONNECTED);
1394 if (!servMgr->isRoot)
1396 chanMgr->broadcastRelays(this, 1, 1);
1399 if ((p=outPacketsNorm.curr()))
1400 gnuStream.sendPacket(*p);
1406 // if (type != T_LOOKUP)
1407 // chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1409 lastPacket = lastPing = sys->getTime();
1410 bool doneBigPing=false;
1412 const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy
1413 const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity
1415 unsigned int currBytes=0;
1416 unsigned int lastWait=0;
1418 unsigned int lastTotalIn=0,lastTotalOut=0;
1420 while (thread.active && sock->active())
1423 if (sock->readReady())
1425 lastPacket = sys->getTime();
1427 if (gnuStream.readPacket(pack))
1430 sock->host.toStr(ipstr);
1433 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1435 if (pack.func != GNU_FUNC_PONG)
1436 if (servMgr->seenPacket(pack))
1437 ret = GnuStream::R_DUPLICATE;
1439 seenIDs.add(pack.id);
1442 if (ret == GnuStream::R_PROCESS)
1445 ret = gnuStream.processPacket(pack,this,routeID);
1447 if (flowControl && (ret == GnuStream::R_BROADCAST))
1448 ret = GnuStream::R_DROP;
1454 case GnuStream::R_BROADCAST:
1455 if (servMgr->broadcast(pack,this))
1456 stats.add(Stats::NUMBROADCASTED);
1458 stats.add(Stats::NUMDROPPED);
1460 case GnuStream::R_ROUTE:
1461 if (servMgr->route(pack,routeID,NULL))
1462 stats.add(Stats::NUMROUTED);
1464 stats.add(Stats::NUMDROPPED);
1466 case GnuStream::R_ACCEPTED:
1467 stats.add(Stats::NUMACCEPTED);
1469 case GnuStream::R_DUPLICATE:
1470 stats.add(Stats::NUMDUP);
1472 case GnuStream::R_DEAD:
1473 stats.add(Stats::NUMDEAD);
1475 case GnuStream::R_DISCARD:
1476 stats.add(Stats::NUMDISCARDED);
1478 case GnuStream::R_BADVERSION:
1479 stats.add(Stats::NUMOLD);
1481 case GnuStream::R_DROP:
1482 stats.add(Stats::NUMDROPPED);
1487 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1492 LOG_ERROR("Bad packet");
1499 if ((p=outPacketsPri.curr())) // priority packet
1501 gnuStream.sendPacket(*p);
1503 outPacketsPri.next();
1504 } else if ((p=outPacketsNorm.curr())) // or.. normal packet
1506 gnuStream.sendPacket(*p);
1508 outPacketsNorm.next();
1511 int lpt = sys->getTime()-lastPacket;
1515 if ((sys->getTime()-lastPing) > 15)
1518 lastPing = sys->getTime();
1522 if (lpt > packetTimeoutSecs)
1525 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1528 lastPing = sys->getTime();
1533 if (lpt > abortTimeoutSecs)
1534 throw TimeoutException();
1537 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1538 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1540 unsigned int bytes = totIn+totOut;
1542 lastTotalIn = sock->totalBytesIn;
1543 lastTotalOut = sock->totalBytesOut;
1545 const int serventBandwidth = 1000;
1547 int delay = sys->idleSleepTime;
1548 if ((bytes) && (serventBandwidth >= 8))
1549 delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize
1551 if (delay < (int)sys->idleSleepTime)
1552 delay = sys->idleSleepTime;
1553 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1561 // -----------------------------------
1562 void Servent::processRoot()
1567 gnuStream.init(sock);
1568 setStatus(S_CONNECTED);
1572 unsigned int lastConnect = sys->getTime();
1574 while (thread.active && sock->active())
1576 if (gnuStream.readPacket(pack))
1579 sock->host.toStr(ipstr);
1581 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1584 if (pack.func == GNU_FUNC_PING) // if ping then pong back some hosts and close
1588 int cnt = servMgr->getNewestServents(hl,32,sock->host);
1591 int start = sys->rnd() % cnt;
1592 int max = cnt>8?8:cnt;
1594 for(int i=0; i<max; i++)
1598 pong.initPong(hl[start],false,pack);
1599 gnuStream.sendPacket(pong);
1602 hl[start].toStr(ipstr);
1604 //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1605 start = (start+1) % cnt;
1608 sock->host.toStr(str);
1609 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1612 LOG_NETWORK("No Pongs to send");
1615 }else if (pack.func == GNU_FUNC_PONG) // pong?
1617 MemoryStream pong(pack.data,pack.len);
1620 port = pong.readShort();
1621 ip = pong.readLong();
1626 if ((ip) && (port) && (h.globalIP()))
1629 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1630 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1633 } else if (pack.func == GNU_FUNC_HIT)
1635 MemoryStream data(pack.data,pack.len);
1637 gnuStream.readHit(data,hit,pack.hops,pack.id);
1640 //if (gnuStream.packetsIn > 5) // die if we get too many packets
1644 if((sys->getTime()-lastConnect > 60))
1649 }catch(StreamException &e)
1651 LOG_ERROR("Relay: %s",e.msg);
1657 // -----------------------------------
1658 int Servent::givProcMain(ThreadInfo *thread)
1661 Servent *sv = (Servent*)thread->data;
1664 sv->handshakeGiv(sv->givID);
1665 sv->handshakeIncoming();
1667 }catch(StreamException &e)
1669 LOG_ERROR("GIV: %s",e.msg);
1673 sys->endThread(thread);
1677 // -----------------------------------
1678 int Servent::givProc(ThreadInfo *thread)
1680 SEH_THREAD(givProcMain, Servent::givProc);
1683 // -----------------------------------
1684 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1687 bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1688 bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1690 bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1693 MemoryStream mem(tbuf, sizeof(tbuf));
1694 AtomStream atom2(mem);
1695 atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1696 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1697 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1698 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1700 atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1702 atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1704 atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1705 atom.io.write(tbuf, mem.getPosition());
1708 LOG_DEBUG("PCP outgoing waiting for OLEH..");
1711 ID4 id = atom.read(numc,numd);
1714 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1715 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1716 throw StreamException("Got unexpected PCP response");
1731 // read OLEH response
1732 for(int i=0; i<numc; i++)
1735 ID4 id = atom.read(c,dlen);
1737 if (id == PCP_HELO_AGENT)
1739 atom.readString(arg,sizeof(arg),dlen);
1742 }else if (id == PCP_HELO_REMOTEIP)
1744 thisHost.ip = atom.readInt();
1746 }else if (id == PCP_HELO_PORT)
1748 thisHost.port = atom.readShort();
1750 }else if (id == PCP_HELO_VERSION)
1752 version = atom.readInt();
1754 }else if (id == PCP_HELO_DISABLE)
1756 disable = atom.readInt();
1758 }else if (id == PCP_HELO_SESSIONID)
1760 atom.readBytes(rid.id,16);
1761 if (rid.isSame(servMgr->sessionID))
1762 throw StreamException("Servent loopback");
1766 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1773 // update server ip/firewall status
1776 if (thisHost.isValid())
1778 if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1781 thisHost.toStr(ipstr);
1782 LOG_DEBUG("Got new ip: %s",ipstr);
1783 servMgr->serverHost.ip = thisHost.ip;
1786 if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1788 if (thisHost.port && thisHost.globalIP())
1789 servMgr->setFirewall(ServMgr::FW_OFF);
1791 servMgr->setFirewall(ServMgr::FW_ON);
1797 LOG_ERROR("client disabled: %d",disable);
1798 servMgr->isDisabled = true;
1801 servMgr->isDisabled = false;
1809 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1810 throw StreamException("Remote host not identified");
1813 LOG_DEBUG("PCP Outgoing handshake complete.");
1817 // -----------------------------------
1818 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1821 ID4 id = atom.read(numc,numd);
1826 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1827 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1828 throw StreamException("Got unexpected PCP response");
1847 for(int i=0; i<numc; i++)
1851 ID4 id = atom.read(c,dlen);
1853 if (id == PCP_HELO_AGENT)
1855 atom.readString(arg,sizeof(arg),dlen);
1858 }else if (id == PCP_HELO_VERSION)
1860 version = atom.readInt();
1862 }else if (id == PCP_HELO_SESSIONID)
1864 atom.readBytes(rid.id,16);
1865 if (rid.isSame(servMgr->sessionID))
1866 throw StreamException("Servent loopback");
1868 }else if (id == PCP_HELO_BCID)
1870 atom.readBytes(bcID.id,16);
1872 }else if (id == PCP_HELO_OSTYPE)
1874 osType = atom.readInt();
1875 }else if (id == PCP_HELO_PORT)
1877 rhost.port = atom.readShort();
1878 }else if (id == PCP_HELO_PING)
1880 pingPort = atom.readShort();
1883 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1890 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1893 if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1894 rhost.ip = servMgr->serverHost.ip;
1899 rhost.toStr(ripStr);
1900 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1901 rhost.port = pingPort;
1902 if (!rhost.globalIP() || !pingHost(rhost,rid))
1906 if (servMgr->isRoot)
1910 if (bcID.getFlags() & 1) // private
1912 BCID *bcid = servMgr->findValidBCID(bcID);
1913 if (!bcid || (bcid && !bcid->valid))
1915 atom.writeParent(PCP_OLEH,1);
1916 atom.writeInt(PCP_HELO_DISABLE,1);
1917 throw StreamException("Client is banned");
1925 MemoryStream mem(tbuf, sizeof(tbuf));
1926 AtomStream atom2(mem);
1927 atom2.writeParent(PCP_OLEH,5);
1928 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1929 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1930 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1931 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1932 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1936 if (version < PCP_CLIENT_MINVERSION)
1938 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1939 atom.io.write(tbuf, mem.getPosition());
1940 throw StreamException("Agent is not valid");
1946 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1947 atom.io.write(tbuf, mem.getPosition());
1948 throw StreamException("Remote host not identified");
1953 if (servMgr->isRoot)
1955 servMgr->writeRootAtoms(atom2,false);
1958 atom.io.write(tbuf, mem.getPosition());
1960 LOG_DEBUG("PCP Incoming handshake complete.");
1964 // -----------------------------------
1965 void Servent::processIncomingPCP(bool suggestOthers)
1967 PCPStream::readVersion(*sock);
1970 AtomStream atom(*sock);
1971 Host rhost = sock->host;
1973 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1976 bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1977 || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1978 bool unavailable = servMgr->controlInFull();
1979 bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1984 if (unavailable || alreadyConnected || offair)
1988 if (alreadyConnected)
1989 error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
1990 else if (unavailable)
1991 error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1993 error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
1995 error = PCP_ERROR_QUIT;
2005 for(int i=0; i<8; i++)
2009 // find best hit on this network
2010 if (!rhost.globalIP())
2013 chs.matchHost = servMgr->serverHost;
2015 chs.excludeID = remoteID;
2016 chs.trackersOnly = true;
2017 chs.useBusyControls = false;
2018 if (chanMgr->pickHits(chs))
2022 // find best hit on same network
2026 chs.matchHost = rhost;
2028 chs.excludeID = remoteID;
2029 chs.trackersOnly = true;
2030 chs.useBusyControls = false;
2031 if (chanMgr->pickHits(chs))
2035 // else find best hit on other networks
2040 chs.excludeID = remoteID;
2041 chs.trackersOnly = true;
2042 chs.useBusyControls = false;
2043 if (chanMgr->pickHits(chs))
2052 best.writeAtoms(atom,noID);
2057 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2059 else if (rhost.port)
2061 // send push request to best firewalled tracker on other network
2064 chs.excludeID = remoteID;
2065 chs.trackersOnly = true;
2066 chs.useFirewalled = true;
2067 chs.useBusyControls = false;
2068 if (chanMgr->pickHits(chs))
2073 int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2074 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2078 LOG_DEBUG("No available trackers");
2083 LOG_ERROR("Sending QUIT to incoming: %d",error);
2085 atom.writeInt(PCP_QUIT,error);
2091 setStatus(S_CONNECTED);
2093 atom.writeInt(PCP_OK,0);
2096 atom.writeParent(PCP_ROOT,1);
2097 atom.writeParent(PCP_ROOT_UPDATE,0);
2099 pcpStream = new PCPStream(remoteID);
2103 while (!error && thread.active && !sock->eof())
2105 error = pcpStream->readPacket(*sock,bcs);
2108 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2109 error = PCP_ERROR_OFFAIR;
2110 if (peercastInst->isQuitting)
2111 error = PCP_ERROR_SHUTDOWN;
2114 pcpStream->flush(*sock);
2116 error += PCP_ERROR_QUIT;
2117 atom.writeInt(PCP_QUIT,error);
2119 LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2123 // -----------------------------------
2124 int Servent::outgoingProcMain(ThreadInfo *thread)
2127 LOG_DEBUG("COUT started");
2129 Servent *sv = (Servent*)thread->data;
2133 sv->pcpStream = new PCPStream(noID);
2135 while (sv->thread.active)
2137 sv->setStatus(S_WAIT);
2139 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2149 if (servMgr->rootHost.isEmpty())
2154 sv->sock = sv->pushSock;
2155 sv->pushSock = NULL;
2156 bestHit.host = sv->sock->host;
2162 ChanHitList *chl = chanMgr->findHitListByID(noID);
2165 // find local tracker
2167 chs.matchHost = servMgr->serverHost;
2168 chs.waitDelay = MIN_TRACKER_RETRY;
2169 chs.excludeID = servMgr->sessionID;
2170 chs.trackersOnly = true;
2171 if (!chl->pickHits(chs))
2173 // else find global tracker
2175 chs.waitDelay = MIN_TRACKER_RETRY;
2176 chs.excludeID = servMgr->sessionID;
2177 chs.trackersOnly = true;
2183 bestHit = chs.best[0];
2188 unsigned int ctime = sys->getTime();
2190 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2192 bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2194 chanMgr->lastYPConnect = ctime;
2198 }while (!bestHit.host.ip && (sv->thread.active));
2201 if (!bestHit.host.ip) // give up
2203 LOG_ERROR("COUT giving up");
2208 bestHit.host.toStr(ipStr);
2214 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2218 sv->setStatus(S_CONNECTING);
2219 sv->sock = sys->createSocket();
2221 throw StreamException("Unable to create socket");
2222 sv->sock->open(bestHit.host);
2223 sv->sock->connect();
2227 sv->sock->setReadTimeout(30000);
2228 AtomStream atom(*sv->sock);
2230 sv->setStatus(S_HANDSHAKE);
2232 Host rhost = sv->sock->host;
2233 atom.writeInt(PCP_CONNECT,1);
2234 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2236 sv->setStatus(S_CONNECTED);
2238 LOG_DEBUG("COUT to %s: OK",ipStr);
2240 sv->pcpStream->init(sv->remoteID);
2243 bcs.servent_id = sv->servent_id;
2245 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2247 error = sv->pcpStream->readPacket(*sv->sock,bcs);
2251 if (!chanMgr->isBroadcasting())
2252 error = PCP_ERROR_OFFAIR;
2253 if (peercastInst->isQuitting)
2254 error = PCP_ERROR_SHUTDOWN;
2256 if (sv->pcpStream->nextRootPacket)
2257 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2258 error = PCP_ERROR_NOROOT;
2260 sv->setStatus(S_CLOSING);
2262 sv->pcpStream->flush(*sv->sock);
2264 error += PCP_ERROR_QUIT;
2265 atom.writeInt(PCP_QUIT,error);
2267 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2269 }catch(TimeoutException &e)
2271 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2272 sv->setStatus(S_TIMEOUT);
2273 }catch(StreamException &e)
2275 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2276 sv->setStatus(S_ERROR);
2288 }catch(StreamException &) {}
2290 // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2291 if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2292 chanMgr->deadHit(bestHit);
2300 sys->endThread(thread);
2301 LOG_DEBUG("COUT ended");
2304 // -----------------------------------
2305 int Servent::outgoingProc(ThreadInfo *thread)
2307 SEH_THREAD(outgoingProcMain, Servent::outgoingProc);
2309 // -----------------------------------
2310 int Servent::incomingProcMain(ThreadInfo *thread)
2314 Servent *sv = (Servent*)thread->data;
2317 sv->sock->host.toStr(ipStr);
2321 sv->handshakeIncoming();
2322 }catch(HTTPException &e)
2326 sv->sock->writeLine(e.msg);
2328 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2329 sv->sock->writeLine("");
2330 }catch(StreamException &){}
2331 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2332 }catch(StreamException &e)
2334 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2339 sys->endThread(thread);
2342 // -----------------------------------
2343 int Servent::incomingProc(ThreadInfo *thread)
2345 SEH_THREAD(incomingProcMain, Servent::incomingProc);
2347 // -----------------------------------
2348 void Servent::processServent()
2350 setStatus(S_HANDSHAKE);
2355 throw StreamException("Servent has no socket");
2360 // -----------------------------------
2361 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2365 setStatus(S_HANDSHAKE);
2367 if (!handshakeStream(chanInfo))
2371 if (chanInfo.id.isSet())
2374 chanID = chanInfo.id;
2376 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2378 if (!waitForChannelHeader(chanInfo))
2379 throw StreamException("Channel not ready");
2381 servMgr->totalStreams++;
2383 Host host = sock->host;
2384 host.port = 0; // force to 0 so we ignore the incoming port
2386 Channel *ch = chanMgr->findChannelByID(chanID);
2388 throw StreamException("Channel not found");
2390 if (outputProtocol == ChanInfo::SP_HTTP)
2392 if ((addMetadata) && (chanMgr->icyMetaInterval))
2393 sendRawMetaChannel(chanMgr->icyMetaInterval);
2395 sendRawChannel(true,true);
2397 }else if (outputProtocol == ChanInfo::SP_MMS)
2401 sendRawChannel(true,true);
2404 sendRawChannel(true,false);
2407 }else if (outputProtocol == ChanInfo::SP_PCP)
2411 } else if (outputProtocol == ChanInfo::SP_PEERCAST)
2413 sendPeercastChannel();
2417 setStatus(S_CLOSING);
2420 // -----------------------------------------
2424 file.openReadOnly("c://test.mp3");
2426 LOG_DEBUG("raw file read");
2431 LOG_DEBUG("send %d",cnt++);
2432 file.read(buf,sizeof(buf));
2433 sock->write(buf,sizeof(buf));
2437 LOG_DEBUG("raw file sent");
2442 // -----------------------------------
2443 bool Servent::waitForChannelHeader(ChanInfo &info)
2445 for(int i=0; i<30*10; i++)
2447 Channel *ch = chanMgr->findChannelByID(info.id);
2451 if (ch->isPlaying() && (ch->rawData.writePos>0))
2454 if (!thread.active || !sock->active())
2460 // -----------------------------------
2461 void Servent::sendRawChannel(bool sendHead, bool sendData)
2466 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2468 Channel *ch = chanMgr->findChannelByID(chanID);
2470 throw StreamException("Channel not found");
2472 setStatus(S_CONNECTED);
2474 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2478 ch->headPack.writeRaw(*sock);
2479 streamPos = ch->headPack.pos + ch->headPack.len;
2480 LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2486 unsigned int streamIndex = ch->streamIndex;
2487 unsigned int connectTime = sys->getTime();
2488 unsigned int lastWriteTime = connectTime;
2490 while ((thread.active) && sock->active())
2492 ch = chanMgr->findChannelByID(chanID);
2497 if (streamIndex != ch->streamIndex)
2499 streamIndex = ch->streamIndex;
2500 streamPos = ch->headPack.pos;
2501 LOG_DEBUG("sendRaw got new stream index");
2505 if (ch->rawData.findPacket(streamPos,rawPack))
2507 if (syncPos != rawPack.sync)
2508 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2509 syncPos = rawPack.sync+1;
2511 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2513 rawPack.writeRaw(*sock);
2514 lastWriteTime = sys->getTime();
2517 if (rawPack.pos < streamPos)
2518 LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2519 streamPos = rawPack.pos+rawPack.len;
2520 } else if (sock->readReady()) {
2522 int error = sock->readUpto(&c, 1);
2523 if (error == 0) sock->close();
2527 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2528 throw TimeoutException();
2533 }catch(StreamException &e)
2535 LOG_ERROR("Stream channel: %s",e.msg);
2540 // -----------------------------------
2541 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2545 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2546 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2547 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2549 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2551 Channel *ch = &chanMgr->channels[i];
2552 if (ch->isPlaying())
2553 chanIDs[numChanIDs++]=ch->info.id;
2558 setStatus(S_CONNECTED);
2563 for(int i=0; i<numChanIDs; i++)
2565 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2568 LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2569 ch->headPack.writeRaw(*sock);
2570 chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2571 chanStreamIndex[i] = ch->streamIndex;
2572 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2581 unsigned int connectTime=sys->getTime();
2583 while ((thread.active) && sock->active())
2586 for(int i=1; i<numChanIDs; i++)
2588 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2591 if (chanStreamIndex[i] != ch->streamIndex)
2593 chanStreamIndex[i] = ch->streamIndex;
2594 chanStreamPos[i] = ch->headPack.pos;
2595 LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2599 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2601 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2602 rawPack.writeRaw(*sock);
2605 if (rawPack.pos < chanStreamPos[i])
2606 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2607 chanStreamPos[i] = rawPack.pos+rawPack.len;
2610 //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2620 }catch(StreamException &e)
2622 LOG_ERROR("Stream channel: %s",e.msg);
2627 // -----------------------------------
2628 void Servent::sendRawMetaChannel(int interval)
2633 Channel *ch = chanMgr->findChannelByID(chanID);
2635 throw StreamException("Channel not found");
2637 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2639 setStatus(S_CONNECTED);
2641 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2644 String lastTitle,lastURL;
2646 int lastMsgTime=sys->getTime();
2652 if ((interval > sizeof(buf)) || (interval < 1))
2653 throw StreamException("Bad ICY Meta Interval value");
2655 unsigned int connectTime = sys->getTime();
2656 unsigned int lastWriteTime = connectTime;
2658 streamPos = 0; // raw meta channel has no header (its MP3)
2660 while ((thread.active) && sock->active())
2662 ch = chanMgr->findChannelByID(chanID);
2668 if (ch->rawData.findPacket(streamPos,rawPack))
2671 if (syncPos != rawPack.sync)
2672 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2673 syncPos = rawPack.sync+1;
2675 MemoryStream mem(rawPack.data,rawPack.len);
2677 if (rawPack.type == ChanPacket::T_DATA)
2680 int len = rawPack.len;
2681 char *p = rawPack.data;
2685 if ((bufPos+rl) > interval)
2686 rl = interval-bufPos;
2687 memcpy(&buf[bufPos],p,rl);
2692 if (bufPos >= interval)
2695 sock->write(buf,interval);
2696 lastWriteTime = sys->getTime();
2698 if (chanMgr->broadcastMsgInterval)
2699 if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2702 lastMsgTime = sys->getTime();
2705 String *metaTitle = &ch->info.track.title;
2706 if (!ch->info.comment.isEmpty() && (showMsg))
2707 metaTitle = &ch->info.comment;
2710 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2719 title.convertTo(String::T_META);
2720 url.convertTo(String::T_META);
2722 sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2723 int len = ((strlen(tmp) + 15+1) / 16);
2724 sock->writeChar(len);
2725 sock->write(tmp,len*16);
2727 lastTitle = *metaTitle;
2728 lastURL = ch->info.url;
2730 LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2740 streamPos = rawPack.pos + rawPack.len;
2743 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2744 throw TimeoutException();
2749 }catch(StreamException &e)
2751 LOG_ERROR("Stream channel: %s",e.msg);
2754 // -----------------------------------
2755 void Servent::sendPeercastChannel()
2759 setStatus(S_CONNECTED);
2761 Channel *ch = chanMgr->findChannelByID(chanID);
2763 throw StreamException("Channel not found");
2765 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2767 sock->writeTag("PCST");
2771 ch->headPack.writePeercast(*sock);
2773 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2774 pack.writePeercast(*sock);
2777 unsigned int syncPos=0;
2778 while ((thread.active) && sock->active())
2780 ch = chanMgr->findChannelByID(chanID);
2785 if (ch->rawData.findPacket(streamPos,rawPack))
2787 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2789 sock->writeTag("SYNC");
2790 sock->writeShort(4);
2791 sock->writeShort(0);
2792 sock->write(&syncPos,4);
2795 rawPack.writePeercast(*sock);
2797 streamPos = rawPack.pos + rawPack.len;
2803 }catch(StreamException &e)
2805 LOG_ERROR("Stream channel: %s",e.msg);
2809 //WLock canStreamLock;
2811 // -----------------------------------
2812 void Servent::sendPCPChannel()
2814 bool skipCheck = false;
2815 unsigned int ptime = 0;
2816 int npacket = 0, upsize = 0;
2818 Channel *ch = chanMgr->findChannelByID(chanID);
2820 throw StreamException("Channel not found");
2822 AtomStream atom(*sock);
2824 pcpStream = new PCPStream(remoteID);
2830 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2833 // setStatus(S_CONNECTED);
2835 //canStreamLock.on();
2836 //thread.active = canStream(ch);
2837 //setStatus(S_CONNECTED);
2838 //canStreamLock.off();
2845 atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2846 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2847 ch->info.writeInfoAtoms(atom);
2848 ch->info.writeTrackAtoms(atom);
2851 atom.writeParent(PCP_CHAN_PKT,3);
2852 atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2853 atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2854 atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2857 streamPos = ch->headPack.pos+ch->headPack.len;
2858 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2862 unsigned int streamIndex = ch->streamIndex;
2865 char pbuf[ChanPacket::MAX_DATALEN*3];
2866 MemoryStream mems(pbuf,sizeof(pbuf));
2867 AtomStream atom2(mems);
2869 while (thread.active)
2872 Channel *ch = chanMgr->findChannelByID(chanID);
2877 if (streamIndex != ch->streamIndex)
2879 streamIndex = ch->streamIndex;
2880 streamPos = ch->headPack.pos;
2881 LOG_DEBUG("sendPCPStream got new stream index");
2886 if (ch->rawData.findPacket(streamPos,rawPack))
2888 if ((streamPos < rawPack.pos) && !rawPack.skip){
2891 getHost().IPtoStr(tmp);
2892 LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2894 if (sys->getTime() == lastSkipTime) {
2895 LOG_DEBUG("##### skip all buffer");
2896 streamPos = ch->rawData.getLatestPos();
2900 lastSkipTime = sys->getTime();
2907 if (rawPack.type == ChanPacket::T_HEAD)
2909 atom2.writeParent(PCP_CHAN,2);
2910 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2911 atom2.writeParent(PCP_CHAN_PKT,3);
2912 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2913 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2914 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2916 sock->write(pbuf, mems.getPosition());
2917 }else if (rawPack.type == ChanPacket::T_DATA)
2919 atom2.writeParent(PCP_CHAN,2);
2920 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2921 atom2.writeParent(PCP_CHAN_PKT,3);
2922 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2923 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2924 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2927 sock->bufferingWrite(pbuf, mems.getPosition());
2928 lastSkipTime = sock->bufList.lastSkipTime;
2929 lastSkipCount = sock->bufList.skipCount;
2931 sock->write(pbuf, mems.getPosition());
2935 if (rawPack.pos < streamPos)
2936 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2938 //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2940 streamPos = rawPack.pos+rawPack.len;
2943 throw StreamException("Channel not found");
2947 sock->bufferingWrite(NULL, 0);
2948 lastSkipTime = sock->bufList.lastSkipTime;
2949 lastSkipCount = sock->bufList.skipCount;
2952 bcs.servent_id = servent_id;
2953 // error = pcpStream->readPacket(*sock,bcs);
2955 unsigned int t = sys->getTime();
2958 npacket = MAX_PROC_PACKETS;
2959 upsize = MAX_OUTWARD_SIZE;
2962 int len = pcpStream->flushUb(*sock, upsize);
2965 while (npacket > 0 && sock->readReady()) {
2967 error = pcpStream->readPacket(*sock,bcs);
2969 throw StreamException("PCP exception");
2976 LOG_DEBUG("PCP channel stream closed normally.");
2978 }catch(StreamException &e)
2980 LOG_ERROR("Stream channel: %s",e.msg);
2985 pcpStream->flush(*sock);
2986 atom.writeInt(PCP_QUIT,error);
2987 }catch(StreamException &) {}
2991 // -----------------------------------
2992 int Servent::serverProcMain(ThreadInfo *thread)
2997 Servent *sv = (Servent*)thread->data;
3002 throw StreamException("Server has no socket");
3004 sv->setStatus(S_LISTENING);
3008 sv->sock->host.toStr(servIP);
3010 if (servMgr->isRoot)
3011 LOG_DEBUG("Root Server started: %s",servIP);
3013 LOG_DEBUG("Server started: %s",servIP);
3016 while ((thread->active) && (sv->sock->active()))
3018 if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3020 ClientSocket *cs = sv->sock->accept();
3022 //
\95s
\90³
\82È
\83\
\81[
\83X
\83A
\83h
\83\8c\83X(IPv4
\83}
\83\8b\83`
\83L
\83\83\83X
\83g)
\82ð
\8f\9c\8aO
3023 if (cs && (((cs->host.ip >> 24) & 0xF0) == 0xE0))
3028 LOG_ERROR("reject incoming multicast address: %s", ip);
3029 peercastApp->notifyMessage(ServMgr::NT_PEERCAST, "reject multicast address");
3033 // countermeasure against DoS Atk
3034 if (cs->host.ip != (0x7F000001)) // bypass loopback
3037 addrCont clientAddr(cs->host.ip);
3038 servMgr->IP_blacklist->lock();
3039 if (servMgr->IP_blacklist->find(clientAddr))
3042 servMgr->IP_blacklist->unlock();
3044 LOG_DEBUG("REFUSED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3051 servMgr->IP_blacklist->unlock();
3052 LOG_DEBUG("ACCEPT: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3056 servMgr->IP_graylist->lock();
3058 if (servMgr->IP_graylist->find(clientAddr, &idx))
3061 ++(servMgr->IP_graylist->at(idx));
3062 LOG_DEBUG("UPDATE: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3066 servMgr->IP_graylist->push_back(clientAddr);
3067 LOG_DEBUG("GRAYED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3069 servMgr->IP_graylist->unlock();
3072 LOG_DEBUG("accepted incoming");
3073 Servent *ns = servMgr->allocServent();
3076 servMgr->lastIncoming = sys->getTime();
3077 ns->servPort = sv->sock->host.port;
3078 ns->networkID = servMgr->networkID;
3079 ns->initIncoming(cs,sv->allow);
3081 LOG_ERROR("Out of servents");
3086 }catch(StreamException &e)
3088 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3092 LOG_DEBUG("Server stopped");
3095 sys->endThread(thread);
3099 // -----------------------------------
3100 int Servent::serverProc(ThreadInfo *thread)
3102 SEH_THREAD(serverProcMain, Servent::serverProc);
3105 // -----------------------------------
3106 bool Servent::writeVariable(Stream &s, const String &var)
3111 strcpy(buf,getTypeStr());
3112 else if (var == "status")
3113 strcpy(buf,getStatusStr());
3114 else if (var == "address")
3116 if (servMgr->enableGetName) //JP-EX s
3118 getHost().toStr(buf);
3123 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3125 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3127 ChanHitList *chl = &chanMgr->hitlists[i];
3129 hits[numHits++] = chl;
3132 ishit = isfw = false;
3136 for(int k=0; k<numHits; k++)
3138 ChanHitList *chl = hits[k];
3141 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3143 ChanHit *hit = &chl->hits[j];
3144 if (hit->host.isValid() && (h.ip == hit->host.ip))
3147 if (hit->firewalled)
3149 numRelay += hit->numRelays;
3161 strcat(buf,"<font color=red>");
3163 strcat(buf,"<font color=orange>");
3166 strcat(buf,"<font color=green>");
3170 if (ClientSocket::getHostname(h_name,h.ip))
3178 strcat(buf,"</font>");
3184 bool isRelay = true;
3186 ChanHitList *chl = chanMgr->findHitListByID(chanID);
3188 ChanHit *hit = chl->hit;
3190 if (hit->host.isValid() && (h.ip == hit->host.ip)){
3191 isfw = hit->firewalled;
3192 isRelay = hit->relay;
3193 numRelay = hit->numRelays;
3202 strcat(buf,"<font color=red>");
3204 strcat(buf,"<font color=orange>");
3209 strcpy(buf,"<font color=purple>");
3211 strcpy(buf,"<font color=blue>");
3214 strcpy(buf,"<font color=green>");
3219 if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF
\91Î
\8dô)
3225 strcat(buf,"</font>");
3228 getHost().toStr(buf);
3230 else if (var == "agent")
3231 strcpy(buf,agent.cstr());
3232 else if (var == "bitrate")
3236 unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3237 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3240 }else if (var == "uptime")
3244 uptime.setFromStopwatch(sys->getTime()-lastConnect);
3247 strcpy(buf,uptime.cstr());
3248 }else if (var.startsWith("gnet."))
3251 float ctime = (float)(sys->getTime()-lastConnect);
3252 if (var == "gnet.packetsIn")
3253 sprintf(buf,"%d",gnuStream.packetsIn);
3254 else if (var == "gnet.packetsInPerSec")
3255 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3256 else if (var == "gnet.packetsOut")
3257 sprintf(buf,"%d",gnuStream.packetsOut);
3258 else if (var == "gnet.packetsOutPerSec")
3259 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3260 else if (var == "gnet.normQueue")
3261 sprintf(buf,"%d",outPacketsNorm.numPending());
3262 else if (var == "gnet.priQueue")
3263 sprintf(buf,"%d",outPacketsPri.numPending());
3264 else if (var == "gnet.flowControl")
3265 sprintf(buf,"%d",flowControl?1:0);
3266 else if (var == "gnet.routeTime")
3268 int nr = seenIDs.numUsed();
3269 unsigned int tim = sys->getTime()-seenIDs.getOldest();
3272 tstr.setFromStopwatch(tim);
3275 strcpy(buf,tstr.cstr());