1 // ------------------------------------------------
6 // (c) 2002-4 peercast.org
7 // ------------------------------------------------
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 // ------------------------------------------------
24 #include "chkMemoryLeak.h"
25 #define DEBUG_NEW new(__FILE__, __LINE__)
30 // ------------------------------------------
31 void PCPStream::init(GnuID &rid)
37 nextRootPacket = 0; // 0 seconds (never)
40 inData.accept = ChanPacket::T_PCP;
43 outData.accept = ChanPacket::T_PCP;
45 // ------------------------------------------
46 void PCPStream::readVersion(Stream &in)
48 int len = in.readInt();
51 throw StreamException("Invalid PCP");
53 int ver = in.readInt();
55 LOG_DEBUG("PCP ver: %d",ver);
57 // ------------------------------------------
58 void PCPStream::readHeader(Stream &in,Channel *)
60 // AtomStream atom(in);
62 // if (in.readInt() != PCP_CONNECT)
63 // throw StreamException("Not PCP");
67 // ------------------------------------------
68 bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
71 if (!destID.isSame(remoteID))
72 if (!routeList.contains(destID))
75 return outData.writePacket(pack);
77 // ------------------------------------------
78 void PCPStream::flush(Stream &in)
81 // send outward packets
82 while (outData.numPending())
84 outData.readPacket(pack);
89 // ------------------------------------------
90 unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
93 unsigned int len = 0, skip = 0;
95 while (outData.numPending())
97 outData.readPacketPri(pack);
99 if (size >= len + pack.len) {
107 LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
112 // ------------------------------------------
113 int PCPStream::readPacket(Stream &in,Channel *)
117 return readPacket(in,bcs);
119 // ------------------------------------------
120 int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
122 int error = PCP_ERROR_GENERAL;
128 MemoryStream mem(pack.data,sizeof(pack.data));
129 AtomStream patom(mem);
132 // send outward packets
133 error = PCP_ERROR_WRITE;
134 if (outData.numPending())
136 outData.readPacket(pack);
139 error = PCP_ERROR_GENERAL;
141 if (outData.willSkip())
143 error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
144 throw StreamException("Send too slow");
148 error = PCP_ERROR_READ;
149 // poll for new downward packet
155 id = atom.read(numc,numd);
158 pack.len = patom.writeAtoms(id, in, numc, numd);
159 pack.type = ChanPacket::T_PCP;
161 //inData.writePacket(pack);
163 error = PCP_ERROR_GENERAL;
165 // process downward packets
166 //if (inData.numPending())
168 //inData.readPacket(pack);
173 id = patom.read(numc,numd);
175 error = PCPStream::procAtom(patom,id,numc,numd,bcs);
179 throw StreamException("PCP exception");
185 }catch(StreamException &e)
187 LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
193 // ------------------------------------------
194 void PCPStream::readEnd(Stream &,Channel *)
199 // ------------------------------------------
200 void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
207 for(int i=0; i<numc; i++)
210 ID4 id = atom.read(c,d);
212 if (id == PCP_PUSH_IP)
213 host.ip = atom.readInt();
214 else if (id == PCP_PUSH_PORT)
215 host.port = atom.readShort();
216 else if (id == PCP_PUSH_CHANID)
217 atom.readBytes(chanID.id,16);
220 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
235 Channel *ch = chanMgr->findChannelByID(chanID);
237 if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
238 s = servMgr->allocServent();
240 s = servMgr->allocServent();
245 LOG_DEBUG("GIVing to %s",ipstr);
246 s->initGIV(host,chanID);
251 // ------------------------------------------
252 void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
256 for(int i=0; i<numc; i++)
259 ID4 id = atom.read(c,d);
261 if (id == PCP_ROOT_UPDINT)
263 int si = atom.readInt();
265 chanMgr->setUpdateInterval(si);
266 LOG_DEBUG("PCP got new host update interval: %ds",si);
267 }else if (id == PCP_ROOT_URL)
269 url = "http://www.peercast.org/";
271 atom.readString(loc.data,sizeof(loc.data),d);
274 }else if (id == PCP_ROOT_CHECKVER)
276 unsigned int newVer = atom.readInt();
277 if (newVer > PCP_CLIENT_VERSION)
279 strcpy(servMgr->downloadURL,url.cstr());
280 peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
282 LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
284 }else if (id == PCP_ROOT_NEXT)
286 unsigned int time = atom.readInt();
290 unsigned int ctime = sys->getTime();
291 nextRootPacket = ctime+time;
292 LOG_DEBUG("PCP expecting next root packet in %ds",time);
298 }else if (id == PCP_ROOT_UPDATE)
302 chanMgr->broadcastTrackerUpdate(remoteID,true);
304 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
308 atom.readString(newMsg.data,sizeof(newMsg.data),d);
309 if (!newMsg.isSame(servMgr->rootMsg.cstr()))
311 servMgr->rootMsg = newMsg;
312 LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
313 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
317 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
323 // ------------------------------------------
324 void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
330 for(int i=0; i<numc; i++)
333 ID4 id = atom.read(c,d);
335 if (id == PCP_CHAN_PKT_TYPE)
337 type = atom.readID4();
339 if (type == PCP_CHAN_PKT_HEAD)
340 pack.type = ChanPacket::T_HEAD;
341 else if (type == PCP_CHAN_PKT_DATA)
342 pack.type = ChanPacket::T_DATA;
344 pack.type = ChanPacket::T_UNKNOWN;
346 }else if (id == PCP_CHAN_PKT_POS)
348 pack.pos = atom.readInt();
351 }else if (id == PCP_CHAN_PKT_DATA)
354 atom.readBytes(pack.data,pack.len);
358 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
366 int diff = pack.pos - ch->streamPos;
369 LOG_DEBUG("PCP skipping %s%8d (%10d -> %10d) count=%2d",(diff>0)?"+":"",diff,ch->streamPos,pack.pos, ch->skipCount);
370 if (ch->lastSkipTime + 120 < sys->getTime()){
373 ch->lastSkipTime = sys->getTime();
374 ch->skipCount++; //JP-EX
378 if (servMgr->autoBumpSkipCount) //JP-EX
380 if ((ch->skipCount > servMgr->autoBumpSkipCount) && !(servMgr->disableAutoBumpIfDirect && ch->sourceHost.tracker)) //JP-MOD
382 LOG_DEBUG("Auto bump");
387 if (pack.type == ChanPacket::T_HEAD)
389 LOG_DEBUG("New head packet at %d",pack.pos);
391 if (servMgr->keepDownstreams)
392 renewhead = (memcmp(ch->headPack.data, pack.data, pack.len) != 0);
397 // check for stream restart
400 LOG_CHANNEL("PCP resetting stream");
405 if (renewhead || ch->lastStopTime + 30 < sys->getTime()) {
406 // check for stream restart
409 LOG_CHANNEL("PCP resetting stream");
416 ch->rawData.writePacket(pack,true);
417 ch->streamPos = pack.pos+pack.len;
420 }else if (pack.type == ChanPacket::T_DATA)
422 ch->rawData.writePacket(pack,true);
423 ch->streamPos = pack.pos+pack.len;
428 // update this parent packet stream position
429 if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
430 bcs.streamPos = pack.pos;
433 // -----------------------------------
434 void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, ChanHit &hit, bool flg)
438 GnuID chanID = bcs.chanID; //use default
442 unsigned int ipNum=0;
444 for(int i=0; i<numc; i++)
447 ID4 id = atom.read(c,d);
449 if (id == PCP_HOST_IP)
451 unsigned int ip = atom.readInt();
452 hit.rhost[ipNum].ip = ip;
453 }else if (id == PCP_HOST_PORT)
455 int port = atom.readShort();
456 hit.rhost[ipNum++].port = port;
461 else if (id == PCP_HOST_NUML)
463 hit.numListeners = atom.readInt();
464 if (hit.numListeners > 10)
465 hit.numListeners = 10;
467 else if (id == PCP_HOST_NUMR)
469 hit.numRelays = atom.readInt();
470 if (hit.numRelays > 100)
473 else if (id == PCP_HOST_UPTIME)
474 hit.upTime = atom.readInt();
475 else if (id == PCP_HOST_OLDPOS)
476 hit.oldestPos = atom.readInt();
477 else if (id == PCP_HOST_NEWPOS)
478 hit.newestPos = atom.readInt();
479 else if (id == PCP_HOST_VERSION)
480 hit.version = atom.readInt();
481 else if (id == PCP_HOST_VERSION_VP)
482 hit.version_vp = atom.readInt();
483 else if (id == PCP_HOST_VERSION_EX_PREFIX)
484 atom.readBytes(hit.version_ex_prefix,2);
485 else if (id == PCP_HOST_VERSION_EX_NUMBER){
486 hit.version_ex_number = atom.readShort();
488 else if (id == PCP_HOST_FLAGS1)
490 int fl1 = atom.readChar();
492 hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
493 hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
494 hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
495 hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
496 hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
497 hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
500 }else if (id == PCP_HOST_ID)
501 atom.readBytes(hit.sessionID.id,16);
502 else if (id == PCP_HOST_CHANID)
503 atom.readBytes(chanID.id,16);
504 else if (id == PCP_HOST_UPHOST_IP)
505 hit.uphost.ip = atom.readInt();
506 else if (id == PCP_HOST_UPHOST_PORT)
507 hit.uphost.port = atom.readInt();
508 else if (id == PCP_HOST_UPHOST_HOPS)
509 hit.uphostHops = atom.readInt();
510 else if (id == PCP_HOST_CLAP_PP){ //JP-MOD
511 hit.clap_pp = atom.readInt();
512 if (hit.clap_pp & 1){
513 Channel *c = chanMgr->findChannelByID(chanID);
514 if(c && c->isBroadcasting()){
517 sjis.convertTo(String::T_SJIS);
518 peercastApp->notifyMessage(ServMgr::NT_APPLAUSE, sjis);
523 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
528 hit.host = hit.rhost[0];
531 hit.numHops = bcs.numHops;
533 hit.servent_id = bcs.servent_id;
535 if (flg && (bcs.ttl != 0)){
536 // LOG_DEBUG("readHostAtoms HITLISTLOCK ON-------------");
537 chanMgr->hitlistlock.on();
539 chanMgr->addHit(hit);
541 chanMgr->delHit(hit);
542 // LOG_DEBUG("readHostAtoms HITLISTLOCK OFF-------------");
543 chanMgr->hitlistlock.off();
546 if (hit.numHops == 1){
547 Servent *sv = servMgr->findServentByServentID(hit.servent_id);
548 if (sv && sv->getHost().ip == hit.host.ip){
549 // LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
550 sv->waitPort = hit.host.port;
551 //hit.lastSendSeq = sv->serventHit.lastSendSeq;
552 sv->serventHit = hit;
557 // ------------------------------------------
558 void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
561 ChanHitList *chl=NULL;
564 ch = chanMgr->findChannelByID(bcs.chanID);
565 chl = chanMgr->findHitListByID(bcs.chanID);
570 newInfo = chl->info;*/
573 ChanHitList *chl=NULL;
574 ChanInfo newInfo, chaInfo;
582 for(int i=0; i<numc; i++)
586 ID4 id = atom.read(c,d);
588 if ((id == PCP_CHAN_PKT) && (ch))
590 readPktAtoms(ch,atom,c,bcs);
591 }else if (id == PCP_CHAN_INFO)
593 newInfo.readInfoAtoms(atom,c);
595 }else if (id == PCP_CHAN_TRACK)
597 newInfo.readTrackAtoms(atom,c);
599 }else if (id == PCP_CHAN_BCID)
601 atom.readBytes(newInfo.bcID.id,16);
603 }else if (id == PCP_CHAN_KEY) // depreciated
605 atom.readBytes(newInfo.bcID.id,16);
606 newInfo.bcID.id[0] = 0; // clear flags
608 }else if (id == PCP_CHAN_ID)
610 atom.readBytes(newInfo.id.id,16);
612 ch = chanMgr->findChannelByID(newInfo.id);
613 chl = chanMgr->findHitListByID(newInfo.id);
617 LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
622 chl = chanMgr->findHitList(newInfo);
625 chl = chanMgr->addHitList(newInfo);
629 chl->info.update(newInfo);
631 if (!servMgr->chanLog.isEmpty())
633 //if (chl->numListeners())
638 file.openWriteAppend(servMgr->chanLog.cstr());
640 XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
641 XML::Node *n = chl->info.createChannelXML();
642 n->add(chl->createXML(false));
643 n->add(chl->info.createTrackXML());
649 }catch(StreamException &e)
651 LOG_ERROR("Unable to update channel log: %s",e.msg);
658 if (ch && !ch->isBroadcasting())
659 ch->updateInfo(newInfo);
663 // ------------------------------------------
664 int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
672 char ver_ex_prefix[2];
673 int ver_ex_number = 0;
678 bcs.initPacketSettings();
680 MemoryStream pmem(pack.data,sizeof(pack.data));
681 AtomStream patom(pmem);
683 patom.writeParent(PCP_BCST,numc);
685 for(int i=0; i<numc; i++)
688 ID4 id = atom.read(c,d);
690 if (id == PCP_BCST_TTL)
692 bcs.ttl = atom.readChar()-1;
693 patom.writeChar(id,bcs.ttl);
694 }else if (id == PCP_BCST_HOPS)
696 bcs.numHops = atom.readChar()+1;
697 patom.writeChar(id,bcs.numHops);
699 }else if (id == PCP_BCST_FROM)
701 atom.readBytes(fromID.id,16);
702 patom.writeBytes(id,fromID.id,16);
704 routeList.add(fromID);
705 }else if (id == PCP_BCST_GROUP)
707 bcs.group = atom.readChar();
708 patom.writeChar(id,bcs.group);
709 }else if (id == PCP_BCST_DEST)
711 atom.readBytes(destID.id,16);
712 patom.writeBytes(id,destID.id,16);
713 bcs.forMe = destID.isSame(servMgr->sessionID);
718 destID.toStr(idstr1);
719 servMgr->sessionID.toStr(idstr2);
721 }else if (id == PCP_BCST_CHANID)
723 atom.readBytes(bcs.chanID.id,16);
724 patom.writeBytes(id,bcs.chanID.id,16);
725 }else if (id == PCP_BCST_VERSION)
727 ver = atom.readInt();
728 patom.writeInt(id,ver);
729 }else if (id == PCP_BCST_VERSION_VP)
731 ver_vp = atom.readInt();
732 patom.writeInt(id,ver_vp);
733 }else if (id == PCP_BCST_VERSION_EX_PREFIX)
735 atom.readBytes(ver_ex_prefix,2);
736 patom.writeBytes(id,ver_ex_prefix,2);
737 }else if (id == PCP_BCST_VERSION_EX_NUMBER)
739 ver_ex_number = atom.readShort();
740 patom.writeShort(id,ver_ex_number);
741 }else if (id == PCP_HOST)
744 readHostAtoms(atom,c,bcs,hit,false);
745 Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
746 if (hit.uphost.ip == 0){
747 // LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
748 if (bcs.numHops == 1){
749 hit.uphost.ip = servMgr->serverHost.ip;
750 hit.uphost.port = servMgr->serverHost.port;
753 //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
755 hit.uphost.ip = sv->getHost().ip;
756 hit.uphost.port = sv->waitPort;
757 hit.uphostHops = bcs.numHops - 1;
762 ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
763 && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
764 || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
765 || (hit.numHops != 1 && chanMgr->findParentHit(hit))))
767 int oldPos = pmem.pos;
768 hit.writeAtoms(patom, hit.chanID);
770 r = readAtom(patom,bcs);
772 char tmp[80], tmp2[80], tmp3[80];
776 hit.uphost.toStr(tmp);
777 hit.host.toStr(tmp2);
779 sv->getHost().toStr(tmp3);
780 LOG_DEBUG("### Invalid bcst: hops=%d, l/r = %d/%d, ver=%d(VP%04d), ttl=%d",
781 bcs.numHops,hit.numListeners, hit.numRelays, ver,ver_vp,bcs.ttl);
782 LOG_DEBUG("### %s <- %s <- sv(%s)",
787 // copy and process atoms
788 int oldPos = pmem.pos;
789 patom.writeAtoms(id,atom.io,c,d);
791 r = readAtom(patom,bcs);
798 fromID.toStr(fromStr);
802 destID.toStr(destStr);
804 bcs.chanID.toStr(tmp);
807 if (servMgr->lastPCPFromID.isSame(fromID)
808 && time(NULL) - servMgr->lastPCPBcstTime < 3)
810 memcpy(servMgr->lastPCPFromID.id, fromID.id, 16);
811 servMgr->lastPCPBcstTime = time(NULL);
812 LOG_DEBUG("PCP bcst reject: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
813 bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,bcs.ttl);
817 memcpy(servMgr->lastPCPFromID.id, fromID.id, 16);
818 servMgr->lastPCPBcstTime = time(NULL);
823 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
824 bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,bcs.ttl);
826 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(VP%04d), from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,ver_vp,fromStr,destStr,bcs.ttl);
828 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,fromStr,destStr,bcs.ttl);
832 if (fromID.isSame(servMgr->sessionID))
834 LOG_ERROR("BCST loopback");
835 return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
838 // broadcast back out if ttl > 0
839 if ((bcs.ttl>0) && (!bcs.forMe))
842 pack.type = ChanPacket::T_PCP;
844 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
846 pack.priority = 11 - bcs.numHops;
847 chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
850 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
852 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
855 if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
857 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
860 if (bcs.group & (PCP_BCST_GROUP_RELAYS))
862 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
866 // LOG_DEBUG("ttl=%d",ttl);
869 // LOG_DEBUG("ttl=%d",ttl);
875 // ------------------------------------------
876 int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
884 readChanAtoms(atom,numc,bcs);
885 }else if (id == PCP_ROOT)
888 throw StreamException("Unauthorized root message");
890 readRootAtoms(atom,numc,bcs);
892 }else if (id == PCP_HOST)
894 readHostAtoms(atom,numc,bcs,hit);
895 Channel *ch = chanMgr->findChannelByID(hit.chanID);
896 if (ch && (ch->isBroadcasting() || servMgr->vpDebug)){
897 if (servMgr->autoPort0Kick && (hit.numHops == 1) && (hit.firewalled || (!hit.relay && !hit.numRelays))){
899 hit.host.IPtoStr(tmp);
900 LOG_DEBUG("host that can't relay is disconnect: %s", tmp);
901 rBan = PCP_ERROR_BANNED;
903 if (servMgr->allowOnlyVP && (hit.numHops == 1) && !hit.version_vp){
905 hit.host.IPtoStr(tmp);
906 LOG_DEBUG("host that is not VP is disconnect: %s", tmp);
907 rBan = PCP_ERROR_BANNED;
911 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG)) // PCP_MESG_ASCII to be depreciated
914 atom.readString(msg.data,sizeof(msg.data),dlen);
915 LOG_DEBUG("PCP got text: %s",msg.cstr());
916 }else if (id == PCP_BCST)
918 r = readBroadcastAtoms(atom,numc,bcs);
919 }else if (id == PCP_HELO)
921 atom.skip(numc,dlen);
922 atom.writeParent(PCP_OLEH,1);
923 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
924 }else if (id == PCP_PUSH)
927 readPushAtoms(atom,numc,bcs);
928 }else if (id == PCP_OK)
932 }else if (id == PCP_QUIT)
938 }else if (id == PCP_ATOM)
940 for(int i=0; i<numc; i++)
943 ID4 aid = atom.read(nc,nd);
944 int ar = procAtom(atom,aid,nc,nd,bcs);
951 LOG_CHANNEL("PCP skip: %s",id.getString().str());
952 atom.skip(numc,dlen);
962 // ------------------------------------------
963 int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
966 ID4 id = atom.read(numc,dlen);
968 return procAtom(atom,id,numc,dlen,bcs);