1 // ------------------------------------------------
6 // Channel streaming classes. These do the actual
7 // streaming of media between clients.
9 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
48 #include "chkMemoryLeak.h"
49 #define DEBUG_NEW new(__FILE__, __LINE__)
53 // -----------------------------------
54 char *Channel::srcTypes[]=
62 // -----------------------------------
63 char *Channel::statusMsgs[]=
82 bool isIndexTxt(ChanInfo *info)
87 info->contentType == ChanInfo::T_RAW &&
88 info->bitrate <= 32 &&
89 (len = strlen(info->name.cstr())) >= 9 &&
90 !memcmp(info->name.cstr(), "index", 5) &&
91 !memcmp(info->name.cstr()+len-4, ".txt", 4))
101 bool isIndexTxt(Channel *ch)
103 if(ch && !ch->isBroadcasting() && isIndexTxt(&ch->info))
109 int numMaxRelaysIndexTxt(Channel *ch)
111 return ((servMgr->maxRelaysIndexTxt < 1) ? 1 : servMgr->maxRelaysIndexTxt);
114 int canStreamIndexTxt(Channel *ch)
118 //
\8e©
\95ª
\82ª
\94z
\90M
\82µ
\82Ä
\82¢
\82é
\8fê
\8d\87\82Í
\8aÖ
\8cW
\82È
\82¢
119 if(!ch || ch->isBroadcasting())
122 ret = numMaxRelaysIndexTxt(ch) - ch->localRelays();
130 // -----------------------------------
131 void readXMLString(String &str, XML::Node *n, const char *arg)
134 p = n->findAttr(arg);
137 str.set(p,String::T_HTML);
138 str.convertTo(String::T_ASCII);
143 // -----------------------------------------------------------------------------
144 // Initialise the channel to its default settings of unallocated and reset.
145 // -----------------------------------------------------------------------------
146 Channel::Channel() : maxRelays(0)
150 channel_id = channel_count++;
152 // -----------------------------------------------------------------------------
153 void Channel::endThread(bool flg)
177 chanMgr->channellock.on();
178 chanMgr->deleteChannel(this);
179 chanMgr->channellock.off();
181 sys->endThread(&thread);
184 // -----------------------------------------------------------------------------
185 void Channel::resetPlayTime()
187 info.lastPlayStart = sys->getTime();
189 // -----------------------------------------------------------------------------
190 void Channel::setStatus(STATUS s)
194 bool wasPlaying = isPlaying();
200 info.status = ChanInfo::S_PLAY;
205 info.lastPlayEnd = sys->getTime();
206 info.status = ChanInfo::S_UNKNOWN;
209 if (isBroadcasting())
211 ChanHitList *chl = chanMgr->findHitListByID(info.id);
213 chanMgr->addHitList(info);
216 peercastApp->channelUpdate(&info);
221 // -----------------------------------------------------------------------------
222 // Reset channel and make it available
223 // -----------------------------------------------------------------------------
224 void Channel::reset()
237 stayConnected = false;
241 skipCount = 0; //JP-EX
251 rawData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA;
263 lastTrackerUpdate = 0;
278 // -----------------------------------
279 void Channel::newPacket(ChanPacket &pack)
281 if (pack.type != ChanPacket::T_PCP)
283 rawData.writePacket(pack,true);
288 // -----------------------------------
289 bool Channel::checkIdle()
291 return ( (info.getUptime() > chanMgr->prefetchTime) && (localListeners() == 0) && (!stayConnected) && (status != S_BROADCASTING));
294 // -----------------------------------
295 bool Channel::isFull()
297 // for PCRaw (relay) start.
300 int ret = canStreamIndexTxt(this);
307 // for PCRaw (relay) end.
309 //
\83`
\83\83\83\93\83l
\83\8b\8cÅ
\97L
\82Ì
\83\8a\83\8c\81[
\8fã
\8cÀ
\90Ý
\92è
\82ª
\82 \82é
\82©
312 return localRelays() >= maxRelays;
315 return chanMgr->maxRelaysPerChannel ? localRelays() >= chanMgr->maxRelaysPerChannel : false;
318 // -----------------------------------
319 int Channel::localRelays()
321 return servMgr->numStreams(info.id,Servent::T_RELAY,true);
323 // -----------------------------------
324 int Channel::localListeners()
326 return servMgr->numStreams(info.id,Servent::T_DIRECT,true);
329 // -----------------------------------
330 int Channel::totalRelays()
333 ChanHitList *chl = chanMgr->findHitListByID(info.id);
335 tot += chl->numHits();
338 // -----------------------------------
339 int Channel::totalListeners()
341 int tot = localListeners();
342 ChanHitList *chl = chanMgr->findHitListByID(info.id);
344 tot += chl->numListeners();
351 // -----------------------------------
352 void Channel::startGet()
354 srcType = SRC_PEERCAST;
356 info.srcProtocol = ChanInfo::SP_PCP;
359 sourceData = new PeercastSource();
363 // -----------------------------------
364 void Channel::startURL(const char *u)
370 stayConnected = true;
374 sourceData = new URLSource(u);
379 // -----------------------------------
380 void Channel::startStream()
383 thread.func = stream;
384 if (!sys->startThread(&thread))
388 // -----------------------------------
389 void Channel::sleepUntil(double time)
391 double sleepTime = time - (sys->getDTime()-startTime);
393 // LOG("sleep %g",sleepTime);
396 if (sleepTime > 60) sleepTime = 60;
398 double sleepMS = sleepTime*1000;
400 sys->sleep((int)sleepMS);
405 // -----------------------------------
406 void Channel::checkReadDelay(unsigned int len)
410 unsigned int time = (len*1000)/((info.bitrate*1024)/8);
418 // -----------------------------------
419 THREAD_PROC Channel::stream(ThreadInfo *thread)
423 Channel *ch = (Channel *)thread->data;
425 LOG_CHANNEL("Channel started");
427 while (thread->active && !peercastInst->isQuitting && !thread->finish)
429 ChanHitList *chl = chanMgr->findHitList(ch->info);
431 chanMgr->addHitList(ch->info);
433 ch->sourceData->stream(ch);
435 LOG_CHANNEL("Channel stopped");
438 if (!ch->stayConnected)
440 thread->active = false;
444 if (!ch->info.lastPlayEnd)
445 ch->info.lastPlayEnd = sys->getTime();
447 unsigned int diff = (sys->getTime()-ch->info.lastPlayEnd) + 5;
449 LOG_DEBUG("Channel sleeping for %d seconds",diff);
450 for(unsigned int i=0; i<diff; i++)
452 if (ch->info.lastPlayEnd == 0) // reconnected
454 if (!thread->active || peercastInst->isQuitting){
455 thread->active = false;
463 LOG_DEBUG("thread.active = %d, thread.finish = %d",
464 ch->thread.active, ch->thread.finish);
466 if (!thread->finish){
467 ch->endThread(false);
470 ch->finthread = new ThreadInfo();
471 ch->finthread->func = waitFinish;
472 ch->finthread->data = ch;
473 sys->startThread(ch->finthread);
481 // -----------------------------------
482 THREAD_PROC Channel::waitFinish(ThreadInfo *thread)
484 Channel *ch = (Channel*)thread->data;
485 LOG_DEBUG("Wait channel finish");
487 while(!(ch->thread.finish) && !thread->finish){
491 if (ch->thread.finish){
492 LOG_DEBUG("channel finish");
495 LOG_DEBUG("channel restart");
502 // -----------------------------------
503 bool Channel::acceptGIV(ClientSocket *givSock)
512 // -----------------------------------
513 void Channel::connectFetch()
515 sock = sys->createSocket();
518 throw StreamException("Can`t create socket");
520 if (sourceHost.tracker || sourceHost.yp)
522 sock->setReadTimeout(30000);
523 sock->setWriteTimeout(30000);
524 LOG_CHANNEL("Channel using longer timeouts");
526 sock->setReadTimeout(5000);
527 sock->setWriteTimeout(5000);
530 sock->open(sourceHost.host);
535 // -----------------------------------
536 int Channel::handshakeFetch()
539 info.id.toStr(idStr);
542 servMgr->sessionID.toStr(sidStr);
544 sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
545 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
546 sock->writeLineF("%s %d",PCX_HS_PCP,1);
547 sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port);
553 int r = http.readResponse();
555 LOG_CHANNEL("Got response: %d",r);
557 while (http.nextHeader())
559 char *arg = http.getArgStr();
563 if (http.isHeader(PCX_HS_POS))
564 streamPos = atoi(arg);
566 Servent::readICYHeader(http, info, NULL);
568 LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
571 if ((r != 200) && (r != 503))
574 if (!servMgr->keepDownstreams) {
575 if (rawData.getLatestPos() > streamPos)
579 AtomStream atom(*sock);
583 Host rhost = sock->host;
585 if (info.srcProtocol == ChanInfo::SP_PCP)
587 // don`t need PCP_CONNECT here
588 Servent::handshakeOutgoingPCP(atom,rhost,remoteID,agent,sourceHost.yp|sourceHost.tracker);
591 if (r == 503) return 503;
596 // -----------------------------------
597 void PeercastSource::stream(Channel *ch)
601 bool next_yp = false;
602 bool tracker_check = (ch->trackerHit.host.ip != 0);
605 ch->lastStopTime = 0;
608 while (ch->thread.active)
610 ch->skipCount = 0; //JP-EX
611 ch->lastSkipTime = 0;
613 ChanHitList *chl = NULL;
615 ch->sourceHost.init();
617 if (connFailCnt >= 3 && (ch->localListeners() == 0) && (!ch->stayConnected) && !ch->isBroadcasting()) {
618 ch->lastIdleTime = sys->getTime();
619 ch->setStatus(Channel::S_IDLE);
621 ch->lastSkipTime = 0;
625 if (!servMgr->keepDownstreams && !ch->bumped) {
626 ch->trackerHit.lastContact = sys->getTime() - 30 + (rand() % 30);
629 ch->setStatus(Channel::S_SEARCHING);
630 LOG_CHANNEL("Channel searching for hit..");
636 ch->sock = ch->pushSock;
638 ch->sourceHost.host = ch->sock->host;
642 chanMgr->hitlistlock.on();
644 chl = chanMgr->findHitList(ch->info);
650 if (!ch->sourceHost.host.ip){
652 chs.matchHost = servMgr->serverHost;
653 chs.waitDelay = MIN_RELAY_RETRY;
654 chs.excludeID = servMgr->sessionID;
655 if (chl->pickSourceHits(chs)){
656 ch->sourceHost = chs.best[0];
657 LOG_DEBUG("use local hit");
661 // else find global hit
662 if (!ch->sourceHost.host.ip)
665 chs.waitDelay = MIN_RELAY_RETRY;
666 chs.excludeID = servMgr->sessionID;
667 if (chl->pickSourceHits(chs)){
668 ch->sourceHost = chs.best[0];
669 LOG_DEBUG("use global hit");
674 // else find local tracker
675 if (!ch->sourceHost.host.ip)
678 chs.matchHost = servMgr->serverHost;
679 chs.waitDelay = MIN_TRACKER_RETRY;
680 chs.excludeID = servMgr->sessionID;
681 chs.trackersOnly = true;
682 if (chl->pickSourceHits(chs)){
683 ch->sourceHost = chs.best[0];
684 LOG_DEBUG("use local tracker");
688 // else find global tracker
689 if (!ch->sourceHost.host.ip)
692 chs.waitDelay = MIN_TRACKER_RETRY;
693 chs.excludeID = servMgr->sessionID;
694 chs.trackersOnly = true;
695 if (chl->pickSourceHits(chs)){
696 ch->sourceHost = chs.best[0];
697 tracker_check = true;
698 ch->trackerHit = chs.best[0];
699 LOG_DEBUG("use global tracker");
704 unsigned int ctime = sys->getTime();
705 if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
706 if (ch->trackerHit.lastContact + 30 < ctime){
707 ch->sourceHost = ch->trackerHit;
708 ch->trackerHit.lastContact = ctime;
709 LOG_DEBUG("use saved tracker");
714 chanMgr->hitlistlock.off();
716 if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) {
717 ch->lastStopTime = 0;
718 LOG_DEBUG("------------ disconnect all downstreams");
720 MemoryStream mem(pack.data,sizeof(pack.data));
721 AtomStream atom(mem);
722 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
724 pack.type = ChanPacket::T_PCP;
727 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
729 chanMgr->hitlistlock.on();
730 ChanHitList *hl = chanMgr->findHitList(ch->info);
732 hl->clearHits(false);
734 chanMgr->hitlistlock.off();
737 // no trackers found so contact YP
738 if (!tracker_check && !ch->sourceHost.host.ip)
741 if (servMgr->rootHost.isEmpty())
747 if ((!servMgr->rootHost2.isEmpty()) && (numYPTries > numYPTries2))
750 unsigned int ctime=sys->getTime();
751 if ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY)
753 ch->sourceHost.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
754 ch->sourceHost.yp = true;
755 chanMgr->lastYPConnect=ctime;
763 // no trackers found so contact YP2
764 if (!tracker_check && !ch->sourceHost.host.ip)
767 if (servMgr->rootHost2.isEmpty())
770 if (numYPTries2 >= 3)
773 unsigned int ctime=sys->getTime();
774 if ((ctime-chanMgr->lastYPConnect2) > MIN_YP_RETRY)
776 ch->sourceHost.host.fromStrName(servMgr->rootHost2.cstr(),DEFAULT_PORT);
777 ch->sourceHost.yp = true;
778 chanMgr->lastYPConnect2=ctime;
785 if (!tracker_check && !ch->sourceHost.host.ip && !next_yp) break;
789 }while((ch->sourceHost.host.ip==0) && (ch->thread.active));
791 if (!ch->sourceHost.host.ip)
793 LOG_ERROR("Channel giving up");
794 ch->setStatus(Channel::S_ERROR);
798 if (ch->sourceHost.yp)
800 LOG_CHANNEL("Channel contacting YP, try %d",numYPTries);
803 LOG_CHANNEL("Channel found hit");
808 if (ch->sourceHost.host.ip)
810 bool isTrusted = ch->sourceHost.tracker | ch->sourceHost.yp;
813 //if (ch->sourceHost.tracker)
814 // peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Contacting tracker, please wait...");
817 ch->sourceHost.host.toStr(ipstr);
820 if (ch->sourceHost.tracker)
822 else if (ch->sourceHost.yp)
829 ch->setStatus(Channel::S_CONNECTING);
830 ch->sourceHost.lastContact = sys->getTime();
834 LOG_CHANNEL("Channel connecting to %s %s",ipstr,type);
838 error = ch->handshakeFetch();
844 throw StreamException("Handshake error");
845 if (ch->sourceHost.tracker) connFailCnt = 0;
847 if (servMgr->autoMaxRelaySetting) //JP-EX
849 double setMaxRelays = ch->info.bitrate?servMgr->maxBitrateOut/(ch->info.bitrate*1.3):0;
850 if ((unsigned int)setMaxRelays == 0)
851 servMgr->maxRelays = 1;
852 else if ((unsigned int)setMaxRelays > servMgr->autoMaxRelaySetting)
853 servMgr->maxRelays = servMgr->autoMaxRelaySetting;
855 servMgr->maxRelays = (unsigned int)setMaxRelays;
858 ch->sourceStream = ch->createSource();
860 error = ch->readStream(*ch->sock,ch->sourceStream);
862 throw StreamException("Stream error");
864 error = 0; // no errors, closing normally.
865 // ch->setStatus(Channel::S_CLOSING);
866 ch->setStatus(Channel::S_IDLE);
868 LOG_CHANNEL("Channel closed normally");
869 }catch(StreamException &e)
871 ch->setStatus(Channel::S_ERROR);
872 LOG_ERROR("Channel to %s %s : %s",ipstr,type,e.msg);
873 if (!servMgr->allowConnectPCST) //JP-EX
875 if (ch->info.srcProtocol == ChanInfo::SP_PEERCAST)
876 ch->thread.active = false;
878 //if (!ch->sourceHost.tracker || ((error != 503) && ch->sourceHost.tracker))
879 if (!ch->sourceHost.tracker || (!got503 && ch->sourceHost.tracker))
880 chanMgr->deadHit(ch->sourceHost);
881 if (ch->sourceHost.tracker && error == -1) {
882 LOG_ERROR("can't connect to tracker");
887 unsigned int ctime = sys->getTime();
888 if (ch->rawData.lastWriteTime) {
889 ch->lastStopTime = ch->rawData.lastWriteTime;
890 if (isIndexTxt(ch) && ctime - ch->lastStopTime < 60)
891 ch->lastStopTime = ctime;
894 if (tracker_check && ch->sourceHost.tracker)
895 ch->trackerHit.lastContact = ctime - 30 + (rand() % 30);
897 // broadcast source host
898 if (!error && ch->sourceHost.host.ip) { // if closed normally
900 MemoryStream mem(pack.data,sizeof(pack.data));
901 AtomStream atom(mem);
902 ch->sourceHost.writeAtoms(atom, ch->info.id);
904 pack.type = ChanPacket::T_PCP;
907 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
908 LOG_DEBUG("stream: broadcast sourceHost");
911 // broadcast quit to any connected downstream servents
912 if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) {
914 MemoryStream mem(pack.data,sizeof(pack.data));
915 AtomStream atom(mem);
916 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
918 pack.type = ChanPacket::T_PCP;
921 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
922 LOG_DEBUG("------------ broadcast quit to all downstreams");
924 chanMgr->hitlistlock.on();
925 ChanHitList *hl = chanMgr->findHitList(ch->info);
927 hl->clearHits(false);
929 chanMgr->hitlistlock.off();
933 if (ch->sourceStream)
939 ch->sourceStream->updateStatus(ch);
940 ch->sourceStream->flush(*ch->sock);
942 }catch(StreamException &)
944 ChannelStream *cs = ch->sourceStream;
945 ch->sourceStream = NULL;
959 LOG_ERROR("Channel not found");
961 if ((ch->sourceHost.yp && !next_yp) || ch->sourceHost.tracker) {
962 chanMgr->hitlistlock.on();
963 ChanHitList *hl = chanMgr->findHitList(ch->info);
967 chanMgr->hitlistlock.off();
969 if(!isIndexTxt(&ch->info)) // for PCRaw (popup)
970 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
978 ch->lastIdleTime = sys->getTime();
979 ch->setStatus(Channel::S_IDLE);
980 ch->skipCount = 0; //JP-EX
981 ch->lastSkipTime = 0;
982 while ((ch->checkIdle()) && (ch->thread.active))
991 // -----------------------------------
992 void Channel::startICY(ClientSocket *cs, SRC_TYPE st)
996 cs->setReadTimeout(0); // stay connected even when theres no data coming through
998 info.srcProtocol = ChanInfo::SP_HTTP;
1000 streamIndex = ++chanMgr->icyIndex;
1002 sourceData = new ICYSource();
1006 // -----------------------------------
1007 static char *nextMetaPart(char *str,char delim)
1020 // -----------------------------------
1021 static void copyStr(char *to,char *from,int max)
1024 while ((c=*from++) && (--max))
1031 // -----------------------------------
1032 void Channel::processMp3Metadata(char *str)
1034 ChanInfo newInfo = info;
1039 char *arg = nextMetaPart(cmd,'=');
1043 char *next = nextMetaPart(arg,';');
1045 if (strcmp(cmd,"StreamTitle")==0)
1047 newInfo.track.title.setUnquote(arg,String::T_ASCII);
1048 newInfo.track.title.convertTo(String::T_UNICODE);
1050 }else if (strcmp(cmd,"StreamUrl")==0)
1052 newInfo.track.contact.setUnquote(arg,String::T_ASCII);
1053 newInfo.track.contact.convertTo(String::T_UNICODE);
1060 updateInfo(newInfo);
1063 // -----------------------------------
1064 XML::Node *ChanHit::createXML()
1070 return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" relays=\"%d\" uptime=\"%d\" push=\"%d\" relay=\"%d\" direct=\"%d\" cin=\"%d\" stable=\"%d\" version=\"%d\" update=\"%d\" tracker=\"%d\"",
1082 sys->getTime()-time,
1088 // -----------------------------------
1089 XML::Node *ChanHitList::createXML(bool addHits)
1091 XML::Node *hn = new XML::Node("hits hosts=\"%d\" listeners=\"%d\" relays=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"",
1098 sys->getTime()-newestHit()
1107 hn->add(h->createXML());
1115 // -----------------------------------
1116 XML::Node *Channel::createRelayXML(bool showStat)
1119 ststr = getStatusStr();
1121 if ((status == S_RECEIVING) || (status == S_BROADCASTING))
1124 ChanHitList *chl = chanMgr->findHitList(info);
1126 return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",
1129 (chl!=NULL)?chl->numHits():0,
1134 // -----------------------------------
1135 void ChanMeta::fromXML(XML &xml)
1137 MemoryStream tout(data,MAX_DATALEN);
1142 // -----------------------------------
1143 void ChanMeta::fromMem(void *p, int l)
1148 // -----------------------------------
1149 void ChanMeta::addMem(void *p, int l)
1151 if ((len+l) <= MAX_DATALEN)
1153 memcpy(data+len,p,l);
1157 // -----------------------------------
1158 void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
1160 unsigned int ctime = sys->getTime();
1162 if (((ctime-lastTrackerUpdate) > 30) || (force))
1166 MemoryStream mem(pack.data,sizeof(pack));
1168 AtomStream atom(mem);
1172 ChanHitList *chl = chanMgr->findHitListByID(info.id);
1174 throw StreamException("Broadcast channel has no hitlist");
1176 int numListeners = totalListeners();
1177 int numRelays = totalRelays();
1179 unsigned int oldp = rawData.getOldestPos();
1180 unsigned int newp = rawData.getLatestPos();
1182 hit.initLocal(numListeners,numRelays,info.numSkips,info.getUptime(),isPlaying(), false, 0, this, oldp,newp);
1186 atom.writeParent(PCP_BCST,8);
1188 atom.writeParent(PCP_BCST,10);
1190 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
1191 atom.writeChar(PCP_BCST_HOPS,0);
1192 atom.writeChar(PCP_BCST_TTL,11);
1193 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1194 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1195 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1197 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1198 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1200 atom.writeParent(PCP_CHAN,4);
1201 atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
1202 atom.writeBytes(PCP_CHAN_BCID,chanMgr->broadcastID.id,16);
1203 info.writeInfoAtoms(atom);
1204 info.writeTrackAtoms(atom);
1205 hit.writeAtoms(atom,info.id);
1209 pack.type = ChanPacket::T_PCP;
1213 int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);
1217 LOG_DEBUG("Sent tracker update for %s to %d client(s)",info.name.cstr(),cnt);
1218 lastTrackerUpdate = ctime;
1223 // -----------------------------------
1224 bool Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
1227 && (!cid.isSet() || info.id.isSame(cid))
1228 && (!sid.isSet() || !remoteID.isSame(sid))
1231 return sourceStream->sendPacket(pack,did);
1236 // -----------------------------------
1237 void Channel::updateInfo(ChanInfo &newInfo)
1239 if (info.update(newInfo))
1241 if (isBroadcasting())
1243 unsigned int ctime = sys->getTime();
1244 if ((ctime-lastMetaUpdate) > 30)
1246 lastMetaUpdate = ctime;
1250 MemoryStream mem(pack.data,sizeof(pack));
1252 AtomStream atom(mem);
1255 atom.writeParent(PCP_BCST,8);
1257 atom.writeParent(PCP_BCST,10);
1259 atom.writeChar(PCP_BCST_HOPS,0);
1260 atom.writeChar(PCP_BCST_TTL,7);
1261 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
1262 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1263 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1264 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1266 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1267 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1269 atom.writeBytes(PCP_BCST_CHANID,info.id.id,16);
1270 atom.writeParent(PCP_CHAN,3);
1271 atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
1272 info.writeInfoAtoms(atom);
1273 info.writeTrackAtoms(atom);
1276 pack.type = ChanPacket::T_PCP;
1279 servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_RELAY);
1281 broadcastTrackerUpdate(noID);
1285 ChanHitList *chl = chanMgr->findHitList(info);
1289 peercastApp->channelUpdate(&info);
1294 // -----------------------------------
1295 ChannelStream *Channel::createSource()
1297 // if (servMgr->relayBroadcast)
1298 // chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL);
1301 ChannelStream *source=NULL;
1303 if (info.srcProtocol == ChanInfo::SP_PEERCAST)
1305 LOG_CHANNEL("Channel is Peercast");
1306 if (servMgr->allowConnectPCST) //JP-EX
1307 source = new PeercastStream();
1309 throw StreamException("Channel is not allowed");
1311 else if (info.srcProtocol == ChanInfo::SP_PCP)
1313 LOG_CHANNEL("Channel is PCP");
1314 PCPStream *pcp = new PCPStream(remoteID);
1317 else if (info.srcProtocol == ChanInfo::SP_MMS)
1319 LOG_CHANNEL("Channel is MMS");
1320 source = new MMSStream();
1323 switch(info.contentType)
1325 case ChanInfo::T_MP3:
1326 LOG_CHANNEL("Channel is MP3 - meta: %d",icyMetaInterval);
1327 source = new MP3Stream();
1329 case ChanInfo::T_NSV:
1330 LOG_CHANNEL("Channel is NSV");
1331 source = new NSVStream();
1333 case ChanInfo::T_WMA:
1334 case ChanInfo::T_WMV:
1335 throw StreamException("Channel is WMA/WMV - but not MMS");
1337 case ChanInfo::T_OGG:
1338 case ChanInfo::T_OGM:
1339 LOG_CHANNEL("Channel is OGG");
1340 source = new OGGStream();
1343 LOG_CHANNEL("Channel is Raw");
1344 source = new RawStream();
1349 source->parent = this;
1353 // ------------------------------------------
1354 void ChannelStream::updateStatus(Channel *ch)
1357 if (getStatus(ch,pack))
1359 if (!ch->isBroadcasting())
1363 int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
1364 LOG_CHANNEL("Sent channel status update to %d clients",cnt);
1369 // ------------------------------------------
1370 bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
1372 unsigned int ctime = sys->getTime();
1374 ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
1379 /* int newLocalListeners = ch->localListeners();
1380 int newLocalRelays = ch->localRelays();
1384 (numListeners != newLocalListeners)
1385 || (numRelays != newLocalRelays)
1386 || (ch->isPlaying() != isPlaying)
1387 || (servMgr->getFirewall() != fwState)
1388 || (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
1390 && ((ctime-lastUpdate) > 10)
1394 numListeners = newLocalListeners;
1395 numRelays = newLocalRelays;
1396 isPlaying = ch->isPlaying();
1397 fwState = servMgr->getFirewall();
1402 hit.initLocal(ch->localListeners(),ch->localRelays(),ch->info.numSkips,ch->info.getUptime(),isPlaying, ch->isFull(), ch->info.bitrate, ch);
1403 hit.tracker = ch->isBroadcasting();*/
1405 int newLocalListeners = ch->localListeners();
1406 int newLocalRelays = ch->localRelays();
1408 if ((ch->isPlaying() == isPlaying)){
1409 if ((ctime-lastUpdate) < 10){
1413 if ((ctime-lastCheckTime) < 10){
1416 lastCheckTime = ctime;
1419 unsigned int oldp = ch->rawData.getOldestPos();
1420 unsigned int newp = ch->rawData.getLatestPos();
1424 // LOG_DEBUG("isPlaying-------------------------------------- %d %d", ch->isPlaying(), isPlaying);
1426 hit.initLocal(newLocalListeners,newLocalRelays,ch->info.numSkips,ch->info.getUptime(),ch->isPlaying(), ch->isFull(), ch->info.bitrate, ch, oldp, newp);
1427 hit.tracker = ch->isBroadcasting();
1429 if ( (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
1430 || (newLocalListeners != numListeners)
1431 || (newLocalRelays != numRelays)
1432 || (ch->isPlaying() != isPlaying)
1433 || (servMgr->getFirewall() != fwState)
1434 || (ch->chDisp.relay != hit.relay)
1435 || (ch->chDisp.relayfull != hit.relayfull)
1436 || (ch->chDisp.chfull != hit.chfull)
1437 || (ch->chDisp.ratefull != hit.ratefull)
1439 numListeners = newLocalListeners;
1440 numRelays = newLocalRelays;
1441 isPlaying = ch->isPlaying();
1442 fwState = servMgr->getFirewall();
1447 if ((numRelays) && ((servMgr->getFirewall() == ServMgr::FW_OFF) && (servMgr->autoRelayKeep!=0))) //JP-EX
1448 ch->stayConnected = true;
1450 if ((!numRelays && !numListeners) && (servMgr->autoRelayKeep==2)) //JP-EX
1451 ch->stayConnected = false;
1453 MemoryStream pmem(pack.data,sizeof(pack.data));
1454 AtomStream atom(pmem);
1460 atom.writeParent(PCP_BCST,8);
1462 atom.writeParent(PCP_BCST,10);
1464 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
1465 atom.writeChar(PCP_BCST_HOPS,0);
1466 atom.writeChar(PCP_BCST_TTL,11);
1467 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1468 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1469 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1471 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1472 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1474 atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
1475 hit.writeAtoms(atom,noID);
1477 pack.len = pmem.pos;
1478 pack.type = ChanPacket::T_PCP;
1483 // -----------------------------------
1484 bool Channel::checkBump()
1486 unsigned int maxIdleTime = 30;
1487 if (isIndexTxt(this)) maxIdleTime = 60;
1489 if (!isBroadcasting() && (!sourceHost.tracker))
1490 if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime))
1492 LOG_ERROR("Channel Auto bumped");
1504 // -----------------------------------
1505 int Channel::readStream(Stream &in,ChannelStream *source)
1513 source->readHeader(in,this);
1515 peercastApp->channelStart(&info);
1517 rawData.lastWriteTime = 0;
1519 bool wasBroadcasting=false;
1521 unsigned int receiveStartTime = 0;
1523 unsigned int ptime = 0;
1524 unsigned int upsize = 0;
1528 while (thread.active && !peercastInst->isQuitting)
1532 LOG_DEBUG("Channel idle");
1538 LOG_DEBUG("Channel bumped");
1546 LOG_DEBUG("Channel eof");
1552 error = source->readPacket(in,this);
1557 //if (rawData.writePos > 0)
1558 if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
1560 if (isBroadcasting())
1562 if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
1566 broadcastTrackerUpdate(noID);
1568 wasBroadcasting = true;
1572 /* if (status != Channel::S_RECEIVING){
1573 receiveStartTime = sys->getTime();
1574 } else if (receiveStartTime && receiveStartTime + 10 > sys->getTime()){
1575 chanMgr->hitlistlock.on();
1576 ChanHitList *hl = chanMgr->findHitList(info);
1578 hl->clearHits(true);
1580 chanMgr->hitlistlock.off();
1581 receiveStartTime = 0;
1583 setStatus(Channel::S_RECEIVING);
1586 source->updateStatus(this);
1590 unsigned int t = sys->getTime();
1593 upsize = Servent::MAX_OUTWARD_SIZE;
1596 unsigned int len = source->flushUb(in, upsize);
1601 }catch(StreamException &e)
1603 LOG_ERROR("readStream: %s",e.msg);
1607 if (!servMgr->keepDownstreams) {
1608 if (status == Channel::S_RECEIVING){
1609 chanMgr->hitlistlock.on();
1610 ChanHitList *hl = chanMgr->findHitList(info);
1612 hl->clearHits(false);
1614 chanMgr->hitlistlock.off();
1618 // setStatus(S_CLOSING);
1621 if (wasBroadcasting)
1625 broadcastTrackerUpdate(noID,true);
1628 peercastApp->channelStop(&info);
1630 source->readEnd(in,this);
1635 // -----------------------------------
1636 void PeercastStream::readHeader(Stream &in,Channel *ch)
1638 if (in.readTag() != 'PCST')
1639 throw StreamException("Not PeerCast stream");
1642 // -----------------------------------
1643 void PeercastStream::readEnd(Stream &,Channel *)
1647 // -----------------------------------
1648 int PeercastStream::readPacket(Stream &in,Channel *ch)
1654 pack.readPeercast(in);
1656 MemoryStream mem(pack.data,pack.len);
1660 case ChanPacket::T_HEAD:
1662 ch->headPack = pack;
1663 pack.pos = ch->streamPos;
1664 ch->newPacket(pack);
1665 ch->streamPos+=pack.len;
1667 case ChanPacket::T_DATA:
1668 pack.pos = ch->streamPos;
1669 ch->newPacket(pack);
1670 ch->streamPos+=pack.len;
1672 case ChanPacket::T_META:
1673 ch->insertMeta.fromMem(pack.data,pack.len);
1679 XML::Node *n = xml.findNode("channel");
1682 ChanInfo newInfo = ch->info;
1683 newInfo.updateFromXML(n);
1684 ChanHitList *chl = chanMgr->findHitList(ch->info);
1686 newInfo.updateFromXML(n);
1687 ch->updateInfo(newInfo);
1693 case ChanPacket::T_SYNC:
1695 unsigned int s = mem.readLong();
1696 if ((s-ch->syncPos) != 1)
1698 LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips);
1701 ch->info.numSkips++;
1702 if (ch->info.numSkips>50)
1703 throw StreamException("Bumped - Too many skips");
1718 // -----------------------------------
1719 void ChannelStream::readRaw(Stream &in, Channel *ch)
1723 const int readLen = 8192;
1725 pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos);
1726 in.read(pack.data,pack.len);
1727 ch->newPacket(pack);
1728 ch->checkReadDelay(pack.len);
1730 ch->streamPos+=pack.len;
1733 // ------------------------------------------
1734 void RawStream::readHeader(Stream &,Channel *)
1738 // ------------------------------------------
1739 int RawStream::readPacket(Stream &in,Channel *ch)
1745 // ------------------------------------------
1746 void RawStream::readEnd(Stream &,Channel *)
1752 // -----------------------------------
1753 void ChanPacket::init(ChanPacketv &p)
1757 if (len > MAX_DATALEN)
1758 throw StreamException("Packet data too large");
1762 priority = p.priority;
1763 memcpy(data, p.data, len);
1765 // -----------------------------------
1766 void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos)
1769 if (l > MAX_DATALEN)
1770 throw StreamException("Packet data too large");
1777 // -----------------------------------
1778 void ChanPacket::writeRaw(Stream &out)
1780 out.write(data,len);
1782 // -----------------------------------
1783 void ChanPacket::writePeercast(Stream &out)
1785 unsigned int tp = 0;
1788 case T_HEAD: tp = 'HEAD'; break;
1789 case T_META: tp = 'META'; break;
1790 case T_DATA: tp = 'DATA'; break;
1793 if (type != T_UNKNOWN)
1796 out.writeShort(len);
1798 out.write(data,len);
1801 // -----------------------------------
1802 void ChanPacket::readPeercast(Stream &in)
1804 unsigned int tp = in.readTag();
1808 case 'HEAD': type = T_HEAD; break;
1809 case 'DATA': type = T_DATA; break;
1810 case 'META': type = T_META; break;
1811 default: type = T_UNKNOWN;
1813 len = in.readShort();
1815 if (len > MAX_DATALEN)
1816 throw StreamException("Bad ChanPacket");
1819 // -----------------------------------
1820 int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
1830 for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
1832 //ChanPacket *src = &buf.packets[i%MAX_PACKETS];
1833 ChanPacketv *src = &buf.packets[i%MAX_PACKETS];
1834 if (src->type & accept)
1836 if (src->pos >= reqPos)
1839 //packets[writePos++] = *src;
1840 packets[writePos++].init(*src);
1849 return lastPos-firstPos;
1852 // -----------------------------------
1853 bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
1860 unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
1861 unsigned int fpos = getStreamPos(firstPos);
1862 unsigned int lpos = getStreamPos(lastPos);
1863 if (spos < fpos && (fpos < lpos || spos > lpos + bound))
1867 for(unsigned int i=firstPos; i<=lastPos; i++)
1869 //ChanPacket &p = packets[i%MAX_PACKETS];
1870 ChanPacketv &p = packets[i%MAX_PACKETS];
1871 if (p.pos >= spos && p.pos - spos <= bound)
1882 // -----------------------------------
1883 unsigned int ChanPacketBuffer::getLatestPos()
1888 return getStreamPos(lastPos);
1891 // -----------------------------------
1892 unsigned int ChanPacketBuffer::getOldestPos()
1897 return getStreamPos(firstPos);
1900 // -----------------------------------
1901 unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
1903 unsigned int min = getStreamPos(safePos);
1904 unsigned int max = getStreamPos(lastPos);
1915 // -----------------------------------
1916 unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
1918 return packets[index%MAX_PACKETS].pos;
1920 // -----------------------------------
1921 unsigned int ChanPacketBuffer::getStreamPosEnd(unsigned int index)
1923 return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
1925 // -----------------------------------
1926 bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos)
1930 if (servMgr->keepDownstreams) {
1931 unsigned int lpos = getLatestPos();
1932 unsigned int diff = pack.pos - lpos;
1933 if (packets[lastPos%MAX_PACKETS].type == ChanPacket::T_HEAD) lpos = 0;
1934 if (lpos && (diff == 0 || diff > 0xfff00000)) {
1935 LOG_DEBUG("* latest pos=%d, pack pos=%d", getLatestPos(), pack.pos);
1936 lastSkipTime = sys->getTime();
1941 if (willSkip()) // too far behind
1943 lastSkipTime = sys->getTime();
1949 pack.sync = writePos;
1950 packets[writePos%MAX_PACKETS].init(pack);
1952 // LOG_DEBUG("packet.len = %d",pack.len);
1958 if (writePos >= MAX_PACKETS)
1959 firstPos = writePos-MAX_PACKETS;
1963 if (writePos >= NUM_SAFEPACKETS)
1964 safePos = writePos - NUM_SAFEPACKETS;
1971 lastWriteTime = sys->getTime();
1979 // -----------------------------------
1980 void ChanPacketBuffer::readPacket(ChanPacket &pack)
1983 unsigned int tim = sys->getTime();
1985 if (readPos < firstPos)
1986 throw StreamException("Read too far behind");
1988 while (readPos >= writePos)
1991 if ((sys->getTime() - tim) > 30)
1992 throw TimeoutException();
1995 pack.init(packets[readPos%MAX_PACKETS]);
2000 // -----------------------------------
2001 void ChanPacketBuffer::readPacketPri(ChanPacket &pack)
2003 unsigned int tim = sys->getTime();
2005 if (readPos < firstPos)
2006 throw StreamException("Read too far behind");
2008 while (readPos >= writePos)
2011 if ((sys->getTime() - tim) > 30)
2012 throw TimeoutException();
2015 ChanPacketv *best = &packets[readPos % MAX_PACKETS];
2016 for (unsigned int i = readPos + 1; i < writePos; i++) {
2017 if (packets[i % MAX_PACKETS].priority > best->priority)
2018 best = &packets[i % MAX_PACKETS];
2021 best->init(packets[readPos % MAX_PACKETS]);
2026 // -----------------------------------
2027 bool ChanPacketBuffer::willSkip()
2029 return ((writePos-readPos) >= MAX_PACKETS);
2032 // -----------------------------------
2033 void Channel::getStreamPath(char *str)
2039 sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));
2044 // -----------------------------------
2045 void ChanMgr::startSearch(ChanInfo &info)
2052 searchActive = true;
2054 // -----------------------------------
2055 void ChanMgr::quit()
2057 LOG_DEBUG("ChanMgr is quitting..");
2060 // -----------------------------------
2061 int ChanMgr::numIdleChannels()
2064 Channel *ch = channel;
2068 if (ch->thread.active)
2069 if (ch->status == Channel::S_IDLE)
2075 // -----------------------------------
2076 void ChanMgr::closeOldestIdle()
2078 unsigned int idleTime = (unsigned int)-1;
2079 Channel *ch = channel,*oldest=NULL;
2083 if (ch->thread.active)
2084 if (ch->status == Channel::S_IDLE)
2085 if (ch->lastIdleTime < idleTime)
2088 idleTime = ch->lastIdleTime;
2094 oldest->thread.active = false;
2097 // -----------------------------------
2098 void ChanMgr::closeAll()
2100 Channel *ch = channel;
2103 if (ch->thread.active)
2104 ch->thread.shutdown();
2108 // -----------------------------------
2109 Channel *ChanMgr::findChannelByNameID(ChanInfo &info)
2111 Channel *ch = channel;
2115 if (ch->info.matchNameID(info))
2122 // -----------------------------------
2123 Channel *ChanMgr::findChannelByName(const char *n)
2125 Channel *ch = channel;
2129 if (stricmp(ch->info.name,n)==0)
2136 // -----------------------------------
2137 Channel *ChanMgr::findChannelByIndex(int index)
2140 Channel *ch = channel;
2153 // -----------------------------------
2154 Channel *ChanMgr::findChannelByMount(const char *str)
2156 Channel *ch = channel;
2160 if (strcmp(ch->mount,str)==0)
2167 // -----------------------------------
2168 Channel *ChanMgr::findChannelByID(GnuID &id)
2170 Channel *ch = channel;
2174 if (ch->info.id.isSame(id))
2180 // -----------------------------------
2181 Channel *ChanMgr::findChannelByChannelID(int id)
2184 Channel *ch = channel;
2187 if (ch->isActive()){
2188 if (ch->channel_id == id){
2196 // -----------------------------------
2197 int ChanMgr::findChannels(ChanInfo &info, Channel **chlist, int max)
2200 Channel *ch = channel;
2204 if (ch->info.match(info))
2214 // -----------------------------------
2215 int ChanMgr::findChannelsByStatus(Channel **chlist, int max, Channel::STATUS status)
2218 Channel *ch = channel;
2222 if (ch->status == status)
2232 // -----------------------------------
2233 Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected)
2235 Channel *c = chanMgr->createChannel(info,NULL);
2238 c->stayConnected = stayConnected;
2244 // -----------------------------------
2245 Channel *ChanMgr::findAndRelay(ChanInfo &info)
2248 info.id.toStr(idStr);
2249 LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());
2251 if(!isIndexTxt(&info)) // for PCRaw (popup)
2252 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");
2257 WLockBlock wb(&(chanMgr->channellock));
2261 c = findChannelByNameID(info);
2265 c = chanMgr->createChannel(info,NULL);
2268 c->setStatus(Channel::S_SEARCHING);
2271 } else if (!(c->thread.active)){
2272 c->thread.active = true;
2273 c->thread.finish = false;
2274 c->info.lastPlayStart = 0; // reconnect
2275 c->info.lastPlayEnd = 0;
2277 c->finthread->finish = true;
2278 c->finthread = NULL;
2280 if (c->status != Channel::S_CONNECTING && c->status != Channel::S_SEARCHING){
2281 c->setStatus(Channel::S_SEARCHING);
2288 for(int i=0; i<600; i++) // search for 1 minute.
2293 c = findChannelByNameID(info);
2297 // peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
2302 if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
2312 // -----------------------------------
2319 currFindAndPlayChannel.clear();
2321 broadcastMsg.clear();
2322 broadcastMsgInterval=10;
2324 broadcastID.generate(PCP_BROADCAST_FLAGS);
2329 icyMetaInterval = 8192;
2330 maxRelaysPerChannel = 1;
2334 minBroadcastTTL = 1;
2335 maxBroadcastTTL = 7;
2337 pushTimeout = 60; // 1 minute
2338 pushTries = 5; // 5 times
2339 maxPushHops = 8; // max 8 hops away
2340 maxUptime = 0; // 0 = none
2342 prefetchTime = 10; // n seconds
2344 hostUpdateInterval = 120; // 2 minutes
2356 // -----------------------------------
2357 bool ChanMgr::writeVariable(Stream &out, const String &var, int index)
2360 if (var == "numHitLists")
2361 sprintf(buf,"%d",numHitLists());
2363 else if (var == "numChannels")
2364 sprintf(buf,"%d",numChannels());
2365 else if (var == "djMessage")
2366 strcpy(buf,broadcastMsg.cstr());
2367 else if (var == "icyMetaInterval")
2368 sprintf(buf,"%d",icyMetaInterval);
2369 else if (var == "maxRelaysPerChannel")
2370 sprintf(buf,"%d",maxRelaysPerChannel);
2371 else if (var == "hostUpdateInterval")
2372 sprintf(buf,"%d",hostUpdateInterval);
2373 else if (var == "broadcastID")
2374 broadcastID.toStr(buf);
2380 out.writeString(buf);
2384 // -----------------------------------
2385 bool Channel::writeVariable(Stream &out, const String &var, int index)
2396 utf8.convertTo(String::T_UNICODESAFE);
2397 strcpy(buf,utf8.cstr());
2399 }else if (var == "bitrate")
2401 sprintf(buf,"%d",info.bitrate);
2403 }else if (var == "srcrate")
2407 unsigned int tot = sourceData->getSourceRate();
2408 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
2412 }else if (var == "genre")
2415 utf8.convertTo(String::T_UNICODESAFE);
2416 strcpy(buf,utf8.cstr());
2417 }else if (var == "desc")
2420 utf8.convertTo(String::T_UNICODESAFE);
2421 strcpy(buf,utf8.cstr());
2422 }else if (var == "comment")
2424 utf8 = info.comment;
2425 utf8.convertTo(String::T_UNICODESAFE);
2426 strcpy(buf,utf8.cstr());
2427 }else if (var == "uptime")
2430 if (info.lastPlayStart)
2431 uptime.setFromStopwatch(sys->getTime()-info.lastPlayStart);
2434 strcpy(buf,uptime.cstr());
2436 else if (var == "type")
2437 sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
2438 else if (var == "ext")
2439 sprintf(buf,"%s",ChanInfo::getTypeExt(info.contentType));
2440 else if (var == "proto") {
2441 switch(info.contentType) {
2442 case ChanInfo::T_WMA:
2443 case ChanInfo::T_WMV:
2444 sprintf(buf, "mms://");
2447 sprintf(buf, "http://");
2450 else if (var == "localRelays")
2451 sprintf(buf,"%d",localRelays());
2452 else if (var == "localListeners")
2453 sprintf(buf,"%d",localListeners());
2455 else if (var == "totalRelays")
2456 sprintf(buf,"%d",totalRelays());
2457 else if (var == "totalListeners")
2458 sprintf(buf,"%d",totalListeners());
2460 else if (var == "status")
2461 sprintf(buf,"%s",getStatusStr());
2462 else if (var == "keep")
2463 sprintf(buf,"%s",stayConnected?"Yes":"No");
2464 else if (var == "id")
2466 else if (var.startsWith("track."))
2469 if (var == "track.title")
2470 utf8 = info.track.title;
2471 else if (var == "track.artist")
2472 utf8 = info.track.artist;
2473 else if (var == "track.album")
2474 utf8 = info.track.album;
2475 else if (var == "track.genre")
2476 utf8 = info.track.genre;
2477 else if (var == "track.contactURL")
2478 utf8 = info.track.contact;
2480 utf8.convertTo(String::T_UNICODESAFE);
2481 strcpy(buf,utf8.cstr());
2483 }else if (var == "contactURL")
2484 sprintf(buf,"%s",info.url.cstr());
2485 else if (var == "streamPos")
2486 sprintf(buf,"%d",streamPos);
2487 else if (var == "sourceType")
2488 strcpy(buf,getSrcTypeStr());
2489 else if (var == "sourceProtocol")
2490 strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
2491 else if (var == "sourceURL")
2493 if (sourceURL.isEmpty())
2494 sourceHost.host.toStr(buf);
2496 strcpy(buf,sourceURL.cstr());
2498 else if (var == "headPos")
2499 sprintf(buf,"%d",headPack.pos);
2500 else if (var == "headLen")
2501 sprintf(buf,"%d",headPack.len);
2502 else if (var == "numHits")
2504 ChanHitList *chl = chanMgr->findHitListByID(info.id);
2507 // numHits = chl->numHits();
2515 sprintf(buf,"%d",numHits);
2516 } else if (var == "isBroadcast")
2517 strcpy(buf, (type == T_BROADCAST) ? "1":"0");
2521 out.writeString(buf);
2525 // -----------------------------------
2526 void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
2528 Channel *c = channel;
2531 if ( c->isActive() && c->isBroadcasting() )
2532 c->broadcastTrackerUpdate(svID,force);
2538 // -----------------------------------
2539 int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
2543 Channel *c = channel;
2546 if (c->sendPacketUp(pack,chanID,srcID,destID))
2554 // -----------------------------------
2555 void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL)
2557 //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP())
2560 Host sh = servMgr->serverHost;
2561 bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF);
2562 bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->relaysFull();
2563 bool stable = servMgr->totalStreams>0;
2570 Channel *c = channel;
2576 bool tracker = c->isBroadcasting();
2578 int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds
2586 if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl))
2592 serv->outputPacket(hit,false);
2596 LOG_NETWORK("Sent channel to %d servents, TTL %d",numOut,ttl);
2603 // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut);
2606 // -----------------------------------
2607 void ChanMgr::setUpdateInterval(unsigned int v)
2609 hostUpdateInterval = v;
2613 // -----------------------------------
2617 MemoryStream mem(pack.data,sizeof(pack.data));
2618 AtomStream atom(mem);
2619 atom.writeParent(PCP_BCST,3);
2620 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
2621 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
2622 atom.writeParent(PCP_MESG,1);
2623 atom.writeString(PCP_MESG_DATA,msg.cstr());
2633 PCPStream::readAtom(atom,bcs);
2634 //int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
2635 //int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
2636 //int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
2637 //LOG_DEBUG("Sent message to %d clients",cnt);
2639 // -----------------------------------
2640 void ChanMgr::setBroadcastMsg(String &msg)
2642 if (!msg.isSame(broadcastMsg))
2646 Channel *c = channel;
2649 if (c->isActive() && c->isBroadcasting())
2651 ChanInfo newInfo = c->info;
2652 newInfo.comment = broadcastMsg;
2653 c->updateInfo(newInfo);
2662 // -----------------------------------
2663 void ChanMgr::clearHitLists()
2666 // LOG_DEBUG("clearHitLists HITLISTLOCK ON-------------");
2667 chanMgr->hitlistlock.on();
2670 peercastApp->delChannel(&hitlist->info);
2672 ChanHitList *next = hitlist->next;
2678 // LOG_DEBUG("clearHitLists HITLISTLOCK OFF-------------");
2679 chanMgr->hitlistlock.off();
2681 // -----------------------------------
2682 Channel *ChanMgr::deleteChannel(Channel *delchan)
2686 Channel *ch = channel,*prev=NULL,*next=NULL;
2692 Channel *next = ch->next;
2698 if (delchan->sourceStream){
2699 delchan->sourceStream->parent = NULL;
2715 // -----------------------------------
2716 Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount)
2729 nc->info.lastPlayStart = 0;
2730 nc->info.lastPlayEnd = 0;
2731 nc->info.status = ChanInfo::S_UNKNOWN;
2733 nc->mount.set(mount);
2734 nc->setStatus(Channel::S_WAIT);
2735 nc->type = Channel::T_ALLOCATED;
2736 nc->info.createdTime = sys->getTime();
2738 LOG_CHANNEL("New channel created");
2743 // -----------------------------------
2744 int ChanMgr::pickHits(ChanHitSearch &chs)
2746 ChanHitList *chl = hitlist;
2750 if (chl->pickHits(chs))
2760 // -----------------------------------
2761 ChanHitList *ChanMgr::findHitList(ChanInfo &info)
2763 ChanHitList *chl = hitlist;
2767 if (chl->info.matchNameID(info))
2774 // -----------------------------------
2775 ChanHitList *ChanMgr::findHitListByID(GnuID &id)
2777 ChanHitList *chl = hitlist;
2781 if (chl->info.id.isSame(id))
2787 // -----------------------------------
2788 int ChanMgr::numHitLists()
2791 ChanHitList *chl = hitlist;
2800 // -----------------------------------
2801 ChanHitList *ChanMgr::addHitList(ChanInfo &info)
2803 ChanHitList *chl = new ChanHitList();
2806 chl->next = hitlist;
2811 chl->info.createdTime = sys->getTime();
2812 peercastApp->addChannel(&chl->info);
2817 // -----------------------------------
2818 void ChanMgr::clearDeadHits(bool clearTrackers)
2820 unsigned int interval;
2822 if (servMgr->isRoot)
2823 interval = 1200; // mainly for old 0.119 clients
2825 interval = hostUpdateInterval+120;
2827 chanMgr->hitlistlock.on();
2828 ChanHitList *chl = hitlist,*prev = NULL;
2833 if (chl->clearDeadHits(interval,clearTrackers) == 0)
2835 if (!isBroadcasting(chl->info.id))
2837 if (!chanMgr->findChannelByID(chl->info.id))
2839 peercastApp->delChannel(&chl->info);
2841 ChanHitList *next = chl->next;
2857 chanMgr->hitlistlock.off();
2859 // -----------------------------------
2860 bool ChanMgr::isBroadcasting(GnuID &id)
2862 Channel *ch = findChannelByID(id);
2864 return ch->isBroadcasting();
2868 // -----------------------------------
2869 bool ChanMgr::isBroadcasting()
2871 Channel *ch = channel;
2875 if (ch->isBroadcasting())
2883 // -----------------------------------
2884 int ChanMgr::numChannels()
2887 Channel *ch = channel;
2897 // -----------------------------------
2898 void ChanMgr::deadHit(ChanHit &hit)
2900 ChanHitList *chl = findHitListByID(hit.chanID);
2904 // -----------------------------------
2905 void ChanMgr::delHit(ChanHit &hit)
2907 ChanHitList *chl = findHitListByID(hit.chanID);
2912 // -----------------------------------
2913 void ChanMgr::addHit(Host &h,GnuID &id,bool tracker)
2919 hit.rhost[1].init();
2920 hit.tracker = tracker;
2925 // -----------------------------------
2926 ChanHit *ChanMgr::addHit(ChanHit &h)
2929 lastHit = sys->getTime();
2931 ChanHitList *hl=NULL;
2933 hl = findHitListByID(h.chanID);
2939 hl = addHitList(info);
2944 return hl->addHit(h);
2949 // -----------------------------------
2950 bool ChanMgr::findParentHit(ChanHit &p)
2952 ChanHitList *hl=NULL;
2954 chanMgr->hitlistlock.on();
2956 hl = findHitListByID(p.chanID);
2960 ChanHit *ch = hl->hit;
2963 if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip)
2964 && (ch->rhost[0].port == p.uphost.port))
2966 chanMgr->hitlistlock.off();
2973 chanMgr->hitlistlock.off();
2978 // -----------------------------------
2979 class ChanFindInfo : public ThreadInfo
2985 // -----------------------------------
2986 THREAD_PROC findAndPlayChannelProc(ThreadInfo *th)
2988 ChanFindInfo *cfi = (ChanFindInfo *)th;
2994 Channel *ch = chanMgr->findChannelByNameID(info);
2996 chanMgr->currFindAndPlayChannel = info.id;
2999 ch = chanMgr->findAndRelay(info);
3003 // check that a different channel hasn`t be selected already.
3004 if (chanMgr->currFindAndPlayChannel.isSame(ch->info.id))
3005 chanMgr->playChannel(ch->info);
3008 ch->stayConnected = cfi->keep;
3014 // -----------------------------------
3015 void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep)
3017 ChanFindInfo *cfi = new ChanFindInfo;
3020 cfi->func = findAndPlayChannelProc;
3023 sys->startThread(cfi);
3025 // -----------------------------------
3026 void ChanMgr::playChannel(ChanInfo &info)
3029 char str[128],fname[256],idStr[128];
3031 sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
3032 info.id.toStr(idStr);
3034 PlayList::TYPE type;
3037 if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
3039 type = PlayList::T_ASX;
3040 // WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
3041 // so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
3042 if (servMgr->getModulePath) //JP-EX
3044 peercastApp->getDirectory();
3045 sprintf(fname,"%s/%s.asx",servMgr->modulePath,idStr);
3047 sprintf(fname,"%s/%s.asx",peercastApp->getPath(),idStr);
3048 }else if (info.contentType == ChanInfo::T_OGM)
3050 type = PlayList::T_RAM;
3051 if (servMgr->getModulePath) //JP-EX
3053 peercastApp->getDirectory();
3054 sprintf(fname,"%s/play.ram",servMgr->modulePath);
3056 sprintf(fname,"%s/play.ram",peercastApp->getPath());
3060 type = PlayList::T_SCPLS;
3061 if (servMgr->getModulePath) //JP-EX
3063 peercastApp->getDirectory();
3064 sprintf(fname,"%s/play.pls",servMgr->modulePath);
3066 sprintf(fname,"%s/play.pls",peercastApp->getPath());
3070 PlayList *pls = new PlayList(type,1);
3071 pls->addChannel(str,info);
3074 LOG_DEBUG("Writing %s",fname);
3076 file.openWriteReplace(fname);
3081 LOG_DEBUG("Executing: %s",fname);
3082 sys->executeFile(fname);
3087 // -----------------------------------
3088 ChanHitList::ChanHitList()
3095 // -----------------------------------
3096 ChanHitList::~ChanHitList()
3098 chanMgr->hitlistlock.on();
3100 hit = deleteHit(hit);
3101 chanMgr->hitlistlock.off();
3103 // -----------------------------------
3104 void ChanHit::pickNearestIP(Host &h)
3106 for(int i=0; i<2; i++)
3108 if (h.classType() == rhost[i].classType())
3116 // -----------------------------------
3117 void ChanHit::init()
3129 dead = tracker = firewalled = stable = yp = false;
3130 recv = cin = direct = relay = true;
3131 relayfull = chfull = ratefull = false;
3140 version_ex_prefix[0] = ' ';
3141 version_ex_prefix[1] = ' ';
3142 version_ex_number = 0;
3151 oldestPos = newestPos = 0;
3156 // -----------------------------------
3157 void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,bool isFull,unsigned int bitrate, Channel* ch, unsigned int oldp,unsigned int newp)
3160 firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
3161 numListeners = numl;
3164 stable = servMgr->totalStreams>0;
3165 sessionID = servMgr->sessionID;
3168 direct = !servMgr->directFull();
3169 // relay = !servMgr->relaysFull();
3170 cin = !servMgr->controlInFull();
3172 relayfull = servMgr->relaysFull();
3175 Channel *c = chanMgr->channel;
3177 unsigned int needRate = 0;
3178 unsigned int allRate = 0;
3180 if (c->isPlaying()){
3181 allRate += c->info.bitrate * c->localRelays();
3182 if ((c != ch) && (c->localRelays() == 0)){
3183 if(!isIndexTxt(c)) // for PCRaw (relay)
3185 needRate+=c->info.bitrate;
3190 unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
3191 int diff = servMgr->maxRelays - numRelay;
3192 if (ch->localRelays()){
3193 if (noRelay > diff){
3201 // ratefull = servMgr->bitrateFull(needRate+bitrate);
3202 ratefull = (servMgr->maxBitrateOut < allRate + needRate + ch->info.bitrate);
3204 if (!isIndexTxt(ch))
3205 relay = (!relayfull) && (!chfull) && (!ratefull) && (numRelay + noRelay < servMgr->maxRelays);
3207 relay = (!chfull) && (!ratefull); // for PCRaw (relay)
3210 LOG_DEBUG("Reject by relay full");
3213 LOG_DEBUG("Reject by channel full");
3216 LOG_DEBUG("Reject by rate: Max=%d Now=%d Need=%d ch=%d", servMgr->maxBitrateOut, allRate, needRate, ch->info.bitrate);
3219 host = servMgr->serverHost;
3221 version = PCP_CLIENT_VERSION;
3222 version_vp = PCP_CLIENT_VERSION_VP;
3224 strncpy(version_ex_prefix, PCP_CLIENT_VERSION_EX_PREFIX,2);
3225 version_ex_number = PCP_CLIENT_VERSION_EX_NUMBER;
3227 version_ex_prefix[0] = ' ';
3228 version_ex_prefix[1] = ' ';
3229 version_ex_number = 0;
3232 status = ch->status;
3234 rhost[0] = Host(host.ip,host.port);
3235 rhost[1] = Host(ClientSocket::getIP(NULL),host.port);
3243 uphost.ip = ch->sourceHost.host.ip;
3244 uphost.port = ch->sourceHost.host.port;
3248 // -----------------------------------
3249 void ChanHit::writeAtoms(AtomStream &atom,GnuID &chanID)
3251 bool addChan=chanID.isSet();
3252 bool uphostdata=(uphost.ip != 0);
3255 if (recv) fl1 |= PCP_HOST_FLAGS1_RECV;
3256 if (relay) fl1 |= PCP_HOST_FLAGS1_RELAY;
3257 if (direct) fl1 |= PCP_HOST_FLAGS1_DIRECT;
3258 if (cin) fl1 |= PCP_HOST_FLAGS1_CIN;
3259 if (tracker) fl1 |= PCP_HOST_FLAGS1_TRACKER;
3260 if (firewalled) fl1 |= PCP_HOST_FLAGS1_PUSH;
3262 atom.writeParent(PCP_HOST,13 + (addChan?1:0) + (uphostdata?3:0) + (version_ex_number?2:0));
3265 atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
3266 atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
3267 atom.writeInt(PCP_HOST_IP,rhost[0].ip);
3268 atom.writeShort(PCP_HOST_PORT,rhost[0].port);
3269 atom.writeInt(PCP_HOST_IP,rhost[1].ip);
3270 atom.writeShort(PCP_HOST_PORT,rhost[1].port);
3271 atom.writeInt(PCP_HOST_NUML,numListeners);
3272 atom.writeInt(PCP_HOST_NUMR,numRelays);
3273 atom.writeInt(PCP_HOST_UPTIME,upTime);
3274 atom.writeInt(PCP_HOST_VERSION,version);
3275 atom.writeInt(PCP_HOST_VERSION_VP,version_vp);
3276 if (version_ex_number){
3277 atom.writeBytes(PCP_HOST_VERSION_EX_PREFIX,version_ex_prefix,2);
3278 atom.writeShort(PCP_HOST_VERSION_EX_NUMBER,version_ex_number);
3280 atom.writeChar(PCP_HOST_FLAGS1,fl1);
3281 atom.writeInt(PCP_HOST_OLDPOS,oldestPos);
3282 atom.writeInt(PCP_HOST_NEWPOS,newestPos);
3284 atom.writeInt(PCP_HOST_UPHOST_IP,uphost.ip);
3285 atom.writeInt(PCP_HOST_UPHOST_PORT,uphost.port);
3286 atom.writeInt(PCP_HOST_UPHOST_HOPS,uphostHops);
3289 // -----------------------------------
3290 bool ChanHit::writeVariable(Stream &out, const String &var)
3294 if (var == "rhost0")
3296 if (servMgr->enableGetName) //JP-EX s
3302 strcpy(buf,"<font color=red>");
3304 strcpy(buf,"<font color=orange>");
3309 strcpy(buf,"<font color=purple>");
3311 strcpy(buf,"<font color=blue>");
3314 strcpy(buf,"<font color=green>");
3318 rhost[0].toStr(buf2);
3322 if (ClientSocket::getHostname(h_name,rhost[0].ip))
3328 strcat(buf,"</font>");
3331 rhost[0].toStr(buf);
3333 else if (var == "rhost1")
3334 rhost[1].toStr(buf);
3335 else if (var == "numHops")
3336 sprintf(buf,"%d",numHops);
3337 else if (var == "numListeners")
3338 sprintf(buf,"%d",numListeners);
3339 else if (var == "numRelays")
3340 sprintf(buf,"%d",numRelays);
3341 else if (var == "uptime")
3344 timeStr.setFromStopwatch(upTime);
3345 strcpy(buf,timeStr.cstr());
3346 }else if (var == "update")
3350 timeStr.setFromStopwatch(sys->getTime()-time);
3353 strcpy(buf,timeStr.cstr());
3354 }else if (var == "isFirewalled"){
3355 sprintf(buf,"%d",firewalled?1:0);
3356 }else if (var == "version"){
3357 sprintf(buf,"%d",version);
3358 }else if (var == "agent"){
3360 if (version_ex_number){
3361 sprintf(buf, "v0.%d(%c%c%04d)", version, version_ex_prefix[0], version_ex_prefix[1], version_ex_number);
3362 } else if (version_vp){
3363 sprintf(buf,"v0.%d(VP%04d)", version, version_vp);
3365 sprintf(buf,"v0.%d", version);
3371 else if (var == "check")
3374 strcpy(buf, "<a href=\"#\" onclick=\"checkip('");
3375 rhost[0].IPtoStr(buf2);
3377 strcat(buf, "')\">_</a>");
3379 else if (var == "uphost") // tree
3381 else if (var == "uphostHops") // tree
3382 sprintf(buf,"%d",uphostHops);
3383 else if (var == "canRelay") // tree
3384 sprintf(buf, "%d",relay);
3388 out.writeString(buf);
3392 // -----------------------------------
3393 int ChanHitList::getTotalListeners()
3400 cnt+=h->numListeners;
3405 // -----------------------------------
3406 int ChanHitList::getTotalRelays()
3418 // -----------------------------------
3419 int ChanHitList::getTotalFirewalled()
3433 // -----------------------------------
3434 int ChanHitList::contactTrackers(bool connected, int numl, int nums, int uptm)
3439 void ChanHitList::clearHits(bool flg)
3441 ChanHit *c = hit, *prev = NULL;
3444 if (flg || (c->numHops != 0)){
3445 ChanHit *next = c->next;
3460 // -----------------------------------
3461 ChanHit *ChanHitList::deleteHit(ChanHit *ch)
3463 ChanHit *c = hit,*prev=NULL;
3468 ChanHit *next = c->next;
3484 // -----------------------------------
3485 ChanHit *ChanHitList::addHit(ChanHit &h)
3487 char ip0str[64],ip1str[64];
3488 h.rhost[0].toStr(ip0str);
3489 h.rhost[1].toStr(ip1str);
3491 h.uphost.toStr(uphostStr);
3493 LOG_DEBUG("Add hit: F%dT%dR%d %s/%s <- %s(%d)",h.firewalled,h.tracker,h.relay,ip0str,ip1str,uphostStr, h.uphostHops);
3495 LOG_DEBUG("Add hit: F%dT%dR%d %s/%s",h.firewalled,h.tracker,h.relay,ip0str,ip1str);
3498 // dont add our own hits
3499 if (servMgr->sessionID.isSame(h.sessionID))
3503 lastHitTime = sys->getTime();
3504 h.time = lastHitTime;
3509 if ((ch->rhost[0].ip == h.rhost[0].ip) && (ch->rhost[0].port == h.rhost[0].port))
3510 if (((ch->rhost[1].ip == h.rhost[1].ip) && (ch->rhost[1].port == h.rhost[1].port)) || (!ch->rhost[1].isValid()))
3514 if (ch->numHops > 0 && h.numHops == 0)
3515 // downstream hit recieved as RelayHost
3517 ChanHit *next = ch->next;
3526 // clear hits with same session ID (IP may have changed)
3527 if (h.sessionID.isSet())
3533 if (ch->sessionID.isSame(h.sessionID))
3545 ChanHit *ch = new ChanHit();
3547 ch->chanID = info.id;
3557 // -----------------------------------
3558 int ChanHitList::clearDeadHits(unsigned int timeout, bool clearTrackers)
3561 unsigned int ctime = sys->getTime();
3563 // LOG_DEBUG("clearDeadHits HITLISTLOCK ON-------------");
3564 chanMgr->hitlistlock.on();
3570 if (ch->dead || ((ctime-ch->time) > timeout) && (clearTrackers || (!clearTrackers & !ch->tracker)))
3572 // ch = deleteHit(ch);
3574 if (ch->firewalled){
3575 // LOG_DEBUG("kickKeepTime = %d, %d", servMgr->kickKeepTime, ctime-ch->time);
3576 if ( (servMgr->kickKeepTime == 0) || ((ctime-ch->time) > servMgr->kickKeepTime) ){
3580 ch->numListeners = 0;
3593 // LOG_DEBUG("clearDeadHits HITLISTLOCK OFF-------------");
3594 chanMgr->hitlistlock.off();
3599 // -----------------------------------
3600 void ChanHitList::deadHit(ChanHit &h)
3602 char ip0str[64],ip1str[64];
3603 h.rhost[0].toStr(ip0str);
3604 h.rhost[1].toStr(ip1str);
3605 LOG_DEBUG("Dead hit: %s/%s",ip0str,ip1str);
3611 if (ch->rhost[0].isSame(h.rhost[0]) && ch->rhost[1].isSame(h.rhost[1]))
3618 // -----------------------------------
3619 void ChanHitList::delHit(ChanHit &h)
3621 char ip0str[64],ip1str[64];
3622 h.rhost[0].toStr(ip0str);
3623 h.rhost[1].toStr(ip1str);
3624 LOG_DEBUG("Del hit: %s/%s",ip0str,ip1str);
3630 if (ch->rhost[0].isSame(h.rhost[0]) && ch->rhost[1].isSame(h.rhost[1]))
3638 // -----------------------------------
3639 int ChanHitList::numHits()
3645 if (ch->host.ip && !ch->dead && ch->numHops)
3652 // -----------------------------------
3653 int ChanHitList::numListeners()
3659 if (ch->host.ip && !ch->dead && ch->numHops)
3660 cnt += ch->numListeners;
3666 // -----------------------------------
3667 int ChanHitList::numRelays()
3673 if (ch->host.ip && !ch->dead)
3674 cnt += ch->numRelays;
3681 // -----------------------------------
3682 int ChanHitList::numTrackers()
3688 if ((ch->host.ip && !ch->dead) && (ch->tracker))
3694 // -----------------------------------
3695 int ChanHitList::numFirewalled()
3701 if (ch->host.ip && !ch->dead)
3702 cnt += ch->firewalled?1:0;
3707 // -----------------------------------
3708 int ChanHitList::closestHit()
3710 unsigned int hop=10000;
3714 if (ch->host.ip && !ch->dead)
3715 if (ch->numHops < hop)
3722 // -----------------------------------
3723 int ChanHitList::furthestHit()
3729 if (ch->host.ip && !ch->dead)
3730 if (ch->numHops > hop)
3737 // -----------------------------------
3738 unsigned int ChanHitList::newestHit()
3740 unsigned int time=0;
3744 if (ch->host.ip && !ch->dead)
3745 if (ch->time > time)
3752 // -----------------------------------
3753 int ChanHitList::pickHits(ChanHitSearch &chs)
3755 ChanHit best,*bestP=NULL;
3760 unsigned int ctime = sys->getTime();
3765 if (c->host.ip && !c->dead)
3767 if (!chs.excludeID.isSame(c->sessionID))
3768 if ((chs.waitDelay==0) || ((ctime-c->lastContact) >= chs.waitDelay))
3769 if ((c->numHops<=best.numHops)) // (c->time>=best.time))
3770 if (c->relay || (!c->relay && chs.useBusyRelays))
3771 if (c->cin || (!c->cin && chs.useBusyControls))
3774 if (chs.trackersOnly && c->tracker)
3776 if (chs.matchHost.ip)
3778 if ((c->rhost[0].ip == chs.matchHost.ip) && c->rhost[1].isValid())
3782 best.host = best.rhost[1]; // use lan ip
3784 }else if (c->firewalled == chs.useFirewalled)
3788 best.host = best.rhost[0]; // use wan ip
3790 }else if (!chs.trackersOnly && !c->tracker)
3792 if (chs.matchHost.ip)
3794 if ((c->rhost[0].ip == chs.matchHost.ip) && c->rhost[1].isValid())
3798 best.host = best.rhost[1]; // use lan ip
3800 }else if (c->firewalled == chs.useFirewalled && (!bestP || !bestP->relay))
3804 best.host = best.rhost[0]; // use wan ip
3814 if (chs.numResults < ChanHitSearch::MAX_RESULTS)
3817 bestP->lastContact = ctime;
3818 chs.best[chs.numResults++] = best;
3828 // -----------------------------------
3829 int ChanHitList::pickSourceHits(ChanHitSearch &chs)
3831 if (pickHits(chs) && chs.best[0].numHops == 0) return 1;
3836 // -----------------------------------
3837 const char *ChanInfo::getTypeStr(TYPE t)
3841 case T_RAW: return "RAW";
3843 case T_MP3: return "MP3";
3844 case T_OGG: return "OGG";
3845 case T_OGM: return "OGM";
3846 case T_WMA: return "WMA";
3848 case T_MOV: return "MOV";
3849 case T_MPG: return "MPG";
3850 case T_NSV: return "NSV";
3851 case T_WMV: return "WMV";
3853 case T_PLS: return "PLS";
3854 case T_ASX: return "ASX";
3856 default: return "UNKNOWN";
3859 // -----------------------------------
3860 const char *ChanInfo::getProtocolStr(PROTOCOL t)
3864 case SP_PEERCAST: return "PEERCAST";
3865 case SP_HTTP: return "HTTP";
3866 case SP_FILE: return "FILE";
3867 case SP_MMS: return "MMS";
3868 case SP_PCP: return "PCP";
3869 default: return "UNKNOWN";
3872 // -----------------------------------
3873 ChanInfo::PROTOCOL ChanInfo::getProtocolFromStr(const char *str)
3875 if (stricmp(str,"PEERCAST")==0)
3877 else if (stricmp(str,"HTTP")==0)
3879 else if (stricmp(str,"FILE")==0)
3881 else if (stricmp(str,"MMS")==0)
3883 else if (stricmp(str,"PCP")==0)
3889 // -----------------------------------
3890 const char *ChanInfo::getTypeExt(TYPE t)
3894 case ChanInfo::T_OGM:
3895 case ChanInfo::T_OGG:
3897 case ChanInfo::T_MP3:
3899 case ChanInfo::T_MOV:
3901 case ChanInfo::T_NSV:
3903 case ChanInfo::T_WMV:
3905 case ChanInfo::T_WMA:
3911 // -----------------------------------
3912 ChanInfo::TYPE ChanInfo::getTypeFromStr(const char *str)
3914 if (stricmp(str,"MP3")==0)
3916 else if (stricmp(str,"OGG")==0)
3918 else if (stricmp(str,"OGM")==0)
3920 else if (stricmp(str,"RAW")==0)
3922 else if (stricmp(str,"NSV")==0)
3924 else if (stricmp(str,"WMA")==0)
3926 else if (stricmp(str,"WMV")==0)
3928 else if (stricmp(str,"PLS")==0)
3930 else if (stricmp(str,"M3U")==0)
3932 else if (stricmp(str,"ASX")==0)
3937 // -----------------------------------
3938 bool ChanInfo::matchNameID(ChanInfo &inf)
3941 if (id.isSame(inf.id))
3944 if (!inf.name.isEmpty())
3945 if (name.contains(inf.name))
3950 // -----------------------------------
3951 bool ChanInfo::match(ChanInfo &inf)
3955 if (inf.status != S_UNKNOWN)
3957 if (status != inf.status)
3961 if (inf.bitrate != 0)
3963 if (bitrate == inf.bitrate)
3970 if (id.isSame(inf.id))
3975 if (inf.contentType != T_UNKNOWN)
3977 if (contentType == inf.contentType)
3982 if (!inf.name.isEmpty())
3984 if (name.contains(inf.name))
3989 if (!inf.genre.isEmpty())
3991 if (genre.contains(inf.genre))
3998 // -----------------------------------
3999 bool TrackInfo::update(TrackInfo &inf)
4001 bool changed = false;
4003 if (!contact.isSame(inf.contact))
4005 contact = inf.contact;
4009 if (!title.isSame(inf.title))
4015 if (!artist.isSame(inf.artist))
4017 artist = inf.artist;
4021 if (!album.isSame(inf.album))
4027 if (!genre.isSame(inf.genre))
4038 // -----------------------------------
4039 bool ChanInfo::update(ChanInfo &info)
4041 bool changed = false;
4046 if (!info.id.isSet())
4049 // only update from chaninfo that has full name etc..
4050 if (info.name.isEmpty())
4053 // check valid broadcaster key
4056 if (!bcID.isSame(info.bcID))
4058 LOG_ERROR("ChanInfo BC key not valid");
4068 if (bitrate != info.bitrate)
4070 bitrate = info.bitrate;
4074 if (contentType != info.contentType)
4076 contentType = info.contentType;
4080 if (!desc.isSame(info.desc)) //JP-EX
4086 if (!name.isSame(info.name))
4092 if (!comment.isSame(info.comment))
4094 comment = info.comment;
4098 if (!genre.isSame(info.genre))
4104 if (!url.isSame(info.url))
4110 if (track.update(info.track))
4116 // -----------------------------------
4117 void ChanInfo::initNameID(const char *n)
4125 // -----------------------------------
4126 void ChanInfo::init()
4131 contentType = T_UNKNOWN;
4132 srcProtocol = SP_UNKNOWN;
4144 // -----------------------------------
4145 void ChanInfo::readTrackXML(XML::Node *n)
4148 readXMLString(track.title,n,"title");
4149 readXMLString(track.contact,n,"contact");
4150 readXMLString(track.artist,n,"artist");
4151 readXMLString(track.album,n,"album");
4152 readXMLString(track.genre,n,"genre");
4154 // -----------------------------------
4155 unsigned int ChanInfo::getUptime()
4157 // calculate uptime and cap if requested by settings.
4159 upt = lastPlayStart?(sys->getTime()-lastPlayStart):0;
4160 if (chanMgr->maxUptime)
4161 if (upt > chanMgr->maxUptime)
4162 upt = chanMgr->maxUptime;
4165 // -----------------------------------
4166 unsigned int ChanInfo::getAge()
4168 return sys->getTime()-createdTime;
4171 // ------------------------------------------
4172 void ChanInfo::readTrackAtoms(AtomStream &atom,int numc)
4174 for(int i=0; i<numc; i++)
4177 ID4 id = atom.read(c,d);
4178 if (id == PCP_CHAN_TRACK_TITLE)
4180 atom.readString(track.title.data,sizeof(track.title.data),d);
4181 }else if (id == PCP_CHAN_TRACK_CREATOR)
4183 atom.readString(track.artist.data,sizeof(track.artist.data),d);
4184 }else if (id == PCP_CHAN_TRACK_URL)
4186 atom.readString(track.contact.data,sizeof(track.contact.data),d);
4187 }else if (id == PCP_CHAN_TRACK_ALBUM)
4189 atom.readString(track.album.data,sizeof(track.album.data),d);
4194 // ------------------------------------------
4195 void ChanInfo::readInfoAtoms(AtomStream &atom,int numc)
4197 for(int i=0; i<numc; i++)
4200 ID4 id = atom.read(c,d);
4201 if (id == PCP_CHAN_INFO_NAME)
4203 atom.readString(name.data,sizeof(name.data),d);
4204 }else if (id == PCP_CHAN_INFO_BITRATE)
4206 bitrate = atom.readInt();
4207 }else if (id == PCP_CHAN_INFO_GENRE)
4209 atom.readString(genre.data,sizeof(genre.data),d);
4210 }else if (id == PCP_CHAN_INFO_URL)
4212 atom.readString(url.data,sizeof(url.data),d);
4213 }else if (id == PCP_CHAN_INFO_DESC)
4215 atom.readString(desc.data,sizeof(desc.data),d);
4216 }else if (id == PCP_CHAN_INFO_COMMENT)
4218 atom.readString(comment.data,sizeof(comment.data),d);
4219 }else if (id == PCP_CHAN_INFO_TYPE)
4222 atom.readString(type,sizeof(type),d);
4223 contentType = ChanInfo::getTypeFromStr(type);
4229 // -----------------------------------
4230 void ChanInfo::writeInfoAtoms(AtomStream &atom)
4232 atom.writeParent(PCP_CHAN_INFO,7);
4233 atom.writeString(PCP_CHAN_INFO_NAME,name.cstr());
4234 atom.writeInt(PCP_CHAN_INFO_BITRATE,bitrate);
4235 atom.writeString(PCP_CHAN_INFO_GENRE,genre.cstr());
4236 atom.writeString(PCP_CHAN_INFO_URL,url.cstr());
4237 atom.writeString(PCP_CHAN_INFO_DESC,desc.cstr());
4238 atom.writeString(PCP_CHAN_INFO_COMMENT,comment.cstr());
4239 atom.writeString(PCP_CHAN_INFO_TYPE,getTypeStr(contentType));
4242 // -----------------------------------
4243 void ChanInfo::writeTrackAtoms(AtomStream &atom)
4245 atom.writeParent(PCP_CHAN_TRACK,4);
4246 atom.writeString(PCP_CHAN_TRACK_TITLE,track.title.cstr());
4247 atom.writeString(PCP_CHAN_TRACK_CREATOR,track.artist.cstr());
4248 atom.writeString(PCP_CHAN_TRACK_URL,track.contact.cstr());
4249 atom.writeString(PCP_CHAN_TRACK_ALBUM,track.album.cstr());
4253 // -----------------------------------
4254 XML::Node *ChanInfo::createChannelXML()
4258 String nameUNI = name;
4259 nameUNI.convertTo(String::T_UNICODESAFE);
4261 String urlUNI = url;
4262 urlUNI.convertTo(String::T_UNICODESAFE);
4264 String genreUNI = genre;
4265 genreUNI.convertTo(String::T_UNICODESAFE);
4267 String descUNI = desc;
4268 descUNI.convertTo(String::T_UNICODESAFE);
4271 commentUNI = comment;
4272 commentUNI.convertTo(String::T_UNICODESAFE);
4278 return new XML::Node("channel name=\"%s\" id=\"%s\" bitrate=\"%d\" type=\"%s\" genre=\"%s\" desc=\"%s\" url=\"%s\" uptime=\"%d\" comment=\"%s\" skips=\"%d\" age=\"%d\" bcflags=\"%d\"",
4282 getTypeStr(contentType),
4294 // -----------------------------------
4295 XML::Node *ChanInfo::createQueryXML()
4301 String nameHTML = name;
4302 nameHTML.convertTo(String::T_HTML);
4303 String genreHTML = genre;
4304 genreHTML.convertTo(String::T_HTML);
4307 if (!nameHTML.isEmpty())
4309 strcat(buf," name=\"");
4310 strcat(buf,nameHTML.cstr());
4314 if (!genreHTML.isEmpty())
4316 strcat(buf," genre=\"");
4317 strcat(buf,genreHTML.cstr());
4324 strcat(buf," id=\"");
4330 return new XML::Node("channel %s",buf);
4333 // -----------------------------------
4334 XML::Node *ChanInfo::createRelayChannelXML()
4341 return new XML::Node("channel id=\"%s\" uptime=\"%d\" skips=\"%d\" age=\"%d\"",
4347 }// -----------------------------------
4348 XML::Node *ChanInfo::createTrackXML()
4350 String titleUNI = track.title;
4351 titleUNI.convertTo(String::T_UNICODESAFE);
4353 String artistUNI = track.artist;
4354 artistUNI.convertTo(String::T_UNICODESAFE);
4356 String albumUNI = track.album;
4357 albumUNI.convertTo(String::T_UNICODESAFE);
4359 String genreUNI = track.genre;
4360 genreUNI.convertTo(String::T_UNICODESAFE);
4362 String contactUNI = track.contact;
4363 contactUNI.convertTo(String::T_UNICODESAFE);
4367 return new XML::Node("track title=\"%s\" artist=\"%s\" album=\"%s\" genre=\"%s\" contact=\"%s\"",
4376 // -----------------------------------
4377 void ChanInfo::init(XML::Node *n)
4383 // -----------------------------------
4384 void ChanInfo::updateFromXML(XML::Node *n)
4386 String typeStr,idStr;
4388 readXMLString(name,n,"name");
4389 readXMLString(genre,n,"genre");
4390 readXMLString(url,n,"url");
4391 readXMLString(desc,n,"desc");
4394 int br = n->findAttrInt("bitrate");
4398 readXMLString(typeStr,n,"type");
4399 if (!typeStr.isEmpty())
4400 contentType = getTypeFromStr(typeStr.cstr());
4403 readXMLString(idStr,n,"id");
4404 if (!idStr.isEmpty())
4405 id.fromStr(idStr.cstr());
4407 readXMLString(comment,n,"comment");
4409 XML::Node *tn = n->findNode("track");
4415 // -----------------------------------
4416 void ChanInfo::init(const char *n, GnuID &cid, TYPE tp, int br)
4426 // -----------------------------------
4427 void ChanInfo::init(const char *fn)
4434 // -----------------------------------
4435 void PlayList::readASX(Stream &in)
4437 LOG_DEBUG("Reading ASX");
4443 }catch(StreamException &) {} // TODO: eof is NOT handled properly in sockets - always get error at end
4447 XML::Node *n = xml.root->child;
4450 if (stricmp("entry",n->getName())==0)
4452 XML::Node *rf = n->findNode("ref");
4455 char *hr = rf->findAttr("href");
4459 //LOG("asx url %s",hr);
4468 // -----------------------------------
4469 void PlayList::readSCPLS(Stream &in)
4472 while (in.readLine(tmp,sizeof(tmp)))
4474 if (strnicmp(tmp,"file",4)==0)
4476 char *p = strstr(tmp,"=");
4482 // -----------------------------------
4483 void PlayList::readPLS(Stream &in)
4486 while (in.readLine(tmp,sizeof(tmp)))
4492 // -----------------------------------
4493 void PlayList::writeSCPLS(Stream &out)
4495 out.writeLine("[playlist]");
4497 out.writeLineF("NumberOfEntries=%d",numURLs);
4499 for(int i=0; i<numURLs; i++)
4501 out.writeLineF("File%d=%s",i+1,urls[i].cstr());
4502 out.writeLineF("Title%d=%s",i+1,titles[i].cstr());
4503 out.writeLineF("Length%d=-1",i+1);
4505 out.writeLine("Version=2");
4507 // -----------------------------------
4508 void PlayList::writePLS(Stream &out)
4510 for(int i=0; i<numURLs; i++)
4511 out.writeLineF("%s",urls[i].cstr());
4513 // -----------------------------------
4514 void PlayList::writeRAM(Stream &out)
4516 for(int i=0; i<numURLs; i++)
4517 out.writeLineF("%s",urls[i].cstr());
4520 // -----------------------------------
4521 void PlayList::writeASX(Stream &out)
4523 out.writeLine("<ASX Version=\"3.0\">");
4524 for(int i=0; i<numURLs; i++)
4526 out.writeLine("<ENTRY>");
4527 out.writeLineF("<REF href = \"%s\" />",urls[i].cstr());
4528 out.writeLine("</ENTRY>");
4530 out.writeLine("</ASX>");
4534 // -----------------------------------
4535 void PlayList::addChannel(const char *path, ChanInfo &info)
4541 info.id.toStr(idStr);
4542 char *nid = info.id.isSet()?idStr:info.name.cstr();
4544 sprintf(url.cstr(),"%s/stream/%s%s",path,nid,ChanInfo::getTypeExt(info.contentType));
4545 addURL(url.cstr(),info.name);
4548 // -----------------------------------
4549 void ChanHitSearch::init()
4553 useFirewalled = false;
4554 trackersOnly = false;
4555 useBusyRelays = true;
4556 useBusyControls = true;
4560 //seed = sys->getTime();
4564 int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList *chl)
4571 static int base = 0x400;
4572 ChanHit tmpHit[MAX_RESULTS];
4573 static WLock seqLock;
4574 static unsigned int riSequence = 0;
4582 riSequence &= 0xffffff;
4585 Servent *s = servMgr->servents;
4587 if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
4588 && s->chanID.isSame(chl->info.id)) {
4589 int i = index % MAX_RESULTS;
4590 if (index < MAX_RESULTS
4591 || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
4592 s->serventHit.lastSendSeq = seq;
4593 tmpHit[i] = s->serventHit;
4594 tmpHit[i].host = s->serventHit.rhost[0];
4601 ChanHit *hit = chl->hit;
4604 if (hit->host.ip && !hit->dead){
4606 (!exID.isSame(hit->sessionID))
4609 && (!hit->firewalled)
4610 && (hit->numHops != 0)
4612 if ( (hit->rhost[0].ip == host1.ip)
4613 && hit->rhost[1].isValid()
4614 && (host2.ip != hit->rhost[1].ip)
4617 best[0].host = hit->rhost[1];
4620 if ((hit->rhost[0].ip == host2.ip) && hit->rhost[1].isValid()){
4622 best[0].host = hit->rhost[1];
4626 loop = (index / MAX_RESULTS) + 1;
4627 //prob = (float)1 / (float)loop;
4629 //rnd = (float)rand() / (float)RAND_MAX;
4630 rnd = rand() % base;
4631 if (hit->numHops == 1){
4633 if (tmpHit[index % MAX_RESULTS].numHops == 1){
4635 tmpHit[index % MAX_RESULTS] = *hit;
4636 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4640 tmpHit[index % MAX_RESULTS] = *hit;
4641 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4646 if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
4647 tmpHit[index % MAX_RESULTS] = *hit;
4648 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4654 // hit->host.toStr(tmp);
4655 // LOG_DEBUG("TEST %s: %f %f", tmp, rnd, prob);
4661 if (index > MAX_RESULTS){
4667 /* int use[MAX_RESULTS];
4668 memset(use, 0, sizeof(use));
4670 for (i = 0; i < cnt; i++){
4674 for (i = 0; i < cnt; i++){
4677 // LOG_DEBUG("%d",r);
4685 for (i = 0; i < cnt; i++){
4686 // LOG_DEBUG("%d", use[i]);
4687 best[use[i]] = tmpHit[i];
4690 int use[MAX_RESULTS];
4692 for (i = 0; i < cnt; i++) {
4693 use[i] = (i + seq) % cnt;
4696 for (i = 0; i < cnt; i++){
4697 // LOG_DEBUG("%d", use[i]);
4698 best[use[i]] = tmpHit[i];
4700 // for (i = 0; i < cnt; i++){
4702 // best[i].host.toStr(tmp);
4703 // LOG_DEBUG("Relay info: Hops = %d, %s", best[i].numHops, tmp);