1 // ------------------------------------------------
6 // (c) 2002-4 peercast.org
7 // ------------------------------------------------
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 // ------------------------------------------------
24 #include "chkMemoryLeak.h"
25 #define DEBUG_NEW new(__FILE__, __LINE__)
29 // ------------------------------------------
30 void PCPStream::init(GnuID &rid)
36 nextRootPacket = 0; // 0 seconds (never)
39 inData.accept = ChanPacket::T_PCP;
42 outData.accept = ChanPacket::T_PCP;
44 // ------------------------------------------
45 void PCPStream::readVersion(Stream &in)
47 int len = in.readInt();
50 throw StreamException("Invalid PCP");
52 int ver = in.readInt();
54 LOG_DEBUG("PCP ver: %d",ver);
56 // ------------------------------------------
57 void PCPStream::readHeader(Stream &in,Channel *)
59 // AtomStream atom(in);
61 // if (in.readInt() != PCP_CONNECT)
62 // throw StreamException("Not PCP");
66 // ------------------------------------------
67 bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
70 if (!destID.isSame(remoteID))
71 if (!routeList.contains(destID))
74 return outData.writePacket(pack);
76 // ------------------------------------------
77 void PCPStream::flush(Stream &in)
80 // send outward packets
81 while (outData.numPending())
83 outData.readPacket(pack);
87 // ------------------------------------------
88 int PCPStream::readPacket(Stream &in,Channel *)
91 return readPacket(in,bcs);
93 // ------------------------------------------
94 int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
96 int error = PCP_ERROR_GENERAL;
102 MemoryStream mem(pack.data,sizeof(pack.data));
103 AtomStream patom(mem);
106 // send outward packets
107 error = PCP_ERROR_WRITE;
108 if (outData.numPending())
110 outData.readPacket(pack);
113 error = PCP_ERROR_GENERAL;
115 if (outData.willSkip())
117 error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
118 throw StreamException("Send too slow");
122 error = PCP_ERROR_READ;
123 // poll for new downward packet
129 id = atom.read(numc,numd);
132 pack.len = patom.writeAtoms(id, in, numc, numd);
133 pack.type = ChanPacket::T_PCP;
135 //inData.writePacket(pack);
137 error = PCP_ERROR_GENERAL;
139 // process downward packets
140 //if (inData.numPending())
142 //inData.readPacket(pack);
147 id = patom.read(numc,numd);
149 error = PCPStream::procAtom(patom,id,numc,numd,bcs);
153 throw StreamException("PCP exception");
159 }catch(StreamException &e)
161 LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
167 // ------------------------------------------
168 void PCPStream::readEnd(Stream &,Channel *)
173 // ------------------------------------------
174 void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
181 for(int i=0; i<numc; i++)
184 ID4 id = atom.read(c,d);
186 if (id == PCP_PUSH_IP)
187 host.ip = atom.readInt();
188 else if (id == PCP_PUSH_PORT)
189 host.port = atom.readShort();
190 else if (id == PCP_PUSH_CHANID)
191 atom.readBytes(chanID.id,16);
194 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
209 Channel *ch = chanMgr->findChannelByID(chanID);
211 if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
212 s = servMgr->allocServent();
214 s = servMgr->allocServent();
219 LOG_DEBUG("GIVing to %s",ipstr);
220 s->initGIV(host,chanID);
225 // ------------------------------------------
226 void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
230 for(int i=0; i<numc; i++)
233 ID4 id = atom.read(c,d);
235 if (id == PCP_ROOT_UPDINT)
237 int si = atom.readInt();
239 chanMgr->setUpdateInterval(si);
240 LOG_DEBUG("PCP got new host update interval: %ds",si);
241 }else if (id == PCP_ROOT_URL)
243 url = "http://www.peercast.org/";
245 atom.readString(loc.data,sizeof(loc.data),d);
248 }else if (id == PCP_ROOT_CHECKVER)
250 unsigned int newVer = atom.readInt();
251 if (newVer > PCP_CLIENT_VERSION)
253 strcpy(servMgr->downloadURL,url.cstr());
254 peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
256 LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
258 }else if (id == PCP_ROOT_NEXT)
260 unsigned int time = atom.readInt();
264 unsigned int ctime = sys->getTime();
265 nextRootPacket = ctime+time;
266 LOG_DEBUG("PCP expecting next root packet in %ds",time);
272 }else if (id == PCP_ROOT_UPDATE)
276 chanMgr->broadcastTrackerUpdate(remoteID,true);
278 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
282 atom.readString(newMsg.data,sizeof(newMsg.data),d);
283 if (!newMsg.isSame(servMgr->rootMsg.cstr()))
285 servMgr->rootMsg = newMsg;
286 LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
287 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
291 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
297 // ------------------------------------------
298 void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
304 for(int i=0; i<numc; i++)
307 ID4 id = atom.read(c,d);
309 if (id == PCP_CHAN_PKT_TYPE)
311 type = atom.readID4();
313 if (type == PCP_CHAN_PKT_HEAD)
314 pack.type = ChanPacket::T_HEAD;
315 else if (type == PCP_CHAN_PKT_DATA)
316 pack.type = ChanPacket::T_DATA;
318 pack.type = ChanPacket::T_UNKNOWN;
320 }else if (id == PCP_CHAN_PKT_POS)
322 pack.pos = atom.readInt();
325 }else if (id == PCP_CHAN_PKT_DATA)
328 atom.readBytes(pack.data,pack.len);
332 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
340 int diff = pack.pos - ch->streamPos;
343 LOG_DEBUG("PCP skipping %s%8d (%10d -> %10d) count=%2d",(diff>0)?"+":"",diff,ch->streamPos,pack.pos, ch->skipCount);
344 if (ch->lastSkipTime + 120 < sys->getTime()){
347 ch->lastSkipTime = sys->getTime();
348 ch->skipCount++; //JP-EX
352 if (servMgr->autoBumpSkipCount) //JP-EX
354 if (ch->skipCount > servMgr->autoBumpSkipCount)
356 LOG_DEBUG("Auto bump");
361 if (pack.type == ChanPacket::T_HEAD)
363 LOG_DEBUG("New head packet at %d",pack.pos);
365 if (servMgr->keepDownstreams)
366 renewhead = (memcmp(ch->headPack.data, pack.data, pack.len) != 0);
371 // check for stream restart
374 LOG_CHANNEL("PCP resetting stream");
379 if (renewhead || ch->lastStopTime + 30 < sys->getTime()) {
380 // check for stream restart
383 LOG_CHANNEL("PCP resetting stream");
390 ch->rawData.writePacket(pack,true);
391 ch->streamPos = pack.pos+pack.len;
394 }else if (pack.type == ChanPacket::T_DATA)
396 ch->rawData.writePacket(pack,true);
397 ch->streamPos = pack.pos+pack.len;
402 // update this parent packet stream position
403 if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
404 bcs.streamPos = pack.pos;
407 // -----------------------------------
408 void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, ChanHit &hit, bool flg)
412 GnuID chanID = bcs.chanID; //use default
416 unsigned int ipNum=0;
418 for(int i=0; i<numc; i++)
421 ID4 id = atom.read(c,d);
423 if (id == PCP_HOST_IP)
425 unsigned int ip = atom.readInt();
426 hit.rhost[ipNum].ip = ip;
427 }else if (id == PCP_HOST_PORT)
429 int port = atom.readShort();
430 hit.rhost[ipNum++].port = port;
435 else if (id == PCP_HOST_NUML)
436 hit.numListeners = atom.readInt();
437 else if (id == PCP_HOST_NUMR)
438 hit.numRelays = atom.readInt();
439 else if (id == PCP_HOST_UPTIME)
440 hit.upTime = atom.readInt();
441 else if (id == PCP_HOST_OLDPOS)
442 hit.oldestPos = atom.readInt();
443 else if (id == PCP_HOST_NEWPOS)
444 hit.newestPos = atom.readInt();
445 else if (id == PCP_HOST_VERSION)
446 hit.version = atom.readInt();
447 else if (id == PCP_HOST_VERSION_VP)
448 hit.version_vp = atom.readInt();
449 else if (id == PCP_HOST_VERSION_EX_PREFIX)
450 atom.readBytes(hit.version_ex_prefix,2);
451 else if (id == PCP_HOST_VERSION_EX_NUMBER){
452 hit.version_ex_number = atom.readShort();
454 else if (id == PCP_HOST_FLAGS1)
456 int fl1 = atom.readChar();
458 hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
459 hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
460 hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
461 hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
462 hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
463 hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
466 }else if (id == PCP_HOST_ID)
467 atom.readBytes(hit.sessionID.id,16);
468 else if (id == PCP_HOST_CHANID)
469 atom.readBytes(chanID.id,16);
470 else if (id == PCP_HOST_UPHOST_IP)
471 hit.uphost.ip = atom.readInt();
472 else if (id == PCP_HOST_UPHOST_PORT)
473 hit.uphost.port = atom.readInt();
474 else if (id == PCP_HOST_UPHOST_HOPS)
475 hit.uphostHops = atom.readInt();
478 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
483 hit.host = hit.rhost[0];
486 hit.numHops = bcs.numHops;
488 hit.servent_id = bcs.servent_id;
491 // LOG_DEBUG("readHostAtoms HITLISTLOCK ON-------------");
492 chanMgr->hitlistlock.on();
494 chanMgr->addHit(hit);
496 chanMgr->delHit(hit);
497 // LOG_DEBUG("readHostAtoms HITLISTLOCK OFF-------------");
498 chanMgr->hitlistlock.off();
501 if (hit.numHops == 1){
502 Servent *sv = servMgr->findServentByServentID(hit.servent_id);
504 // LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
505 sv->waitPort = hit.host.port;
510 // ------------------------------------------
511 void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
514 ChanHitList *chl=NULL;
517 ch = chanMgr->findChannelByID(bcs.chanID);
518 chl = chanMgr->findHitListByID(bcs.chanID);
523 newInfo = chl->info;*/
526 ChanHitList *chl=NULL;
527 ChanInfo newInfo, chaInfo;
535 for(int i=0; i<numc; i++)
539 ID4 id = atom.read(c,d);
541 if ((id == PCP_CHAN_PKT) && (ch))
543 readPktAtoms(ch,atom,c,bcs);
544 }else if (id == PCP_CHAN_INFO)
546 newInfo.readInfoAtoms(atom,c);
548 }else if (id == PCP_CHAN_TRACK)
550 newInfo.readTrackAtoms(atom,c);
552 }else if (id == PCP_CHAN_BCID)
554 atom.readBytes(newInfo.bcID.id,16);
556 }else if (id == PCP_CHAN_KEY) // depreciated
558 atom.readBytes(newInfo.bcID.id,16);
559 newInfo.bcID.id[0] = 0; // clear flags
561 }else if (id == PCP_CHAN_ID)
563 atom.readBytes(newInfo.id.id,16);
565 ch = chanMgr->findChannelByID(newInfo.id);
566 chl = chanMgr->findHitListByID(newInfo.id);
570 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
575 chl = chanMgr->findHitList(newInfo);
578 chl = chanMgr->addHitList(newInfo);
582 chl->info.update(newInfo);
584 if (!servMgr->chanLog.isEmpty())
586 //if (chl->numListeners())
591 file.openWriteAppend(servMgr->chanLog.cstr());
593 XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
594 XML::Node *n = chl->info.createChannelXML();
595 n->add(chl->createXML(false));
596 n->add(chl->info.createTrackXML());
602 }catch(StreamException &e)
604 LOG_ERROR("Unable to update channel log: %s",e.msg);
611 if (ch && !ch->isBroadcasting())
612 ch->updateInfo(newInfo);
616 // ------------------------------------------
617 int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
625 char ver_ex_prefix[2];
626 int ver_ex_number = 0;
631 bcs.initPacketSettings();
633 MemoryStream pmem(pack.data,sizeof(pack.data));
634 AtomStream patom(pmem);
636 patom.writeParent(PCP_BCST,numc);
638 for(int i=0; i<numc; i++)
641 ID4 id = atom.read(c,d);
643 if (id == PCP_BCST_TTL)
645 ttl = atom.readChar()-1;
646 patom.writeChar(id,ttl);
648 }else if (id == PCP_BCST_HOPS)
650 bcs.numHops = atom.readChar()+1;
651 patom.writeChar(id,bcs.numHops);
653 }else if (id == PCP_BCST_FROM)
655 atom.readBytes(fromID.id,16);
656 patom.writeBytes(id,fromID.id,16);
658 routeList.add(fromID);
659 }else if (id == PCP_BCST_GROUP)
661 bcs.group = atom.readChar();
662 patom.writeChar(id,bcs.group);
663 }else if (id == PCP_BCST_DEST)
665 atom.readBytes(destID.id,16);
666 patom.writeBytes(id,destID.id,16);
667 bcs.forMe = destID.isSame(servMgr->sessionID);
672 destID.toStr(idstr1);
673 servMgr->sessionID.toStr(idstr2);
675 }else if (id == PCP_BCST_CHANID)
677 atom.readBytes(bcs.chanID.id,16);
678 patom.writeBytes(id,bcs.chanID.id,16);
679 }else if (id == PCP_BCST_VERSION)
681 ver = atom.readInt();
682 patom.writeInt(id,ver);
683 }else if (id == PCP_BCST_VERSION_VP)
685 ver_vp = atom.readInt();
686 patom.writeInt(id,ver_vp);
687 }else if (id == PCP_BCST_VERSION_EX_PREFIX)
689 atom.readBytes(ver_ex_prefix,2);
690 patom.writeBytes(id,ver_ex_prefix,2);
691 }else if (id == PCP_BCST_VERSION_EX_NUMBER)
693 ver_ex_number = atom.readShort();
694 patom.writeShort(id,ver_ex_number);
695 }else if (id == PCP_HOST)
698 readHostAtoms(atom,c,bcs,hit,false);
699 if (hit.uphost.ip == 0){
700 // LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
701 if (bcs.numHops == 1){
702 hit.uphost.ip = servMgr->serverHost.ip;
703 hit.uphost.port = servMgr->serverHost.port;
706 Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
708 hit.uphost.ip = sv->getHost().ip;
709 hit.uphost.port = sv->waitPort;
710 hit.uphostHops = bcs.numHops - 1;
714 int oldPos = pmem.pos;
715 hit.writeAtoms(patom, hit.chanID);
717 r = readAtom(patom,bcs);
719 // copy and process atoms
720 int oldPos = pmem.pos;
721 patom.writeAtoms(id,atom.io,c,d);
723 r = readAtom(patom,bcs);
730 fromID.toStr(fromStr);
734 destID.toStr(destStr);
736 bcs.chanID.toStr(tmp);
741 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
742 bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,ttl);
744 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(VP%04d), from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,ver_vp,fromStr,destStr,ttl);
746 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,fromStr,destStr,ttl);
750 if (fromID.isSame(servMgr->sessionID))
752 LOG_ERROR("BCST loopback");
753 return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
756 // broadcast back out if ttl > 0
757 if ((ttl>0) && (!bcs.forMe))
760 pack.type = ChanPacket::T_PCP;
762 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
764 chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
767 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
769 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
772 if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
774 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
777 if (bcs.group & (PCP_BCST_GROUP_RELAYS))
779 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
783 // LOG_DEBUG("ttl=%d",ttl);
786 // LOG_DEBUG("ttl=%d",ttl);
792 // ------------------------------------------
793 int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
801 readChanAtoms(atom,numc,bcs);
802 }else if (id == PCP_ROOT)
805 throw StreamException("Unauthorized root message");
807 readRootAtoms(atom,numc,bcs);
809 }else if (id == PCP_HOST)
811 readHostAtoms(atom,numc,bcs,hit);
812 Channel *ch = chanMgr->findChannelByID(hit.chanID);
813 if (ch && (ch->isBroadcasting() || servMgr->vpDebug)){
814 if (servMgr->autoPort0Kick && (hit.numHops == 1) && (hit.firewalled || (!hit.relay && !hit.numRelays))){
816 hit.host.IPtoStr(tmp);
817 LOG_DEBUG("host that can't relay is disconnect: %s", tmp);
818 rBan = PCP_ERROR_BANNED;
820 if (servMgr->allowOnlyVP && (hit.numHops == 1) && !hit.version_vp){
822 hit.host.IPtoStr(tmp);
823 LOG_DEBUG("host that is not VP is disconnect: %s", tmp);
824 rBan = PCP_ERROR_BANNED;
828 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
831 atom.readString(msg.data,sizeof(msg.data),dlen);
832 LOG_DEBUG("PCP got text: %s",msg.cstr());
833 }else if (id == PCP_BCST)
835 r = readBroadcastAtoms(atom,numc,bcs);
836 }else if (id == PCP_HELO)
838 atom.skip(numc,dlen);
839 atom.writeParent(PCP_OLEH,1);
840 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
841 }else if (id == PCP_PUSH)
844 readPushAtoms(atom,numc,bcs);
845 }else if (id == PCP_OK)
849 }else if (id == PCP_QUIT)
855 }else if (id == PCP_ATOM)
857 for(int i=0; i<numc; i++)
860 ID4 aid = atom.read(nc,nd);
861 int ar = procAtom(atom,aid,nc,nd,bcs);
868 LOG_CHANNEL("PCP skip: %s",id.getString().str());
869 atom.skip(numc,dlen);
879 // ------------------------------------------
880 int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
883 ID4 id = atom.read(numc,dlen);
885 return procAtom(atom,id,numc,dlen,bcs);