OSDN Git Service

VP24マージ
[peercast-im/PeerCastIM.git] / c: / Git / PeerCast.root / PeerCast / core / common / pcp.cpp
1 // ------------------------------------------------
2 // File : pcp.cpp
3 // Date: 1-mar-2004
4 // Author: giles
5 //
6 // (c) 2002-4 peercast.org
7 // ------------------------------------------------
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 // ------------------------------------------------
18
19 #include "atom.h"
20 #include "pcp.h"
21 #include "peercast.h"
22 #include "version2.h"
23 #ifdef _DEBUG
24 #include "chkMemoryLeak.h"
25 #define DEBUG_NEW new(__FILE__, __LINE__)
26 #define new DEBUG_NEW
27 #endif
28
29 // ------------------------------------------
30 void PCPStream::init(GnuID &rid)
31 {
32         remoteID = rid;
33         routeList.clear();
34
35         lastPacketTime = 0;
36         nextRootPacket = 0;      // 0 seconds (never)
37
38         inData.init();
39         inData.accept = ChanPacket::T_PCP;
40
41         outData.init();
42         outData.accept = ChanPacket::T_PCP;
43 }
44 // ------------------------------------------
45 void PCPStream::readVersion(Stream &in)
46 {
47         int len = in.readInt();
48
49         if (len != 4)
50                 throw StreamException("Invalid PCP");
51
52         int ver = in.readInt();
53
54         LOG_DEBUG("PCP ver: %d",ver);
55 }
56 // ------------------------------------------
57 void PCPStream::readHeader(Stream &in,Channel *)
58 {
59 //      AtomStream atom(in);
60
61 //      if (in.readInt() != PCP_CONNECT)
62 //              throw StreamException("Not PCP");
63
64 //      readVersion(in);
65 }
66 // ------------------------------------------
67 bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
68 {
69         if (destID.isSet())
70                 if (!destID.isSame(remoteID))
71                         if (!routeList.contains(destID))
72                                 return false;
73
74         return outData.writePacket(pack);
75 }
76 // ------------------------------------------
77 void PCPStream::flush(Stream &in)
78 {
79         ChanPacket pack;
80         // send outward packets
81         while (outData.numPending())
82         {
83                 outData.readPacket(pack);
84                 pack.writeRaw(in);
85         }
86 }
87 // ------------------------------------------
88 int PCPStream::readPacket(Stream &in,Channel *)
89 {
90         BroadcastState bcs;
91         return readPacket(in,bcs);
92 }
93 // ------------------------------------------
94 int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
95 {
96         int error = PCP_ERROR_GENERAL;
97         try
98         {
99                 AtomStream atom(in);
100
101                 ChanPacket pack;
102                 MemoryStream mem(pack.data,sizeof(pack.data));
103                 AtomStream patom(mem);
104
105
106                 // send outward packets
107                 error = PCP_ERROR_WRITE;
108                 if (outData.numPending())
109                 {
110                         outData.readPacket(pack);
111                         pack.writeRaw(in);
112                 }
113                 error = PCP_ERROR_GENERAL;
114
115                 if (outData.willSkip())
116                 {
117                         error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
118                         throw StreamException("Send too slow");
119                 }
120
121
122                 error = PCP_ERROR_READ;
123                 // poll for new downward packet
124                 if (in.readReady())
125                 {
126                         int numc,numd;
127                         ID4 id;
128
129                         id = atom.read(numc,numd);
130
131                         mem.rewind();
132                         pack.len = patom.writeAtoms(id, in, numc, numd);
133                         pack.type = ChanPacket::T_PCP;
134
135                         //inData.writePacket(pack);
136                 //}
137                 error = PCP_ERROR_GENERAL;
138
139                 // process downward packets
140                 //if (inData.numPending())
141                 //{
142                         //inData.readPacket(pack);
143
144                         mem.rewind();
145
146                         //int numc,numd;
147                         id = patom.read(numc,numd);
148
149                         error = PCPStream::procAtom(patom,id,numc,numd,bcs);
150                         
151                         if (error)
152                         {
153                                 throw StreamException("PCP exception");
154                         }
155                 }
156
157                 error = 0;
158
159         }catch(StreamException &e)
160         {
161                 LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
162         }
163
164         return error;
165 }
166
167 // ------------------------------------------
168 void PCPStream::readEnd(Stream &,Channel *)
169 {
170 }
171
172
173 // ------------------------------------------
174 void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
175 {
176         Host host;
177         GnuID   chanID;
178
179         chanID.clear();
180
181         for(int i=0; i<numc; i++)
182         {
183                 int c,d;
184                 ID4 id = atom.read(c,d);
185
186                 if (id == PCP_PUSH_IP)
187                         host.ip = atom.readInt();
188                 else if (id == PCP_PUSH_PORT)
189                         host.port = atom.readShort();
190                 else if (id == PCP_PUSH_CHANID)
191                         atom.readBytes(chanID.id,16);
192                 else
193                 {
194                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
195                         atom.skip(c,d);
196                 }
197         }
198
199
200         if (bcs.forMe)
201         {
202                 char ipstr[64];
203                 host.toStr(ipstr);
204
205                 Servent *s = NULL;
206
207                 if (chanID.isSet())
208                 {
209                         Channel *ch = chanMgr->findChannelByID(chanID);
210                         if (ch)
211                                 if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
212                                         s = servMgr->allocServent();
213                 }else{
214                         s = servMgr->allocServent();
215                 }
216
217                 if (s)
218                 {
219                         LOG_DEBUG("GIVing to %s",ipstr);
220                         s->initGIV(host,chanID);
221                 }
222         }
223
224 }
225 // ------------------------------------------
226 void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
227 {
228         String url;
229
230         for(int i=0; i<numc; i++)
231         {
232                 int c,d;
233                 ID4 id = atom.read(c,d);
234
235                 if (id == PCP_ROOT_UPDINT)
236                 {
237                         int si = atom.readInt();
238
239                         chanMgr->setUpdateInterval(si);
240                         LOG_DEBUG("PCP got new host update interval: %ds",si);
241                 }else if (id == PCP_ROOT_URL)
242                 {
243                         url = "http://www.peercast.org/";
244                         String loc;
245                         atom.readString(loc.data,sizeof(loc.data),d);
246                         url.append(loc);
247
248                 }else if (id == PCP_ROOT_CHECKVER)
249                 {
250                         unsigned int newVer = atom.readInt();
251                         if (newVer > PCP_CLIENT_VERSION)
252                         {
253                                 strcpy(servMgr->downloadURL,url.cstr());
254                                 peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
255                         }
256                         LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
257
258                 }else if (id == PCP_ROOT_NEXT)
259                 {
260                         unsigned int time = atom.readInt();
261
262                         if (time)
263                         {
264                                 unsigned int ctime = sys->getTime();
265                                 nextRootPacket = ctime+time;
266                                 LOG_DEBUG("PCP expecting next root packet in %ds",time);
267                         }else
268                         {
269                                 nextRootPacket = 0;
270                         }
271
272                 }else if (id == PCP_ROOT_UPDATE)
273                 {
274                         atom.skip(c,d);
275
276                         chanMgr->broadcastTrackerUpdate(remoteID,true);
277
278                 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))                   // PCP_MESG_ASCII to be depreciated 
279                 {
280                         String newMsg;
281
282                         atom.readString(newMsg.data,sizeof(newMsg.data),d);
283                         if (!newMsg.isSame(servMgr->rootMsg.cstr()))
284                         {
285                                 servMgr->rootMsg = newMsg;
286                                 LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
287                                 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
288                         }
289                 }else
290                 {
291                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
292                         atom.skip(c,d);
293                 }
294         }
295 }
296
297 // ------------------------------------------
298 void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
299 {
300         ChanPacket pack;
301         ID4 type;
302
303
304         for(int i=0; i<numc; i++)
305         {
306                 int c,d;
307                 ID4 id = atom.read(c,d);
308
309                 if (id == PCP_CHAN_PKT_TYPE)
310                 {
311                         type = atom.readID4();
312
313                         if (type == PCP_CHAN_PKT_HEAD)
314                                 pack.type = ChanPacket::T_HEAD;
315                         else if (type == PCP_CHAN_PKT_DATA)
316                                 pack.type = ChanPacket::T_DATA;
317                         else
318                                 pack.type = ChanPacket::T_UNKNOWN;
319
320                 }else if (id == PCP_CHAN_PKT_POS)
321                 {
322                         pack.pos = atom.readInt();
323
324
325                 }else if (id == PCP_CHAN_PKT_DATA)
326                 {
327                         pack.len = d;
328                         atom.readBytes(pack.data,pack.len);
329                 }
330                 else
331                 {
332                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
333                         atom.skip(c,d);
334                 }
335         }
336
337         if (ch)
338         {
339
340                 int diff = pack.pos - ch->streamPos;
341                 if (diff)
342                 {
343                         LOG_DEBUG("PCP skipping %s%8d (%10d -> %10d) count=%2d",(diff>0)?"+":"",diff,ch->streamPos,pack.pos, ch->skipCount);
344                         if (ch->lastSkipTime + 120 < sys->getTime()){
345                                 ch->skipCount = 0;
346                         }
347                         ch->lastSkipTime = sys->getTime();
348                         ch->skipCount++; //JP-EX
349                         pack.skip = true;
350                 }
351
352                 if (servMgr->autoBumpSkipCount) //JP-EX
353                 {
354                         if (ch->skipCount > servMgr->autoBumpSkipCount)
355                         {
356                                 LOG_DEBUG("Auto bump");
357                                 ch->bump = true;
358                         }
359                 }
360
361                 if (pack.type == ChanPacket::T_HEAD)
362                 {
363                         LOG_DEBUG("New head packet at %d",pack.pos);
364                         bool renewhead;
365                         if (servMgr->keepDownstreams)
366                                 renewhead = (memcmp(ch->headPack.data, pack.data, pack.len) != 0);
367                         else
368                                 renewhead = true;
369
370                         /*
371                         // check for stream restart
372                         if (pack.pos == 0)              
373                         {
374                                 LOG_CHANNEL("PCP resetting stream");
375                                 ch->streamIndex++;
376                                 ch->rawData.init();
377                         }
378                         */
379                         if (renewhead || ch->lastStopTime + 30 < sys->getTime()) {
380                                 // check for stream restart
381                                 if (pack.pos == 0)
382                                 {
383                                         LOG_CHANNEL("PCP resetting stream");
384                                         ch->streamIndex++;
385                                         ch->rawData.init();
386                                 }
387
388                                 ch->headPack = pack;
389
390                                 ch->rawData.writePacket(pack,true);
391                                 ch->streamPos = pack.pos+pack.len;
392                         }
393
394                 }else if (pack.type == ChanPacket::T_DATA)
395                 {
396                         ch->rawData.writePacket(pack,true);
397                         ch->streamPos = pack.pos+pack.len;
398                 }
399
400         }
401
402         // update this parent packet stream position
403         if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
404                 bcs.streamPos = pack.pos;
405
406 }
407 // -----------------------------------
408 void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, ChanHit &hit, bool flg)
409 {
410 //      ChanHit hit;
411         hit.init();
412         GnuID chanID = bcs.chanID;      //use default
413
414         bool busy=false;
415
416         unsigned int ipNum=0;
417
418         for(int i=0; i<numc; i++)
419         {
420                 int c,d;
421                 ID4 id = atom.read(c,d);
422
423                 if (id == PCP_HOST_IP)
424                 {
425                         unsigned int ip = atom.readInt();
426                         hit.rhost[ipNum].ip = ip;
427                 }else if (id == PCP_HOST_PORT)
428                 {
429                         int port = atom.readShort();
430                         hit.rhost[ipNum++].port = port;
431
432                         if (ipNum > 1)
433                                 ipNum = 1;
434                 }
435                 else if (id == PCP_HOST_NUML)
436                         hit.numListeners = atom.readInt();
437                 else if (id == PCP_HOST_NUMR)
438                         hit.numRelays = atom.readInt();
439                 else if (id == PCP_HOST_UPTIME)
440                         hit.upTime = atom.readInt();
441                 else if (id == PCP_HOST_OLDPOS)
442                         hit.oldestPos = atom.readInt();
443                 else if (id == PCP_HOST_NEWPOS)
444                         hit.newestPos = atom.readInt();
445                 else if (id == PCP_HOST_VERSION)
446                         hit.version = atom.readInt();
447                 else if (id == PCP_HOST_VERSION_VP)
448                         hit.version_vp = atom.readInt();
449                 else if (id == PCP_HOST_VERSION_EX_PREFIX)
450                         atom.readBytes(hit.version_ex_prefix,2);
451                 else if (id == PCP_HOST_VERSION_EX_NUMBER){
452                         hit.version_ex_number = atom.readShort();
453                 }
454                 else if (id == PCP_HOST_FLAGS1)
455                 {
456                         int fl1 = atom.readChar();
457
458                         hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
459                         hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
460                         hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
461                         hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
462                         hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
463                         hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
464
465
466                 }else if (id == PCP_HOST_ID)
467                         atom.readBytes(hit.sessionID.id,16);
468                 else if (id == PCP_HOST_CHANID)
469                         atom.readBytes(chanID.id,16);
470                 else if (id == PCP_HOST_UPHOST_IP)
471                         hit.uphost.ip = atom.readInt();
472                 else if (id == PCP_HOST_UPHOST_PORT)
473                         hit.uphost.port = atom.readInt();
474                 else if (id == PCP_HOST_UPHOST_HOPS)
475                         hit.uphostHops = atom.readInt();
476                 else
477                 {
478                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
479                         atom.skip(c,d);
480                 }
481         }
482
483         hit.host = hit.rhost[0];
484         hit.chanID = chanID;
485
486         hit.numHops = bcs.numHops;
487
488         hit.servent_id = bcs.servent_id;
489
490         if (flg){
491 //              LOG_DEBUG("readHostAtoms HITLISTLOCK ON-------------");
492                 chanMgr->hitlistlock.on();
493                 if (hit.recv)
494                         chanMgr->addHit(hit);
495                 else
496                         chanMgr->delHit(hit);
497 //              LOG_DEBUG("readHostAtoms HITLISTLOCK OFF-------------");
498                 chanMgr->hitlistlock.off();
499         }
500
501         if (hit.numHops == 1){
502                 Servent *sv = servMgr->findServentByServentID(hit.servent_id);
503                 if (sv){
504 //                      LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
505                         sv->waitPort = hit.host.port;
506                 }
507         }
508 }
509
510 // ------------------------------------------
511 void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
512 {
513 /*      Channel *ch=NULL;
514         ChanHitList *chl=NULL;
515         ChanInfo newInfo;
516
517         ch = chanMgr->findChannelByID(bcs.chanID);
518         chl = chanMgr->findHitListByID(bcs.chanID);
519
520         if (ch)
521                 newInfo = ch->info;
522         else if (chl)
523                 newInfo = chl->info;*/
524
525         Channel *ch=NULL;
526         ChanHitList *chl=NULL;
527         ChanInfo newInfo, chaInfo;
528
529         ch = this->parent;
530         if (ch){
531                 newInfo = ch->info;
532                 chaInfo = ch->info;
533         }
534
535         for(int i=0; i<numc; i++)
536         {
537
538                 int c,d;
539                 ID4 id = atom.read(c,d);
540
541                 if ((id == PCP_CHAN_PKT) && (ch))
542                 {
543                         readPktAtoms(ch,atom,c,bcs);
544                 }else if (id == PCP_CHAN_INFO)
545                 {
546                         newInfo.readInfoAtoms(atom,c);
547
548                 }else if (id == PCP_CHAN_TRACK)
549                 {
550                         newInfo.readTrackAtoms(atom,c);
551
552                 }else if (id == PCP_CHAN_BCID)
553                 {
554                         atom.readBytes(newInfo.bcID.id,16);
555
556                 }else if (id == PCP_CHAN_KEY)                   // depreciated
557                 {
558                         atom.readBytes(newInfo.bcID.id,16);
559                         newInfo.bcID.id[0] = 0;                         // clear flags
560
561                 }else if (id == PCP_CHAN_ID)
562                 {
563                         atom.readBytes(newInfo.id.id,16);
564
565                         ch = chanMgr->findChannelByID(newInfo.id);
566                         chl = chanMgr->findHitListByID(newInfo.id);
567
568                 }else
569                 {
570                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
571                         atom.skip(c,d);
572                 }
573         }
574
575         chl = chanMgr->findHitList(newInfo);
576
577         if (!chl)
578                 chl = chanMgr->addHitList(newInfo);
579
580         if (chl)
581         {
582                 chl->info.update(newInfo);
583         
584                 if (!servMgr->chanLog.isEmpty())
585                 {
586                         //if (chl->numListeners())
587                         {
588                                 try
589                                 {
590                                         FileStream file;
591                                         file.openWriteAppend(servMgr->chanLog.cstr());
592
593                                         XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
594                                         XML::Node *n = chl->info.createChannelXML();
595                                         n->add(chl->createXML(false));
596                                         n->add(chl->info.createTrackXML());
597                                         rn->add(n);     
598         
599                                         rn->write(file,0);
600                                         delete rn;
601                                         file.close();
602                                 }catch(StreamException &e)
603                                 {
604                                         LOG_ERROR("Unable to update channel log: %s",e.msg);
605                                 }
606                         }
607                 }
608
609         }
610
611         if (ch && !ch->isBroadcasting())
612                 ch->updateInfo(newInfo);
613
614
615 }
616 // ------------------------------------------
617 int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
618 {
619         ChanPacket pack;
620         int ttl=0;              
621         int ver=0;
622         int ver_vp=0;
623         GnuID fromID,destID;
624         int r=0;
625         char ver_ex_prefix[2];
626         int ver_ex_number = 0;
627
628         fromID.clear();
629         destID.clear();
630
631         bcs.initPacketSettings();
632
633         MemoryStream pmem(pack.data,sizeof(pack.data));
634         AtomStream patom(pmem);
635
636         patom.writeParent(PCP_BCST,numc);
637
638         for(int i=0; i<numc; i++)
639         {
640                 int c,d;
641                 ID4 id = atom.read(c,d);
642                 
643                 if (id == PCP_BCST_TTL)
644                 {
645                         ttl = atom.readChar()-1;
646                         patom.writeChar(id,ttl);
647
648                 }else if (id == PCP_BCST_HOPS)
649                 {
650                         bcs.numHops = atom.readChar()+1;
651                         patom.writeChar(id,bcs.numHops);
652
653                 }else if (id == PCP_BCST_FROM)
654                 {
655                         atom.readBytes(fromID.id,16);
656                         patom.writeBytes(id,fromID.id,16);
657
658                         routeList.add(fromID);
659                 }else if (id == PCP_BCST_GROUP)
660                 {
661                         bcs.group = atom.readChar();
662                         patom.writeChar(id,bcs.group);
663                 }else if (id == PCP_BCST_DEST)
664                 {
665                         atom.readBytes(destID.id,16);
666                         patom.writeBytes(id,destID.id,16);
667                         bcs.forMe = destID.isSame(servMgr->sessionID);
668
669                         char idstr1[64];
670                         char idstr2[64];
671
672                         destID.toStr(idstr1);
673                         servMgr->sessionID.toStr(idstr2);
674
675                 }else if (id == PCP_BCST_CHANID)
676                 {
677                         atom.readBytes(bcs.chanID.id,16);
678                         patom.writeBytes(id,bcs.chanID.id,16);
679                 }else if (id == PCP_BCST_VERSION)
680                 {
681                         ver = atom.readInt();
682                         patom.writeInt(id,ver);
683                 }else if (id == PCP_BCST_VERSION_VP)
684                 {
685                         ver_vp = atom.readInt();
686                         patom.writeInt(id,ver_vp);
687                 }else if (id == PCP_BCST_VERSION_EX_PREFIX)
688                 {
689                         atom.readBytes(ver_ex_prefix,2);
690                         patom.writeBytes(id,ver_ex_prefix,2);
691                 }else if (id == PCP_BCST_VERSION_EX_NUMBER)
692                 {
693                         ver_ex_number = atom.readShort();
694                         patom.writeShort(id,ver_ex_number);
695                 }else if (id == PCP_HOST)
696                 {
697                         ChanHit hit;
698                         readHostAtoms(atom,c,bcs,hit,false);
699                         if (hit.uphost.ip == 0){
700 //                              LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
701                                 if (bcs.numHops == 1){
702                                         hit.uphost.ip = servMgr->serverHost.ip;
703                                         hit.uphost.port = servMgr->serverHost.port;
704                                         hit.uphostHops = 1;
705                                 } else {
706                                         Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
707                                         if (sv){
708                                                 hit.uphost.ip = sv->getHost().ip;
709                                                 hit.uphost.port = sv->waitPort;
710                                                 hit.uphostHops = bcs.numHops - 1;
711                                         }
712                                 }
713                         }
714                         int oldPos = pmem.pos;
715                         hit.writeAtoms(patom, hit.chanID);
716                         pmem.pos = oldPos;
717                         r = readAtom(patom,bcs);
718                 } else {
719                         // copy and process atoms
720                         int oldPos = pmem.pos;
721                         patom.writeAtoms(id,atom.io,c,d);
722                         pmem.pos = oldPos;
723                         r = readAtom(patom,bcs);
724                 }
725         }
726
727         char fromStr[64];
728         fromStr[0] = 0;
729         if (fromID.isSet())
730                 fromID.toStr(fromStr);
731         char destStr[64];
732         destStr[0] = 0;
733         if (destID.isSet())
734                 destID.toStr(destStr);
735         char tmp[64];
736         bcs.chanID.toStr(tmp);
737
738 //      LOG_DEBUG(tmp);
739
740         if (ver_ex_number){
741                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
742                         bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,ttl);
743         } else if (ver_vp){
744                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(VP%04d), from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,ver_vp,fromStr,destStr,ttl);
745         } else {
746                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,fromStr,destStr,ttl);
747         }
748
749         if (fromID.isSet())
750                 if (fromID.isSame(servMgr->sessionID))
751                 {
752                         LOG_ERROR("BCST loopback"); 
753                         return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
754                 }
755
756         // broadcast back out if ttl > 0 
757         if ((ttl>0) && (!bcs.forMe))
758         {
759                 pack.len = pmem.pos;
760                 pack.type = ChanPacket::T_PCP;
761
762                 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
763                 {
764                         chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
765                 }
766
767                 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
768                 {
769                         servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
770                 }
771
772                 if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
773                 {
774                         servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
775                 }
776
777                 if (bcs.group & (PCP_BCST_GROUP_RELAYS))
778                 {
779                         servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
780                 }
781
782
783 //              LOG_DEBUG("ttl=%d",ttl);
784
785         } else {
786 //              LOG_DEBUG("ttl=%d",ttl);
787         }
788         return r;
789 }
790
791
792 // ------------------------------------------
793 int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
794 {
795         int r=0;
796         ChanHit hit;
797         int rBan = 0;
798
799         if (id == PCP_CHAN)
800         {
801                 readChanAtoms(atom,numc,bcs);
802         }else if (id == PCP_ROOT)
803         {
804                 if (servMgr->isRoot)
805                         throw StreamException("Unauthorized root message");                             
806                 else
807                         readRootAtoms(atom,numc,bcs);
808
809         }else if (id == PCP_HOST)
810         {
811                 readHostAtoms(atom,numc,bcs,hit);
812                 Channel *ch = chanMgr->findChannelByID(hit.chanID);
813                 if (ch && (ch->isBroadcasting() || servMgr->vpDebug)){
814                         if (servMgr->autoPort0Kick && (hit.numHops == 1) && (hit.firewalled || (!hit.relay && !hit.numRelays))){
815                                 char tmp[32];
816                                 hit.host.IPtoStr(tmp);
817                                 LOG_DEBUG("host that can't relay is disconnect: %s", tmp);
818                                 rBan = PCP_ERROR_BANNED;
819                         }
820                         if (servMgr->allowOnlyVP && (hit.numHops == 1) && !hit.version_vp){
821                                 char tmp[32];
822                                 hit.host.IPtoStr(tmp);
823                                 LOG_DEBUG("host that is not VP is disconnect: %s", tmp);
824                                 rBan = PCP_ERROR_BANNED;
825                         }
826                 }
827
828         }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))           // PCP_MESG_ASCII to be depreciated
829         {
830                 String msg;
831                 atom.readString(msg.data,sizeof(msg.data),dlen);
832                 LOG_DEBUG("PCP got text: %s",msg.cstr());
833         }else if (id == PCP_BCST)
834         {
835                 r = readBroadcastAtoms(atom,numc,bcs);
836         }else if (id == PCP_HELO)
837         {
838                 atom.skip(numc,dlen);
839                 atom.writeParent(PCP_OLEH,1);
840                         atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
841         }else if (id == PCP_PUSH)
842         {
843
844                 readPushAtoms(atom,numc,bcs);
845         }else if (id == PCP_OK)
846         {
847                 atom.readInt();
848
849         }else if (id == PCP_QUIT)
850         {
851                 r = atom.readInt();
852                 if (!r)
853                         r = PCP_ERROR_QUIT;
854
855         }else if (id == PCP_ATOM)
856         {
857                 for(int i=0; i<numc; i++)
858                 {
859                         int nc,nd;
860                         ID4 aid = atom.read(nc,nd);
861                         int ar = procAtom(atom,aid,nc,nd,bcs);
862                         if (ar)
863                                 r = ar;
864                 }
865
866         }else
867         {
868                 LOG_CHANNEL("PCP skip: %s",id.getString().str());
869                 atom.skip(numc,dlen);
870         }
871
872         if (!r)
873                 r = rBan;
874
875         return r;
876
877 }
878
879 // ------------------------------------------
880 int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
881 {
882         int numc,dlen;
883         ID4 id = atom.read(numc,dlen);
884
885         return  procAtom(atom,id,numc,dlen,bcs);
886 }
887
888