1 // ------------------------------------------------
6 // (c) 2002-4 peercast.org
7 // ------------------------------------------------
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 // ------------------------------------------------
24 #include "chkMemoryLeak.h"
25 #define DEBUG_NEW new(__FILE__, __LINE__)
29 // ------------------------------------------
30 void PCPStream::init(GnuID &rid)
36 nextRootPacket = 0; // 0 seconds (never)
39 inData.accept = ChanPacket::T_PCP;
42 outData.accept = ChanPacket::T_PCP;
44 // ------------------------------------------
45 void PCPStream::readVersion(Stream &in)
47 int len = in.readInt();
50 throw StreamException("Invalid PCP");
52 int ver = in.readInt();
54 LOG_DEBUG("PCP ver: %d",ver);
56 // ------------------------------------------
57 void PCPStream::readHeader(Stream &in,Channel *)
59 // AtomStream atom(in);
61 // if (in.readInt() != PCP_CONNECT)
62 // throw StreamException("Not PCP");
66 // ------------------------------------------
67 bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
70 if (!destID.isSame(remoteID))
71 if (!routeList.contains(destID))
74 return outData.writePacket(pack);
76 // ------------------------------------------
77 void PCPStream::flush(Stream &in)
80 // send outward packets
81 while (outData.numPending())
83 outData.readPacket(pack);
88 // ------------------------------------------
89 unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
92 unsigned int len = 0, skip = 0;
94 while (outData.numPending())
96 outData.readPacketPri(pack);
98 if (size >= len + pack.len) {
106 LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
111 // ------------------------------------------
112 int PCPStream::readPacket(Stream &in,Channel *)
116 return readPacket(in,bcs);
118 // ------------------------------------------
119 int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
121 int error = PCP_ERROR_GENERAL;
127 MemoryStream mem(pack.data,sizeof(pack.data));
128 AtomStream patom(mem);
131 // send outward packets
132 error = PCP_ERROR_WRITE;
133 if (outData.numPending())
135 outData.readPacket(pack);
138 error = PCP_ERROR_GENERAL;
140 if (outData.willSkip())
142 error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
143 throw StreamException("Send too slow");
147 error = PCP_ERROR_READ;
148 // poll for new downward packet
154 id = atom.read(numc,numd);
157 pack.len = patom.writeAtoms(id, in, numc, numd);
158 pack.type = ChanPacket::T_PCP;
160 //inData.writePacket(pack);
162 error = PCP_ERROR_GENERAL;
164 // process downward packets
165 //if (inData.numPending())
167 //inData.readPacket(pack);
172 id = patom.read(numc,numd);
174 error = PCPStream::procAtom(patom,id,numc,numd,bcs);
178 throw StreamException("PCP exception");
184 }catch(StreamException &e)
186 LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
192 // ------------------------------------------
193 void PCPStream::readEnd(Stream &,Channel *)
198 // ------------------------------------------
199 void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
206 for(int i=0; i<numc; i++)
209 ID4 id = atom.read(c,d);
211 if (id == PCP_PUSH_IP)
212 host.ip = atom.readInt();
213 else if (id == PCP_PUSH_PORT)
214 host.port = atom.readShort();
215 else if (id == PCP_PUSH_CHANID)
216 atom.readBytes(chanID.id,16);
219 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
234 Channel *ch = chanMgr->findChannelByID(chanID);
236 if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
237 s = servMgr->allocServent();
239 s = servMgr->allocServent();
244 LOG_DEBUG("GIVing to %s",ipstr);
245 s->initGIV(host,chanID);
250 // ------------------------------------------
251 void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
255 for(int i=0; i<numc; i++)
258 ID4 id = atom.read(c,d);
260 if (id == PCP_ROOT_UPDINT)
262 int si = atom.readInt();
264 chanMgr->setUpdateInterval(si);
265 LOG_DEBUG("PCP got new host update interval: %ds",si);
266 }else if (id == PCP_ROOT_URL)
268 url = "http://www.peercast.org/";
270 atom.readString(loc.data,sizeof(loc.data),d);
273 }else if (id == PCP_ROOT_CHECKVER)
275 unsigned int newVer = atom.readInt();
276 if (newVer > PCP_CLIENT_VERSION)
278 strcpy(servMgr->downloadURL,url.cstr());
279 peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
281 LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
283 }else if (id == PCP_ROOT_NEXT)
285 unsigned int time = atom.readInt();
289 unsigned int ctime = sys->getTime();
290 nextRootPacket = ctime+time;
291 LOG_DEBUG("PCP expecting next root packet in %ds",time);
297 }else if (id == PCP_ROOT_UPDATE)
301 chanMgr->broadcastTrackerUpdate(remoteID,true);
303 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
307 atom.readString(newMsg.data,sizeof(newMsg.data),d);
308 if (!newMsg.isSame(servMgr->rootMsg.cstr()))
310 servMgr->rootMsg = newMsg;
311 LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
312 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
316 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
322 // ------------------------------------------
323 void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
329 for(int i=0; i<numc; i++)
332 ID4 id = atom.read(c,d);
334 if (id == PCP_CHAN_PKT_TYPE)
336 type = atom.readID4();
338 if (type == PCP_CHAN_PKT_HEAD)
339 pack.type = ChanPacket::T_HEAD;
340 else if (type == PCP_CHAN_PKT_DATA)
341 pack.type = ChanPacket::T_DATA;
343 pack.type = ChanPacket::T_UNKNOWN;
345 }else if (id == PCP_CHAN_PKT_POS)
347 pack.pos = atom.readInt();
350 }else if (id == PCP_CHAN_PKT_DATA)
353 atom.readBytes(pack.data,pack.len);
357 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
365 int diff = pack.pos - ch->streamPos;
368 LOG_DEBUG("PCP skipping %s%8d (%10d -> %10d) count=%2d",(diff>0)?"+":"",diff,ch->streamPos,pack.pos, ch->skipCount);
369 if (ch->lastSkipTime + 120 < sys->getTime()){
372 ch->lastSkipTime = sys->getTime();
373 ch->skipCount++; //JP-EX
377 if (servMgr->autoBumpSkipCount) //JP-EX
379 if ((ch->skipCount > servMgr->autoBumpSkipCount) && !(servMgr->disableAutoBumpIfDirect && ch->sourceHost.tracker)) //JP-MOD
381 LOG_DEBUG("Auto bump");
386 if (pack.type == ChanPacket::T_HEAD)
388 LOG_DEBUG("New head packet at %d",pack.pos);
390 if (servMgr->keepDownstreams)
391 renewhead = (memcmp(ch->headPack.data, pack.data, pack.len) != 0);
396 // check for stream restart
399 LOG_CHANNEL("PCP resetting stream");
404 if (renewhead || ch->lastStopTime + 30 < sys->getTime()) {
405 // check for stream restart
408 LOG_CHANNEL("PCP resetting stream");
415 ch->rawData.writePacket(pack,true);
416 ch->streamPos = pack.pos+pack.len;
419 }else if (pack.type == ChanPacket::T_DATA)
421 ch->rawData.writePacket(pack,true);
422 ch->streamPos = pack.pos+pack.len;
427 // update this parent packet stream position
428 if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
429 bcs.streamPos = pack.pos;
432 // -----------------------------------
433 void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, ChanHit &hit, bool flg)
437 GnuID chanID = bcs.chanID; //use default
441 unsigned int ipNum=0;
443 for(int i=0; i<numc; i++)
446 ID4 id = atom.read(c,d);
448 if (id == PCP_HOST_IP)
450 unsigned int ip = atom.readInt();
451 hit.rhost[ipNum].ip = ip;
452 }else if (id == PCP_HOST_PORT)
454 int port = atom.readShort();
455 hit.rhost[ipNum++].port = port;
460 else if (id == PCP_HOST_NUML)
462 hit.numListeners = atom.readInt();
463 if (hit.numListeners > 10)
464 hit.numListeners = 10;
466 else if (id == PCP_HOST_NUMR)
468 hit.numRelays = atom.readInt();
469 if (hit.numRelays > 100)
472 else if (id == PCP_HOST_UPTIME)
473 hit.upTime = atom.readInt();
474 else if (id == PCP_HOST_OLDPOS)
475 hit.oldestPos = atom.readInt();
476 else if (id == PCP_HOST_NEWPOS)
477 hit.newestPos = atom.readInt();
478 else if (id == PCP_HOST_VERSION)
479 hit.version = atom.readInt();
480 else if (id == PCP_HOST_VERSION_VP)
481 hit.version_vp = atom.readInt();
482 else if (id == PCP_HOST_VERSION_EX_PREFIX)
483 atom.readBytes(hit.version_ex_prefix,2);
484 else if (id == PCP_HOST_VERSION_EX_NUMBER){
485 hit.version_ex_number = atom.readShort();
487 else if (id == PCP_HOST_FLAGS1)
489 int fl1 = atom.readChar();
491 hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
492 hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
493 hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
494 hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
495 hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
496 hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
499 }else if (id == PCP_HOST_ID)
500 atom.readBytes(hit.sessionID.id,16);
501 else if (id == PCP_HOST_CHANID)
502 atom.readBytes(chanID.id,16);
503 else if (id == PCP_HOST_UPHOST_IP)
504 hit.uphost.ip = atom.readInt();
505 else if (id == PCP_HOST_UPHOST_PORT)
506 hit.uphost.port = atom.readInt();
507 else if (id == PCP_HOST_UPHOST_HOPS)
508 hit.uphostHops = atom.readInt();
509 else if (id == PCP_HOST_CLAP_PP){ //JP-MOD
510 hit.clap_pp = atom.readInt();
511 if (hit.clap_pp & 1){
512 Channel *c = chanMgr->findChannelByID(chanID);
513 if(c && c->isBroadcasting()){
516 sjis.convertTo(String::T_SJIS);
517 peercastApp->notifyMessage(ServMgr::NT_APPLAUSE, sjis);
522 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
527 hit.host = hit.rhost[0];
530 hit.numHops = bcs.numHops;
532 hit.servent_id = bcs.servent_id;
534 if (flg && (bcs.ttl != 0)){
535 // LOG_DEBUG("readHostAtoms HITLISTLOCK ON-------------");
536 chanMgr->hitlistlock.on();
538 chanMgr->addHit(hit);
540 chanMgr->delHit(hit);
541 // LOG_DEBUG("readHostAtoms HITLISTLOCK OFF-------------");
542 chanMgr->hitlistlock.off();
545 if (hit.numHops == 1){
546 Servent *sv = servMgr->findServentByServentID(hit.servent_id);
547 if (sv && sv->getHost().ip == hit.host.ip){
548 // LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
549 sv->waitPort = hit.host.port;
550 hit.lastSendSeq = sv->serventHit.lastSendSeq;
551 sv->serventHit = hit;
556 // ------------------------------------------
557 void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
560 ChanHitList *chl=NULL;
563 ch = chanMgr->findChannelByID(bcs.chanID);
564 chl = chanMgr->findHitListByID(bcs.chanID);
569 newInfo = chl->info;*/
572 ChanHitList *chl=NULL;
573 ChanInfo newInfo, chaInfo;
581 for(int i=0; i<numc; i++)
585 ID4 id = atom.read(c,d);
587 if ((id == PCP_CHAN_PKT) && (ch))
589 readPktAtoms(ch,atom,c,bcs);
590 }else if (id == PCP_CHAN_INFO)
592 newInfo.readInfoAtoms(atom,c);
594 }else if (id == PCP_CHAN_TRACK)
596 newInfo.readTrackAtoms(atom,c);
598 }else if (id == PCP_CHAN_BCID)
600 atom.readBytes(newInfo.bcID.id,16);
602 }else if (id == PCP_CHAN_KEY) // depreciated
604 atom.readBytes(newInfo.bcID.id,16);
605 newInfo.bcID.id[0] = 0; // clear flags
607 }else if (id == PCP_CHAN_ID)
609 atom.readBytes(newInfo.id.id,16);
611 ch = chanMgr->findChannelByID(newInfo.id);
612 chl = chanMgr->findHitListByID(newInfo.id);
616 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
621 chl = chanMgr->findHitList(newInfo);
624 chl = chanMgr->addHitList(newInfo);
628 chl->info.update(newInfo);
630 if (!servMgr->chanLog.isEmpty())
632 //if (chl->numListeners())
637 file.openWriteAppend(servMgr->chanLog.cstr());
639 XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
640 XML::Node *n = chl->info.createChannelXML();
641 n->add(chl->createXML(false));
642 n->add(chl->info.createTrackXML());
648 }catch(StreamException &e)
650 LOG_ERROR("Unable to update channel log: %s",e.msg);
657 if (ch && !ch->isBroadcasting())
658 ch->updateInfo(newInfo);
662 // ------------------------------------------
663 int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
671 char ver_ex_prefix[2];
672 int ver_ex_number = 0;
677 bcs.initPacketSettings();
679 MemoryStream pmem(pack.data,sizeof(pack.data));
680 AtomStream patom(pmem);
682 patom.writeParent(PCP_BCST,numc);
684 for(int i=0; i<numc; i++)
687 ID4 id = atom.read(c,d);
689 if (id == PCP_BCST_TTL)
691 bcs.ttl = atom.readChar()-1;
692 patom.writeChar(id,bcs.ttl);
693 }else if (id == PCP_BCST_HOPS)
695 bcs.numHops = atom.readChar()+1;
696 patom.writeChar(id,bcs.numHops);
698 }else if (id == PCP_BCST_FROM)
700 atom.readBytes(fromID.id,16);
701 patom.writeBytes(id,fromID.id,16);
703 routeList.add(fromID);
704 }else if (id == PCP_BCST_GROUP)
706 bcs.group = atom.readChar();
707 patom.writeChar(id,bcs.group);
708 }else if (id == PCP_BCST_DEST)
710 atom.readBytes(destID.id,16);
711 patom.writeBytes(id,destID.id,16);
712 bcs.forMe = destID.isSame(servMgr->sessionID);
717 destID.toStr(idstr1);
718 servMgr->sessionID.toStr(idstr2);
720 }else if (id == PCP_BCST_CHANID)
722 atom.readBytes(bcs.chanID.id,16);
723 patom.writeBytes(id,bcs.chanID.id,16);
724 }else if (id == PCP_BCST_VERSION)
726 ver = atom.readInt();
727 patom.writeInt(id,ver);
728 }else if (id == PCP_BCST_VERSION_VP)
730 ver_vp = atom.readInt();
731 patom.writeInt(id,ver_vp);
732 }else if (id == PCP_BCST_VERSION_EX_PREFIX)
734 atom.readBytes(ver_ex_prefix,2);
735 patom.writeBytes(id,ver_ex_prefix,2);
736 }else if (id == PCP_BCST_VERSION_EX_NUMBER)
738 ver_ex_number = atom.readShort();
739 patom.writeShort(id,ver_ex_number);
740 }else if (id == PCP_HOST)
743 readHostAtoms(atom,c,bcs,hit,false);
744 Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
745 if (hit.uphost.ip == 0){
746 // LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
747 if (bcs.numHops == 1){
748 hit.uphost.ip = servMgr->serverHost.ip;
749 hit.uphost.port = servMgr->serverHost.port;
752 //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
754 hit.uphost.ip = sv->getHost().ip;
755 hit.uphost.port = sv->waitPort;
756 hit.uphostHops = bcs.numHops - 1;
761 ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
762 && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
763 || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
764 || chanMgr->findParentHit(hit)))
766 int oldPos = pmem.pos;
767 hit.writeAtoms(patom, hit.chanID);
769 r = readAtom(patom,bcs);
771 char tmp[80], tmp2[80], tmp3[80];
772 hit.uphost.toStr(tmp);
773 hit.host.toStr(tmp2);
774 sv->getHost().toStr(tmp3);
775 LOG_DEBUG("### Invalid bcst: hops=%d, l/r = %d/%d, ver=%d(VP%04d), ttl=%d",
776 bcs.numHops,hit.numListeners, hit.numRelays, ver,ver_vp,bcs.ttl);
777 LOG_DEBUG("### %s <- %s <- sv(%s)",
782 // copy and process atoms
783 int oldPos = pmem.pos;
784 patom.writeAtoms(id,atom.io,c,d);
786 r = readAtom(patom,bcs);
793 fromID.toStr(fromStr);
797 destID.toStr(destStr);
799 bcs.chanID.toStr(tmp);
804 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
805 bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,bcs.ttl);
807 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(VP%04d), from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,ver_vp,fromStr,destStr,bcs.ttl);
809 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,fromStr,destStr,bcs.ttl);
813 if (fromID.isSame(servMgr->sessionID))
815 LOG_ERROR("BCST loopback");
816 return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
819 // broadcast back out if ttl > 0
820 if ((bcs.ttl>0) && (!bcs.forMe))
823 pack.type = ChanPacket::T_PCP;
825 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
827 pack.priority = 11 - bcs.numHops;
828 chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
831 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
833 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
836 if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
838 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
841 if (bcs.group & (PCP_BCST_GROUP_RELAYS))
843 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
847 // LOG_DEBUG("ttl=%d",ttl);
850 // LOG_DEBUG("ttl=%d",ttl);
856 // ------------------------------------------
857 int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
865 readChanAtoms(atom,numc,bcs);
866 }else if (id == PCP_ROOT)
869 throw StreamException("Unauthorized root message");
871 readRootAtoms(atom,numc,bcs);
873 }else if (id == PCP_HOST)
875 readHostAtoms(atom,numc,bcs,hit);
876 Channel *ch = chanMgr->findChannelByID(hit.chanID);
877 if (ch && (ch->isBroadcasting() || servMgr->vpDebug)){
878 if (servMgr->autoPort0Kick && (hit.numHops == 1) && (hit.firewalled || (!hit.relay && !hit.numRelays))){
880 hit.host.IPtoStr(tmp);
881 LOG_DEBUG("host that can't relay is disconnect: %s", tmp);
882 rBan = PCP_ERROR_BANNED;
884 if (servMgr->allowOnlyVP && (hit.numHops == 1) && !hit.version_vp){
886 hit.host.IPtoStr(tmp);
887 LOG_DEBUG("host that is not VP is disconnect: %s", tmp);
888 rBan = PCP_ERROR_BANNED;
892 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
895 atom.readString(msg.data,sizeof(msg.data),dlen);
896 LOG_DEBUG("PCP got text: %s",msg.cstr());
897 }else if (id == PCP_BCST)
899 r = readBroadcastAtoms(atom,numc,bcs);
900 }else if (id == PCP_HELO)
902 atom.skip(numc,dlen);
903 atom.writeParent(PCP_OLEH,1);
904 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
905 }else if (id == PCP_PUSH)
908 readPushAtoms(atom,numc,bcs);
909 }else if (id == PCP_OK)
913 }else if (id == PCP_QUIT)
919 }else if (id == PCP_ATOM)
921 for(int i=0; i<numc; i++)
924 ID4 aid = atom.read(nc,nd);
925 int ar = procAtom(atom,aid,nc,nd,bcs);
932 LOG_CHANNEL("PCP skip: %s",id.getString().str());
933 atom.skip(numc,dlen);
943 // ------------------------------------------
944 int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
947 ID4 id = atom.read(numc,dlen);
949 return procAtom(atom,id,numc,dlen,bcs);