1 // ------------------------------------------------
6 // Channel streaming classes. These do the actual
7 // streaming of media between clients.
9 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
48 #include "chkMemoryLeak.h"
49 #define DEBUG_NEW new(__FILE__, __LINE__)
53 // -----------------------------------
54 char *Channel::srcTypes[]=
62 // -----------------------------------
63 char *Channel::statusMsgs[]=
82 bool isIndexTxt(ChanInfo *info)
87 info->contentType == ChanInfo::T_RAW &&
88 info->bitrate <= 32 &&
89 (len = strlen(info->name.cstr())) >= 9 &&
90 !memcmp(info->name.cstr(), "index", 5) &&
91 !memcmp(info->name.cstr()+len-4, ".txt", 4))
101 bool isIndexTxt(Channel *ch)
103 if(ch && !ch->isBroadcasting() && isIndexTxt(&ch->info))
109 int numMaxRelaysIndexTxt(Channel *ch)
111 return ((servMgr->maxRelaysIndexTxt < 1) ? 1 : servMgr->maxRelaysIndexTxt);
114 int canStreamIndexTxt(Channel *ch)
118 //
\8e©
\95ª
\82ª
\94z
\90M
\82µ
\82Ä
\82¢
\82é
\8fê
\8d\87\82Í
\8aÖ
\8cW
\82È
\82¢
119 if(!ch || ch->isBroadcasting())
122 ret = numMaxRelaysIndexTxt(ch) - ch->localRelays();
130 // -----------------------------------
131 void readXMLString(String &str, XML::Node *n, const char *arg)
134 p = n->findAttr(arg);
137 str.set(p,String::T_HTML);
138 str.convertTo(String::T_ASCII);
143 // -----------------------------------------------------------------------------
144 // Initialise the channel to its default settings of unallocated and reset.
145 // -----------------------------------------------------------------------------
150 channel_id = channel_count++;
152 // -----------------------------------------------------------------------------
153 void Channel::endThread(bool flg)
177 chanMgr->channellock.on();
178 chanMgr->deleteChannel(this);
179 chanMgr->channellock.off();
181 sys->endThread(&thread);
184 // -----------------------------------------------------------------------------
185 void Channel::resetPlayTime()
187 info.lastPlayStart = sys->getTime();
189 // -----------------------------------------------------------------------------
190 void Channel::setStatus(STATUS s)
194 bool wasPlaying = isPlaying();
200 info.status = ChanInfo::S_PLAY;
205 info.lastPlayEnd = sys->getTime();
206 info.status = ChanInfo::S_UNKNOWN;
209 if (isBroadcasting())
211 ChanHitList *chl = chanMgr->findHitListByID(info.id);
213 chanMgr->addHitList(info);
216 peercastApp->channelUpdate(&info);
221 // -----------------------------------------------------------------------------
222 // Reset channel and make it available
223 // -----------------------------------------------------------------------------
224 void Channel::reset()
237 stayConnected = false;
241 skipCount = 0; //JP-EX
251 rawData.accept = ChanPacket::T_HEAD|ChanPacket::T_DATA;
263 lastTrackerUpdate = 0;
278 // -----------------------------------
279 void Channel::newPacket(ChanPacket &pack)
281 if (pack.type != ChanPacket::T_PCP)
283 rawData.writePacket(pack,true);
288 // -----------------------------------
289 bool Channel::checkIdle()
291 return ( (info.getUptime() > chanMgr->prefetchTime) && (localListeners() == 0) && (!stayConnected) && (status != S_BROADCASTING));
294 // -----------------------------------
295 bool Channel::isFull()
297 // for PCRaw (relay) start.
300 int ret = canStreamIndexTxt(this);
307 // for PCRaw (relay) end.
309 return chanMgr->maxRelaysPerChannel ? localRelays() >= chanMgr->maxRelaysPerChannel : false;
311 // -----------------------------------
312 int Channel::localRelays()
314 return servMgr->numStreams(info.id,Servent::T_RELAY,true);
316 // -----------------------------------
317 int Channel::localListeners()
319 return servMgr->numStreams(info.id,Servent::T_DIRECT,true);
322 // -----------------------------------
323 int Channel::totalRelays()
326 ChanHitList *chl = chanMgr->findHitListByID(info.id);
328 tot += chl->numHits();
331 // -----------------------------------
332 int Channel::totalListeners()
334 int tot = localListeners();
335 ChanHitList *chl = chanMgr->findHitListByID(info.id);
337 tot += chl->numListeners();
344 // -----------------------------------
345 void Channel::startGet()
347 srcType = SRC_PEERCAST;
349 info.srcProtocol = ChanInfo::SP_PCP;
352 sourceData = new PeercastSource();
356 // -----------------------------------
357 void Channel::startURL(const char *u)
363 stayConnected = true;
367 sourceData = new URLSource(u);
372 // -----------------------------------
373 void Channel::startStream()
376 thread.func = stream;
377 if (!sys->startThread(&thread))
381 // -----------------------------------
382 void Channel::sleepUntil(double time)
384 double sleepTime = time - (sys->getDTime()-startTime);
386 // LOG("sleep %g",sleepTime);
389 if (sleepTime > 60) sleepTime = 60;
391 double sleepMS = sleepTime*1000;
393 sys->sleep((int)sleepMS);
398 // -----------------------------------
399 void Channel::checkReadDelay(unsigned int len)
403 unsigned int time = (len*1000)/((info.bitrate*1024)/8);
411 // -----------------------------------
412 THREAD_PROC Channel::stream(ThreadInfo *thread)
416 Channel *ch = (Channel *)thread->data;
418 LOG_CHANNEL("Channel started");
420 while (thread->active && !peercastInst->isQuitting && !thread->finish)
422 ChanHitList *chl = chanMgr->findHitList(ch->info);
424 chanMgr->addHitList(ch->info);
426 ch->sourceData->stream(ch);
428 LOG_CHANNEL("Channel stopped");
431 if (!ch->stayConnected)
433 thread->active = false;
437 if (!ch->info.lastPlayEnd)
438 ch->info.lastPlayEnd = sys->getTime();
440 unsigned int diff = (sys->getTime()-ch->info.lastPlayEnd) + 5;
442 LOG_DEBUG("Channel sleeping for %d seconds",diff);
443 for(unsigned int i=0; i<diff; i++)
445 if (ch->info.lastPlayEnd == 0) // reconnected
447 if (!thread->active || peercastInst->isQuitting){
448 thread->active = false;
456 LOG_DEBUG("thread.active = %d, thread.finish = %d",
457 ch->thread.active, ch->thread.finish);
459 if (!thread->finish){
460 ch->endThread(false);
463 ch->finthread = new ThreadInfo();
464 ch->finthread->func = waitFinish;
465 ch->finthread->data = ch;
466 sys->startThread(ch->finthread);
474 // -----------------------------------
475 THREAD_PROC Channel::waitFinish(ThreadInfo *thread)
477 Channel *ch = (Channel*)thread->data;
478 LOG_DEBUG("Wait channel finish");
480 while(!(ch->thread.finish) && !thread->finish){
484 if (ch->thread.finish){
485 LOG_DEBUG("channel finish");
488 LOG_DEBUG("channel restart");
495 // -----------------------------------
496 bool Channel::acceptGIV(ClientSocket *givSock)
505 // -----------------------------------
506 void Channel::connectFetch()
508 sock = sys->createSocket();
511 throw StreamException("Can`t create socket");
513 if (sourceHost.tracker || sourceHost.yp)
515 sock->setReadTimeout(30000);
516 sock->setWriteTimeout(30000);
517 LOG_CHANNEL("Channel using longer timeouts");
519 sock->setReadTimeout(5000);
520 sock->setWriteTimeout(5000);
523 sock->open(sourceHost.host);
528 // -----------------------------------
529 int Channel::handshakeFetch()
532 info.id.toStr(idStr);
535 servMgr->sessionID.toStr(sidStr);
537 sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
538 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
539 sock->writeLineF("%s %d",PCX_HS_PCP,1);
545 int r = http.readResponse();
547 LOG_CHANNEL("Got response: %d",r);
549 while (http.nextHeader())
551 char *arg = http.getArgStr();
555 if (http.isHeader(PCX_HS_POS))
556 streamPos = atoi(arg);
558 Servent::readICYHeader(http, info, NULL);
560 LOG_CHANNEL("Channel fetch: %s",http.cmdLine);
563 if ((r != 200) && (r != 503))
566 if (!servMgr->keepDownstreams) {
567 if (rawData.getLatestPos() > streamPos)
571 AtomStream atom(*sock);
575 Host rhost = sock->host;
577 if (info.srcProtocol == ChanInfo::SP_PCP)
579 // don`t need PCP_CONNECT here
580 Servent::handshakeOutgoingPCP(atom,rhost,remoteID,agent,sourceHost.yp|sourceHost.tracker);
583 if (r == 503) return 503;
588 // -----------------------------------
589 void PeercastSource::stream(Channel *ch)
593 bool next_yp = false;
594 bool tracker_check = (ch->trackerHit.host.ip != 0);
597 ch->lastStopTime = 0;
600 while (ch->thread.active)
602 ch->skipCount = 0; //JP-EX
603 ch->lastSkipTime = 0;
605 ChanHitList *chl = NULL;
607 ch->sourceHost.init();
609 if (connFailCnt >= 3 && (ch->localListeners() == 0) && (!ch->stayConnected) && !ch->isBroadcasting()) {
610 ch->lastIdleTime = sys->getTime();
611 ch->setStatus(Channel::S_IDLE);
613 ch->lastSkipTime = 0;
617 if (!servMgr->keepDownstreams && !ch->bumped) {
618 ch->trackerHit.lastContact = sys->getTime() - 30 + (rand() % 30);
621 ch->setStatus(Channel::S_SEARCHING);
622 LOG_CHANNEL("Channel searching for hit..");
628 ch->sock = ch->pushSock;
630 ch->sourceHost.host = ch->sock->host;
634 chanMgr->hitlistlock.on();
636 chl = chanMgr->findHitList(ch->info);
642 if (!ch->sourceHost.host.ip){
644 chs.matchHost = servMgr->serverHost;
645 chs.waitDelay = MIN_RELAY_RETRY;
646 chs.excludeID = servMgr->sessionID;
647 if (chl->pickSourceHits(chs)){
648 ch->sourceHost = chs.best[0];
649 LOG_DEBUG("use local hit");
653 // else find global hit
654 if (!ch->sourceHost.host.ip)
657 chs.waitDelay = MIN_RELAY_RETRY;
658 chs.excludeID = servMgr->sessionID;
659 if (chl->pickSourceHits(chs)){
660 ch->sourceHost = chs.best[0];
661 LOG_DEBUG("use global hit");
666 // else find local tracker
667 if (!ch->sourceHost.host.ip)
670 chs.matchHost = servMgr->serverHost;
671 chs.waitDelay = MIN_TRACKER_RETRY;
672 chs.excludeID = servMgr->sessionID;
673 chs.trackersOnly = true;
674 if (chl->pickSourceHits(chs)){
675 ch->sourceHost = chs.best[0];
676 LOG_DEBUG("use local tracker");
680 // else find global tracker
681 if (!ch->sourceHost.host.ip)
684 chs.waitDelay = MIN_TRACKER_RETRY;
685 chs.excludeID = servMgr->sessionID;
686 chs.trackersOnly = true;
687 if (chl->pickSourceHits(chs)){
688 ch->sourceHost = chs.best[0];
689 tracker_check = true;
690 ch->trackerHit = chs.best[0];
691 LOG_DEBUG("use global tracker");
696 unsigned int ctime = sys->getTime();
697 if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
698 if (ch->trackerHit.lastContact + 30 < ctime){
699 ch->sourceHost = ch->trackerHit;
700 ch->trackerHit.lastContact = ctime;
701 LOG_DEBUG("use saved tracker");
706 chanMgr->hitlistlock.off();
708 if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) {
709 ch->lastStopTime = 0;
710 LOG_DEBUG("------------ disconnect all downstreams");
712 MemoryStream mem(pack.data,sizeof(pack.data));
713 AtomStream atom(mem);
714 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
716 pack.type = ChanPacket::T_PCP;
719 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
721 chanMgr->hitlistlock.on();
722 ChanHitList *hl = chanMgr->findHitList(ch->info);
724 hl->clearHits(false);
726 chanMgr->hitlistlock.off();
729 // no trackers found so contact YP
730 if (!tracker_check && !ch->sourceHost.host.ip)
733 if (servMgr->rootHost.isEmpty())
739 if ((!servMgr->rootHost2.isEmpty()) && (numYPTries > numYPTries2))
742 unsigned int ctime=sys->getTime();
743 if ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY)
745 ch->sourceHost.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
746 ch->sourceHost.yp = true;
747 chanMgr->lastYPConnect=ctime;
755 // no trackers found so contact YP2
756 if (!tracker_check && !ch->sourceHost.host.ip)
759 if (servMgr->rootHost2.isEmpty())
762 if (numYPTries2 >= 3)
765 unsigned int ctime=sys->getTime();
766 if ((ctime-chanMgr->lastYPConnect2) > MIN_YP_RETRY)
768 ch->sourceHost.host.fromStrName(servMgr->rootHost2.cstr(),DEFAULT_PORT);
769 ch->sourceHost.yp = true;
770 chanMgr->lastYPConnect2=ctime;
777 if (!tracker_check && !ch->sourceHost.host.ip && !next_yp) break;
781 }while((ch->sourceHost.host.ip==0) && (ch->thread.active));
783 if (!ch->sourceHost.host.ip)
785 LOG_ERROR("Channel giving up");
786 ch->setStatus(Channel::S_ERROR);
790 if (ch->sourceHost.yp)
792 LOG_CHANNEL("Channel contacting YP, try %d",numYPTries);
795 LOG_CHANNEL("Channel found hit");
800 if (ch->sourceHost.host.ip)
802 bool isTrusted = ch->sourceHost.tracker | ch->sourceHost.yp;
805 //if (ch->sourceHost.tracker)
806 // peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Contacting tracker, please wait...");
809 ch->sourceHost.host.toStr(ipstr);
812 if (ch->sourceHost.tracker)
814 else if (ch->sourceHost.yp)
821 ch->setStatus(Channel::S_CONNECTING);
822 ch->sourceHost.lastContact = sys->getTime();
826 LOG_CHANNEL("Channel connecting to %s %s",ipstr,type);
830 error = ch->handshakeFetch();
836 throw StreamException("Handshake error");
837 if (ch->sourceHost.tracker) connFailCnt = 0;
839 if (servMgr->autoMaxRelaySetting) //JP-EX
841 double setMaxRelays = ch->info.bitrate?servMgr->maxBitrateOut/(ch->info.bitrate*1.3):0;
842 if ((unsigned int)setMaxRelays == 0)
843 servMgr->maxRelays = 1;
844 else if ((unsigned int)setMaxRelays > servMgr->autoMaxRelaySetting)
845 servMgr->maxRelays = servMgr->autoMaxRelaySetting;
847 servMgr->maxRelays = (unsigned int)setMaxRelays;
850 ch->sourceStream = ch->createSource();
852 error = ch->readStream(*ch->sock,ch->sourceStream);
854 throw StreamException("Stream error");
856 error = 0; // no errors, closing normally.
857 // ch->setStatus(Channel::S_CLOSING);
858 ch->setStatus(Channel::S_IDLE);
860 LOG_CHANNEL("Channel closed normally");
861 }catch(StreamException &e)
863 ch->setStatus(Channel::S_ERROR);
864 LOG_ERROR("Channel to %s %s : %s",ipstr,type,e.msg);
865 if (!servMgr->allowConnectPCST) //JP-EX
867 if (ch->info.srcProtocol == ChanInfo::SP_PEERCAST)
868 ch->thread.active = false;
870 //if (!ch->sourceHost.tracker || ((error != 503) && ch->sourceHost.tracker))
871 if (!ch->sourceHost.tracker || (!got503 && ch->sourceHost.tracker))
872 chanMgr->deadHit(ch->sourceHost);
873 if (ch->sourceHost.tracker && error == -1) {
874 LOG_ERROR("can't connect to tracker");
879 unsigned int ctime = sys->getTime();
880 if (ch->rawData.lastWriteTime) {
881 ch->lastStopTime = ch->rawData.lastWriteTime;
882 if (isIndexTxt(ch) && ctime - ch->lastStopTime < 60)
883 ch->lastStopTime = ctime;
886 if (tracker_check && ch->sourceHost.tracker)
887 ch->trackerHit.lastContact = ctime - 30 + (rand() % 30);
889 // broadcast source host
890 if (!error && ch->sourceHost.host.ip) { // if closed normally
892 MemoryStream mem(pack.data,sizeof(pack.data));
893 AtomStream atom(mem);
894 ch->sourceHost.writeAtoms(atom, ch->info.id);
896 pack.type = ChanPacket::T_PCP;
899 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
900 LOG_DEBUG("stream: broadcast sourceHost");
903 // broadcast quit to any connected downstream servents
904 if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) {
906 MemoryStream mem(pack.data,sizeof(pack.data));
907 AtomStream atom(mem);
908 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_OFFAIR);
910 pack.type = ChanPacket::T_PCP;
913 servMgr->broadcastPacket(pack,ch->info.id,ch->remoteID,noID,Servent::T_RELAY);
914 LOG_DEBUG("------------ broadcast quit to all downstreams");
916 chanMgr->hitlistlock.on();
917 ChanHitList *hl = chanMgr->findHitList(ch->info);
919 hl->clearHits(false);
921 chanMgr->hitlistlock.off();
925 if (ch->sourceStream)
931 ch->sourceStream->updateStatus(ch);
932 ch->sourceStream->flush(*ch->sock);
934 }catch(StreamException &)
936 ChannelStream *cs = ch->sourceStream;
937 ch->sourceStream = NULL;
951 LOG_ERROR("Channel not found");
953 if ((ch->sourceHost.yp && !next_yp) || ch->sourceHost.tracker) {
954 chanMgr->hitlistlock.on();
955 ChanHitList *hl = chanMgr->findHitList(ch->info);
959 chanMgr->hitlistlock.off();
961 if(!isIndexTxt(&ch->info)) // for PCRaw (popup)
962 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
970 ch->lastIdleTime = sys->getTime();
971 ch->setStatus(Channel::S_IDLE);
972 ch->skipCount = 0; //JP-EX
973 ch->lastSkipTime = 0;
974 while ((ch->checkIdle()) && (ch->thread.active))
983 // -----------------------------------
984 void Channel::startICY(ClientSocket *cs, SRC_TYPE st)
988 cs->setReadTimeout(0); // stay connected even when theres no data coming through
990 info.srcProtocol = ChanInfo::SP_HTTP;
992 streamIndex = ++chanMgr->icyIndex;
994 sourceData = new ICYSource();
998 // -----------------------------------
999 static char *nextMetaPart(char *str,char delim)
1012 // -----------------------------------
1013 static void copyStr(char *to,char *from,int max)
1016 while ((c=*from++) && (--max))
1023 // -----------------------------------
1024 void Channel::processMp3Metadata(char *str)
1026 ChanInfo newInfo = info;
1031 char *arg = nextMetaPart(cmd,'=');
1035 char *next = nextMetaPart(arg,';');
1037 if (strcmp(cmd,"StreamTitle")==0)
1039 newInfo.track.title.setUnquote(arg,String::T_ASCII);
1040 newInfo.track.title.convertTo(String::T_UNICODE);
1042 }else if (strcmp(cmd,"StreamUrl")==0)
1044 newInfo.track.contact.setUnquote(arg,String::T_ASCII);
1045 newInfo.track.contact.convertTo(String::T_UNICODE);
1052 updateInfo(newInfo);
1055 // -----------------------------------
1056 XML::Node *ChanHit::createXML()
1062 return new XML::Node("host ip=\"%s\" hops=\"%d\" listeners=\"%d\" relays=\"%d\" uptime=\"%d\" push=\"%d\" relay=\"%d\" direct=\"%d\" cin=\"%d\" stable=\"%d\" version=\"%d\" update=\"%d\" tracker=\"%d\"",
1074 sys->getTime()-time,
1080 // -----------------------------------
1081 XML::Node *ChanHitList::createXML(bool addHits)
1083 XML::Node *hn = new XML::Node("hits hosts=\"%d\" listeners=\"%d\" relays=\"%d\" firewalled=\"%d\" closest=\"%d\" furthest=\"%d\" newest=\"%d\"",
1090 sys->getTime()-newestHit()
1099 hn->add(h->createXML());
1107 // -----------------------------------
1108 XML::Node *Channel::createRelayXML(bool showStat)
1111 ststr = getStatusStr();
1113 if ((status == S_RECEIVING) || (status == S_BROADCASTING))
1116 ChanHitList *chl = chanMgr->findHitList(info);
1118 return new XML::Node("relay listeners=\"%d\" relays=\"%d\" hosts=\"%d\" status=\"%s\"",
1121 (chl!=NULL)?chl->numHits():0,
1126 // -----------------------------------
1127 void ChanMeta::fromXML(XML &xml)
1129 MemoryStream tout(data,MAX_DATALEN);
1134 // -----------------------------------
1135 void ChanMeta::fromMem(void *p, int l)
1140 // -----------------------------------
1141 void ChanMeta::addMem(void *p, int l)
1143 if ((len+l) <= MAX_DATALEN)
1145 memcpy(data+len,p,l);
1149 // -----------------------------------
1150 void Channel::broadcastTrackerUpdate(GnuID &svID, bool force)
1152 unsigned int ctime = sys->getTime();
1154 if (((ctime-lastTrackerUpdate) > 30) || (force))
1158 MemoryStream mem(pack.data,sizeof(pack));
1160 AtomStream atom(mem);
1164 ChanHitList *chl = chanMgr->findHitListByID(info.id);
1166 throw StreamException("Broadcast channel has no hitlist");
1168 int numListeners = totalListeners();
1169 int numRelays = totalRelays();
1171 unsigned int oldp = rawData.getOldestPos();
1172 unsigned int newp = rawData.getLatestPos();
1174 hit.initLocal(numListeners,numRelays,info.numSkips,info.getUptime(),isPlaying(), false, 0, this, oldp,newp);
1178 atom.writeParent(PCP_BCST,8);
1180 atom.writeParent(PCP_BCST,10);
1182 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ROOT);
1183 atom.writeChar(PCP_BCST_HOPS,0);
1184 atom.writeChar(PCP_BCST_TTL,11);
1185 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1186 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1187 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1189 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1190 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1192 atom.writeParent(PCP_CHAN,4);
1193 atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
1194 atom.writeBytes(PCP_CHAN_BCID,chanMgr->broadcastID.id,16);
1195 info.writeInfoAtoms(atom);
1196 info.writeTrackAtoms(atom);
1197 hit.writeAtoms(atom,info.id);
1201 pack.type = ChanPacket::T_PCP;
1205 int cnt = servMgr->broadcastPacket(pack,noID,servMgr->sessionID,svID,Servent::T_COUT);
1209 LOG_DEBUG("Sent tracker update for %s to %d client(s)",info.name.cstr(),cnt);
1210 lastTrackerUpdate = ctime;
1215 // -----------------------------------
1216 bool Channel::sendPacketUp(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did)
1219 && (!cid.isSet() || info.id.isSame(cid))
1220 && (!sid.isSet() || !remoteID.isSame(sid))
1223 return sourceStream->sendPacket(pack,did);
1228 // -----------------------------------
1229 void Channel::updateInfo(ChanInfo &newInfo)
1231 if (info.update(newInfo))
1233 if (isBroadcasting())
1235 unsigned int ctime = sys->getTime();
1236 if ((ctime-lastMetaUpdate) > 30)
1238 lastMetaUpdate = ctime;
1242 MemoryStream mem(pack.data,sizeof(pack));
1244 AtomStream atom(mem);
1247 atom.writeParent(PCP_BCST,8);
1249 atom.writeParent(PCP_BCST,10);
1251 atom.writeChar(PCP_BCST_HOPS,0);
1252 atom.writeChar(PCP_BCST_TTL,7);
1253 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_RELAYS);
1254 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1255 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1256 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1258 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1259 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1261 atom.writeBytes(PCP_BCST_CHANID,info.id.id,16);
1262 atom.writeParent(PCP_CHAN,3);
1263 atom.writeBytes(PCP_CHAN_ID,info.id.id,16);
1264 info.writeInfoAtoms(atom);
1265 info.writeTrackAtoms(atom);
1268 pack.type = ChanPacket::T_PCP;
1271 servMgr->broadcastPacket(pack,info.id,servMgr->sessionID,noID,Servent::T_RELAY);
1273 broadcastTrackerUpdate(noID);
1277 ChanHitList *chl = chanMgr->findHitList(info);
1281 peercastApp->channelUpdate(&info);
1286 // -----------------------------------
1287 ChannelStream *Channel::createSource()
1289 // if (servMgr->relayBroadcast)
1290 // chanMgr->broadcastRelays(NULL,chanMgr->minBroadcastTTL,chanMgr->maxBroadcastTTL);
1293 ChannelStream *source=NULL;
1295 if (info.srcProtocol == ChanInfo::SP_PEERCAST)
1297 LOG_CHANNEL("Channel is Peercast");
1298 if (servMgr->allowConnectPCST) //JP-EX
1299 source = new PeercastStream();
1301 throw StreamException("Channel is not allowed");
1303 else if (info.srcProtocol == ChanInfo::SP_PCP)
1305 LOG_CHANNEL("Channel is PCP");
1306 PCPStream *pcp = new PCPStream(remoteID);
1309 else if (info.srcProtocol == ChanInfo::SP_MMS)
1311 LOG_CHANNEL("Channel is MMS");
1312 source = new MMSStream();
1315 switch(info.contentType)
1317 case ChanInfo::T_MP3:
1318 LOG_CHANNEL("Channel is MP3 - meta: %d",icyMetaInterval);
1319 source = new MP3Stream();
1321 case ChanInfo::T_NSV:
1322 LOG_CHANNEL("Channel is NSV");
1323 source = new NSVStream();
1325 case ChanInfo::T_WMA:
1326 case ChanInfo::T_WMV:
1327 throw StreamException("Channel is WMA/WMV - but not MMS");
1329 case ChanInfo::T_OGG:
1330 case ChanInfo::T_OGM:
1331 LOG_CHANNEL("Channel is OGG");
1332 source = new OGGStream();
1335 LOG_CHANNEL("Channel is Raw");
1336 source = new RawStream();
1341 source->parent = this;
1345 // ------------------------------------------
1346 void ChannelStream::updateStatus(Channel *ch)
1349 if (getStatus(ch,pack))
1351 if (!ch->isBroadcasting())
1355 int cnt = chanMgr->broadcastPacketUp(pack,ch->info.id,servMgr->sessionID,noID);
1356 LOG_CHANNEL("Sent channel status update to %d clients",cnt);
1361 // ------------------------------------------
1362 bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
1364 unsigned int ctime = sys->getTime();
1366 ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
1371 /* int newLocalListeners = ch->localListeners();
1372 int newLocalRelays = ch->localRelays();
1376 (numListeners != newLocalListeners)
1377 || (numRelays != newLocalRelays)
1378 || (ch->isPlaying() != isPlaying)
1379 || (servMgr->getFirewall() != fwState)
1380 || (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
1382 && ((ctime-lastUpdate) > 10)
1386 numListeners = newLocalListeners;
1387 numRelays = newLocalRelays;
1388 isPlaying = ch->isPlaying();
1389 fwState = servMgr->getFirewall();
1394 hit.initLocal(ch->localListeners(),ch->localRelays(),ch->info.numSkips,ch->info.getUptime(),isPlaying, ch->isFull(), ch->info.bitrate, ch);
1395 hit.tracker = ch->isBroadcasting();*/
1397 int newLocalListeners = ch->localListeners();
1398 int newLocalRelays = ch->localRelays();
1400 if ((ch->isPlaying() == isPlaying)){
1401 if ((ctime-lastUpdate) < 10){
1405 if ((ctime-lastCheckTime) < 10){
1408 lastCheckTime = ctime;
1411 unsigned int oldp = ch->rawData.getOldestPos();
1412 unsigned int newp = ch->rawData.getLatestPos();
1416 // LOG_DEBUG("isPlaying-------------------------------------- %d %d", ch->isPlaying(), isPlaying);
1418 hit.initLocal(newLocalListeners,newLocalRelays,ch->info.numSkips,ch->info.getUptime(),ch->isPlaying(), ch->isFull(), ch->info.bitrate, ch, oldp, newp);
1419 hit.tracker = ch->isBroadcasting();
1421 if ( (((ctime-lastUpdate)>chanMgr->hostUpdateInterval) && chanMgr->hostUpdateInterval)
1422 || (newLocalListeners != numListeners)
1423 || (newLocalRelays != numRelays)
1424 || (ch->isPlaying() != isPlaying)
1425 || (servMgr->getFirewall() != fwState)
1426 || (ch->chDisp.relay != hit.relay)
1427 || (ch->chDisp.relayfull != hit.relayfull)
1428 || (ch->chDisp.chfull != hit.chfull)
1429 || (ch->chDisp.ratefull != hit.ratefull)
1431 numListeners = newLocalListeners;
1432 numRelays = newLocalRelays;
1433 isPlaying = ch->isPlaying();
1434 fwState = servMgr->getFirewall();
1439 if ((numRelays) && ((servMgr->getFirewall() == ServMgr::FW_OFF) && (servMgr->autoRelayKeep!=0))) //JP-EX
1440 ch->stayConnected = true;
1442 if ((!numRelays && !numListeners) && (servMgr->autoRelayKeep==2)) //JP-EX
1443 ch->stayConnected = false;
1445 MemoryStream pmem(pack.data,sizeof(pack.data));
1446 AtomStream atom(pmem);
1452 atom.writeParent(PCP_BCST,8);
1454 atom.writeParent(PCP_BCST,10);
1456 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_TRACKERS);
1457 atom.writeChar(PCP_BCST_HOPS,0);
1458 atom.writeChar(PCP_BCST_TTL,11);
1459 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
1460 atom.writeInt(PCP_BCST_VERSION,PCP_CLIENT_VERSION);
1461 atom.writeInt(PCP_BCST_VERSION_VP,PCP_CLIENT_VERSION_VP);
1463 atom.writeBytes(PCP_BCST_VERSION_EX_PREFIX,PCP_CLIENT_VERSION_EX_PREFIX,2);
1464 atom.writeShort(PCP_BCST_VERSION_EX_NUMBER,PCP_CLIENT_VERSION_EX_NUMBER);
1466 atom.writeBytes(PCP_BCST_CHANID,ch->info.id.id,16);
1467 hit.writeAtoms(atom,noID);
1469 pack.len = pmem.pos;
1470 pack.type = ChanPacket::T_PCP;
1475 // -----------------------------------
1476 bool Channel::checkBump()
1478 if (!isBroadcasting() && (!sourceHost.tracker))
1479 if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30))
1481 LOG_ERROR("Channel Auto bumped");
1493 // -----------------------------------
1494 int Channel::readStream(Stream &in,ChannelStream *source)
1502 source->readHeader(in,this);
1504 peercastApp->channelStart(&info);
1506 rawData.lastWriteTime = 0;
1508 bool wasBroadcasting=false;
1510 unsigned int receiveStartTime = 0;
1514 while (thread.active && !peercastInst->isQuitting)
1518 LOG_DEBUG("Channel idle");
1524 LOG_DEBUG("Channel bumped");
1532 LOG_DEBUG("Channel eof");
1538 error = source->readPacket(in,this);
1543 //if (rawData.writePos > 0)
1544 if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
1546 if (isBroadcasting())
1548 if ((sys->getTime()-lastTrackerUpdate) >= chanMgr->hostUpdateInterval)
1552 broadcastTrackerUpdate(noID);
1554 wasBroadcasting = true;
1558 /* if (status != Channel::S_RECEIVING){
1559 receiveStartTime = sys->getTime();
1560 } else if (receiveStartTime && receiveStartTime + 10 > sys->getTime()){
1561 chanMgr->hitlistlock.on();
1562 ChanHitList *hl = chanMgr->findHitList(info);
1564 hl->clearHits(true);
1566 chanMgr->hitlistlock.off();
1567 receiveStartTime = 0;
1569 setStatus(Channel::S_RECEIVING);
1572 source->updateStatus(this);
1580 }catch(StreamException &e)
1582 LOG_ERROR("readStream: %s",e.msg);
1586 if (!servMgr->keepDownstreams) {
1587 if (status == Channel::S_RECEIVING){
1588 chanMgr->hitlistlock.on();
1589 ChanHitList *hl = chanMgr->findHitList(info);
1591 hl->clearHits(false);
1593 chanMgr->hitlistlock.off();
1597 // setStatus(S_CLOSING);
1600 if (wasBroadcasting)
1604 broadcastTrackerUpdate(noID,true);
1607 peercastApp->channelStop(&info);
1609 source->readEnd(in,this);
1614 // -----------------------------------
1615 void PeercastStream::readHeader(Stream &in,Channel *ch)
1617 if (in.readTag() != 'PCST')
1618 throw StreamException("Not PeerCast stream");
1621 // -----------------------------------
1622 void PeercastStream::readEnd(Stream &,Channel *)
1626 // -----------------------------------
1627 int PeercastStream::readPacket(Stream &in,Channel *ch)
1633 pack.readPeercast(in);
1635 MemoryStream mem(pack.data,pack.len);
1639 case ChanPacket::T_HEAD:
1641 ch->headPack = pack;
1642 pack.pos = ch->streamPos;
1643 ch->newPacket(pack);
1644 ch->streamPos+=pack.len;
1646 case ChanPacket::T_DATA:
1647 pack.pos = ch->streamPos;
1648 ch->newPacket(pack);
1649 ch->streamPos+=pack.len;
1651 case ChanPacket::T_META:
1652 ch->insertMeta.fromMem(pack.data,pack.len);
1658 XML::Node *n = xml.findNode("channel");
1661 ChanInfo newInfo = ch->info;
1662 newInfo.updateFromXML(n);
1663 ChanHitList *chl = chanMgr->findHitList(ch->info);
1665 newInfo.updateFromXML(n);
1666 ch->updateInfo(newInfo);
1672 case ChanPacket::T_SYNC:
1674 unsigned int s = mem.readLong();
1675 if ((s-ch->syncPos) != 1)
1677 LOG_CHANNEL("Ch.%d SKIP: %d to %d (%d)",ch->index,ch->syncPos,s,ch->info.numSkips);
1680 ch->info.numSkips++;
1681 if (ch->info.numSkips>50)
1682 throw StreamException("Bumped - Too many skips");
1697 // -----------------------------------
1698 void ChannelStream::readRaw(Stream &in, Channel *ch)
1702 const int readLen = 8192;
1704 pack.init(ChanPacket::T_DATA,pack.data,readLen,ch->streamPos);
1705 in.read(pack.data,pack.len);
1706 ch->newPacket(pack);
1707 ch->checkReadDelay(pack.len);
1709 ch->streamPos+=pack.len;
1712 // ------------------------------------------
1713 void RawStream::readHeader(Stream &,Channel *)
1717 // ------------------------------------------
1718 int RawStream::readPacket(Stream &in,Channel *ch)
1724 // ------------------------------------------
1725 void RawStream::readEnd(Stream &,Channel *)
1731 // -----------------------------------
1732 void ChanPacket::init(ChanPacketv &p)
1739 memcpy(data, p.data, len);
1741 // -----------------------------------
1742 void ChanPacket::init(TYPE t, const void *p, unsigned int l,unsigned int _pos)
1745 if (l > MAX_DATALEN)
1746 throw StreamException("Packet data too large");
1752 // -----------------------------------
1753 void ChanPacket::writeRaw(Stream &out)
1755 out.write(data,len);
1757 // -----------------------------------
1758 void ChanPacket::writePeercast(Stream &out)
1760 unsigned int tp = 0;
1763 case T_HEAD: tp = 'HEAD'; break;
1764 case T_META: tp = 'META'; break;
1765 case T_DATA: tp = 'DATA'; break;
1768 if (type != T_UNKNOWN)
1771 out.writeShort(len);
1773 out.write(data,len);
1776 // -----------------------------------
1777 void ChanPacket::readPeercast(Stream &in)
1779 unsigned int tp = in.readTag();
1783 case 'HEAD': type = T_HEAD; break;
1784 case 'DATA': type = T_DATA; break;
1785 case 'META': type = T_META; break;
1786 default: type = T_UNKNOWN;
1788 len = in.readShort();
1790 if (len > MAX_DATALEN)
1791 throw StreamException("Bad ChanPacket");
1794 // -----------------------------------
1795 int ChanPacketBuffer::copyFrom(ChanPacketBuffer &buf, unsigned int reqPos)
1805 for(unsigned int i=buf.firstPos; i<=buf.lastPos; i++)
1807 //ChanPacket *src = &buf.packets[i%MAX_PACKETS];
1808 ChanPacketv *src = &buf.packets[i%MAX_PACKETS];
1809 if (src->type & accept)
1811 if (src->pos >= reqPos)
1814 //packets[writePos++] = *src;
1815 packets[writePos++].init(*src);
1824 return lastPos-firstPos;
1827 // -----------------------------------
1828 bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
1835 unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
1836 unsigned int fpos = getStreamPos(firstPos);
1837 unsigned int lpos = getStreamPos(lastPos);
1838 if (spos < fpos && (fpos < lpos || spos > lpos + bound))
1842 for(unsigned int i=firstPos; i<=lastPos; i++)
1844 //ChanPacket &p = packets[i%MAX_PACKETS];
1845 ChanPacketv &p = packets[i%MAX_PACKETS];
1846 if (p.pos >= spos && p.pos - spos <= bound)
1857 // -----------------------------------
1858 unsigned int ChanPacketBuffer::getLatestPos()
1863 return getStreamPos(lastPos);
1866 // -----------------------------------
1867 unsigned int ChanPacketBuffer::getOldestPos()
1872 return getStreamPos(firstPos);
1875 // -----------------------------------
1876 unsigned int ChanPacketBuffer::findOldestPos(unsigned int spos)
1878 unsigned int min = getStreamPos(safePos);
1879 unsigned int max = getStreamPos(lastPos);
1890 // -----------------------------------
1891 unsigned int ChanPacketBuffer::getStreamPos(unsigned int index)
1893 return packets[index%MAX_PACKETS].pos;
1895 // -----------------------------------
1896 unsigned int ChanPacketBuffer::getStreamPosEnd(unsigned int index)
1898 return packets[index%MAX_PACKETS].pos+packets[index%MAX_PACKETS].len;
1900 // -----------------------------------
1901 bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos)
1905 if (servMgr->keepDownstreams) {
1906 unsigned int lpos = getLatestPos();
1907 unsigned int diff = pack.pos - lpos;
1908 if (packets[lastPos%MAX_PACKETS].type == ChanPacket::T_HEAD) lpos = 0;
1909 if (lpos && (diff == 0 || diff > 0xfff00000)) {
1910 LOG_DEBUG("* latest pos=%d, pack pos=%d", getLatestPos(), pack.pos);
1911 lastSkipTime = sys->getTime();
1916 if (willSkip()) // too far behind
1918 lastSkipTime = sys->getTime();
1924 pack.sync = writePos;
1925 packets[writePos%MAX_PACKETS].init(pack);
1927 // LOG_DEBUG("packet.len = %d",pack.len);
1933 if (writePos >= MAX_PACKETS)
1934 firstPos = writePos-MAX_PACKETS;
1938 if (writePos >= NUM_SAFEPACKETS)
1939 safePos = writePos - NUM_SAFEPACKETS;
1946 lastWriteTime = sys->getTime();
1954 // -----------------------------------
1955 void ChanPacketBuffer::readPacket(ChanPacket &pack)
1958 unsigned int tim = sys->getTime();
1960 if (readPos < firstPos)
1961 throw StreamException("Read too far behind");
1963 while (readPos >= writePos)
1966 if ((sys->getTime() - tim) > 30)
1967 throw TimeoutException();
1970 pack.init(packets[readPos%MAX_PACKETS]);
1977 // -----------------------------------
1978 bool ChanPacketBuffer::willSkip()
1980 return ((writePos-readPos) >= MAX_PACKETS);
1983 // -----------------------------------
1984 void Channel::getStreamPath(char *str)
1990 sprintf(str,"/stream/%s%s",idStr,info.getTypeExt(info.contentType));
1995 // -----------------------------------
1996 void ChanMgr::startSearch(ChanInfo &info)
2003 searchActive = true;
2005 // -----------------------------------
2006 void ChanMgr::quit()
2008 LOG_DEBUG("ChanMgr is quitting..");
2011 // -----------------------------------
2012 int ChanMgr::numIdleChannels()
2015 Channel *ch = channel;
2019 if (ch->thread.active)
2020 if (ch->status == Channel::S_IDLE)
2026 // -----------------------------------
2027 void ChanMgr::closeOldestIdle()
2029 unsigned int idleTime = (unsigned int)-1;
2030 Channel *ch = channel,*oldest=NULL;
2034 if (ch->thread.active)
2035 if (ch->status == Channel::S_IDLE)
2036 if (ch->lastIdleTime < idleTime)
2039 idleTime = ch->lastIdleTime;
2045 oldest->thread.active = false;
2048 // -----------------------------------
2049 void ChanMgr::closeAll()
2051 Channel *ch = channel;
2054 if (ch->thread.active)
2055 ch->thread.shutdown();
2059 // -----------------------------------
2060 Channel *ChanMgr::findChannelByNameID(ChanInfo &info)
2062 Channel *ch = channel;
2066 if (ch->info.matchNameID(info))
2073 // -----------------------------------
2074 Channel *ChanMgr::findChannelByName(const char *n)
2076 Channel *ch = channel;
2080 if (stricmp(ch->info.name,n)==0)
2087 // -----------------------------------
2088 Channel *ChanMgr::findChannelByIndex(int index)
2091 Channel *ch = channel;
2104 // -----------------------------------
2105 Channel *ChanMgr::findChannelByMount(const char *str)
2107 Channel *ch = channel;
2111 if (strcmp(ch->mount,str)==0)
2118 // -----------------------------------
2119 Channel *ChanMgr::findChannelByID(GnuID &id)
2121 Channel *ch = channel;
2125 if (ch->info.id.isSame(id))
2131 // -----------------------------------
2132 Channel *ChanMgr::findChannelByChannelID(int id)
2135 Channel *ch = channel;
2138 if (ch->isActive()){
2139 if (ch->channel_id == id){
2147 // -----------------------------------
2148 int ChanMgr::findChannels(ChanInfo &info, Channel **chlist, int max)
2151 Channel *ch = channel;
2155 if (ch->info.match(info))
2165 // -----------------------------------
2166 int ChanMgr::findChannelsByStatus(Channel **chlist, int max, Channel::STATUS status)
2169 Channel *ch = channel;
2173 if (ch->status == status)
2183 // -----------------------------------
2184 Channel *ChanMgr::createRelay(ChanInfo &info, bool stayConnected)
2186 Channel *c = chanMgr->createChannel(info,NULL);
2189 c->stayConnected = stayConnected;
2195 // -----------------------------------
2196 Channel *ChanMgr::findAndRelay(ChanInfo &info)
2199 info.id.toStr(idStr);
2200 LOG_CHANNEL("Searching for: %s (%s)",idStr,info.name.cstr());
2202 if(!isIndexTxt(&info)) // for PCRaw (popup)
2203 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Finding channel...");
2208 WLockBlock wb(&(chanMgr->channellock));
2212 c = findChannelByNameID(info);
2216 c = chanMgr->createChannel(info,NULL);
2219 c->setStatus(Channel::S_SEARCHING);
2222 } else if (!(c->thread.active)){
2223 c->thread.active = true;
2224 c->thread.finish = false;
2225 c->info.lastPlayStart = 0; // reconnect
2226 c->info.lastPlayEnd = 0;
2228 c->finthread->finish = true;
2229 c->finthread = NULL;
2231 if (c->status != Channel::S_CONNECTING && c->status != Channel::S_SEARCHING){
2232 c->setStatus(Channel::S_SEARCHING);
2239 for(int i=0; i<600; i++) // search for 1 minute.
2244 c = findChannelByNameID(info);
2248 // peercastApp->notifyMessage(ServMgr::NT_PEERCAST,"Channel not found");
2253 if (c->isPlaying() && (c->info.contentType!=ChanInfo::T_UNKNOWN))
2263 // -----------------------------------
2270 currFindAndPlayChannel.clear();
2272 broadcastMsg.clear();
2273 broadcastMsgInterval=10;
2275 broadcastID.generate(PCP_BROADCAST_FLAGS);
2280 icyMetaInterval = 8192;
2281 maxRelaysPerChannel = 1;
2285 minBroadcastTTL = 1;
2286 maxBroadcastTTL = 7;
2288 pushTimeout = 60; // 1 minute
2289 pushTries = 5; // 5 times
2290 maxPushHops = 8; // max 8 hops away
2291 maxUptime = 0; // 0 = none
2293 prefetchTime = 10; // n seconds
2295 hostUpdateInterval = 120; // 2 minutes
2307 // -----------------------------------
2308 bool ChanMgr::writeVariable(Stream &out, const String &var, int index)
2311 if (var == "numHitLists")
2312 sprintf(buf,"%d",numHitLists());
2314 else if (var == "numChannels")
2315 sprintf(buf,"%d",numChannels());
2316 else if (var == "djMessage")
2317 strcpy(buf,broadcastMsg.cstr());
2318 else if (var == "icyMetaInterval")
2319 sprintf(buf,"%d",icyMetaInterval);
2320 else if (var == "maxRelaysPerChannel")
2321 sprintf(buf,"%d",maxRelaysPerChannel);
2322 else if (var == "hostUpdateInterval")
2323 sprintf(buf,"%d",hostUpdateInterval);
2324 else if (var == "broadcastID")
2325 broadcastID.toStr(buf);
2331 out.writeString(buf);
2335 // -----------------------------------
2336 bool Channel::writeVariable(Stream &out, const String &var, int index)
2347 utf8.convertTo(String::T_UNICODESAFE);
2348 strcpy(buf,utf8.cstr());
2350 }else if (var == "bitrate")
2352 sprintf(buf,"%d",info.bitrate);
2354 }else if (var == "srcrate")
2358 unsigned int tot = sourceData->getSourceRate();
2359 sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
2363 }else if (var == "genre")
2366 utf8.convertTo(String::T_UNICODESAFE);
2367 strcpy(buf,utf8.cstr());
2368 }else if (var == "desc")
2371 utf8.convertTo(String::T_UNICODESAFE);
2372 strcpy(buf,utf8.cstr());
2373 }else if (var == "comment")
2375 utf8 = info.comment;
2376 utf8.convertTo(String::T_UNICODESAFE);
2377 strcpy(buf,utf8.cstr());
2378 }else if (var == "uptime")
2381 if (info.lastPlayStart)
2382 uptime.setFromStopwatch(sys->getTime()-info.lastPlayStart);
2385 strcpy(buf,uptime.cstr());
2387 else if (var == "type")
2388 sprintf(buf,"%s",ChanInfo::getTypeStr(info.contentType));
2389 else if (var == "ext")
2390 sprintf(buf,"%s",ChanInfo::getTypeExt(info.contentType));
2391 else if (var == "proto") {
2392 switch(info.contentType) {
2393 case ChanInfo::T_WMA:
2394 case ChanInfo::T_WMV:
2395 sprintf(buf, "mms://");
2398 sprintf(buf, "http://");
2401 else if (var == "localRelays")
2402 sprintf(buf,"%d",localRelays());
2403 else if (var == "localListeners")
2404 sprintf(buf,"%d",localListeners());
2406 else if (var == "totalRelays")
2407 sprintf(buf,"%d",totalRelays());
2408 else if (var == "totalListeners")
2409 sprintf(buf,"%d",totalListeners());
2411 else if (var == "status")
2412 sprintf(buf,"%s",getStatusStr());
2413 else if (var == "keep")
2414 sprintf(buf,"%s",stayConnected?"Yes":"No");
2415 else if (var == "id")
2417 else if (var.startsWith("track."))
2420 if (var == "track.title")
2421 utf8 = info.track.title;
2422 else if (var == "track.artist")
2423 utf8 = info.track.artist;
2424 else if (var == "track.album")
2425 utf8 = info.track.album;
2426 else if (var == "track.genre")
2427 utf8 = info.track.genre;
2428 else if (var == "track.contactURL")
2429 utf8 = info.track.contact;
2431 utf8.convertTo(String::T_UNICODESAFE);
2432 strcpy(buf,utf8.cstr());
2434 }else if (var == "contactURL")
2435 sprintf(buf,"%s",info.url.cstr());
2436 else if (var == "streamPos")
2437 sprintf(buf,"%d",streamPos);
2438 else if (var == "sourceType")
2439 strcpy(buf,getSrcTypeStr());
2440 else if (var == "sourceProtocol")
2441 strcpy(buf,ChanInfo::getProtocolStr(info.srcProtocol));
2442 else if (var == "sourceURL")
2444 if (sourceURL.isEmpty())
2445 sourceHost.host.toStr(buf);
2447 strcpy(buf,sourceURL.cstr());
2449 else if (var == "headPos")
2450 sprintf(buf,"%d",headPack.pos);
2451 else if (var == "headLen")
2452 sprintf(buf,"%d",headPack.len);
2453 else if (var == "numHits")
2455 ChanHitList *chl = chanMgr->findHitListByID(info.id);
2458 // numHits = chl->numHits();
2466 sprintf(buf,"%d",numHits);
2467 } else if (var == "isBroadcast")
2468 strcpy(buf, (type == T_BROADCAST) ? "1":"0");
2472 out.writeString(buf);
2476 // -----------------------------------
2477 void ChanMgr::broadcastTrackerUpdate(GnuID &svID, bool force)
2479 Channel *c = channel;
2482 if ( c->isActive() && c->isBroadcasting() )
2483 c->broadcastTrackerUpdate(svID,force);
2489 // -----------------------------------
2490 int ChanMgr::broadcastPacketUp(ChanPacket &pack,GnuID &chanID, GnuID &srcID, GnuID &destID)
2494 Channel *c = channel;
2497 if (c->sendPacketUp(pack,chanID,srcID,destID))
2505 // -----------------------------------
2506 void ChanMgr::broadcastRelays(Servent *serv, int minTTL, int maxTTL)
2508 //if ((servMgr->getFirewall() == ServMgr::FW_OFF) || servMgr->serverHost.localIP())
2511 Host sh = servMgr->serverHost;
2512 bool push = (servMgr->getFirewall()!=ServMgr::FW_OFF);
2513 bool busy = (servMgr->pubInFull() && servMgr->outFull()) || servMgr->relaysFull();
2514 bool stable = servMgr->totalStreams>0;
2521 Channel *c = channel;
2527 bool tracker = c->isBroadcasting();
2529 int ttl = (c->info.getUptime() / servMgr->relayBroadcast); // 1 hop per N seconds
2537 if (hit.initHit(sh,c,NULL,push,busy,stable,tracker,ttl))
2543 serv->outputPacket(hit,false);
2547 LOG_NETWORK("Sent channel to %d servents, TTL %d",numOut,ttl);
2554 // LOG_NETWORK("Sent %d channels to %d servents",numChans,numOut);
2557 // -----------------------------------
2558 void ChanMgr::setUpdateInterval(unsigned int v)
2560 hostUpdateInterval = v;
2564 // -----------------------------------
2568 MemoryStream mem(pack.data,sizeof(pack.data));
2569 AtomStream atom(mem);
2570 atom.writeParent(PCP_BCST,3);
2571 atom.writeChar(PCP_BCST_GROUP,PCP_BCST_GROUP_ALL);
2572 atom.writeBytes(PCP_BCST_FROM,servMgr->sessionID.id,16);
2573 atom.writeParent(PCP_MESG,1);
2574 atom.writeString(PCP_MESG_DATA,msg.cstr());
2584 PCPStream::readAtom(atom,bcs);
2585 //int cnt = servMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
2586 //int cnt = servMgr->broadcastPacketDown(pack,noID,servMgr->sessionID);
2587 //int cnt = chanMgr->broadcastPacketUp(pack,noID,servMgr->sessionID);
2588 //LOG_DEBUG("Sent message to %d clients",cnt);
2590 // -----------------------------------
2591 void ChanMgr::setBroadcastMsg(String &msg)
2593 if (!msg.isSame(broadcastMsg))
2597 Channel *c = channel;
2600 if (c->isActive() && c->isBroadcasting())
2602 ChanInfo newInfo = c->info;
2603 newInfo.comment = broadcastMsg;
2604 c->updateInfo(newInfo);
2613 // -----------------------------------
2614 void ChanMgr::clearHitLists()
2617 // LOG_DEBUG("clearHitLists HITLISTLOCK ON-------------");
2618 chanMgr->hitlistlock.on();
2621 peercastApp->delChannel(&hitlist->info);
2623 ChanHitList *next = hitlist->next;
2629 // LOG_DEBUG("clearHitLists HITLISTLOCK OFF-------------");
2630 chanMgr->hitlistlock.off();
2632 // -----------------------------------
2633 Channel *ChanMgr::deleteChannel(Channel *delchan)
2637 Channel *ch = channel,*prev=NULL,*next=NULL;
2643 Channel *next = ch->next;
2649 if (delchan->sourceStream){
2650 delchan->sourceStream->parent = NULL;
2666 // -----------------------------------
2667 Channel *ChanMgr::createChannel(ChanInfo &info, const char *mount)
2680 nc->info.lastPlayStart = 0;
2681 nc->info.lastPlayEnd = 0;
2682 nc->info.status = ChanInfo::S_UNKNOWN;
2684 nc->mount.set(mount);
2685 nc->setStatus(Channel::S_WAIT);
2686 nc->type = Channel::T_ALLOCATED;
2687 nc->info.createdTime = sys->getTime();
2689 LOG_CHANNEL("New channel created");
2694 // -----------------------------------
2695 int ChanMgr::pickHits(ChanHitSearch &chs)
2697 ChanHitList *chl = hitlist;
2701 if (chl->pickHits(chs))
2711 // -----------------------------------
2712 ChanHitList *ChanMgr::findHitList(ChanInfo &info)
2714 ChanHitList *chl = hitlist;
2718 if (chl->info.matchNameID(info))
2725 // -----------------------------------
2726 ChanHitList *ChanMgr::findHitListByID(GnuID &id)
2728 ChanHitList *chl = hitlist;
2732 if (chl->info.id.isSame(id))
2738 // -----------------------------------
2739 int ChanMgr::numHitLists()
2742 ChanHitList *chl = hitlist;
2751 // -----------------------------------
2752 ChanHitList *ChanMgr::addHitList(ChanInfo &info)
2754 ChanHitList *chl = new ChanHitList();
2757 chl->next = hitlist;
2762 chl->info.createdTime = sys->getTime();
2763 peercastApp->addChannel(&chl->info);
2768 // -----------------------------------
2769 void ChanMgr::clearDeadHits(bool clearTrackers)
2771 unsigned int interval;
2773 if (servMgr->isRoot)
2774 interval = 1200; // mainly for old 0.119 clients
2776 interval = hostUpdateInterval+120;
2778 chanMgr->hitlistlock.on();
2779 ChanHitList *chl = hitlist,*prev = NULL;
2784 if (chl->clearDeadHits(interval,clearTrackers) == 0)
2786 if (!isBroadcasting(chl->info.id))
2788 if (!chanMgr->findChannelByID(chl->info.id))
2790 peercastApp->delChannel(&chl->info);
2792 ChanHitList *next = chl->next;
2808 chanMgr->hitlistlock.off();
2810 // -----------------------------------
2811 bool ChanMgr::isBroadcasting(GnuID &id)
2813 Channel *ch = findChannelByID(id);
2815 return ch->isBroadcasting();
2819 // -----------------------------------
2820 bool ChanMgr::isBroadcasting()
2822 Channel *ch = channel;
2826 if (ch->isBroadcasting())
2834 // -----------------------------------
2835 int ChanMgr::numChannels()
2838 Channel *ch = channel;
2848 // -----------------------------------
2849 void ChanMgr::deadHit(ChanHit &hit)
2851 ChanHitList *chl = findHitListByID(hit.chanID);
2855 // -----------------------------------
2856 void ChanMgr::delHit(ChanHit &hit)
2858 ChanHitList *chl = findHitListByID(hit.chanID);
2863 // -----------------------------------
2864 void ChanMgr::addHit(Host &h,GnuID &id,bool tracker)
2870 hit.rhost[1].init();
2871 hit.tracker = tracker;
2876 // -----------------------------------
2877 ChanHit *ChanMgr::addHit(ChanHit &h)
2880 lastHit = sys->getTime();
2882 ChanHitList *hl=NULL;
2884 hl = findHitListByID(h.chanID);
2890 hl = addHitList(info);
2895 return hl->addHit(h);
2900 // -----------------------------------
2901 class ChanFindInfo : public ThreadInfo
2907 // -----------------------------------
2908 THREAD_PROC findAndPlayChannelProc(ThreadInfo *th)
2910 ChanFindInfo *cfi = (ChanFindInfo *)th;
2916 Channel *ch = chanMgr->findChannelByNameID(info);
2918 chanMgr->currFindAndPlayChannel = info.id;
2921 ch = chanMgr->findAndRelay(info);
2925 // check that a different channel hasn`t be selected already.
2926 if (chanMgr->currFindAndPlayChannel.isSame(ch->info.id))
2927 chanMgr->playChannel(ch->info);
2930 ch->stayConnected = cfi->keep;
2936 // -----------------------------------
2937 void ChanMgr::findAndPlayChannel(ChanInfo &info, bool keep)
2939 ChanFindInfo *cfi = new ChanFindInfo;
2942 cfi->func = findAndPlayChannelProc;
2945 sys->startThread(cfi);
2947 // -----------------------------------
2948 void ChanMgr::playChannel(ChanInfo &info)
2951 char str[128],fname[256],idStr[128];
2953 sprintf(str,"http://localhost:%d",servMgr->serverHost.port);
2954 info.id.toStr(idStr);
2956 PlayList::TYPE type;
2959 if ((info.contentType == ChanInfo::T_WMA) || (info.contentType == ChanInfo::T_WMV))
2961 type = PlayList::T_ASX;
2962 // WMP seems to have a bug where it doesn`t re-read asx files if they have the same name
2963 // so we prepend the channel id to make it unique - NOTE: should be deleted afterwards.
2964 if (servMgr->getModulePath) //JP-EX
2966 peercastApp->getDirectory();
2967 sprintf(fname,"%s/%s.asx",servMgr->modulePath,idStr);
2969 sprintf(fname,"%s/%s.asx",peercastApp->getPath(),idStr);
2970 }else if (info.contentType == ChanInfo::T_OGM)
2972 type = PlayList::T_RAM;
2973 if (servMgr->getModulePath) //JP-EX
2975 peercastApp->getDirectory();
2976 sprintf(fname,"%s/play.ram",servMgr->modulePath);
2978 sprintf(fname,"%s/play.ram",peercastApp->getPath());
2982 type = PlayList::T_SCPLS;
2983 if (servMgr->getModulePath) //JP-EX
2985 peercastApp->getDirectory();
2986 sprintf(fname,"%s/play.pls",servMgr->modulePath);
2988 sprintf(fname,"%s/play.pls",peercastApp->getPath());
2992 PlayList *pls = new PlayList(type,1);
2993 pls->addChannel(str,info);
2996 LOG_DEBUG("Writing %s",fname);
2998 file.openWriteReplace(fname);
3003 LOG_DEBUG("Executing: %s",fname);
3004 sys->executeFile(fname);
3009 // -----------------------------------
3010 ChanHitList::ChanHitList()
3017 // -----------------------------------
3018 ChanHitList::~ChanHitList()
3020 chanMgr->hitlistlock.on();
3022 hit = deleteHit(hit);
3023 chanMgr->hitlistlock.off();
3025 // -----------------------------------
3026 void ChanHit::pickNearestIP(Host &h)
3028 for(int i=0; i<2; i++)
3030 if (h.classType() == rhost[i].classType())
3038 // -----------------------------------
3039 void ChanHit::init()
3051 dead = tracker = firewalled = stable = yp = false;
3052 recv = cin = direct = relay = true;
3053 relayfull = chfull = ratefull = false;
3062 version_ex_prefix[0] = ' ';
3063 version_ex_prefix[1] = ' ';
3064 version_ex_number = 0;
3072 oldestPos = newestPos = 0;
3077 // -----------------------------------
3078 void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,bool isFull,unsigned int bitrate, Channel* ch, unsigned int oldp,unsigned int newp)
3081 firewalled = (servMgr->getFirewall() != ServMgr::FW_OFF);
3082 numListeners = numl;
3085 stable = servMgr->totalStreams>0;
3086 sessionID = servMgr->sessionID;
3089 direct = !servMgr->directFull();
3090 // relay = !servMgr->relaysFull();
3091 cin = !servMgr->controlInFull();
3093 relayfull = servMgr->relaysFull();
3096 Channel *c = chanMgr->channel;
3098 unsigned int needRate = 0;
3099 unsigned int allRate = 0;
3101 if (c->isPlaying()){
3102 allRate += c->info.bitrate * c->localRelays();
3103 if ((c != ch) && (c->localRelays() == 0)){
3104 if(!isIndexTxt(c)) // for PCRaw (relay)
3106 needRate+=c->info.bitrate;
3111 unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
3112 int diff = servMgr->maxRelays - numRelay;
3113 if (ch->localRelays()){
3114 if (noRelay > diff){
3122 // ratefull = servMgr->bitrateFull(needRate+bitrate);
3123 ratefull = (servMgr->maxBitrateOut < allRate + needRate + ch->info.bitrate);
3125 if (!isIndexTxt(ch))
3126 relay = (!relayfull) && (!chfull) && (!ratefull) && (numRelay + noRelay < servMgr->maxRelays);
3128 relay = (!chfull) && (!ratefull); // for PCRaw (relay)
3131 LOG_DEBUG("Reject by relay full");
3134 LOG_DEBUG("Reject by channel full");
3137 LOG_DEBUG("Reject by rate: Max=%d Now=%d Need=%d ch=%d", servMgr->maxBitrateOut, allRate, needRate, ch->info.bitrate);
3140 host = servMgr->serverHost;
3142 version = PCP_CLIENT_VERSION;
3143 version_vp = PCP_CLIENT_VERSION_VP;
3145 strncpy(version_ex_prefix, PCP_CLIENT_VERSION_EX_PREFIX,2);
3146 version_ex_number = PCP_CLIENT_VERSION_EX_NUMBER;
3148 version_ex_prefix[0] = ' ';
3149 version_ex_prefix[1] = ' ';
3150 version_ex_number = 0;
3153 status = ch->status;
3155 rhost[0] = Host(host.ip,host.port);
3156 rhost[1] = Host(ClientSocket::getIP(NULL),host.port);
3164 uphost.ip = ch->sourceHost.host.ip;
3165 uphost.port = ch->sourceHost.host.port;
3169 // -----------------------------------
3170 void ChanHit::writeAtoms(AtomStream &atom,GnuID &chanID)
3172 bool addChan=chanID.isSet();
3173 bool uphostdata=(uphost.ip != 0);
3176 if (recv) fl1 |= PCP_HOST_FLAGS1_RECV;
3177 if (relay) fl1 |= PCP_HOST_FLAGS1_RELAY;
3178 if (direct) fl1 |= PCP_HOST_FLAGS1_DIRECT;
3179 if (cin) fl1 |= PCP_HOST_FLAGS1_CIN;
3180 if (tracker) fl1 |= PCP_HOST_FLAGS1_TRACKER;
3181 if (firewalled) fl1 |= PCP_HOST_FLAGS1_PUSH;
3183 atom.writeParent(PCP_HOST,13 + (addChan?1:0) + (uphostdata?3:0) + (version_ex_number?2:0));
3186 atom.writeBytes(PCP_HOST_CHANID,chanID.id,16);
3187 atom.writeBytes(PCP_HOST_ID,sessionID.id,16);
3188 atom.writeInt(PCP_HOST_IP,rhost[0].ip);
3189 atom.writeShort(PCP_HOST_PORT,rhost[0].port);
3190 atom.writeInt(PCP_HOST_IP,rhost[1].ip);
3191 atom.writeShort(PCP_HOST_PORT,rhost[1].port);
3192 atom.writeInt(PCP_HOST_NUML,numListeners);
3193 atom.writeInt(PCP_HOST_NUMR,numRelays);
3194 atom.writeInt(PCP_HOST_UPTIME,upTime);
3195 atom.writeInt(PCP_HOST_VERSION,version);
3196 atom.writeInt(PCP_HOST_VERSION_VP,version_vp);
3197 if (version_ex_number){
3198 atom.writeBytes(PCP_HOST_VERSION_EX_PREFIX,version_ex_prefix,2);
3199 atom.writeShort(PCP_HOST_VERSION_EX_NUMBER,version_ex_number);
3201 atom.writeChar(PCP_HOST_FLAGS1,fl1);
3202 atom.writeInt(PCP_HOST_OLDPOS,oldestPos);
3203 atom.writeInt(PCP_HOST_NEWPOS,newestPos);
3205 atom.writeInt(PCP_HOST_UPHOST_IP,uphost.ip);
3206 atom.writeInt(PCP_HOST_UPHOST_PORT,uphost.port);
3207 atom.writeInt(PCP_HOST_UPHOST_HOPS,uphostHops);
3210 // -----------------------------------
3211 bool ChanHit::writeVariable(Stream &out, const String &var)
3215 if (var == "rhost0")
3217 if (servMgr->enableGetName) //JP-EX s
3223 strcpy(buf,"<font color=red>");
3225 strcpy(buf,"<font color=orange>");
3230 strcpy(buf,"<font color=purple>");
3232 strcpy(buf,"<font color=blue>");
3235 strcpy(buf,"<font color=green>");
3239 rhost[0].toStr(buf2);
3243 if (ClientSocket::getHostname(h_name,rhost[0].ip))
3249 strcat(buf,"</font>");
3252 rhost[0].toStr(buf);
3254 else if (var == "rhost1")
3255 rhost[1].toStr(buf);
3256 else if (var == "numHops")
3257 sprintf(buf,"%d",numHops);
3258 else if (var == "numListeners")
3259 sprintf(buf,"%d",numListeners);
3260 else if (var == "numRelays")
3261 sprintf(buf,"%d",numRelays);
3262 else if (var == "uptime")
3265 timeStr.setFromStopwatch(upTime);
3266 strcpy(buf,timeStr.cstr());
3267 }else if (var == "update")
3271 timeStr.setFromStopwatch(sys->getTime()-time);
3274 strcpy(buf,timeStr.cstr());
3275 }else if (var == "isFirewalled"){
3276 sprintf(buf,"%d",firewalled?1:0);
3277 }else if (var == "version"){
3278 sprintf(buf,"%d",version);
3279 }else if (var == "agent"){
3281 if (version_ex_number){
3282 sprintf(buf, "v0.%d(%c%c%04d)", version, version_ex_prefix[0], version_ex_prefix[1], version_ex_number);
3283 } else if (version_vp){
3284 sprintf(buf,"v0.%d(VP%04d)", version, version_vp);
3286 sprintf(buf,"v0.%d", version);
3292 else if (var == "check")
3295 strcpy(buf, "<a href=\"#\" onclick=\"checkip('");
3296 rhost[0].IPtoStr(buf2);
3298 strcat(buf, "')\">_</a>");
3300 else if (var == "uphost") // tree
3302 else if (var == "uphostHops") // tree
3303 sprintf(buf,"%d",uphostHops);
3304 else if (var == "canRelay") // tree
3305 sprintf(buf, "%d",relay);
3309 out.writeString(buf);
3313 // -----------------------------------
3314 int ChanHitList::getTotalListeners()
3321 cnt+=h->numListeners;
3326 // -----------------------------------
3327 int ChanHitList::getTotalRelays()
3339 // -----------------------------------
3340 int ChanHitList::getTotalFirewalled()
3354 // -----------------------------------
3355 int ChanHitList::contactTrackers(bool connected, int numl, int nums, int uptm)
3360 void ChanHitList::clearHits(bool flg)
3362 ChanHit *c = hit, *prev = NULL;
3365 if (flg || (c->numHops != 0)){
3366 ChanHit *next = c->next;
3381 // -----------------------------------
3382 ChanHit *ChanHitList::deleteHit(ChanHit *ch)
3384 ChanHit *c = hit,*prev=NULL;
3389 ChanHit *next = c->next;
3405 // -----------------------------------
3406 ChanHit *ChanHitList::addHit(ChanHit &h)
3408 char ip0str[64],ip1str[64];
3409 h.rhost[0].toStr(ip0str);
3410 h.rhost[1].toStr(ip1str);
3412 h.uphost.toStr(uphostStr);
3414 LOG_DEBUG("Add hit: F%dT%dR%d %s/%s <- %s(%d)",h.firewalled,h.tracker,h.relay,ip0str,ip1str,uphostStr, h.uphostHops);
3416 LOG_DEBUG("Add hit: F%dT%dR%d %s/%s",h.firewalled,h.tracker,h.relay,ip0str,ip1str);
3419 // dont add our own hits
3420 if (servMgr->sessionID.isSame(h.sessionID))
3424 lastHitTime = sys->getTime();
3425 h.time = lastHitTime;
3430 if ((ch->rhost[0].ip == h.rhost[0].ip) && (ch->rhost[0].port == h.rhost[0].port))
3431 if (((ch->rhost[1].ip == h.rhost[1].ip) && (ch->rhost[1].port == h.rhost[1].port)) || (!ch->rhost[1].isValid()))
3435 if (ch->numHops > 0 && h.numHops == 0)
3436 // downstream hit recieved as RelayHost
3438 ChanHit *next = ch->next;
3447 // clear hits with same session ID (IP may have changed)
3448 if (h.sessionID.isSet())
3454 if (ch->sessionID.isSame(h.sessionID))
3466 ChanHit *ch = new ChanHit();
3468 ch->chanID = info.id;
3478 // -----------------------------------
3479 int ChanHitList::clearDeadHits(unsigned int timeout, bool clearTrackers)
3482 unsigned int ctime = sys->getTime();
3484 // LOG_DEBUG("clearDeadHits HITLISTLOCK ON-------------");
3485 chanMgr->hitlistlock.on();
3491 if (ch->dead || ((ctime-ch->time) > timeout) && (clearTrackers || (!clearTrackers & !ch->tracker)))
3493 // ch = deleteHit(ch);
3495 if (ch->firewalled){
3496 // LOG_DEBUG("kickKeepTime = %d, %d", servMgr->kickKeepTime, ctime-ch->time);
3497 if ( (servMgr->kickKeepTime == 0) || ((ctime-ch->time) > servMgr->kickKeepTime) ){
3501 ch->numListeners = 0;
3514 // LOG_DEBUG("clearDeadHits HITLISTLOCK OFF-------------");
3515 chanMgr->hitlistlock.off();
3520 // -----------------------------------
3521 void ChanHitList::deadHit(ChanHit &h)
3523 char ip0str[64],ip1str[64];
3524 h.rhost[0].toStr(ip0str);
3525 h.rhost[1].toStr(ip1str);
3526 LOG_DEBUG("Dead hit: %s/%s",ip0str,ip1str);
3532 if (ch->rhost[0].isSame(h.rhost[0]) && ch->rhost[1].isSame(h.rhost[1]))
3539 // -----------------------------------
3540 void ChanHitList::delHit(ChanHit &h)
3542 char ip0str[64],ip1str[64];
3543 h.rhost[0].toStr(ip0str);
3544 h.rhost[1].toStr(ip1str);
3545 LOG_DEBUG("Del hit: %s/%s",ip0str,ip1str);
3551 if (ch->rhost[0].isSame(h.rhost[0]) && ch->rhost[1].isSame(h.rhost[1]))
3559 // -----------------------------------
3560 int ChanHitList::numHits()
3566 if (ch->host.ip && !ch->dead && ch->numHops)
3573 // -----------------------------------
3574 int ChanHitList::numListeners()
3580 if (ch->host.ip && !ch->dead && ch->numHops)
3581 cnt += ch->numListeners;
3587 // -----------------------------------
3588 int ChanHitList::numRelays()
3594 if (ch->host.ip && !ch->dead)
3595 cnt += ch->numRelays;
3602 // -----------------------------------
3603 int ChanHitList::numTrackers()
3609 if ((ch->host.ip && !ch->dead) && (ch->tracker))
3615 // -----------------------------------
3616 int ChanHitList::numFirewalled()
3622 if (ch->host.ip && !ch->dead)
3623 cnt += ch->firewalled?1:0;
3628 // -----------------------------------
3629 int ChanHitList::closestHit()
3631 unsigned int hop=10000;
3635 if (ch->host.ip && !ch->dead)
3636 if (ch->numHops < hop)
3643 // -----------------------------------
3644 int ChanHitList::furthestHit()
3650 if (ch->host.ip && !ch->dead)
3651 if (ch->numHops > hop)
3658 // -----------------------------------
3659 unsigned int ChanHitList::newestHit()
3661 unsigned int time=0;
3665 if (ch->host.ip && !ch->dead)
3666 if (ch->time > time)
3673 // -----------------------------------
3674 int ChanHitList::pickHits(ChanHitSearch &chs)
3676 ChanHit best,*bestP=NULL;
3681 unsigned int ctime = sys->getTime();
3686 if (c->host.ip && !c->dead)
3688 if (!chs.excludeID.isSame(c->sessionID))
3689 if ((chs.waitDelay==0) || ((ctime-c->lastContact) >= chs.waitDelay))
3690 if ((c->numHops<=best.numHops)) // (c->time>=best.time))
3691 if (c->relay || (!c->relay && chs.useBusyRelays))
3692 if (c->cin || (!c->cin && chs.useBusyControls))
3695 if (chs.trackersOnly && c->tracker)
3697 if (chs.matchHost.ip)
3699 if ((c->rhost[0].ip == chs.matchHost.ip) && c->rhost[1].isValid())
3703 best.host = best.rhost[1]; // use lan ip
3705 }else if (c->firewalled == chs.useFirewalled)
3709 best.host = best.rhost[0]; // use wan ip
3711 }else if (!chs.trackersOnly && !c->tracker)
3713 if (chs.matchHost.ip)
3715 if ((c->rhost[0].ip == chs.matchHost.ip) && c->rhost[1].isValid())
3719 best.host = best.rhost[1]; // use lan ip
3721 }else if (c->firewalled == chs.useFirewalled && (!bestP || !bestP->relay))
3725 best.host = best.rhost[0]; // use wan ip
3735 if (chs.numResults < ChanHitSearch::MAX_RESULTS)
3738 bestP->lastContact = ctime;
3739 chs.best[chs.numResults++] = best;
3749 // -----------------------------------
3750 int ChanHitList::pickSourceHits(ChanHitSearch &chs)
3752 if (pickHits(chs) && chs.best[0].numHops == 0) return 1;
3757 // -----------------------------------
3758 const char *ChanInfo::getTypeStr(TYPE t)
3762 case T_RAW: return "RAW";
3764 case T_MP3: return "MP3";
3765 case T_OGG: return "OGG";
3766 case T_OGM: return "OGM";
3767 case T_WMA: return "WMA";
3769 case T_MOV: return "MOV";
3770 case T_MPG: return "MPG";
3771 case T_NSV: return "NSV";
3772 case T_WMV: return "WMV";
3774 case T_PLS: return "PLS";
3775 case T_ASX: return "ASX";
3777 default: return "UNKNOWN";
3780 // -----------------------------------
3781 const char *ChanInfo::getProtocolStr(PROTOCOL t)
3785 case SP_PEERCAST: return "PEERCAST";
3786 case SP_HTTP: return "HTTP";
3787 case SP_FILE: return "FILE";
3788 case SP_MMS: return "MMS";
3789 case SP_PCP: return "PCP";
3790 default: return "UNKNOWN";
3793 // -----------------------------------
3794 ChanInfo::PROTOCOL ChanInfo::getProtocolFromStr(const char *str)
3796 if (stricmp(str,"PEERCAST")==0)
3798 else if (stricmp(str,"HTTP")==0)
3800 else if (stricmp(str,"FILE")==0)
3802 else if (stricmp(str,"MMS")==0)
3804 else if (stricmp(str,"PCP")==0)
3810 // -----------------------------------
3811 const char *ChanInfo::getTypeExt(TYPE t)
3815 case ChanInfo::T_OGM:
3816 case ChanInfo::T_OGG:
3818 case ChanInfo::T_MP3:
3820 case ChanInfo::T_MOV:
3822 case ChanInfo::T_NSV:
3824 case ChanInfo::T_WMV:
3826 case ChanInfo::T_WMA:
3832 // -----------------------------------
3833 ChanInfo::TYPE ChanInfo::getTypeFromStr(const char *str)
3835 if (stricmp(str,"MP3")==0)
3837 else if (stricmp(str,"OGG")==0)
3839 else if (stricmp(str,"OGM")==0)
3841 else if (stricmp(str,"RAW")==0)
3843 else if (stricmp(str,"NSV")==0)
3845 else if (stricmp(str,"WMA")==0)
3847 else if (stricmp(str,"WMV")==0)
3849 else if (stricmp(str,"PLS")==0)
3851 else if (stricmp(str,"M3U")==0)
3853 else if (stricmp(str,"ASX")==0)
3858 // -----------------------------------
3859 bool ChanInfo::matchNameID(ChanInfo &inf)
3862 if (id.isSame(inf.id))
3865 if (!inf.name.isEmpty())
3866 if (name.contains(inf.name))
3871 // -----------------------------------
3872 bool ChanInfo::match(ChanInfo &inf)
3876 if (inf.status != S_UNKNOWN)
3878 if (status != inf.status)
3882 if (inf.bitrate != 0)
3884 if (bitrate == inf.bitrate)
3891 if (id.isSame(inf.id))
3896 if (inf.contentType != T_UNKNOWN)
3898 if (contentType == inf.contentType)
3903 if (!inf.name.isEmpty())
3905 if (name.contains(inf.name))
3910 if (!inf.genre.isEmpty())
3912 if (genre.contains(inf.genre))
3919 // -----------------------------------
3920 bool TrackInfo::update(TrackInfo &inf)
3922 bool changed = false;
3924 if (!contact.isSame(inf.contact))
3926 contact = inf.contact;
3930 if (!title.isSame(inf.title))
3936 if (!artist.isSame(inf.artist))
3938 artist = inf.artist;
3942 if (!album.isSame(inf.album))
3948 if (!genre.isSame(inf.genre))
3959 // -----------------------------------
3960 bool ChanInfo::update(ChanInfo &info)
3962 bool changed = false;
3967 if (!info.id.isSet())
3970 // only update from chaninfo that has full name etc..
3971 if (info.name.isEmpty())
3974 // check valid broadcaster key
3977 if (!bcID.isSame(info.bcID))
3979 LOG_ERROR("ChanInfo BC key not valid");
3989 if (bitrate != info.bitrate)
3991 bitrate = info.bitrate;
3995 if (contentType != info.contentType)
3997 contentType = info.contentType;
4001 if (!desc.isSame(info.desc)) //JP-EX
4007 if (!name.isSame(info.name))
4013 if (!comment.isSame(info.comment))
4015 comment = info.comment;
4019 if (!genre.isSame(info.genre))
4025 if (!url.isSame(info.url))
4031 if (track.update(info.track))
4037 // -----------------------------------
4038 void ChanInfo::initNameID(const char *n)
4046 // -----------------------------------
4047 void ChanInfo::init()
4052 contentType = T_UNKNOWN;
4053 srcProtocol = SP_UNKNOWN;
4065 // -----------------------------------
4066 void ChanInfo::readTrackXML(XML::Node *n)
4069 readXMLString(track.title,n,"title");
4070 readXMLString(track.contact,n,"contact");
4071 readXMLString(track.artist,n,"artist");
4072 readXMLString(track.album,n,"album");
4073 readXMLString(track.genre,n,"genre");
4075 // -----------------------------------
4076 unsigned int ChanInfo::getUptime()
4078 // calculate uptime and cap if requested by settings.
4080 upt = lastPlayStart?(sys->getTime()-lastPlayStart):0;
4081 if (chanMgr->maxUptime)
4082 if (upt > chanMgr->maxUptime)
4083 upt = chanMgr->maxUptime;
4086 // -----------------------------------
4087 unsigned int ChanInfo::getAge()
4089 return sys->getTime()-createdTime;
4092 // ------------------------------------------
4093 void ChanInfo::readTrackAtoms(AtomStream &atom,int numc)
4095 for(int i=0; i<numc; i++)
4098 ID4 id = atom.read(c,d);
4099 if (id == PCP_CHAN_TRACK_TITLE)
4101 atom.readString(track.title.data,sizeof(track.title.data),d);
4102 }else if (id == PCP_CHAN_TRACK_CREATOR)
4104 atom.readString(track.artist.data,sizeof(track.artist.data),d);
4105 }else if (id == PCP_CHAN_TRACK_URL)
4107 atom.readString(track.contact.data,sizeof(track.contact.data),d);
4108 }else if (id == PCP_CHAN_TRACK_ALBUM)
4110 atom.readString(track.album.data,sizeof(track.album.data),d);
4115 // ------------------------------------------
4116 void ChanInfo::readInfoAtoms(AtomStream &atom,int numc)
4118 for(int i=0; i<numc; i++)
4121 ID4 id = atom.read(c,d);
4122 if (id == PCP_CHAN_INFO_NAME)
4124 atom.readString(name.data,sizeof(name.data),d);
4125 }else if (id == PCP_CHAN_INFO_BITRATE)
4127 bitrate = atom.readInt();
4128 }else if (id == PCP_CHAN_INFO_GENRE)
4130 atom.readString(genre.data,sizeof(genre.data),d);
4131 }else if (id == PCP_CHAN_INFO_URL)
4133 atom.readString(url.data,sizeof(url.data),d);
4134 }else if (id == PCP_CHAN_INFO_DESC)
4136 atom.readString(desc.data,sizeof(desc.data),d);
4137 }else if (id == PCP_CHAN_INFO_COMMENT)
4139 atom.readString(comment.data,sizeof(comment.data),d);
4140 }else if (id == PCP_CHAN_INFO_TYPE)
4143 atom.readString(type,sizeof(type),d);
4144 contentType = ChanInfo::getTypeFromStr(type);
4150 // -----------------------------------
4151 void ChanInfo::writeInfoAtoms(AtomStream &atom)
4153 atom.writeParent(PCP_CHAN_INFO,7);
4154 atom.writeString(PCP_CHAN_INFO_NAME,name.cstr());
4155 atom.writeInt(PCP_CHAN_INFO_BITRATE,bitrate);
4156 atom.writeString(PCP_CHAN_INFO_GENRE,genre.cstr());
4157 atom.writeString(PCP_CHAN_INFO_URL,url.cstr());
4158 atom.writeString(PCP_CHAN_INFO_DESC,desc.cstr());
4159 atom.writeString(PCP_CHAN_INFO_COMMENT,comment.cstr());
4160 atom.writeString(PCP_CHAN_INFO_TYPE,getTypeStr(contentType));
4163 // -----------------------------------
4164 void ChanInfo::writeTrackAtoms(AtomStream &atom)
4166 atom.writeParent(PCP_CHAN_TRACK,4);
4167 atom.writeString(PCP_CHAN_TRACK_TITLE,track.title.cstr());
4168 atom.writeString(PCP_CHAN_TRACK_CREATOR,track.artist.cstr());
4169 atom.writeString(PCP_CHAN_TRACK_URL,track.contact.cstr());
4170 atom.writeString(PCP_CHAN_TRACK_ALBUM,track.album.cstr());
4174 // -----------------------------------
4175 XML::Node *ChanInfo::createChannelXML()
4179 String nameUNI = name;
4180 nameUNI.convertTo(String::T_UNICODESAFE);
4182 String urlUNI = url;
4183 urlUNI.convertTo(String::T_UNICODESAFE);
4185 String genreUNI = genre;
4186 genreUNI.convertTo(String::T_UNICODESAFE);
4188 String descUNI = desc;
4189 descUNI.convertTo(String::T_UNICODESAFE);
4192 commentUNI = comment;
4193 commentUNI.convertTo(String::T_UNICODESAFE);
4199 return new XML::Node("channel name=\"%s\" id=\"%s\" bitrate=\"%d\" type=\"%s\" genre=\"%s\" desc=\"%s\" url=\"%s\" uptime=\"%d\" comment=\"%s\" skips=\"%d\" age=\"%d\" bcflags=\"%d\"",
4203 getTypeStr(contentType),
4215 // -----------------------------------
4216 XML::Node *ChanInfo::createQueryXML()
4222 String nameHTML = name;
4223 nameHTML.convertTo(String::T_HTML);
4224 String genreHTML = genre;
4225 genreHTML.convertTo(String::T_HTML);
4228 if (!nameHTML.isEmpty())
4230 strcat(buf," name=\"");
4231 strcat(buf,nameHTML.cstr());
4235 if (!genreHTML.isEmpty())
4237 strcat(buf," genre=\"");
4238 strcat(buf,genreHTML.cstr());
4245 strcat(buf," id=\"");
4251 return new XML::Node("channel %s",buf);
4254 // -----------------------------------
4255 XML::Node *ChanInfo::createRelayChannelXML()
4262 return new XML::Node("channel id=\"%s\" uptime=\"%d\" skips=\"%d\" age=\"%d\"",
4268 }// -----------------------------------
4269 XML::Node *ChanInfo::createTrackXML()
4271 String titleUNI = track.title;
4272 titleUNI.convertTo(String::T_UNICODESAFE);
4274 String artistUNI = track.artist;
4275 artistUNI.convertTo(String::T_UNICODESAFE);
4277 String albumUNI = track.album;
4278 albumUNI.convertTo(String::T_UNICODESAFE);
4280 String genreUNI = track.genre;
4281 genreUNI.convertTo(String::T_UNICODESAFE);
4283 String contactUNI = track.contact;
4284 contactUNI.convertTo(String::T_UNICODESAFE);
4288 return new XML::Node("track title=\"%s\" artist=\"%s\" album=\"%s\" genre=\"%s\" contact=\"%s\"",
4297 // -----------------------------------
4298 void ChanInfo::init(XML::Node *n)
4304 // -----------------------------------
4305 void ChanInfo::updateFromXML(XML::Node *n)
4307 String typeStr,idStr;
4309 readXMLString(name,n,"name");
4310 readXMLString(genre,n,"genre");
4311 readXMLString(url,n,"url");
4312 readXMLString(desc,n,"desc");
4315 int br = n->findAttrInt("bitrate");
4319 readXMLString(typeStr,n,"type");
4320 if (!typeStr.isEmpty())
4321 contentType = getTypeFromStr(typeStr.cstr());
4324 readXMLString(idStr,n,"id");
4325 if (!idStr.isEmpty())
4326 id.fromStr(idStr.cstr());
4328 readXMLString(comment,n,"comment");
4330 XML::Node *tn = n->findNode("track");
4336 // -----------------------------------
4337 void ChanInfo::init(const char *n, GnuID &cid, TYPE tp, int br)
4347 // -----------------------------------
4348 void ChanInfo::init(const char *fn)
4355 // -----------------------------------
4356 void PlayList::readASX(Stream &in)
4358 LOG_DEBUG("Reading ASX");
4364 }catch(StreamException &) {} // TODO: eof is NOT handled properly in sockets - always get error at end
4368 XML::Node *n = xml.root->child;
4371 if (stricmp("entry",n->getName())==0)
4373 XML::Node *rf = n->findNode("ref");
4376 char *hr = rf->findAttr("href");
4380 //LOG("asx url %s",hr);
4389 // -----------------------------------
4390 void PlayList::readSCPLS(Stream &in)
4393 while (in.readLine(tmp,sizeof(tmp)))
4395 if (strnicmp(tmp,"file",4)==0)
4397 char *p = strstr(tmp,"=");
4403 // -----------------------------------
4404 void PlayList::readPLS(Stream &in)
4407 while (in.readLine(tmp,sizeof(tmp)))
4413 // -----------------------------------
4414 void PlayList::writeSCPLS(Stream &out)
4416 out.writeLine("[playlist]");
4418 out.writeLineF("NumberOfEntries=%d",numURLs);
4420 for(int i=0; i<numURLs; i++)
4422 out.writeLineF("File%d=%s",i+1,urls[i].cstr());
4423 out.writeLineF("Title%d=%s",i+1,titles[i].cstr());
4424 out.writeLineF("Length%d=-1",i+1);
4426 out.writeLine("Version=2");
4428 // -----------------------------------
4429 void PlayList::writePLS(Stream &out)
4431 for(int i=0; i<numURLs; i++)
4432 out.writeLineF("%s",urls[i].cstr());
4434 // -----------------------------------
4435 void PlayList::writeRAM(Stream &out)
4437 for(int i=0; i<numURLs; i++)
4438 out.writeLineF("%s",urls[i].cstr());
4441 // -----------------------------------
4442 void PlayList::writeASX(Stream &out)
4444 out.writeLine("<ASX Version=\"3.0\">");
4445 for(int i=0; i<numURLs; i++)
4447 out.writeLine("<ENTRY>");
4448 out.writeLineF("<REF href = \"%s\" />",urls[i].cstr());
4449 out.writeLine("</ENTRY>");
4451 out.writeLine("</ASX>");
4455 // -----------------------------------
4456 void PlayList::addChannel(const char *path, ChanInfo &info)
4462 info.id.toStr(idStr);
4463 char *nid = info.id.isSet()?idStr:info.name.cstr();
4465 sprintf(url.cstr(),"%s/stream/%s%s",path,nid,ChanInfo::getTypeExt(info.contentType));
4466 addURL(url.cstr(),info.name);
4469 // -----------------------------------
4470 void ChanHitSearch::init()
4474 useFirewalled = false;
4475 trackersOnly = false;
4476 useBusyRelays = true;
4477 useBusyControls = true;
4481 //seed = sys->getTime();
4485 int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList *chl)
4492 static int base = 0x400;
4493 ChanHit tmpHit[MAX_RESULTS];
4494 static WLock seqLock;
4495 static unsigned int riSequence = 0;
4503 riSequence &= 0xffffff;
4506 ChanHit *hit = chl->hit;
4509 if (hit->host.ip && !hit->dead){
4511 (!exID.isSame(hit->sessionID))
4514 && (!hit->firewalled)
4515 && (hit->numHops != 0)
4517 if ( (hit->rhost[0].ip == host1.ip)
4518 && hit->rhost[1].isValid()
4519 && (host2.ip != hit->rhost[1].ip)
4522 best[0].host = hit->rhost[1];
4525 if ((hit->rhost[0].ip == host2.ip) && hit->rhost[1].isValid()){
4527 best[0].host = hit->rhost[1];
4531 loop = (index / MAX_RESULTS) + 1;
4532 //prob = (float)1 / (float)loop;
4534 //rnd = (float)rand() / (float)RAND_MAX;
4535 rnd = rand() % base;
4536 if (hit->numHops == 1){
4537 if (tmpHit[index % MAX_RESULTS].numHops == 1){
4539 tmpHit[index % MAX_RESULTS] = *hit;
4540 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4544 tmpHit[index % MAX_RESULTS] = *hit;
4545 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4549 if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
4550 tmpHit[index % MAX_RESULTS] = *hit;
4551 tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
4557 // hit->host.toStr(tmp);
4558 // LOG_DEBUG("TEST %s: %f %f", tmp, rnd, prob);
4564 if (index > MAX_RESULTS){
4570 /* int use[MAX_RESULTS];
4571 memset(use, 0, sizeof(use));
4573 for (i = 0; i < cnt; i++){
4577 for (i = 0; i < cnt; i++){
4580 // LOG_DEBUG("%d",r);
4588 for (i = 0; i < cnt; i++){
4589 // LOG_DEBUG("%d", use[i]);
4590 best[use[i]] = tmpHit[i];
4593 int use[MAX_RESULTS];
4595 for (i = 0; i < cnt; i++) {
4596 use[i] = (i + seq) % cnt;
4599 for (i = 0; i < cnt; i++){
4600 // LOG_DEBUG("%d", use[i]);
4601 best[use[i]] = tmpHit[i];
4603 // for (i = 0; i < cnt; i++){
4605 // best[i].host.toStr(tmp);
4606 // LOG_DEBUG("Relay info: Hops = %d, %s", best[i].numHops, tmp);