1 // ------------------------------------------------
6 // Servents are the actual connections between clients. They do the handshaking,
7 // transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 // to it on connect, it uses this to transfer all of its data.
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
44 const int DIRECT_WRITE_TIMEOUT = 60;
46 // -----------------------------------
47 char *Servent::statusMsgs[]=
64 // -----------------------------------
65 char *Servent::typeMsgs[]=
76 // -----------------------------------
77 bool Servent::isPrivate()
80 return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
82 // -----------------------------------
83 bool Servent::isAllowed(int a)
87 if (servMgr->isFiltered(ServFilter::F_BAN,h))
93 // -----------------------------------
94 bool Servent::isFiltered(int f)
97 return servMgr->isFiltered(f,h);
100 int servent_count = 1;
101 // -----------------------------------
102 Servent::Servent(int index)
103 :outPacketsPri(MAX_OUTPACKETS)
104 ,outPacketsNorm(MAX_OUTPACKETS)
111 servent_id = servent_count++;
117 // -----------------------------------
122 // -----------------------------------
125 thread.active = false;
127 setStatus(S_CLOSING);
131 PCPStream *pcp = pcpStream;
137 chanMgr->hitlistlock.on();
138 ChanHitList *chl = chanMgr->findHitListByID(chanID);
140 ChanHit *chh = chl->hit;
141 ChanHit *prev = NULL;
143 if (chh->servent_id == this->servent_id){
144 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
146 chh->numListeners = 0;
151 ChanHit *next = chh->next;
166 chanMgr->hitlistlock.off();
184 if (type != T_SERVER)
191 // -----------------------------------
192 void Servent::abort()
194 thread.active = false;
202 // -----------------------------------
203 void Servent::reset()
217 outputProtocol = ChanInfo::SP_UNKNOWN;
226 lastConnect = lastPing = lastPacket = 0;
227 loginPassword.clear();
230 priorityConnect = false;
234 outPacketsNorm.reset();
235 outPacketsPri.reset();
244 // -----------------------------------
245 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
250 && (!cid.isSet() || chanID.isSame(cid))
251 && (!sid.isSet() || !sid.isSame(remoteID))
252 && (pcpStream != NULL)
255 return pcpStream->sendPacket(pack,did);
261 // -----------------------------------
262 bool Servent::acceptGIV(ClientSocket *givSock)
272 // -----------------------------------
273 Host Servent::getHost()
283 // -----------------------------------
284 bool Servent::outputPacket(GnuPacket &p, bool pri)
290 r = outPacketsPri.write(p);
293 if (servMgr->useFlowControl)
295 int per = outPacketsNorm.percentFull();
306 // if in flowcontrol, only allow packets with less of a hop count than already in queue
307 if (p.hops >= outPacketsNorm.findMinHop())
312 r = outPacketsNorm.write(p);
319 // -----------------------------------
320 bool Servent::initServer(Host &h)
334 thread.func = serverProc;
338 if (!sys->startThread(&thread))
339 throw StreamException("Can`t start thread");
341 }catch(StreamException &e)
343 LOG_ERROR("Bad server: %s",e.msg);
350 // -----------------------------------
351 void Servent::checkFree()
354 throw StreamException("Socket already set");
356 throw StreamException("Thread already active");
358 // -----------------------------------
359 void Servent::initIncoming(ClientSocket *s, unsigned int a)
370 thread.func = incomingProc;
371 thread.finish = false;
373 setStatus(S_PROTOCOL);
376 sock->host.toStr(ipStr);
377 LOG_DEBUG("Incoming from %s",ipStr);
380 if (!sys->startThread(&thread))
381 throw StreamException("Can`t start thread");
382 }catch(StreamException &e)
384 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
385 //servMgr->shutdownTimer = 1;
388 LOG_ERROR("INCOMING FAILED: %s",e.msg);
393 // -----------------------------------
394 void Servent::initOutgoing(TYPE ty)
404 thread.func = outgoingProc;
406 if (!sys->startThread(&thread))
407 throw StreamException("Can`t start thread");
409 }catch(StreamException &e)
411 LOG_ERROR("Unable to start outgoing: %s",e.msg);
416 // -----------------------------------
417 void Servent::initPCP(Host &rh)
431 if (!isAllowed(ALLOW_NETWORK))
432 throw StreamException("Servent not allowed");
435 thread.func = outgoingProc;
437 LOG_DEBUG("Outgoing to %s",ipStr);
439 if (!sys->startThread(&thread))
440 throw StreamException("Can`t start thread");
442 }catch(StreamException &e)
444 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
450 // -----------------------------------
451 void Servent::initChannelFetch(Host &host)
465 if (!isAllowed(ALLOW_DATA))
466 throw StreamException("Servent not allowed");
472 // -----------------------------------
473 void Servent::initGIV(Host &h, GnuID &id)
487 if (!isAllowed(ALLOW_NETWORK))
488 throw StreamException("Servent not allowed");
493 thread.func = givProc;
497 if (!sys->startThread(&thread))
498 throw StreamException("Can`t start thread");
500 }catch(StreamException &e)
502 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
506 // -----------------------------------
507 void Servent::createSocket()
510 LOG_ERROR("Servent::createSocket attempt made while active");
512 sock = sys->createSocket();
514 // -----------------------------------
515 void Servent::setStatus(STATUS s)
521 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
522 lastConnect = sys->getTime();
527 // -----------------------------------
528 void Servent::handshakeOut()
530 sock->writeLine(GNU_PEERCONN);
534 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
535 sock->writeLineF("%s %d",PCX_HS_PCP,1);
538 sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
540 if (networkID.isSet())
542 networkID.toStr(str);
543 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
546 servMgr->sessionID.toStr(str);
547 sock->writeLineF("%s %s",PCX_HS_ID,str);
550 sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
556 int r = http.readResponse();
560 LOG_ERROR("Expected 200, got %d",r);
561 throw StreamException("Unexpected HTTP response");
565 bool versionValid = false;
570 while (http.nextHeader())
572 LOG_DEBUG(http.cmdLine);
574 char *arg = http.getArgStr();
578 if (http.isHeader(HTTP_HS_AGENT))
582 if (strnicmp(arg,"PeerCast/",9)==0)
583 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
584 }else if (http.isHeader(PCX_HS_NETWORKID))
585 clientID.fromStr(arg);
588 if (!clientID.isSame(networkID))
589 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
592 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
595 sock->writeLine(GNU_OK);
601 // -----------------------------------
602 void Servent::processOutChannel()
607 // -----------------------------------
608 void Servent::handshakeIn()
616 bool versionValid = false;
617 bool diffRootVer = false;
622 while (http.nextHeader())
624 LOG_DEBUG("%s",http.cmdLine);
626 char *arg = http.getArgStr();
630 if (http.isHeader(HTTP_HS_AGENT))
634 if (strnicmp(arg,"PeerCast/",9)==0)
636 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
637 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
639 }else if (http.isHeader(PCX_HS_NETWORKID))
641 clientID.fromStr(arg);
643 }else if (http.isHeader(PCX_HS_PRIORITY))
645 priorityConnect = atoi(arg)!=0;
647 }else if (http.isHeader(PCX_HS_ID))
651 if (id.isSame(servMgr->sessionID))
652 throw StreamException("Servent loopback");
654 }else if (http.isHeader(PCX_HS_OS))
656 if (stricmp(arg,PCX_OS_LINUX)==0)
658 else if (stricmp(arg,PCX_OS_WIN32)==0)
660 else if (stricmp(arg,PCX_OS_MACOSX)==0)
662 else if (stricmp(arg,PCX_OS_WINAMP2)==0)
668 if (!clientID.isSame(networkID))
669 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
671 // if this is a priority connection and all incoming connections
672 // are full then kill an old connection to make room. Otherwise reject connection.
673 //if (!priorityConnect)
676 if (servMgr->pubInFull())
677 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
681 throw HTTPException(HTTP_SC_FORBIDDEN,403);
683 sock->writeLine(GNU_OK);
685 sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
687 if (networkID.isSet())
690 networkID.toStr(idStr);
691 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
696 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
697 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
698 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
699 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
700 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
705 sock->writeString(PCX_HS_DL);
706 sock->writeLine(PCX_DL_URL);
709 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
716 sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
722 while (http.nextHeader());
725 // -----------------------------------
726 bool Servent::pingHost(Host &rhost,GnuID &rsid)
730 LOG_DEBUG("Ping host %s: trying..",ipstr);
731 ClientSocket *s=NULL;
735 s = sys->createSocket();
741 s->setReadTimeout(15000);
742 s->setWriteTimeout(15000);
748 atom.writeInt(PCP_CONNECT,1);
749 atom.writeParent(PCP_HELO,1);
750 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
756 ID4 id = atom.read(numc,numd);
759 for(int i=0; i<numc; i++)
762 ID4 pid = atom.read(c,d);
763 if (pid == PCP_SESSIONID)
764 atom.readBytes(sid.id,16,d);
770 LOG_DEBUG("Ping response: %s",id.getString().str());
771 throw StreamException("Bad ping response");
774 if (!sid.isSame(rsid))
775 throw StreamException("SIDs don`t match");
778 LOG_DEBUG("Ping host %s: OK",ipstr);
779 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
783 }catch(StreamException &e)
785 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
801 // -----------------------------------
802 bool Servent::handshakeStream(ChanInfo &chanInfo)
809 unsigned int reqPos=0;
813 while (http.nextHeader())
815 char *arg = http.getArgStr();
819 if (http.isHeader(PCX_HS_PCP))
820 gotPCP = atoi(arg)!=0;
821 else if (http.isHeader(PCX_HS_POS))
823 else if (http.isHeader("icy-metadata"))
824 addMetadata = atoi(arg) > 0;
825 else if (http.isHeader(HTTP_HS_AGENT))
827 else if (http.isHeader("Pragma"))
829 char *ssc = stristr(arg,"stream-switch-count=");
830 char *so = stristr(arg,"stream-offset");
835 //nsSwitchNum = atoi(ssc+20);
839 LOG_DEBUG("Stream: %s",http.cmdLine);
843 if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
844 outputProtocol = ChanInfo::SP_PEERCAST;
846 if (outputProtocol == ChanInfo::SP_HTTP)
848 if ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
849 || (chanInfo.contentType == ChanInfo::T_WMA)
850 || (chanInfo.contentType == ChanInfo::T_WMV)
851 || (chanInfo.contentType == ChanInfo::T_ASX)
853 outputProtocol = ChanInfo::SP_MMS;
857 bool chanFound=false;
858 bool chanReady=false;
860 ChanHit *sourceHit = NULL;
862 Channel *ch = chanMgr->findChannelByID(chanInfo.id);
866 if (reqPos || !isIndexTxt(&chanInfo))
868 streamPos = ch->rawData.findOldestPos(reqPos);
869 //streamPos = ch->rawData.getLatestPos();
872 streamPos = ch->rawData.getLatestPos();
875 chanID = chanInfo.id;
877 chanReady = canStream(ch);
880 if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
882 sourceHit = &ch->sourceHost; // send source host info
884 if (ch->info.getUptime() > 60) // if stable
886 // connect "this" host later
890 nh.rhost[0] = getHost();
895 getHost().toStr(tmp);
896 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
899 else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
901 chanReady = canStream(ch);
903 LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
906 if (!chanReady) type = T_INCOMING;
907 thread.active = chanReady;
908 setStatus(S_CONNECTED);
910 channel_id = ch->channel_id;
913 if (servMgr->isCheckPushStream())
915 if (chanReady == true)
919 if (!h.isLocalhost())
923 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL)
927 LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
932 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
934 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
936 ChanHitList *chl = &chanMgr->hitlists[i];
938 hits[numHits++] = chl;
942 for(int i=0; i<numHits; i++)
944 ChanHitList *chl = hits[i];
947 for (int j=0; j<ChanHitList::MAX_HITS; j++)
949 ChanHit *hit = &chl->hits[j];
950 if (hit->host.isValid() && (h.ip == hit->host.ip))
954 numRelay = hit->numRelays;
959 if ((isfw == true) && (numRelay == 0))
963 LOG_ERROR("Block firewalled Servent : %s",strip);
967 ChanHitList *chl = chanMgr->findHitList(chanInfo);
968 ChanHit *hit = chl->hit;
970 if (hit->host.isValid() && (h.ip == hit->host.ip))
972 if ((hit->firewalled) && (hit->numRelays == 0)){
975 LOG_ERROR("Block firewalled Servent : %s",strip);
988 // LockBlock lockblock(chanMgr->hitlistlock);
990 // lockblock.lockon();
991 ChanHitList *chl = chanMgr->findHitList(chanInfo);
1002 chanInfo.id.toStr(idStr);
1005 servMgr->sessionID.toStr(sidStr);
1007 Host rhost = sock->host;
1012 AtomStream atom(*sock);
1018 sock->writeLine(HTTP_SC_NOTFOUND);
1019 sock->writeLine("");
1020 LOG_DEBUG("Sending channel not found");
1027 if (outputProtocol == ChanInfo::SP_PCP)
1031 MemoryStream mem(tbuf, sizeof(tbuf));
1032 mem.writeLine(HTTP_SC_UNAVAILABLE);
1033 mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1035 sock->write(tbuf, mem.getPosition());
1037 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1040 rhost.toStr(ripStr);
1042 LOG_DEBUG("Sending channel unavailable");
1047 AtomStream atom2(mem);
1049 int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1051 chanMgr->hitlistlock.on();
1053 chl = chanMgr->findHitList(chanInfo);
1059 // search for up to 8 other hits
1067 // find best hit this network if local IP
1068 if (!rhost.globalIP())
1071 chs.matchHost = servMgr->serverHost;
1073 chs.excludeID = remoteID;
1074 if (chl->pickHits(chs)){
1076 LOG_DEBUG("find best hit this network if local IP");
1080 // find best hit on same network
1084 chs.matchHost = rhost;
1086 chs.excludeID = remoteID;
1087 if (chl->pickHits(chs)){
1089 LOG_DEBUG("find best hit on same network");
1094 // find best hit on other networks
1095 /* if (!best.host.ip)
1099 chs.excludeID = remoteID;
1100 if (chl->pickHits(chs)){
1102 LOG_DEBUG("find best hit on other networks");
1110 best.writeAtoms(atom2,chanInfo.id);
1116 sourceHit->writeAtoms(atom2, chanInfo.id);
1117 chs.best[i].host.toStr(tmp);
1118 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1119 best.host.ip = sourceHit->host.ip;
1124 // chanMgr->hitlistlock.on();
1125 int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1126 // chanMgr->hitlistlock.off();
1127 for (int i = 0; i < cnt; i++){
1128 chs.best[i].writeAtoms(atom2, chanInfo.id);
1129 chs.best[i].host.toStr(tmp);
1130 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1131 best.host.ip = chs.best[i].host.ip;
1137 LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1140 else if (rhost.port)
1142 // find firewalled host
1145 chs.useFirewalled = true;
1146 chs.excludeID = remoteID;
1147 if (chl->pickHits(chs))
1150 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1151 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1155 // if all else fails, use tracker
1158 // find best tracker on this network if local IP
1159 if (!rhost.globalIP())
1162 chs.matchHost = servMgr->serverHost;
1163 chs.trackersOnly = true;
1164 chs.excludeID = remoteID;
1165 if (chl->pickHits(chs))
1170 // find local tracker
1174 chs.matchHost = rhost;
1175 chs.trackersOnly = true;
1176 chs.excludeID = remoteID;
1177 if (chl->pickHits(chs))
1181 // find global tracker
1185 chs.trackersOnly = true;
1186 chs.excludeID = remoteID;
1187 if (chl->pickHits(chs))
1193 best.writeAtoms(atom2,chanInfo.id);
1194 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1195 }else if (rhost.port)
1197 // find firewalled tracker
1199 chs.useFirewalled = true;
1200 chs.trackersOnly = true;
1201 chs.excludeID = remoteID;
1203 if (chl->pickHits(chs))
1206 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1207 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1216 chanMgr->hitlistlock.off();
1218 // return not available yet code
1219 atom2.writeInt(PCP_QUIT,error);
1220 sock->write(tbuf, mem.getPosition());
1225 // wait disconnect from other host
1227 while(sock->read(c, sizeof(c))){
1230 }catch(StreamException &e){
1231 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1236 LOG_DEBUG("Sending channel unavailable");
1237 sock->writeLine(HTTP_SC_UNAVAILABLE);
1238 sock->writeLine("");
1244 if (chanInfo.contentType != ChanInfo::T_MP3)
1247 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP)) // winamp mp3 metadata check
1250 sock->writeLine(ICY_OK);
1252 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1253 sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1254 sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1255 sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1256 sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1257 sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1258 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1260 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1265 sock->writeLine(HTTP_SC_OK);
1267 if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1269 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1271 sock->writeLine("Accept-Ranges: none");
1273 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1274 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1275 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1276 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1277 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1278 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1282 if (outputProtocol == ChanInfo::SP_HTTP)
1284 switch (chanInfo.contentType)
1286 case ChanInfo::T_OGG:
1287 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1289 case ChanInfo::T_MP3:
1290 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1292 case ChanInfo::T_MOV:
1293 sock->writeLine("Connection: close");
1294 sock->writeLine("Content-Length: 10000000");
1295 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1297 case ChanInfo::T_MPG:
1298 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1300 case ChanInfo::T_NSV:
1301 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1303 case ChanInfo::T_ASX:
1304 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1306 case ChanInfo::T_WMA:
1307 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1309 case ChanInfo::T_WMV:
1310 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1313 } else if (outputProtocol == ChanInfo::SP_MMS)
1315 sock->writeLine("Server: Rex/9.0.0.2980");
1316 sock->writeLine("Cache-Control: no-cache");
1317 sock->writeLine("Pragma: no-cache");
1318 sock->writeLine("Pragma: client-id=3587303426");
1319 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1323 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1326 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1328 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1329 sock->writeLine("Connection: Keep-Alive");
1332 } else if (outputProtocol == ChanInfo::SP_PCP)
1334 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1335 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1337 }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1339 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1342 sock->writeLine("");
1347 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1348 atom.writeInt(PCP_OK,0);
1358 // -----------------------------------
1359 void Servent::handshakeGiv(GnuID &id)
1365 sock->writeLineF("GIV /%s",idstr);
1367 sock->writeLine("GIV");
1369 sock->writeLine("");
1373 // -----------------------------------
1374 void Servent::processGnutella()
1378 //if (servMgr->isRoot && !servMgr->needConnections())
1379 if (servMgr->isRoot)
1387 gnuStream.init(sock);
1388 setStatus(S_CONNECTED);
1390 if (!servMgr->isRoot)
1392 chanMgr->broadcastRelays(this, 1, 1);
1395 if ((p=outPacketsNorm.curr()))
1396 gnuStream.sendPacket(*p);
1402 // if (type != T_LOOKUP)
1403 // chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1405 lastPacket = lastPing = sys->getTime();
1406 bool doneBigPing=false;
1408 const unsigned int abortTimeoutSecs = 60; // abort connection after 60 secs of no activitiy
1409 const unsigned int packetTimeoutSecs = 30; // ping connection after 30 secs of no activity
1411 unsigned int currBytes=0;
1412 unsigned int lastWait=0;
1414 unsigned int lastTotalIn=0,lastTotalOut=0;
1416 while (thread.active && sock->active())
1419 if (sock->readReady())
1421 lastPacket = sys->getTime();
1423 if (gnuStream.readPacket(pack))
1426 sock->host.toStr(ipstr);
1429 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1431 if (pack.func != GNU_FUNC_PONG)
1432 if (servMgr->seenPacket(pack))
1433 ret = GnuStream::R_DUPLICATE;
1435 seenIDs.add(pack.id);
1438 if (ret == GnuStream::R_PROCESS)
1441 ret = gnuStream.processPacket(pack,this,routeID);
1443 if (flowControl && (ret == GnuStream::R_BROADCAST))
1444 ret = GnuStream::R_DROP;
1450 case GnuStream::R_BROADCAST:
1451 if (servMgr->broadcast(pack,this))
1452 stats.add(Stats::NUMBROADCASTED);
1454 stats.add(Stats::NUMDROPPED);
1456 case GnuStream::R_ROUTE:
1457 if (servMgr->route(pack,routeID,NULL))
1458 stats.add(Stats::NUMROUTED);
1460 stats.add(Stats::NUMDROPPED);
1462 case GnuStream::R_ACCEPTED:
1463 stats.add(Stats::NUMACCEPTED);
1465 case GnuStream::R_DUPLICATE:
1466 stats.add(Stats::NUMDUP);
1468 case GnuStream::R_DEAD:
1469 stats.add(Stats::NUMDEAD);
1471 case GnuStream::R_DISCARD:
1472 stats.add(Stats::NUMDISCARDED);
1474 case GnuStream::R_BADVERSION:
1475 stats.add(Stats::NUMOLD);
1477 case GnuStream::R_DROP:
1478 stats.add(Stats::NUMDROPPED);
1483 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1488 LOG_ERROR("Bad packet");
1495 if ((p=outPacketsPri.curr())) // priority packet
1497 gnuStream.sendPacket(*p);
1499 outPacketsPri.next();
1500 } else if ((p=outPacketsNorm.curr())) // or.. normal packet
1502 gnuStream.sendPacket(*p);
1504 outPacketsNorm.next();
1507 int lpt = sys->getTime()-lastPacket;
1511 if ((sys->getTime()-lastPing) > 15)
1514 lastPing = sys->getTime();
1518 if (lpt > packetTimeoutSecs)
1521 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1524 lastPing = sys->getTime();
1529 if (lpt > abortTimeoutSecs)
1530 throw TimeoutException();
1533 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1534 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1536 unsigned int bytes = totIn+totOut;
1538 lastTotalIn = sock->totalBytesIn;
1539 lastTotalOut = sock->totalBytesOut;
1541 const int serventBandwidth = 1000;
1543 int delay = sys->idleSleepTime;
1544 if ((bytes) && (serventBandwidth >= 8))
1545 delay = (bytes*1000)/(serventBandwidth/8); // set delay relative packetsize
1547 if (delay < (int)sys->idleSleepTime)
1548 delay = sys->idleSleepTime;
1549 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1557 // -----------------------------------
1558 void Servent::processRoot()
1563 gnuStream.init(sock);
1564 setStatus(S_CONNECTED);
1568 unsigned int lastConnect = sys->getTime();
1570 while (thread.active && sock->active())
1572 if (gnuStream.readPacket(pack))
1575 sock->host.toStr(ipstr);
1577 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1580 if (pack.func == GNU_FUNC_PING) // if ping then pong back some hosts and close
1584 int cnt = servMgr->getNewestServents(hl,32,sock->host);
1587 int start = sys->rnd() % cnt;
1588 int max = cnt>8?8:cnt;
1590 for(int i=0; i<max; i++)
1594 pong.initPong(hl[start],false,pack);
1595 gnuStream.sendPacket(pong);
1598 hl[start].toStr(ipstr);
1600 //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1601 start = (start+1) % cnt;
1604 sock->host.toStr(str);
1605 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1608 LOG_NETWORK("No Pongs to send");
1611 }else if (pack.func == GNU_FUNC_PONG) // pong?
1613 MemoryStream pong(pack.data,pack.len);
1616 port = pong.readShort();
1617 ip = pong.readLong();
1622 if ((ip) && (port) && (h.globalIP()))
1625 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1626 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1629 } else if (pack.func == GNU_FUNC_HIT)
1631 MemoryStream data(pack.data,pack.len);
1633 gnuStream.readHit(data,hit,pack.hops,pack.id);
1636 //if (gnuStream.packetsIn > 5) // die if we get too many packets
1640 if((sys->getTime()-lastConnect > 60))
1645 }catch(StreamException &e)
1647 LOG_ERROR("Relay: %s",e.msg);
1653 // -----------------------------------
1654 int Servent::givProc(ThreadInfo *thread)
1657 Servent *sv = (Servent*)thread->data;
1660 sv->handshakeGiv(sv->givID);
1661 sv->handshakeIncoming();
1663 }catch(StreamException &e)
1665 LOG_ERROR("GIV: %s",e.msg);
1669 sys->endThread(thread);
1673 // -----------------------------------
1674 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1677 bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1678 bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1680 bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1683 MemoryStream mem(tbuf, sizeof(tbuf));
1684 AtomStream atom2(mem);
1685 atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1686 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1687 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1688 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1690 atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1692 atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1694 atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1695 atom.io.write(tbuf, mem.getPosition());
1698 LOG_DEBUG("PCP outgoing waiting for OLEH..");
1701 ID4 id = atom.read(numc,numd);
1704 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1705 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1706 throw StreamException("Got unexpected PCP response");
1721 // read OLEH response
1722 for(int i=0; i<numc; i++)
1725 ID4 id = atom.read(c,dlen);
1727 if (id == PCP_HELO_AGENT)
1729 atom.readString(arg,sizeof(arg),dlen);
1732 }else if (id == PCP_HELO_REMOTEIP)
1734 thisHost.ip = atom.readInt();
1736 }else if (id == PCP_HELO_PORT)
1738 thisHost.port = atom.readShort();
1740 }else if (id == PCP_HELO_VERSION)
1742 version = atom.readInt();
1744 }else if (id == PCP_HELO_DISABLE)
1746 disable = atom.readInt();
1748 }else if (id == PCP_HELO_SESSIONID)
1750 atom.readBytes(rid.id,16);
1751 if (rid.isSame(servMgr->sessionID))
1752 throw StreamException("Servent loopback");
1756 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1763 // update server ip/firewall status
1766 if (thisHost.isValid())
1768 if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1771 thisHost.toStr(ipstr);
1772 LOG_DEBUG("Got new ip: %s",ipstr);
1773 servMgr->serverHost.ip = thisHost.ip;
1776 if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1778 if (thisHost.port && thisHost.globalIP())
1779 servMgr->setFirewall(ServMgr::FW_OFF);
1781 servMgr->setFirewall(ServMgr::FW_ON);
1787 LOG_ERROR("client disabled: %d",disable);
1788 servMgr->isDisabled = true;
1791 servMgr->isDisabled = false;
1799 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1800 throw StreamException("Remote host not identified");
1803 LOG_DEBUG("PCP Outgoing handshake complete.");
1807 // -----------------------------------
1808 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1811 ID4 id = atom.read(numc,numd);
1816 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1817 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1818 throw StreamException("Got unexpected PCP response");
1837 for(int i=0; i<numc; i++)
1841 ID4 id = atom.read(c,dlen);
1843 if (id == PCP_HELO_AGENT)
1845 atom.readString(arg,sizeof(arg),dlen);
1848 }else if (id == PCP_HELO_VERSION)
1850 version = atom.readInt();
1852 }else if (id == PCP_HELO_SESSIONID)
1854 atom.readBytes(rid.id,16);
1855 if (rid.isSame(servMgr->sessionID))
1856 throw StreamException("Servent loopback");
1858 }else if (id == PCP_HELO_BCID)
1860 atom.readBytes(bcID.id,16);
1862 }else if (id == PCP_HELO_OSTYPE)
1864 osType = atom.readInt();
1865 }else if (id == PCP_HELO_PORT)
1867 rhost.port = atom.readShort();
1868 }else if (id == PCP_HELO_PING)
1870 pingPort = atom.readShort();
1873 LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1880 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1883 if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1884 rhost.ip = servMgr->serverHost.ip;
1889 rhost.toStr(ripStr);
1890 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1891 rhost.port = pingPort;
1892 if (!rhost.globalIP() || !pingHost(rhost,rid))
1896 if (servMgr->isRoot)
1900 if (bcID.getFlags() & 1) // private
1902 BCID *bcid = servMgr->findValidBCID(bcID);
1903 if (!bcid || (bcid && !bcid->valid))
1905 atom.writeParent(PCP_OLEH,1);
1906 atom.writeInt(PCP_HELO_DISABLE,1);
1907 throw StreamException("Client is banned");
1915 MemoryStream mem(tbuf, sizeof(tbuf));
1916 AtomStream atom2(mem);
1917 atom2.writeParent(PCP_OLEH,5);
1918 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1919 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1920 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1921 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1922 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1926 if (version < PCP_CLIENT_MINVERSION)
1928 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1929 atom.io.write(tbuf, mem.getPosition());
1930 throw StreamException("Agent is not valid");
1936 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1937 atom.io.write(tbuf, mem.getPosition());
1938 throw StreamException("Remote host not identified");
1943 if (servMgr->isRoot)
1945 servMgr->writeRootAtoms(atom2,false);
1948 atom.io.write(tbuf, mem.getPosition());
1950 LOG_DEBUG("PCP Incoming handshake complete.");
1954 // -----------------------------------
1955 void Servent::processIncomingPCP(bool suggestOthers)
1957 PCPStream::readVersion(*sock);
1960 AtomStream atom(*sock);
1961 Host rhost = sock->host;
1963 handshakeIncomingPCP(atom,rhost,remoteID,agent);
1966 bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1967 || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1968 bool unavailable = servMgr->controlInFull();
1969 bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1974 if (unavailable || alreadyConnected || offair)
1978 if (alreadyConnected)
1979 error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
1980 else if (unavailable)
1981 error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1983 error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
1985 error = PCP_ERROR_QUIT;
1995 for(int i=0; i<8; i++)
1999 // find best hit on this network
2000 if (!rhost.globalIP())
2003 chs.matchHost = servMgr->serverHost;
2005 chs.excludeID = remoteID;
2006 chs.trackersOnly = true;
2007 chs.useBusyControls = false;
2008 if (chanMgr->pickHits(chs))
2012 // find best hit on same network
2016 chs.matchHost = rhost;
2018 chs.excludeID = remoteID;
2019 chs.trackersOnly = true;
2020 chs.useBusyControls = false;
2021 if (chanMgr->pickHits(chs))
2025 // else find best hit on other networks
2030 chs.excludeID = remoteID;
2031 chs.trackersOnly = true;
2032 chs.useBusyControls = false;
2033 if (chanMgr->pickHits(chs))
2042 best.writeAtoms(atom,noID);
2047 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2049 else if (rhost.port)
2051 // send push request to best firewalled tracker on other network
2054 chs.excludeID = remoteID;
2055 chs.trackersOnly = true;
2056 chs.useFirewalled = true;
2057 chs.useBusyControls = false;
2058 if (chanMgr->pickHits(chs))
2063 int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2064 LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2068 LOG_DEBUG("No available trackers");
2073 LOG_ERROR("Sending QUIT to incoming: %d",error);
2075 atom.writeInt(PCP_QUIT,error);
2081 setStatus(S_CONNECTED);
2083 atom.writeInt(PCP_OK,0);
2086 atom.writeParent(PCP_ROOT,1);
2087 atom.writeParent(PCP_ROOT_UPDATE,0);
2089 pcpStream = new PCPStream(remoteID);
2093 while (!error && thread.active && !sock->eof())
2095 error = pcpStream->readPacket(*sock,bcs);
2098 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2099 error = PCP_ERROR_OFFAIR;
2100 if (peercastInst->isQuitting)
2101 error = PCP_ERROR_SHUTDOWN;
2104 pcpStream->flush(*sock);
2106 error += PCP_ERROR_QUIT;
2107 atom.writeInt(PCP_QUIT,error);
2109 LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2113 // -----------------------------------
2114 int Servent::outgoingProc(ThreadInfo *thread)
2117 LOG_DEBUG("COUT started");
2119 Servent *sv = (Servent*)thread->data;
2123 sv->pcpStream = new PCPStream(noID);
2125 while (sv->thread.active)
2127 sv->setStatus(S_WAIT);
2129 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2139 if (servMgr->rootHost.isEmpty())
2144 sv->sock = sv->pushSock;
2145 sv->pushSock = NULL;
2146 bestHit.host = sv->sock->host;
2152 ChanHitList *chl = chanMgr->findHitListByID(noID);
2155 // find local tracker
2157 chs.matchHost = servMgr->serverHost;
2158 chs.waitDelay = MIN_TRACKER_RETRY;
2159 chs.excludeID = servMgr->sessionID;
2160 chs.trackersOnly = true;
2161 if (!chl->pickHits(chs))
2163 // else find global tracker
2165 chs.waitDelay = MIN_TRACKER_RETRY;
2166 chs.excludeID = servMgr->sessionID;
2167 chs.trackersOnly = true;
2173 bestHit = chs.best[0];
2178 unsigned int ctime = sys->getTime();
2180 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2182 bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2184 chanMgr->lastYPConnect = ctime;
2188 }while (!bestHit.host.ip && (sv->thread.active));
2191 if (!bestHit.host.ip) // give up
2193 LOG_ERROR("COUT giving up");
2198 bestHit.host.toStr(ipStr);
2204 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2208 sv->setStatus(S_CONNECTING);
2209 sv->sock = sys->createSocket();
2211 throw StreamException("Unable to create socket");
2212 sv->sock->open(bestHit.host);
2213 sv->sock->connect();
2217 sv->sock->setReadTimeout(30000);
2218 AtomStream atom(*sv->sock);
2220 sv->setStatus(S_HANDSHAKE);
2222 Host rhost = sv->sock->host;
2223 atom.writeInt(PCP_CONNECT,1);
2224 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2226 sv->setStatus(S_CONNECTED);
2228 LOG_DEBUG("COUT to %s: OK",ipStr);
2230 sv->pcpStream->init(sv->remoteID);
2233 bcs.servent_id = sv->servent_id;
2235 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2237 error = sv->pcpStream->readPacket(*sv->sock,bcs);
2241 if (!chanMgr->isBroadcasting())
2242 error = PCP_ERROR_OFFAIR;
2243 if (peercastInst->isQuitting)
2244 error = PCP_ERROR_SHUTDOWN;
2246 if (sv->pcpStream->nextRootPacket)
2247 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2248 error = PCP_ERROR_NOROOT;
2250 sv->setStatus(S_CLOSING);
2252 sv->pcpStream->flush(*sv->sock);
2254 error += PCP_ERROR_QUIT;
2255 atom.writeInt(PCP_QUIT,error);
2257 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2259 }catch(TimeoutException &e)
2261 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2262 sv->setStatus(S_TIMEOUT);
2263 }catch(StreamException &e)
2265 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2266 sv->setStatus(S_ERROR);
2278 }catch(StreamException &) {}
2280 // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2281 if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2282 chanMgr->deadHit(bestHit);
2290 sys->endThread(thread);
2291 LOG_DEBUG("COUT ended");
2294 // -----------------------------------
2295 int Servent::incomingProc(ThreadInfo *thread)
2299 Servent *sv = (Servent*)thread->data;
2302 sv->sock->host.toStr(ipStr);
2306 sv->handshakeIncoming();
2307 }catch(HTTPException &e)
2311 sv->sock->writeLine(e.msg);
2313 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2314 sv->sock->writeLine("");
2315 }catch(StreamException &){}
2316 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2317 }catch(StreamException &e)
2319 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2324 sys->endThread(thread);
2327 // -----------------------------------
2328 void Servent::processServent()
2330 setStatus(S_HANDSHAKE);
2335 throw StreamException("Servent has no socket");
2340 // -----------------------------------
2341 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2345 setStatus(S_HANDSHAKE);
2347 if (!handshakeStream(chanInfo))
2351 if (chanInfo.id.isSet())
2354 chanID = chanInfo.id;
2356 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2358 if (!waitForChannelHeader(chanInfo))
2359 throw StreamException("Channel not ready");
2361 servMgr->totalStreams++;
2363 Host host = sock->host;
2364 host.port = 0; // force to 0 so we ignore the incoming port
2366 Channel *ch = chanMgr->findChannelByID(chanID);
2368 throw StreamException("Channel not found");
2370 if (outputProtocol == ChanInfo::SP_HTTP)
2372 if ((addMetadata) && (chanMgr->icyMetaInterval))
2373 sendRawMetaChannel(chanMgr->icyMetaInterval);
2375 sendRawChannel(true,true);
2377 }else if (outputProtocol == ChanInfo::SP_MMS)
2381 sendRawChannel(true,true);
2384 sendRawChannel(true,false);
2387 }else if (outputProtocol == ChanInfo::SP_PCP)
2391 } else if (outputProtocol == ChanInfo::SP_PEERCAST)
2393 sendPeercastChannel();
2397 setStatus(S_CLOSING);
2400 // -----------------------------------------
2404 file.openReadOnly("c://test.mp3");
2406 LOG_DEBUG("raw file read");
2411 LOG_DEBUG("send %d",cnt++);
2412 file.read(buf,sizeof(buf));
2413 sock->write(buf,sizeof(buf));
2417 LOG_DEBUG("raw file sent");
2422 // -----------------------------------
2423 bool Servent::waitForChannelHeader(ChanInfo &info)
2425 for(int i=0; i<30*10; i++)
2427 Channel *ch = chanMgr->findChannelByID(info.id);
2431 if (ch->isPlaying() && (ch->rawData.writePos>0))
2434 if (!thread.active || !sock->active())
2440 // -----------------------------------
2441 void Servent::sendRawChannel(bool sendHead, bool sendData)
2446 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2448 Channel *ch = chanMgr->findChannelByID(chanID);
2450 throw StreamException("Channel not found");
2452 setStatus(S_CONNECTED);
2454 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2458 ch->headPack.writeRaw(*sock);
2459 streamPos = ch->headPack.pos + ch->headPack.len;
2460 LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2466 unsigned int streamIndex = ch->streamIndex;
2467 unsigned int connectTime = sys->getTime();
2468 unsigned int lastWriteTime = connectTime;
2470 while ((thread.active) && sock->active())
2472 ch = chanMgr->findChannelByID(chanID);
2477 if (streamIndex != ch->streamIndex)
2479 streamIndex = ch->streamIndex;
2480 streamPos = ch->headPack.pos;
2481 LOG_DEBUG("sendRaw got new stream index");
2485 if (ch->rawData.findPacket(streamPos,rawPack))
2487 if (syncPos != rawPack.sync)
2488 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2489 syncPos = rawPack.sync+1;
2491 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2493 rawPack.writeRaw(*sock);
2494 lastWriteTime = sys->getTime();
2497 if (rawPack.pos < streamPos)
2498 LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2499 streamPos = rawPack.pos+rawPack.len;
2500 } else if (sock->readReady()) {
2502 int error = sock->readUpto(&c, 1);
2503 if (error == 0) sock->close();
2507 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2508 throw TimeoutException();
2513 }catch(StreamException &e)
2515 LOG_ERROR("Stream channel: %s",e.msg);
2520 // -----------------------------------
2521 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2525 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2526 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2527 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2529 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2531 Channel *ch = &chanMgr->channels[i];
2532 if (ch->isPlaying())
2533 chanIDs[numChanIDs++]=ch->info.id;
2538 setStatus(S_CONNECTED);
2543 for(int i=0; i<numChanIDs; i++)
2545 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2548 LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2549 ch->headPack.writeRaw(*sock);
2550 chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2551 chanStreamIndex[i] = ch->streamIndex;
2552 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2561 unsigned int connectTime=sys->getTime();
2563 while ((thread.active) && sock->active())
2566 for(int i=1; i<numChanIDs; i++)
2568 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2571 if (chanStreamIndex[i] != ch->streamIndex)
2573 chanStreamIndex[i] = ch->streamIndex;
2574 chanStreamPos[i] = ch->headPack.pos;
2575 LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2579 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2581 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2582 rawPack.writeRaw(*sock);
2585 if (rawPack.pos < chanStreamPos[i])
2586 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2587 chanStreamPos[i] = rawPack.pos+rawPack.len;
2590 //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2600 }catch(StreamException &e)
2602 LOG_ERROR("Stream channel: %s",e.msg);
2607 // -----------------------------------
2608 void Servent::sendRawMetaChannel(int interval)
2613 Channel *ch = chanMgr->findChannelByID(chanID);
2615 throw StreamException("Channel not found");
2617 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2619 setStatus(S_CONNECTED);
2621 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2624 String lastTitle,lastURL;
2626 int lastMsgTime=sys->getTime();
2632 if ((interval > sizeof(buf)) || (interval < 1))
2633 throw StreamException("Bad ICY Meta Interval value");
2635 unsigned int connectTime = sys->getTime();
2636 unsigned int lastWriteTime = connectTime;
2638 streamPos = 0; // raw meta channel has no header (its MP3)
2640 while ((thread.active) && sock->active())
2642 ch = chanMgr->findChannelByID(chanID);
2648 if (ch->rawData.findPacket(streamPos,rawPack))
2651 if (syncPos != rawPack.sync)
2652 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2653 syncPos = rawPack.sync+1;
2655 MemoryStream mem(rawPack.data,rawPack.len);
2657 if (rawPack.type == ChanPacket::T_DATA)
2660 int len = rawPack.len;
2661 char *p = rawPack.data;
2665 if ((bufPos+rl) > interval)
2666 rl = interval-bufPos;
2667 memcpy(&buf[bufPos],p,rl);
2672 if (bufPos >= interval)
2675 sock->write(buf,interval);
2676 lastWriteTime = sys->getTime();
2678 if (chanMgr->broadcastMsgInterval)
2679 if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2682 lastMsgTime = sys->getTime();
2685 String *metaTitle = &ch->info.track.title;
2686 if (!ch->info.comment.isEmpty() && (showMsg))
2687 metaTitle = &ch->info.comment;
2690 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2699 title.convertTo(String::T_META);
2700 url.convertTo(String::T_META);
2702 sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2703 int len = ((strlen(tmp) + 15+1) / 16);
2704 sock->writeChar(len);
2705 sock->write(tmp,len*16);
2707 lastTitle = *metaTitle;
2708 lastURL = ch->info.url;
2710 LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2720 streamPos = rawPack.pos + rawPack.len;
2723 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2724 throw TimeoutException();
2729 }catch(StreamException &e)
2731 LOG_ERROR("Stream channel: %s",e.msg);
2734 // -----------------------------------
2735 void Servent::sendPeercastChannel()
2739 setStatus(S_CONNECTED);
2741 Channel *ch = chanMgr->findChannelByID(chanID);
2743 throw StreamException("Channel not found");
2745 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2747 sock->writeTag("PCST");
2751 ch->headPack.writePeercast(*sock);
2753 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2754 pack.writePeercast(*sock);
2757 unsigned int syncPos=0;
2758 while ((thread.active) && sock->active())
2760 ch = chanMgr->findChannelByID(chanID);
2765 if (ch->rawData.findPacket(streamPos,rawPack))
2767 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2769 sock->writeTag("SYNC");
2770 sock->writeShort(4);
2771 sock->writeShort(0);
2772 sock->write(&syncPos,4);
2775 rawPack.writePeercast(*sock);
2777 streamPos = rawPack.pos + rawPack.len;
2783 }catch(StreamException &e)
2785 LOG_ERROR("Stream channel: %s",e.msg);
2789 //WLock canStreamLock;
2791 // -----------------------------------
2792 void Servent::sendPCPChannel()
2794 bool skipCheck = false;
2796 Channel *ch = chanMgr->findChannelByID(chanID);
2798 throw StreamException("Channel not found");
2800 AtomStream atom(*sock);
2802 pcpStream = new PCPStream(remoteID);
2808 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2811 // setStatus(S_CONNECTED);
2813 //canStreamLock.on();
2814 //thread.active = canStream(ch);
2815 //setStatus(S_CONNECTED);
2816 //canStreamLock.off();
2823 atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2824 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2825 ch->info.writeInfoAtoms(atom);
2826 ch->info.writeTrackAtoms(atom);
2829 atom.writeParent(PCP_CHAN_PKT,3);
2830 atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2831 atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2832 atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2835 streamPos = ch->headPack.pos+ch->headPack.len;
2836 LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2840 unsigned int streamIndex = ch->streamIndex;
2843 char pbuf[ChanPacket::MAX_DATALEN*3];
2844 MemoryStream mems(pbuf,sizeof(pbuf));
2845 AtomStream atom2(mems);
2847 while (thread.active)
2850 Channel *ch = chanMgr->findChannelByID(chanID);
2855 if (streamIndex != ch->streamIndex)
2857 streamIndex = ch->streamIndex;
2858 streamPos = ch->headPack.pos;
2859 LOG_DEBUG("sendPCPStream got new stream index");
2864 if (ch->rawData.findPacket(streamPos,rawPack))
2866 if ((streamPos < rawPack.pos) && !rawPack.skip){
2869 getHost().IPtoStr(tmp);
2870 LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2872 if (sys->getTime() == lastSkipTime) {
2873 LOG_DEBUG("##### skip all buffer");
2874 streamPos = ch->rawData.getLatestPos();
2878 lastSkipTime = sys->getTime();
2885 if (rawPack.type == ChanPacket::T_HEAD)
2887 atom2.writeParent(PCP_CHAN,2);
2888 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2889 atom2.writeParent(PCP_CHAN_PKT,3);
2890 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2891 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2892 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2894 sock->write(pbuf, mems.getPosition());
2895 }else if (rawPack.type == ChanPacket::T_DATA)
2897 atom2.writeParent(PCP_CHAN,2);
2898 atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2899 atom2.writeParent(PCP_CHAN_PKT,3);
2900 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2901 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2902 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2905 sock->bufferingWrite(pbuf, mems.getPosition());
2906 lastSkipTime = sock->bufList.lastSkipTime;
2907 lastSkipCount = sock->bufList.skipCount;
2909 sock->write(pbuf, mems.getPosition());
2913 if (rawPack.pos < streamPos)
2914 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2916 //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2918 streamPos = rawPack.pos+rawPack.len;
2921 throw StreamException("Channel not found");
2925 sock->bufferingWrite(NULL, 0);
2926 lastSkipTime = sock->bufList.lastSkipTime;
2927 lastSkipCount = sock->bufList.skipCount;
2930 bcs.servent_id = servent_id;
2931 // error = pcpStream->readPacket(*sock,bcs);
2933 error = pcpStream->readPacket(*sock,bcs);
2935 throw StreamException("PCP exception");
2936 } while (sock->readReady() || pcpStream->outData.numPending());
2942 LOG_DEBUG("PCP channel stream closed normally.");
2944 }catch(StreamException &e)
2946 LOG_ERROR("Stream channel: %s",e.msg);
2951 pcpStream->flush(*sock);
2952 atom.writeInt(PCP_QUIT,error);
2953 }catch(StreamException &) {}
2957 // -----------------------------------
2958 int Servent::serverProc(ThreadInfo *thread)
2963 Servent *sv = (Servent*)thread->data;
2968 throw StreamException("Server has no socket");
2970 sv->setStatus(S_LISTENING);
2974 sv->sock->host.toStr(servIP);
2976 if (servMgr->isRoot)
2977 LOG_DEBUG("Root Server started: %s",servIP);
2979 LOG_DEBUG("Server started: %s",servIP);
2982 while ((thread->active) && (sv->sock->active()))
2984 if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
2986 ClientSocket *cs = sv->sock->accept();
2989 LOG_DEBUG("accepted incoming");
2990 Servent *ns = servMgr->allocServent();
2993 servMgr->lastIncoming = sys->getTime();
2994 ns->servPort = sv->sock->host.port;
2995 ns->networkID = servMgr->networkID;
2996 ns->initIncoming(cs,sv->allow);
2998 LOG_ERROR("Out of servents");
3003 }catch(StreamException &e)
3005 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3009 LOG_DEBUG("Server stopped");
3012 sys->endThread(thread);
3016 // -----------------------------------
3017 bool Servent::writeVariable(Stream &s, const String &var)
3022 strcpy(buf,getTypeStr());
3023 else if (var == "status")
3024 strcpy(buf,getStatusStr());
3025 else if (var == "address")
3027 if (servMgr->enableGetName) //JP-EX s
3029 getHost().toStr(buf);
3034 /* ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3036 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3038 ChanHitList *chl = &chanMgr->hitlists[i];
3040 hits[numHits++] = chl;
3043 ishit = isfw = false;
3047 for(int k=0; k<numHits; k++)
3049 ChanHitList *chl = hits[k];
3052 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3054 ChanHit *hit = &chl->hits[j];
3055 if (hit->host.isValid() && (h.ip == hit->host.ip))
3058 if (hit->firewalled)
3060 numRelay += hit->numRelays;
3072 strcat(buf,"<font color=red>");
3074 strcat(buf,"<font color=orange>");
3077 strcat(buf,"<font color=green>");
3081 if (ClientSocket::getHostname(h_name,h.ip))
3089 strcat(buf,"</font>");
3095 bool isRelay = true;
3097 ChanHitList *chl = chanMgr->findHitListByID(chanID);
3099 ChanHit *hit = chl->hit;
3101 if (hit->host.isValid() && (h.ip == hit->host.ip)){
3102 isfw = hit->firewalled;
3103 isRelay = hit->relay;
3104 numRelay = hit->numRelays;
3113 strcat(buf,"<font color=red>");
3115 strcat(buf,"<font color=orange>");
3120 strcpy(buf,"<font color=purple>");
3122 strcpy(buf,"<font color=blue>");
3125 strcpy(buf,"<font color=green>");
3130 if (ClientSocket::getHostname(h_name,h.ip))
3136 strcat(buf,"</font>");
3139 getHost().toStr(buf);
3141 else if (var == "agent")
3142 strcpy(buf,agent.cstr());
3143 else if (var == "bitrate")
3147 unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3148 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3151 }else if (var == "uptime")
3155 uptime.setFromStopwatch(sys->getTime()-lastConnect);
3158 strcpy(buf,uptime.cstr());
3159 }else if (var.startsWith("gnet."))
3162 float ctime = (float)(sys->getTime()-lastConnect);
3163 if (var == "gnet.packetsIn")
3164 sprintf(buf,"%d",gnuStream.packetsIn);
3165 else if (var == "gnet.packetsInPerSec")
3166 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3167 else if (var == "gnet.packetsOut")
3168 sprintf(buf,"%d",gnuStream.packetsOut);
3169 else if (var == "gnet.packetsOutPerSec")
3170 sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3171 else if (var == "gnet.normQueue")
3172 sprintf(buf,"%d",outPacketsNorm.numPending());
3173 else if (var == "gnet.priQueue")
3174 sprintf(buf,"%d",outPacketsPri.numPending());
3175 else if (var == "gnet.flowControl")
3176 sprintf(buf,"%d",flowControl?1:0);
3177 else if (var == "gnet.routeTime")
3179 int nr = seenIDs.numUsed();
3180 unsigned int tim = sys->getTime()-seenIDs.getOldest();
3183 tstr.setFromStopwatch(tim);
3186 strcpy(buf,tstr.cstr());