1 // ------------------------------------------------
6 // Servents are the actual connections between clients. They do the handshaking,
7 // transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 // to it on connect, it uses this to transfer all of its data.
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
44 const int DIRECT_WRITE_TIMEOUT = 60;
46 // -----------------------------------
47 char *Servent::statusMsgs[]=
64 // -----------------------------------
65 char *Servent::typeMsgs[]=
76 // -----------------------------------
77 bool Servent::isPrivate()
80 return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
82 // -----------------------------------
83 bool Servent::isAllowed(int a)
87 if (servMgr->isFiltered(ServFilter::F_BAN,h))
93 // -----------------------------------
94 bool Servent::isFiltered(int f)
97 return servMgr->isFiltered(f,h);
100 int servent_count = 1;
101 // -----------------------------------
102 Servent::Servent(int index)
103 :outPacketsPri(MAX_OUTPACKETS)
104 ,outPacketsNorm(MAX_OUTPACKETS)
111 servent_id = servent_count++;
117 // -----------------------------------
122 // -----------------------------------
125 thread.active = false;
127 setStatus(S_CLOSING);
131 PCPStream *pcp = pcpStream;
137 chanMgr->hitlistlock.on();
138 ChanHitList *chl = chanMgr->findHitListByID(chanID);
140 ChanHit *chh = chl->hit;
141 ChanHit *prev = NULL;
143 if (chh->servent_id == this->servent_id){
144 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
146 chh->numListeners = 0;
151 ChanHit *next = chh->next;
166 chanMgr->hitlistlock.off();
184 if (type != T_SERVER)
191 // -----------------------------------
192 void Servent::abort()
194 thread.active = false;
202 // -----------------------------------
203 void Servent::reset()
217 outputProtocol = ChanInfo::SP_UNKNOWN;
226 lastConnect = lastPing = lastPacket = 0;
227 loginPassword.clear();
230 priorityConnect = false;
234 outPacketsNorm.reset();
235 outPacketsPri.reset();
246 // -----------------------------------
247 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
252 && (!cid.isSet() || chanID.isSame(cid))
253 && (!sid.isSet() || !sid.isSame(remoteID))
254 && (pcpStream != NULL)
257 return pcpStream->sendPacket(pack,did);
263 // -----------------------------------
264 bool Servent::acceptGIV(ClientSocket *givSock)
274 // -----------------------------------
275 Host Servent::getHost()
285 // -----------------------------------
286 bool Servent::outputPacket(GnuPacket &p, bool pri)
292 r = outPacketsPri.write(p);
295 if (servMgr->useFlowControl)
297 int per = outPacketsNorm.percentFull();
308 // if in flowcontrol, only allow packets with less of a hop count than already in queue
309 if (p.hops >= outPacketsNorm.findMinHop())
314 r = outPacketsNorm.write(p);
321 // -----------------------------------
322 bool Servent::initServer(Host &h)
336 thread.func = serverProc;
340 if (!sys->startThread(&thread))
341 throw StreamException("Can`t start thread");
343 }catch(StreamException &e)
345 LOG_ERROR("Bad server: %s",e.msg);
352 // -----------------------------------
353 void Servent::checkFree()
356 throw StreamException("Socket already set");
358 throw StreamException("Thread already active");
360 // -----------------------------------
361 void Servent::initIncoming(ClientSocket *s, unsigned int a)
372 thread.func = incomingProc;
373 thread.finish = false;
375 setStatus(S_PROTOCOL);
378 sock->host.toStr(ipStr);
379 LOG_DEBUG("Incoming from %s",ipStr);
382 if (!sys->startThread(&thread))
383 throw StreamException("Can`t start thread");
384 }catch(StreamException &e)
386 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
387 //servMgr->shutdownTimer = 1;
390 LOG_ERROR("INCOMING FAILED: %s",e.msg);
395 // -----------------------------------
396 void Servent::initOutgoing(TYPE ty)
406 thread.func = outgoingProc;
408 if (!sys->startThread(&thread))
409 throw StreamException("Can`t start thread");
411 }catch(StreamException &e)
413 LOG_ERROR("Unable to start outgoing: %s",e.msg);
418 // -----------------------------------
419 void Servent::initPCP(Host &rh)
433 if (!isAllowed(ALLOW_NETWORK))
434 throw StreamException("Servent not allowed");
437 thread.func = outgoingProc;
439 LOG_DEBUG("Outgoing to %s",ipStr);
441 if (!sys->startThread(&thread))
442 throw StreamException("Can`t start thread");
444 }catch(StreamException &e)
446 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
452 // -----------------------------------
453 void Servent::initChannelFetch(Host &host)
467 if (!isAllowed(ALLOW_DATA))
468 throw StreamException("Servent not allowed");
474 // -----------------------------------
475 void Servent::initGIV(Host &h, GnuID &id)
489 if (!isAllowed(ALLOW_NETWORK))
490 throw StreamException("Servent not allowed");
495 thread.func = givProc;
499 if (!sys->startThread(&thread))
500 throw StreamException("Can`t start thread");
502 }catch(StreamException &e)
504 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
508 // -----------------------------------
509 void Servent::createSocket()
512 LOG_ERROR("Servent::createSocket attempt made while active");
514 sock = sys->createSocket();
516 // -----------------------------------
517 void Servent::setStatus(STATUS s)
523 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
524 lastConnect = sys->getTime();
529 // -----------------------------------
530 void Servent::handshakeOut()
532 sock->writeLine(GNU_PEERCONN);
536 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
537 sock->writeLineF("%s %d",PCX_HS_PCP,1);
540 sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
542 if (networkID.isSet())
544 networkID.toStr(str);
545 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
548 servMgr->sessionID.toStr(str);
549 sock->writeLineF("%s %s",PCX_HS_ID,str);
552 sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
558 int r = http.readResponse();
562 LOG_ERROR("Expected 200, got %d",r);
563 throw StreamException("Unexpected HTTP response");
567 bool versionValid = false;
572 while (http.nextHeader())
574 LOG_DEBUG(http.cmdLine);
576 char *arg = http.getArgStr();
580 if (http.isHeader(HTTP_HS_AGENT))
584 if (strnicmp(arg,"PeerCast/",9)==0)
585 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
586 }else if (http.isHeader(PCX_HS_NETWORKID))
587 clientID.fromStr(arg);
590 if (!clientID.isSame(networkID))
591 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
594 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
597 sock->writeLine(GNU_OK);
603 // -----------------------------------
604 void Servent::processOutChannel()
609 // -----------------------------------
610 void Servent::handshakeIn()
618 bool versionValid = false;
619 bool diffRootVer = false;
624 while (http.nextHeader())
626 LOG_DEBUG("%s",http.cmdLine);
628 char *arg = http.getArgStr();
632 if (http.isHeader(HTTP_HS_AGENT))
636 if (strnicmp(arg,"PeerCast/",9)==0)
638 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
639 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
641 }else if (http.isHeader(PCX_HS_NETWORKID))
643 clientID.fromStr(arg);
645 }else if (http.isHeader(PCX_HS_PRIORITY))
647 priorityConnect = atoi(arg)!=0;
649 }else if (http.isHeader(PCX_HS_ID))
653 if (id.isSame(servMgr->sessionID))
654 throw StreamException("Servent loopback");
656 }else if (http.isHeader(PCX_HS_OS))
658 if (stricmp(arg,PCX_OS_LINUX)==0)
660 else if (stricmp(arg,PCX_OS_WIN32)==0)
662 else if (stricmp(arg,PCX_OS_MACOSX)==0)
664 else if (stricmp(arg,PCX_OS_WINAMP2)==0)
670 if (!clientID.isSame(networkID))
671 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
673 // if this is a priority connection and all incoming connections
674 // are full then kill an old connection to make room. Otherwise reject connection.
675 //if (!priorityConnect)
678 if (servMgr->pubInFull())
679 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
683 throw HTTPException(HTTP_SC_FORBIDDEN,403);
685 sock->writeLine(GNU_OK);
687 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
689 if (networkID.isSet())
692 networkID.toStr(idStr);
693 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
698 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
699 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
700 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
701 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
702 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
707 sock->writeString(PCX_HS_DL);
708 sock->writeLine(PCX_DL_URL);
711 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
718 sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
724 while (http.nextHeader());
727 // -----------------------------------
728 bool Servent::pingHost(Host &rhost,GnuID &rsid)
732 LOG_DEBUG("Ping host %s: trying..",ipstr);
733 ClientSocket *s=NULL;
737 s = sys->createSocket();
743 s->setReadTimeout(15000);
744 s->setWriteTimeout(15000);
750 atom.writeInt(PCP_CONNECT,1);
751 atom.writeParent(PCP_HELO,1);
752 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
758 ID4 id = atom.read(numc,numd);
761 for(int i=0; i<numc; i++)
764 ID4 pid = atom.read(c,d);
765 if (pid == PCP_SESSIONID)
766 atom.readBytes(sid.id,16,d);
772 LOG_DEBUG("Ping response: %s",id.getString().str());
773 throw StreamException("Bad ping response");
776 if (!sid.isSame(rsid))
777 throw StreamException("SIDs don`t match");
780 LOG_DEBUG("Ping host %s: OK",ipstr);
781 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
785 }catch(StreamException &e)
787 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
803 // -----------------------------------
804 bool Servent::handshakeStream(ChanInfo &chanInfo)
811 unsigned int reqPos=0;
812 unsigned short listenPort = 0;
816 while (http.nextHeader())
818 char *arg = http.getArgStr();
822 if (http.isHeader(PCX_HS_PCP))
823 gotPCP = atoi(arg)!=0;
824 else if (http.isHeader(PCX_HS_POS))
826 else if (http.isHeader(PCX_HS_PORT))
827 listenPort = (unsigned short)atoi(arg);
828 else if (http.isHeader("icy-metadata"))
829 addMetadata = atoi(arg) > 0;
830 else if (http.isHeader(HTTP_HS_AGENT))
832 else if (http.isHeader("Pragma"))
834 char *ssc = stristr(arg,"stream-switch-count=");
835 char *so = stristr(arg,"stream-offset");
840 //nsSwitchNum = atoi(ssc+20);
844 LOG_DEBUG("Stream: %s",http.cmdLine);
848 if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
849 outputProtocol = ChanInfo::SP_PEERCAST;
851 if (outputProtocol == ChanInfo::SP_HTTP)
853 if ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
854 || (chanInfo.contentType == ChanInfo::T_WMA)
855 || (chanInfo.contentType == ChanInfo::T_WMV)
856 || (chanInfo.contentType == ChanInfo::T_ASX)
858 outputProtocol = ChanInfo::SP_MMS;
862 bool chanFound=false;
863 bool chanReady=false;
865 ChanHit *sourceHit = NULL;
867 Channel *ch = chanMgr->findChannelByID(chanInfo.id);
871 if (reqPos || !isIndexTxt(&chanInfo))
873 streamPos = ch->rawData.findOldestPos(reqPos);
874 //streamPos = ch->rawData.getLatestPos();
877 streamPos = ch->rawData.getLatestPos();
880 chanID = chanInfo.id;
881 serventHit.rhost[0].ip = getHost().ip;
882 serventHit.rhost[0].port = listenPort;
883 serventHit.host = serventHit.rhost[0];
884 serventHit.chanID = chanID;
887 chanReady = canStream(ch);
888 if (/*0 && */!chanReady)
890 if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
892 sourceHit = &ch->sourceHost; // send source host info
894 if (listenPort && ch->info.getUptime() > 60) // if stable
896 // connect "this" host later
897 chanMgr->addHit(serventHit);
901 getHost().toStr(tmp);
902 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
905 else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
907 chanReady = canStream(ch);
909 LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
912 if (!chanReady) type = T_INCOMING;
913 thread.active = chanReady;
914 setStatus(S_CONNECTED);
916 channel_id = ch->channel_id;
919 if (servMgr->isCheckPushStream())
921 if (chanReady == true)
925 if (!h.isLocalhost())
929 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL)
933 LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
938 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
940 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
942 ChanHitList *chl = &chanMgr->hitlists[i];
944 hits[numHits++] = chl;
948 for(int i=0; i<numHits; i++)
950 ChanHitList *chl = hits[i];
953 for (int j=0; j<ChanHitList::MAX_HITS; j++)
955 ChanHit *hit = &chl->hits[j];
956 if (hit->host.isValid() && (h.ip == hit->host.ip))
960 numRelay = hit->numRelays;
965 if ((isfw == true) && (numRelay == 0))
969 LOG_ERROR("Block firewalled Servent : %s",strip);
973 ChanHitList *chl = chanMgr->findHitList(chanInfo);
974 ChanHit *hit = chl->hit;
976 if (hit->host.isValid() && (h.ip == hit->host.ip))
978 if ((hit->firewalled) && (hit->numRelays == 0)){
981 LOG_ERROR("Block firewalled Servent : %s",strip);
994 // LockBlock lockblock(chanMgr->hitlistlock);
996 // lockblock.lockon();
997 ChanHitList *chl = chanMgr->findHitList(chanInfo);
1005 bool result = false;
1008 chanInfo.id.toStr(idStr);
1011 servMgr->sessionID.toStr(sidStr);
1013 Host rhost = sock->host;
1018 AtomStream atom(*sock);
1024 sock->writeLine(HTTP_SC_NOTFOUND);
1025 sock->writeLine("");
1026 LOG_DEBUG("Sending channel not found");
1033 if (outputProtocol == ChanInfo::SP_PCP)
1037 MemoryStream mem(tbuf, sizeof(tbuf));
1038 mem.writeLine(HTTP_SC_UNAVAILABLE);
1039 mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1041 sock->write(tbuf, mem.getPosition());
1043 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1046 rhost.toStr(ripStr);
1048 LOG_DEBUG("Sending channel unavailable");
1053 AtomStream atom2(mem);
1055 int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1057 chanMgr->hitlistlock.on();
1059 chl = chanMgr->findHitList(chanInfo);
1065 // search for up to 8 other hits
1073 // find best hit this network if local IP
1074 if (!rhost.globalIP())
1077 chs.matchHost = servMgr->serverHost;
1079 chs.excludeID = remoteID;
1080 if (chl->pickHits(chs)){
1082 LOG_DEBUG("find best hit this network if local IP");
1086 // find best hit on same network
1090 chs.matchHost = rhost;
1092 chs.excludeID = remoteID;
1093 if (chl->pickHits(chs)){
1095 LOG_DEBUG("find best hit on same network");
1100 // find best hit on other networks
1101 /* if (!best.host.ip)
1105 chs.excludeID = remoteID;
1106 if (chl->pickHits(chs)){
1108 LOG_DEBUG("find best hit on other networks");
1116 best.writeAtoms(atom2,chanInfo.id);
1122 sourceHit->writeAtoms(atom2, chanInfo.id);
1123 sourceHit->host.toStr(tmp);
1124 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1125 best.host.ip = sourceHit->host.ip;
1130 // chanMgr->hitlistlock.on();
1131 int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1132 // chanMgr->hitlistlock.off();
1133 for (int i = 0; i < cnt; i++){
1134 chs.best[i].writeAtoms(atom2, chanInfo.id);
1135 chs.best[i].host.toStr(tmp);
1136 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1137 best.host.ip = chs.best[i].host.ip;
1143 LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1146 else if (rhost.port)
1148 // find firewalled host
1151 chs.useFirewalled = true;
1152 chs.excludeID = remoteID;
1153 if (chl->pickHits(chs))
1156 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1157 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1161 // if all else fails, use tracker
1164 // find best tracker on this network if local IP
1165 if (!rhost.globalIP())
1168 chs.matchHost = servMgr->serverHost;
1169 chs.trackersOnly = true;
1170 chs.excludeID = remoteID;
1171 if (chl->pickHits(chs))
1176 // find local tracker
1180 chs.matchHost = rhost;
1181 chs.trackersOnly = true;
1182 chs.excludeID = remoteID;
1183 if (chl->pickHits(chs))
1187 // find global tracker
1191 chs.trackersOnly = true;
1192 chs.excludeID = remoteID;
1193 if (chl->pickHits(chs))
1199 best.writeAtoms(atom2,chanInfo.id);
1200 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1201 }else if (rhost.port)
1203 // find firewalled tracker
1205 chs.useFirewalled = true;
1206 chs.trackersOnly = true;
1207 chs.excludeID = remoteID;
1209 if (chl->pickHits(chs))
1212 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1213 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1222 chanMgr->hitlistlock.off();
1224 // return not available yet code
1225 atom2.writeInt(PCP_QUIT,error);
1226 sock->write(tbuf, mem.getPosition());
1231 // wait disconnect from other host
1233 while(sock->read(c, sizeof(c))){
1236 }catch(StreamException &e){
1237 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1242 LOG_DEBUG("Sending channel unavailable");
1243 sock->writeLine(HTTP_SC_UNAVAILABLE);
1244 sock->writeLine("");
1250 if (chanInfo.contentType != ChanInfo::T_MP3)
1253 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check
1256 sock->writeLine(ICY_OK);
1258 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1259 sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1260 sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1261 sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1262 sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1263 sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1264 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1266 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1271 sock->writeLine(HTTP_SC_OK);
1273 if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1275 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1277 sock->writeLine("Accept-Ranges: none");
1279 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1280 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1281 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1282 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1283 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1284 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1288 if (outputProtocol == ChanInfo::SP_HTTP)
1290 switch (chanInfo.contentType)
1292 case ChanInfo::T_OGG:
1293 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1295 case ChanInfo::T_MP3:
1296 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1298 case ChanInfo::T_MOV:
1299 sock->writeLine("Connection: close");
1300 sock->writeLine("Content-Length: 10000000");
1301 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1303 case ChanInfo::T_MPG:
1304 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1306 case ChanInfo::T_NSV:
1307 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1309 case ChanInfo::T_ASX:
1310 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1312 case ChanInfo::T_WMA:
1313 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1315 case ChanInfo::T_WMV:
1316 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1319 } else if (outputProtocol == ChanInfo::SP_MMS)
1321 sock->writeLine("Server: Rex/9.0.0.2980");
1322 sock->writeLine("Cache-Control: no-cache");
1323 sock->writeLine("Pragma: no-cache");
1324 sock->writeLine("Pragma: client-id=3587303426");
1325 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1329 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1332 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1334 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1335 sock->writeLine("Connection: Keep-Alive");
1338 } else if (outputProtocol == ChanInfo::SP_PCP)
1340 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1341 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1343 }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1345 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1348 sock->writeLine("");
1353 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1354 atom.writeInt(PCP_OK,0);
1364 // -----------------------------------
1365 void Servent::handshakeGiv(GnuID &id)
1371 sock->writeLineF("GIV /%s",idstr);
1373 sock->writeLine("GIV");
1375 sock->writeLine("");
1379 // -----------------------------------
1380 void Servent::processGnutella()
1384 //if (servMgr->isRoot && !servMgr->needConnections())
1385 if (servMgr->isRoot)
1393 gnuStream.init(sock);
1394 setStatus(S_CONNECTED);
1396 if (!servMgr->isRoot)
1398 chanMgr->broadcastRelays(this, 1, 1);
1401 if ((p=outPacketsNorm.curr()))
1402 gnuStream.sendPacket(*p);
1408 // if (type != T_LOOKUP)
1409 // chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1411 lastPacket = lastPing = sys->getTime();
1412 bool doneBigPing=false;
1414 const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy
1415 const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity
1417 unsigned int currBytes=0;
1418 unsigned int lastWait=0;
1420 unsigned int lastTotalIn=0,lastTotalOut=0;
1422 while (thread.active && sock->active())
1425 if (sock->readReady())
1427 lastPacket = sys->getTime();
1429 if (gnuStream.readPacket(pack))
1432 sock->host.toStr(ipstr);
1435 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1437 if (pack.func != GNU_FUNC_PONG)
1438 if (servMgr->seenPacket(pack))
1439 ret = GnuStream::R_DUPLICATE;
1441 seenIDs.add(pack.id);
1444 if (ret == GnuStream::R_PROCESS)
1447 ret = gnuStream.processPacket(pack,this,routeID);
1449 if (flowControl && (ret == GnuStream::R_BROADCAST))
1450 ret = GnuStream::R_DROP;
1456 case GnuStream::R_BROADCAST:
1457 if (servMgr->broadcast(pack,this))
1458 stats.add(Stats::NUMBROADCASTED);
1460 stats.add(Stats::NUMDROPPED);
1462 case GnuStream::R_ROUTE:
1463 if (servMgr->route(pack,routeID,NULL))
1464 stats.add(Stats::NUMROUTED);
1466 stats.add(Stats::NUMDROPPED);
1468 case GnuStream::R_ACCEPTED:
1469 stats.add(Stats::NUMACCEPTED);
1471 case GnuStream::R_DUPLICATE:
1472 stats.add(Stats::NUMDUP);
1474 case GnuStream::R_DEAD:
1475 stats.add(Stats::NUMDEAD);
1477 case GnuStream::R_DISCARD:
1478 stats.add(Stats::NUMDISCARDED);
1480 case GnuStream::R_BADVERSION:
1481 stats.add(Stats::NUMOLD);
1483 case GnuStream::R_DROP:
1484 stats.add(Stats::NUMDROPPED);
1489 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1494 LOG_ERROR("Bad packet");
1501 if ((p=outPacketsPri.curr())) // priority packet
1503 gnuStream.sendPacket(*p);
1505 outPacketsPri.next();
1506 } else if ((p=outPacketsNorm.curr())) // or.. normal packet
1508 gnuStream.sendPacket(*p);
1510 outPacketsNorm.next();
1513 int lpt = sys->getTime()-lastPacket;
1517 if ((sys->getTime()-lastPing) > 15)
1520 lastPing = sys->getTime();
1524 if (lpt > packetTimeoutSecs)
1527 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1530 lastPing = sys->getTime();
1535 if (lpt > abortTimeoutSecs)
1536 throw TimeoutException();
1539 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1540 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1542 unsigned int bytes = totIn+totOut;
1544 lastTotalIn = sock->totalBytesIn;
1545 lastTotalOut = sock->totalBytesOut;
1547 const int serventBandwidth = 1000;
1549 int delay = sys->idleSleepTime;
1550 if ((bytes) && (serventBandwidth >= 8))
1551 delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize
1553 if (delay < (int)sys->idleSleepTime)
1554 delay = sys->idleSleepTime;
1555 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1563 // -----------------------------------
1564 void Servent::processRoot()
1569 gnuStream.init(sock);
1570 setStatus(S_CONNECTED);
1574 unsigned int lastConnect = sys->getTime();
1576 while (thread.active && sock->active())
1578 if (gnuStream.readPacket(pack))
1581 sock->host.toStr(ipstr);
1583 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1586 if (pack.func == GNU_FUNC_PING) // if ping then pong back some hosts and close
1590 int cnt = servMgr->getNewestServents(hl,32,sock->host);
1593 int start = sys->rnd() % cnt;
1594 int max = cnt>8?8:cnt;
1596 for(int i=0; i<max; i++)
1600 pong.initPong(hl[start],false,pack);
1601 gnuStream.sendPacket(pong);
1604 hl[start].toStr(ipstr);
1606 //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1607 start = (start+1) % cnt;
1610 sock->host.toStr(str);
1611 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1614 LOG_NETWORK("No Pongs to send");
1617 }else if (pack.func == GNU_FUNC_PONG) // pong?
1619 MemoryStream pong(pack.data,pack.len);
1622 port = pong.readShort();
1623 ip = pong.readLong();
1628 if ((ip) && (port) && (h.globalIP()))
1631 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1632 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1635 } else if (pack.func == GNU_FUNC_HIT)
1637 MemoryStream data(pack.data,pack.len);
1639 gnuStream.readHit(data,hit,pack.hops,pack.id);
1642 //if (gnuStream.packetsIn > 5) // die if we get too many packets
1646 if((sys->getTime()-lastConnect > 60))
1651 }catch(StreamException &e)
1653 LOG_ERROR("Relay: %s",e.msg);
1659 // -----------------------------------
1660 int Servent::givProc(ThreadInfo *thread)
1663 Servent *sv = (Servent*)thread->data;
1666 sv->handshakeGiv(sv->givID);
1667 sv->handshakeIncoming();
1669 }catch(StreamException &e)
1671 LOG_ERROR("GIV: %s",e.msg);
1675 sys->endThread(thread);
1679 // -----------------------------------
1680 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1683 bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1684 bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1686 bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1689 MemoryStream mem(tbuf, sizeof(tbuf));
1690 AtomStream atom2(mem);
1691 atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1692 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1693 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1694 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1696 atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1698 atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1700 atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1701 atom.io.write(tbuf, mem.getPosition());
1704 LOG_DEBUG("PCP outgoing waiting for OLEH..");
1707 ID4 id = atom.read(numc,numd);
1710 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1711 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1712 throw StreamException("Got unexpected PCP response");
1727 // read OLEH response
1728 for(int i=0; i<numc; i++)
1731 ID4 id = atom.read(c,dlen);
1733 if (id == PCP_HELO_AGENT)
1735 atom.readString(arg,sizeof(arg),dlen);
1738 }else if (id == PCP_HELO_REMOTEIP)
1740 thisHost.ip = atom.readInt();
1742 }else if (id == PCP_HELO_PORT)
1744 thisHost.port = atom.readShort();
1746 }else if (id == PCP_HELO_VERSION)
1748 version = atom.readInt();
1750 }else if (id == PCP_HELO_DISABLE)
1752 disable = atom.readInt();
1754 }else if (id == PCP_HELO_SESSIONID)
1756 atom.readBytes(rid.id,16);
1757 if (rid.isSame(servMgr->sessionID))
1758 throw StreamException("Servent loopback");
1762 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1769 // update server ip/firewall status
1772 if (thisHost.isValid())
1774 if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1777 thisHost.toStr(ipstr);
1778 LOG_DEBUG("Got new ip: %s",ipstr);
1779 servMgr->serverHost.ip = thisHost.ip;
1782 if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1784 if (thisHost.port && thisHost.globalIP())
1785 servMgr->setFirewall(ServMgr::FW_OFF);
1787 servMgr->setFirewall(ServMgr::FW_ON);
1793 LOG_ERROR("client disabled: %d",disable);
1794 servMgr->isDisabled = true;
1797 servMgr->isDisabled = false;
1805 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1806 throw StreamException("Remote host not identified");
1809 LOG_DEBUG("PCP Outgoing handshake complete.");
1813 // -----------------------------------
1814 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1817 ID4 id = atom.read(numc,numd);
1822 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1823 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1824 throw StreamException("Got unexpected PCP response");
1843 for(int i=0; i<numc; i++)
1847 ID4 id = atom.read(c,dlen);
1849 if (id == PCP_HELO_AGENT)
1851 atom.readString(arg,sizeof(arg),dlen);
1854 }else if (id == PCP_HELO_VERSION)
1856 version = atom.readInt();
1858 }else if (id == PCP_HELO_SESSIONID)
1860 atom.readBytes(rid.id,16);
1861 if (rid.isSame(servMgr->sessionID))
1862 throw StreamException("Servent loopback");
1864 }else if (id == PCP_HELO_BCID)
1866 atom.readBytes(bcID.id,16);
1868 }else if (id == PCP_HELO_OSTYPE)
1870 osType = atom.readInt();
1871 }else if (id == PCP_HELO_PORT)
1873 rhost.port = atom.readShort();
1874 }else if (id == PCP_HELO_PING)
1876 pingPort = atom.readShort();
1879 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1886 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1889 if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1890 rhost.ip = servMgr->serverHost.ip;
1895 rhost.toStr(ripStr);
1896 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1897 rhost.port = pingPort;
1898 if (!rhost.globalIP() || !pingHost(rhost,rid))
1902 if (servMgr->isRoot)
1906 if (bcID.getFlags() & 1) // private
1908 BCID *bcid = servMgr->findValidBCID(bcID);
1909 if (!bcid || (bcid && !bcid->valid))
1911 atom.writeParent(PCP_OLEH,1);
1912 atom.writeInt(PCP_HELO_DISABLE,1);
1913 throw StreamException("Client is banned");
1921 MemoryStream mem(tbuf, sizeof(tbuf));
1922 AtomStream atom2(mem);
1923 atom2.writeParent(PCP_OLEH,5);
1924 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1925 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1926 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1927 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1928 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1932 if (version < PCP_CLIENT_MINVERSION)
1934 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1935 atom.io.write(tbuf, mem.getPosition());
1936 throw StreamException("Agent is not valid");
1942 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1943 atom.io.write(tbuf, mem.getPosition());
1944 throw StreamException("Remote host not identified");
1949 if (servMgr->isRoot)
1951 servMgr->writeRootAtoms(atom2,false);
1954 atom.io.write(tbuf, mem.getPosition());
1956 LOG_DEBUG("PCP Incoming handshake complete.");
1960 // -----------------------------------
1961 void Servent::processIncomingPCP(bool suggestOthers)
1963 PCPStream::readVersion(*sock);
1966 AtomStream atom(*sock);
1967 Host rhost = sock->host;
1969 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1972 bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1973 || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1974 bool unavailable = servMgr->controlInFull();
1975 bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1980 if (unavailable || alreadyConnected || offair)
1984 if (alreadyConnected)
1985 error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
1986 else if (unavailable)
1987 error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1989 error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
1991 error = PCP_ERROR_QUIT;
2001 for(int i=0; i<8; i++)
2005 // find best hit on this network
2006 if (!rhost.globalIP())
2009 chs.matchHost = servMgr->serverHost;
2011 chs.excludeID = remoteID;
2012 chs.trackersOnly = true;
2013 chs.useBusyControls = false;
2014 if (chanMgr->pickHits(chs))
2018 // find best hit on same network
2022 chs.matchHost = rhost;
2024 chs.excludeID = remoteID;
2025 chs.trackersOnly = true;
2026 chs.useBusyControls = false;
2027 if (chanMgr->pickHits(chs))
2031 // else find best hit on other networks
2036 chs.excludeID = remoteID;
2037 chs.trackersOnly = true;
2038 chs.useBusyControls = false;
2039 if (chanMgr->pickHits(chs))
2048 best.writeAtoms(atom,noID);
2053 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2055 else if (rhost.port)
2057 // send push request to best firewalled tracker on other network
2060 chs.excludeID = remoteID;
2061 chs.trackersOnly = true;
2062 chs.useFirewalled = true;
2063 chs.useBusyControls = false;
2064 if (chanMgr->pickHits(chs))
2069 int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2070 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2074 LOG_DEBUG("No available trackers");
2079 LOG_ERROR("Sending QUIT to incoming: %d",error);
2081 atom.writeInt(PCP_QUIT,error);
2087 setStatus(S_CONNECTED);
2089 atom.writeInt(PCP_OK,0);
2092 atom.writeParent(PCP_ROOT,1);
2093 atom.writeParent(PCP_ROOT_UPDATE,0);
2095 pcpStream = new PCPStream(remoteID);
2099 while (!error && thread.active && !sock->eof())
2101 error = pcpStream->readPacket(*sock,bcs);
2104 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2105 error = PCP_ERROR_OFFAIR;
2106 if (peercastInst->isQuitting)
2107 error = PCP_ERROR_SHUTDOWN;
2110 pcpStream->flush(*sock);
2112 error += PCP_ERROR_QUIT;
2113 atom.writeInt(PCP_QUIT,error);
2115 LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2119 // -----------------------------------
2120 int Servent::outgoingProc(ThreadInfo *thread)
2123 LOG_DEBUG("COUT started");
2125 Servent *sv = (Servent*)thread->data;
2129 sv->pcpStream = new PCPStream(noID);
2131 while (sv->thread.active)
2133 sv->setStatus(S_WAIT);
2135 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2145 if (servMgr->rootHost.isEmpty())
2150 sv->sock = sv->pushSock;
2151 sv->pushSock = NULL;
2152 bestHit.host = sv->sock->host;
2158 ChanHitList *chl = chanMgr->findHitListByID(noID);
2161 // find local tracker
2163 chs.matchHost = servMgr->serverHost;
2164 chs.waitDelay = MIN_TRACKER_RETRY;
2165 chs.excludeID = servMgr->sessionID;
2166 chs.trackersOnly = true;
2167 if (!chl->pickHits(chs))
2169 // else find global tracker
2171 chs.waitDelay = MIN_TRACKER_RETRY;
2172 chs.excludeID = servMgr->sessionID;
2173 chs.trackersOnly = true;
2179 bestHit = chs.best[0];
2184 unsigned int ctime = sys->getTime();
2186 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2188 bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2190 chanMgr->lastYPConnect = ctime;
2194 }while (!bestHit.host.ip && (sv->thread.active));
2197 if (!bestHit.host.ip) // give up
2199 LOG_ERROR("COUT giving up");
2204 bestHit.host.toStr(ipStr);
2210 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2214 sv->setStatus(S_CONNECTING);
2215 sv->sock = sys->createSocket();
2217 throw StreamException("Unable to create socket");
2218 sv->sock->open(bestHit.host);
2219 sv->sock->connect();
2223 sv->sock->setReadTimeout(30000);
2224 AtomStream atom(*sv->sock);
2226 sv->setStatus(S_HANDSHAKE);
2228 Host rhost = sv->sock->host;
2229 atom.writeInt(PCP_CONNECT,1);
2230 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2232 sv->setStatus(S_CONNECTED);
2234 LOG_DEBUG("COUT to %s: OK",ipStr);
2236 sv->pcpStream->init(sv->remoteID);
2239 bcs.servent_id = sv->servent_id;
2241 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2243 error = sv->pcpStream->readPacket(*sv->sock,bcs);
2247 if (!chanMgr->isBroadcasting())
2248 error = PCP_ERROR_OFFAIR;
2249 if (peercastInst->isQuitting)
2250 error = PCP_ERROR_SHUTDOWN;
2252 if (sv->pcpStream->nextRootPacket)
2253 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2254 error = PCP_ERROR_NOROOT;
2256 sv->setStatus(S_CLOSING);
2258 sv->pcpStream->flush(*sv->sock);
2260 error += PCP_ERROR_QUIT;
2261 atom.writeInt(PCP_QUIT,error);
2263 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2265 }catch(TimeoutException &e)
2267 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2268 sv->setStatus(S_TIMEOUT);
2269 }catch(StreamException &e)
2271 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2272 sv->setStatus(S_ERROR);
2284 }catch(StreamException &) {}
2286 // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2287 if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2288 chanMgr->deadHit(bestHit);
2296 sys->endThread(thread);
2297 LOG_DEBUG("COUT ended");
2300 // -----------------------------------
2301 int Servent::incomingProc(ThreadInfo *thread)
2305 Servent *sv = (Servent*)thread->data;
2308 sv->sock->host.toStr(ipStr);
2312 sv->handshakeIncoming();
2313 }catch(HTTPException &e)
2317 sv->sock->writeLine(e.msg);
2319 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2320 sv->sock->writeLine("");
2321 }catch(StreamException &){}
2322 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2323 }catch(StreamException &e)
2325 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2330 sys->endThread(thread);
2333 // -----------------------------------
2334 void Servent::processServent()
2336 setStatus(S_HANDSHAKE);
2341 throw StreamException("Servent has no socket");
2346 // -----------------------------------
2347 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2351 setStatus(S_HANDSHAKE);
2353 if (!handshakeStream(chanInfo))
2357 if (chanInfo.id.isSet())
2360 chanID = chanInfo.id;
2362 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2364 if (!waitForChannelHeader(chanInfo))
2365 throw StreamException("Channel not ready");
2367 servMgr->totalStreams++;
2369 Host host = sock->host;
2370 host.port = 0; // force to 0 so we ignore the incoming port
2372 Channel *ch = chanMgr->findChannelByID(chanID);
2374 throw StreamException("Channel not found");
2376 if (outputProtocol == ChanInfo::SP_HTTP)
2378 if ((addMetadata) && (chanMgr->icyMetaInterval))
2379 sendRawMetaChannel(chanMgr->icyMetaInterval);
2381 sendRawChannel(true,true);
2383 }else if (outputProtocol == ChanInfo::SP_MMS)
2387 sendRawChannel(true,true);
2390 sendRawChannel(true,false);
2393 }else if (outputProtocol == ChanInfo::SP_PCP)
2397 } else if (outputProtocol == ChanInfo::SP_PEERCAST)
2399 sendPeercastChannel();
2403 setStatus(S_CLOSING);
2406 // -----------------------------------------
2410 file.openReadOnly("c://test.mp3");
2412 LOG_DEBUG("raw file read");
2417 LOG_DEBUG("send %d",cnt++);
2418 file.read(buf,sizeof(buf));
2419 sock->write(buf,sizeof(buf));
2423 LOG_DEBUG("raw file sent");
2428 // -----------------------------------
2429 bool Servent::waitForChannelHeader(ChanInfo &info)
2431 for(int i=0; i<30*10; i++)
2433 Channel *ch = chanMgr->findChannelByID(info.id);
2437 if (ch->isPlaying() && (ch->rawData.writePos>0))
2440 if (!thread.active || !sock->active())
2446 // -----------------------------------
2447 void Servent::sendRawChannel(bool sendHead, bool sendData)
2452 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2454 Channel *ch = chanMgr->findChannelByID(chanID);
2456 throw StreamException("Channel not found");
2458 setStatus(S_CONNECTED);
2460 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2464 ch->headPack.writeRaw(*sock);
2465 streamPos = ch->headPack.pos + ch->headPack.len;
2466 LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2472 unsigned int streamIndex = ch->streamIndex;
2473 unsigned int connectTime = sys->getTime();
2474 unsigned int lastWriteTime = connectTime;
2476 while ((thread.active) && sock->active())
2478 ch = chanMgr->findChannelByID(chanID);
2483 if (streamIndex != ch->streamIndex)
2485 streamIndex = ch->streamIndex;
2486 streamPos = ch->headPack.pos;
2487 LOG_DEBUG("sendRaw got new stream index");
2491 if (ch->rawData.findPacket(streamPos,rawPack))
2493 if (syncPos != rawPack.sync)
2494 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2495 syncPos = rawPack.sync+1;
2497 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2499 rawPack.writeRaw(*sock);
2500 lastWriteTime = sys->getTime();
2503 if (rawPack.pos < streamPos)
2504 LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2505 streamPos = rawPack.pos+rawPack.len;
2506 } else if (sock->readReady()) {
2508 int error = sock->readUpto(&c, 1);
2509 if (error == 0) sock->close();
2513 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2514 throw TimeoutException();
2519 }catch(StreamException &e)
2521 LOG_ERROR("Stream channel: %s",e.msg);
2526 // -----------------------------------
2527 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2531 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2532 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2533 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2535 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2537 Channel *ch = &chanMgr->channels[i];
2538 if (ch->isPlaying())
2539 chanIDs[numChanIDs++]=ch->info.id;
2544 setStatus(S_CONNECTED);
2549 for(int i=0; i<numChanIDs; i++)
2551 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2554 LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2555 ch->headPack.writeRaw(*sock);
2556 chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2557 chanStreamIndex[i] = ch->streamIndex;
2558 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2567 unsigned int connectTime=sys->getTime();
2569 while ((thread.active) && sock->active())
2572 for(int i=1; i<numChanIDs; i++)
2574 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2577 if (chanStreamIndex[i] != ch->streamIndex)
2579 chanStreamIndex[i] = ch->streamIndex;
2580 chanStreamPos[i] = ch->headPack.pos;
2581 LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2585 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2587 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2588 rawPack.writeRaw(*sock);
2591 if (rawPack.pos < chanStreamPos[i])
2592 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2593 chanStreamPos[i] = rawPack.pos+rawPack.len;
2596 //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2606 }catch(StreamException &e)
2608 LOG_ERROR("Stream channel: %s",e.msg);
2613 // -----------------------------------
2614 void Servent::sendRawMetaChannel(int interval)
2619 Channel *ch = chanMgr->findChannelByID(chanID);
2621 throw StreamException("Channel not found");
2623 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2625 setStatus(S_CONNECTED);
2627 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2630 String lastTitle,lastURL;
2632 int lastMsgTime=sys->getTime();
2638 if ((interval > sizeof(buf)) || (interval < 1))
2639 throw StreamException("Bad ICY Meta Interval value");
2641 unsigned int connectTime = sys->getTime();
2642 unsigned int lastWriteTime = connectTime;
2644 streamPos = 0; // raw meta channel has no header (its MP3)
2646 while ((thread.active) && sock->active())
2648 ch = chanMgr->findChannelByID(chanID);
2654 if (ch->rawData.findPacket(streamPos,rawPack))
2657 if (syncPos != rawPack.sync)
2658 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2659 syncPos = rawPack.sync+1;
2661 MemoryStream mem(rawPack.data,rawPack.len);
2663 if (rawPack.type == ChanPacket::T_DATA)
2666 int len = rawPack.len;
2667 char *p = rawPack.data;
2671 if ((bufPos+rl) > interval)
2672 rl = interval-bufPos;
2673 memcpy(&buf[bufPos],p,rl);
2678 if (bufPos >= interval)
2681 sock->write(buf,interval);
2682 lastWriteTime = sys->getTime();
2684 if (chanMgr->broadcastMsgInterval)
2685 if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2688 lastMsgTime = sys->getTime();
2691 String *metaTitle = &ch->info.track.title;
2692 if (!ch->info.comment.isEmpty() && (showMsg))
2693 metaTitle = &ch->info.comment;
2696 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2705 title.convertTo(String::T_META);
2706 url.convertTo(String::T_META);
2708 sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2709 int len = ((strlen(tmp) + 15+1) / 16);
2710 sock->writeChar(len);
2711 sock->write(tmp,len*16);
2713 lastTitle = *metaTitle;
2714 lastURL = ch->info.url;
2716 LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2726 streamPos = rawPack.pos + rawPack.len;
2729 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2730 throw TimeoutException();
2735 }catch(StreamException &e)
2737 LOG_ERROR("Stream channel: %s",e.msg);
2740 // -----------------------------------
2741 void Servent::sendPeercastChannel()
2745 setStatus(S_CONNECTED);
2747 Channel *ch = chanMgr->findChannelByID(chanID);
2749 throw StreamException("Channel not found");
2751 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2753 sock->writeTag("PCST");
2757 ch->headPack.writePeercast(*sock);
2759 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2760 pack.writePeercast(*sock);
2763 unsigned int syncPos=0;
2764 while ((thread.active) && sock->active())
2766 ch = chanMgr->findChannelByID(chanID);
2771 if (ch->rawData.findPacket(streamPos,rawPack))
2773 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2775 sock->writeTag("SYNC");
2776 sock->writeShort(4);
2777 sock->writeShort(0);
2778 sock->write(&syncPos,4);
2781 rawPack.writePeercast(*sock);
2783 streamPos = rawPack.pos + rawPack.len;
2789 }catch(StreamException &e)
2791 LOG_ERROR("Stream channel: %s",e.msg);
2795 //WLock canStreamLock;
2797 // -----------------------------------
2798 void Servent::sendPCPChannel()
2800 bool skipCheck = false;
2801 unsigned int ptime = 0;
2802 int npacket = 0, upsize = 0;
2804 Channel *ch = chanMgr->findChannelByID(chanID);
2806 throw StreamException("Channel not found");
2808 AtomStream atom(*sock);
2810 pcpStream = new PCPStream(remoteID);
2816 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2819 // setStatus(S_CONNECTED);
2821 //canStreamLock.on();
2822 //thread.active = canStream(ch);
2823 //setStatus(S_CONNECTED);
2824 //canStreamLock.off();
2831 atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2832 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2833 ch->info.writeInfoAtoms(atom);
2834 ch->info.writeTrackAtoms(atom);
2837 atom.writeParent(PCP_CHAN_PKT,3);
2838 atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2839 atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2840 atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2843 streamPos = ch->headPack.pos+ch->headPack.len;
2844 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2848 unsigned int streamIndex = ch->streamIndex;
2851 char pbuf[ChanPacket::MAX_DATALEN*3];
2852 MemoryStream mems(pbuf,sizeof(pbuf));
2853 AtomStream atom2(mems);
2855 while (thread.active)
2858 Channel *ch = chanMgr->findChannelByID(chanID);
2863 if (streamIndex != ch->streamIndex)
2865 streamIndex = ch->streamIndex;
2866 streamPos = ch->headPack.pos;
2867 LOG_DEBUG("sendPCPStream got new stream index");
2872 if (ch->rawData.findPacket(streamPos,rawPack))
2874 if ((streamPos < rawPack.pos) && !rawPack.skip){
2877 getHost().IPtoStr(tmp);
2878 LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2880 if (sys->getTime() == lastSkipTime) {
2881 LOG_DEBUG("##### skip all buffer");
2882 streamPos = ch->rawData.getLatestPos();
2886 lastSkipTime = sys->getTime();
2893 if (rawPack.type == ChanPacket::T_HEAD)
2895 atom2.writeParent(PCP_CHAN,2);
2896 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2897 atom2.writeParent(PCP_CHAN_PKT,3);
2898 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2899 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2900 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2902 sock->write(pbuf, mems.getPosition());
2903 }else if (rawPack.type == ChanPacket::T_DATA)
2905 atom2.writeParent(PCP_CHAN,2);
2906 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2907 atom2.writeParent(PCP_CHAN_PKT,3);
2908 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2909 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2910 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2913 sock->bufferingWrite(pbuf, mems.getPosition());
2914 lastSkipTime = sock->bufList.lastSkipTime;
2915 lastSkipCount = sock->bufList.skipCount;
2917 sock->write(pbuf, mems.getPosition());
2921 if (rawPack.pos < streamPos)
2922 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2924 //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2926 streamPos = rawPack.pos+rawPack.len;
2929 throw StreamException("Channel not found");
2933 sock->bufferingWrite(NULL, 0);
2934 lastSkipTime = sock->bufList.lastSkipTime;
2935 lastSkipCount = sock->bufList.skipCount;
2938 bcs.servent_id = servent_id;
2939 // error = pcpStream->readPacket(*sock,bcs);
2941 unsigned int t = sys->getTime();
2944 npacket = MAX_PROC_PACKETS;
2945 upsize = MAX_OUTWARD_SIZE;
2948 int len = pcpStream->flushUb(*sock, upsize);
2951 while (npacket > 0 && sock->readReady()) {
2953 error = pcpStream->readPacket(*sock,bcs);
2955 throw StreamException("PCP exception");
2962 LOG_DEBUG("PCP channel stream closed normally.");
2964 }catch(StreamException &e)
2966 LOG_ERROR("Stream channel: %s",e.msg);
2971 pcpStream->flush(*sock);
2972 atom.writeInt(PCP_QUIT,error);
2973 }catch(StreamException &) {}
2977 // -----------------------------------
2978 int Servent::serverProc(ThreadInfo *thread)
2983 Servent *sv = (Servent*)thread->data;
2988 throw StreamException("Server has no socket");
2990 sv->setStatus(S_LISTENING);
2994 sv->sock->host.toStr(servIP);
2996 if (servMgr->isRoot)
2997 LOG_DEBUG("Root Server started: %s",servIP);
2999 LOG_DEBUG("Server started: %s",servIP);
3002 while ((thread->active) && (sv->sock->active()))
3004 if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3006 ClientSocket *cs = sv->sock->accept();
3009 LOG_DEBUG("accepted incoming");
3010 Servent *ns = servMgr->allocServent();
3013 servMgr->lastIncoming = sys->getTime();
3014 ns->servPort = sv->sock->host.port;
3015 ns->networkID = servMgr->networkID;
3016 ns->initIncoming(cs,sv->allow);
3018 LOG_ERROR("Out of servents");
3023 }catch(StreamException &e)
3025 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3029 LOG_DEBUG("Server stopped");
3032 sys->endThread(thread);
3036 // -----------------------------------
3037 bool Servent::writeVariable(Stream &s, const String &var)
3042 strcpy(buf,getTypeStr());
3043 else if (var == "status")
3044 strcpy(buf,getStatusStr());
3045 else if (var == "address")
3047 if (servMgr->enableGetName) //JP-EX s
3049 getHost().toStr(buf);
3054 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3056 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3058 ChanHitList *chl = &chanMgr->hitlists[i];
3060 hits[numHits++] = chl;
3063 ishit = isfw = false;
3067 for(int k=0; k<numHits; k++)
3069 ChanHitList *chl = hits[k];
3072 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3074 ChanHit *hit = &chl->hits[j];
3075 if (hit->host.isValid() && (h.ip == hit->host.ip))
3078 if (hit->firewalled)
3080 numRelay += hit->numRelays;
3092 strcat(buf,"<font color=red>");
3094 strcat(buf,"<font color=orange>");
3097 strcat(buf,"<font color=green>");
3101 if (ClientSocket::getHostname(h_name,h.ip))
3109 strcat(buf,"</font>");
3115 bool isRelay = true;
3117 ChanHitList *chl = chanMgr->findHitListByID(chanID);
3119 ChanHit *hit = chl->hit;
3121 if (hit->host.isValid() && (h.ip == hit->host.ip)){
3122 isfw = hit->firewalled;
3123 isRelay = hit->relay;
3124 numRelay = hit->numRelays;
3133 strcat(buf,"<font color=red>");
3135 strcat(buf,"<font color=orange>");
3140 strcpy(buf,"<font color=purple>");
3142 strcpy(buf,"<font color=blue>");
3145 strcpy(buf,"<font color=green>");
3150 if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF
\91Î
\8dô)
3156 strcat(buf,"</font>");
3159 getHost().toStr(buf);
3161 else if (var == "agent")
3162 strcpy(buf,agent.cstr());
3163 else if (var == "bitrate")
3167 unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3168 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3171 }else if (var == "uptime")
3175 uptime.setFromStopwatch(sys->getTime()-lastConnect);
3178 strcpy(buf,uptime.cstr());
3179 }else if (var.startsWith("gnet."))
3182 float ctime = (float)(sys->getTime()-lastConnect);
3183 if (var == "gnet.packetsIn")
3184 sprintf(buf,"%d",gnuStream.packetsIn);
3185 else if (var == "gnet.packetsInPerSec")
3186 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3187 else if (var == "gnet.packetsOut")
3188 sprintf(buf,"%d",gnuStream.packetsOut);
3189 else if (var == "gnet.packetsOutPerSec")
3190 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3191 else if (var == "gnet.normQueue")
3192 sprintf(buf,"%d",outPacketsNorm.numPending());
3193 else if (var == "gnet.priQueue")
3194 sprintf(buf,"%d",outPacketsPri.numPending());
3195 else if (var == "gnet.flowControl")
3196 sprintf(buf,"%d",flowControl?1:0);
3197 else if (var == "gnet.routeTime")
3199 int nr = seenIDs.numUsed();
3200 unsigned int tim = sys->getTime()-seenIDs.getOldest();
3203 tstr.setFromStopwatch(tim);
3206 strcpy(buf,tstr.cstr());