OSDN Git Service

GUI終了時のエラー落ちを修正(?)
[peercast-im/PeerCastIM.git] / c: / Git / PeerCast.root / PeerCast / core / common / pcp.cpp
1 // ------------------------------------------------
2 // File : pcp.cpp
3 // Date: 1-mar-2004
4 // Author: giles
5 //
6 // (c) 2002-4 peercast.org
7 // ------------------------------------------------
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 // ------------------------------------------------
18
19 #include "atom.h"
20 #include "pcp.h"
21 #include "peercast.h"
22 #include "version2.h"
23 #ifdef _DEBUG
24 #include "chkMemoryLeak.h"
25 #define DEBUG_NEW new(__FILE__, __LINE__)
26 #define new DEBUG_NEW
27 #endif
28
29 // ------------------------------------------
30 void PCPStream::init(GnuID &rid)
31 {
32         remoteID = rid;
33         routeList.clear();
34
35         lastPacketTime = 0;
36         nextRootPacket = 0;      // 0 seconds (never)
37
38         inData.init();
39         inData.accept = ChanPacket::T_PCP;
40
41         outData.init();
42         outData.accept = ChanPacket::T_PCP;
43 }
44 // ------------------------------------------
45 void PCPStream::readVersion(Stream &in)
46 {
47         int len = in.readInt();
48
49         if (len != 4)
50                 throw StreamException("Invalid PCP");
51
52         int ver = in.readInt();
53
54         LOG_DEBUG("PCP ver: %d",ver);
55 }
56 // ------------------------------------------
57 void PCPStream::readHeader(Stream &in,Channel *)
58 {
59 //      AtomStream atom(in);
60
61 //      if (in.readInt() != PCP_CONNECT)
62 //              throw StreamException("Not PCP");
63
64 //      readVersion(in);
65 }
66 // ------------------------------------------
67 bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
68 {
69         if (destID.isSet())
70                 if (!destID.isSame(remoteID))
71                         if (!routeList.contains(destID))
72                                 return false;
73
74         return outData.writePacket(pack);
75 }
76 // ------------------------------------------
77 void PCPStream::flush(Stream &in)
78 {
79         ChanPacket pack;
80         // send outward packets
81         while (outData.numPending())
82         {
83                 outData.readPacket(pack);
84                 pack.writeRaw(in);
85         }
86 }
87 // ------------------------------------------
88 int PCPStream::readPacket(Stream &in,Channel *)
89 {
90         BroadcastState bcs;
91         return readPacket(in,bcs);
92 }
93 // ------------------------------------------
94 int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
95 {
96         int error = PCP_ERROR_GENERAL;
97         try
98         {
99                 AtomStream atom(in);
100
101                 ChanPacket pack;
102                 MemoryStream mem(pack.data,sizeof(pack.data));
103                 AtomStream patom(mem);
104
105
106                 // send outward packets
107                 error = PCP_ERROR_WRITE;
108                 if (outData.numPending())
109                 {
110                         outData.readPacket(pack);
111                         pack.writeRaw(in);
112                 }
113                 error = PCP_ERROR_GENERAL;
114
115                 if (outData.willSkip())
116                 {
117                         error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
118                         throw StreamException("Send too slow");
119                 }
120
121
122                 error = PCP_ERROR_READ;
123                 // poll for new downward packet
124                 if (in.readReady())
125                 {
126                         int numc,numd;
127                         ID4 id;
128
129                         id = atom.read(numc,numd);
130
131                         mem.rewind();
132                         pack.len = patom.writeAtoms(id, in, numc, numd);
133                         pack.type = ChanPacket::T_PCP;
134
135                         //inData.writePacket(pack);
136                 //}
137                 error = PCP_ERROR_GENERAL;
138
139                 // process downward packets
140                 //if (inData.numPending())
141                 //{
142                         //inData.readPacket(pack);
143
144                         mem.rewind();
145
146                         //int numc,numd;
147                         id = patom.read(numc,numd);
148
149                         error = PCPStream::procAtom(patom,id,numc,numd,bcs);
150                         
151                         if (error)
152                                 throw StreamException("PCP exception");
153                 }
154
155                 error = 0;
156
157         }catch(StreamException &e)
158         {
159                 LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
160         }
161
162         return error;
163 }
164
165 // ------------------------------------------
166 void PCPStream::readEnd(Stream &,Channel *)
167 {
168 }
169
170
171 // ------------------------------------------
172 void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
173 {
174         Host host;
175         GnuID   chanID;
176
177         chanID.clear();
178
179         for(int i=0; i<numc; i++)
180         {
181                 int c,d;
182                 ID4 id = atom.read(c,d);
183
184                 if (id == PCP_PUSH_IP)
185                         host.ip = atom.readInt();
186                 else if (id == PCP_PUSH_PORT)
187                         host.port = atom.readShort();
188                 else if (id == PCP_PUSH_CHANID)
189                         atom.readBytes(chanID.id,16);
190                 else
191                 {
192                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
193                         atom.skip(c,d);
194                 }
195         }
196
197
198         if (bcs.forMe)
199         {
200                 char ipstr[64];
201                 host.toStr(ipstr);
202
203                 Servent *s = NULL;
204
205                 if (chanID.isSet())
206                 {
207                         Channel *ch = chanMgr->findChannelByID(chanID);
208                         if (ch)
209                                 if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
210                                         s = servMgr->allocServent();
211                 }else{
212                         s = servMgr->allocServent();
213                 }
214
215                 if (s)
216                 {
217                         LOG_DEBUG("GIVing to %s",ipstr);
218                         s->initGIV(host,chanID);
219                 }
220         }
221
222 }
223 // ------------------------------------------
224 void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
225 {
226         String url;
227
228         for(int i=0; i<numc; i++)
229         {
230                 int c,d;
231                 ID4 id = atom.read(c,d);
232
233                 if (id == PCP_ROOT_UPDINT)
234                 {
235                         int si = atom.readInt();
236
237                         chanMgr->setUpdateInterval(si);
238                         LOG_DEBUG("PCP got new host update interval: %ds",si);
239                 }else if (id == PCP_ROOT_URL)
240                 {
241                         url = "http://www.peercast.org/";
242                         String loc;
243                         atom.readString(loc.data,sizeof(loc.data),d);
244                         url.append(loc);
245
246                 }else if (id == PCP_ROOT_CHECKVER)
247                 {
248                         unsigned int newVer = atom.readInt();
249                         if (newVer > PCP_CLIENT_VERSION)
250                         {
251                                 strcpy(servMgr->downloadURL,url.cstr());
252                                 peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
253                         }
254                         LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
255
256                 }else if (id == PCP_ROOT_NEXT)
257                 {
258                         unsigned int time = atom.readInt();
259
260                         if (time)
261                         {
262                                 unsigned int ctime = sys->getTime();
263                                 nextRootPacket = ctime+time;
264                                 LOG_DEBUG("PCP expecting next root packet in %ds",time);
265                         }else
266                         {
267                                 nextRootPacket = 0;
268                         }
269
270                 }else if (id == PCP_ROOT_UPDATE)
271                 {
272                         atom.skip(c,d);
273
274                         chanMgr->broadcastTrackerUpdate(remoteID,true);
275
276                 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))                   // PCP_MESG_ASCII to be depreciated 
277                 {
278                         String newMsg;
279
280                         atom.readString(newMsg.data,sizeof(newMsg.data),d);
281                         if (!newMsg.isSame(servMgr->rootMsg.cstr()))
282                         {
283                                 servMgr->rootMsg = newMsg;
284                                 LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
285                                 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
286                         }
287                 }else
288                 {
289                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
290                         atom.skip(c,d);
291                 }
292         }
293 }
294
295 // ------------------------------------------
296 void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
297 {
298         ChanPacket pack;
299         ID4 type;
300
301
302         for(int i=0; i<numc; i++)
303         {
304                 int c,d;
305                 ID4 id = atom.read(c,d);
306
307                 if (id == PCP_CHAN_PKT_TYPE)
308                 {
309                         type = atom.readID4();
310
311                         if (type == PCP_CHAN_PKT_HEAD)
312                                 pack.type = ChanPacket::T_HEAD;
313                         else if (type == PCP_CHAN_PKT_DATA)
314                                 pack.type = ChanPacket::T_DATA;
315                         else
316                                 pack.type = ChanPacket::T_UNKNOWN;
317
318                 }else if (id == PCP_CHAN_PKT_POS)
319                 {
320                         pack.pos = atom.readInt();
321
322
323                 }else if (id == PCP_CHAN_PKT_DATA)
324                 {
325                         pack.len = d;
326                         atom.readBytes(pack.data,pack.len);
327                 }
328                 else
329                 {
330                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
331                         atom.skip(c,d);
332                 }
333         }
334
335         if (ch)
336         {
337
338                 int diff = pack.pos - ch->streamPos;
339                 if (diff)
340                 {
341                         LOG_DEBUG("PCP skipping %s%8d (%10d -> %10d) count=%2d",(diff>0)?"+":"",diff,ch->streamPos,pack.pos, ch->skipCount);
342                         if (ch->lastSkipTime + 120 < sys->getTime()){
343                                 ch->skipCount = 0;
344                         }
345                         ch->lastSkipTime = sys->getTime();
346                         ch->skipCount++; //JP-EX
347                         pack.skip = true;
348                 }
349
350                 if (servMgr->autoBumpSkipCount) //JP-EX
351                 {
352                         if (ch->skipCount > servMgr->autoBumpSkipCount)
353                         {
354                                 LOG_DEBUG("Auto bump");
355                                 ch->bump = true;
356                         }
357                 }
358
359                 if (pack.type == ChanPacket::T_HEAD)
360                 {
361                         LOG_DEBUG("New head packet at %d",pack.pos);
362                         bool renewhead;
363                         if (servMgr->keepDownstreams)
364                                 renewhead = (memcmp(ch->headPack.data, pack.data, pack.len) != 0);
365                         else
366                                 renewhead = true;
367
368                         /*
369                         // check for stream restart
370                         if (pack.pos == 0)              
371                         {
372                                 LOG_CHANNEL("PCP resetting stream");
373                                 ch->streamIndex++;
374                                 ch->rawData.init();
375                         }
376                         */
377                         if (renewhead || ch->lastStopTime + 30 < sys->getTime()) {
378                                 // check for stream restart
379                                 if (pack.pos == 0)
380                                 {
381                                         LOG_CHANNEL("PCP resetting stream");
382                                         ch->streamIndex++;
383                                         ch->rawData.init();
384                                 }
385
386                                 ch->headPack = pack;
387
388                                 ch->rawData.writePacket(pack,true);
389                                 ch->streamPos = pack.pos+pack.len;
390                         }
391
392                 }else if (pack.type == ChanPacket::T_DATA)
393                 {
394                         ch->rawData.writePacket(pack,true);
395                         ch->streamPos = pack.pos+pack.len;
396                 }
397
398         }
399
400         // update this parent packet stream position
401         if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
402                 bcs.streamPos = pack.pos;
403
404 }
405 // -----------------------------------
406 void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, ChanHit &hit, bool flg)
407 {
408 //      ChanHit hit;
409         hit.init();
410         GnuID chanID = bcs.chanID;      //use default
411
412         bool busy=false;
413
414         unsigned int ipNum=0;
415
416         for(int i=0; i<numc; i++)
417         {
418                 int c,d;
419                 ID4 id = atom.read(c,d);
420
421                 if (id == PCP_HOST_IP)
422                 {
423                         unsigned int ip = atom.readInt();
424                         hit.rhost[ipNum].ip = ip;
425                 }else if (id == PCP_HOST_PORT)
426                 {
427                         int port = atom.readShort();
428                         hit.rhost[ipNum++].port = port;
429
430                         if (ipNum > 1)
431                                 ipNum = 1;
432                 }
433                 else if (id == PCP_HOST_NUML)
434                         hit.numListeners = atom.readInt();
435                 else if (id == PCP_HOST_NUMR)
436                         hit.numRelays = atom.readInt();
437                 else if (id == PCP_HOST_UPTIME)
438                         hit.upTime = atom.readInt();
439                 else if (id == PCP_HOST_OLDPOS)
440                         hit.oldestPos = atom.readInt();
441                 else if (id == PCP_HOST_NEWPOS)
442                         hit.newestPos = atom.readInt();
443                 else if (id == PCP_HOST_VERSION)
444                         hit.version = atom.readInt();
445                 else if (id == PCP_HOST_VERSION_VP)
446                         hit.version_vp = atom.readInt();
447                 else if (id == PCP_HOST_VERSION_EX_PREFIX)
448                         atom.readBytes(hit.version_ex_prefix,2);
449                 else if (id == PCP_HOST_VERSION_EX_NUMBER){
450                         hit.version_ex_number = atom.readShort();
451                 }
452                 else if (id == PCP_HOST_FLAGS1)
453                 {
454                         int fl1 = atom.readChar();
455
456                         hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
457                         hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
458                         hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
459                         hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
460                         hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
461                         hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
462
463
464                 }else if (id == PCP_HOST_ID)
465                         atom.readBytes(hit.sessionID.id,16);
466                 else if (id == PCP_HOST_CHANID)
467                         atom.readBytes(chanID.id,16);
468                 else if (id == PCP_HOST_UPHOST_IP)
469                         hit.uphost.ip = atom.readInt();
470                 else if (id == PCP_HOST_UPHOST_PORT)
471                         hit.uphost.port = atom.readInt();
472                 else if (id == PCP_HOST_UPHOST_HOPS)
473                         hit.uphostHops = atom.readInt();
474                 else
475                 {
476                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
477                         atom.skip(c,d);
478                 }
479         }
480
481         hit.host = hit.rhost[0];
482         hit.chanID = chanID;
483
484         hit.numHops = bcs.numHops;
485
486         hit.servent_id = bcs.servent_id;
487
488         if (flg){
489 //              LOG_DEBUG("readHostAtoms HITLISTLOCK ON-------------");
490                 chanMgr->hitlistlock.on();
491                 if (hit.recv)
492                         chanMgr->addHit(hit);
493                 else
494                         chanMgr->delHit(hit);
495 //              LOG_DEBUG("readHostAtoms HITLISTLOCK OFF-------------");
496                 chanMgr->hitlistlock.off();
497         }
498
499         if (hit.numHops == 1){
500                 Servent *sv = servMgr->findServentByServentID(hit.servent_id);
501                 if (sv){
502 //                      LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
503                         sv->waitPort = hit.host.port;
504                 }
505         }
506 }
507
508 // ------------------------------------------
509 void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
510 {
511 /*      Channel *ch=NULL;
512         ChanHitList *chl=NULL;
513         ChanInfo newInfo;
514
515         ch = chanMgr->findChannelByID(bcs.chanID);
516         chl = chanMgr->findHitListByID(bcs.chanID);
517
518         if (ch)
519                 newInfo = ch->info;
520         else if (chl)
521                 newInfo = chl->info;*/
522
523         Channel *ch=NULL;
524         ChanHitList *chl=NULL;
525         ChanInfo newInfo, chaInfo;
526
527         ch = this->parent;
528         if (ch){
529                 newInfo = ch->info;
530                 chaInfo = ch->info;
531         }
532
533         for(int i=0; i<numc; i++)
534         {
535
536                 int c,d;
537                 ID4 id = atom.read(c,d);
538
539                 if ((id == PCP_CHAN_PKT) && (ch))
540                 {
541                         readPktAtoms(ch,atom,c,bcs);
542                 }else if (id == PCP_CHAN_INFO)
543                 {
544                         newInfo.readInfoAtoms(atom,c);
545
546                 }else if (id == PCP_CHAN_TRACK)
547                 {
548                         newInfo.readTrackAtoms(atom,c);
549
550                 }else if (id == PCP_CHAN_BCID)
551                 {
552                         atom.readBytes(newInfo.bcID.id,16);
553
554                 }else if (id == PCP_CHAN_KEY)                   // depreciated
555                 {
556                         atom.readBytes(newInfo.bcID.id,16);
557                         newInfo.bcID.id[0] = 0;                         // clear flags
558
559                 }else if (id == PCP_CHAN_ID)
560                 {
561                         atom.readBytes(newInfo.id.id,16);
562
563                         ch = chanMgr->findChannelByID(newInfo.id);
564                         chl = chanMgr->findHitListByID(newInfo.id);
565
566                 }else
567                 {
568                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
569                         atom.skip(c,d);
570                 }
571         }
572
573         chl = chanMgr->findHitList(newInfo);
574
575         if (!chl)
576                 chl = chanMgr->addHitList(newInfo);
577
578         if (chl)
579         {
580                 chl->info.update(newInfo);
581         
582                 if (!servMgr->chanLog.isEmpty())
583                 {
584                         //if (chl->numListeners())
585                         {
586                                 try
587                                 {
588                                         FileStream file;
589                                         file.openWriteAppend(servMgr->chanLog.cstr());
590
591                                         XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
592                                         XML::Node *n = chl->info.createChannelXML();
593                                         n->add(chl->createXML(false));
594                                         n->add(chl->info.createTrackXML());
595                                         rn->add(n);     
596         
597                                         rn->write(file,0);
598                                         delete rn;
599                                         file.close();
600                                 }catch(StreamException &e)
601                                 {
602                                         LOG_ERROR("Unable to update channel log: %s",e.msg);
603                                 }
604                         }
605                 }
606
607         }
608
609         if (ch && !ch->isBroadcasting())
610                 ch->updateInfo(newInfo);
611
612
613 }
614 // ------------------------------------------
615 int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
616 {
617         ChanPacket pack;
618         int ttl=0;              
619         int ver=0;
620         int ver_vp=0;
621         GnuID fromID,destID;
622         int r=0;
623         char ver_ex_prefix[2];
624         int ver_ex_number = 0;
625
626         fromID.clear();
627         destID.clear();
628
629         bcs.initPacketSettings();
630
631         MemoryStream pmem(pack.data,sizeof(pack.data));
632         AtomStream patom(pmem);
633
634         patom.writeParent(PCP_BCST,numc);
635
636         for(int i=0; i<numc; i++)
637         {
638                 int c,d;
639                 ID4 id = atom.read(c,d);
640                 
641                 if (id == PCP_BCST_TTL)
642                 {
643                         ttl = atom.readChar()-1;
644                         patom.writeChar(id,ttl);
645
646                 }else if (id == PCP_BCST_HOPS)
647                 {
648                         bcs.numHops = atom.readChar()+1;
649                         patom.writeChar(id,bcs.numHops);
650
651                 }else if (id == PCP_BCST_FROM)
652                 {
653                         atom.readBytes(fromID.id,16);
654                         patom.writeBytes(id,fromID.id,16);
655
656                         routeList.add(fromID);
657                 }else if (id == PCP_BCST_GROUP)
658                 {
659                         bcs.group = atom.readChar();
660                         patom.writeChar(id,bcs.group);
661                 }else if (id == PCP_BCST_DEST)
662                 {
663                         atom.readBytes(destID.id,16);
664                         patom.writeBytes(id,destID.id,16);
665                         bcs.forMe = destID.isSame(servMgr->sessionID);
666
667                         char idstr1[64];
668                         char idstr2[64];
669
670                         destID.toStr(idstr1);
671                         servMgr->sessionID.toStr(idstr2);
672
673                 }else if (id == PCP_BCST_CHANID)
674                 {
675                         atom.readBytes(bcs.chanID.id,16);
676                         patom.writeBytes(id,bcs.chanID.id,16);
677                 }else if (id == PCP_BCST_VERSION)
678                 {
679                         ver = atom.readInt();
680                         patom.writeInt(id,ver);
681                 }else if (id == PCP_BCST_VERSION_VP)
682                 {
683                         ver_vp = atom.readInt();
684                         patom.writeInt(id,ver_vp);
685                 }else if (id == PCP_BCST_VERSION_EX_PREFIX)
686                 {
687                         atom.readBytes(ver_ex_prefix,2);
688                         patom.writeBytes(id,ver_ex_prefix,2);
689                 }else if (id == PCP_BCST_VERSION_EX_NUMBER)
690                 {
691                         ver_ex_number = atom.readShort();
692                         patom.writeShort(id,ver_ex_number);
693                 }else if (id == PCP_HOST)
694                 {
695                         ChanHit hit;
696                         readHostAtoms(atom,c,bcs,hit,false);
697                         if (hit.uphost.ip == 0){
698 //                              LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
699                                 if (bcs.numHops == 1){
700                                         hit.uphost.ip = servMgr->serverHost.ip;
701                                         hit.uphost.port = servMgr->serverHost.port;
702                                         hit.uphostHops = 1;
703                                 } else {
704                                         Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
705                                         if (sv){
706                                                 hit.uphost.ip = sv->getHost().ip;
707                                                 hit.uphost.port = sv->waitPort;
708                                                 hit.uphostHops = bcs.numHops - 1;
709                                         }
710                                 }
711                         }
712                         int oldPos = pmem.pos;
713                         hit.writeAtoms(patom, hit.chanID);
714                         pmem.pos = oldPos;
715                         r = readAtom(patom,bcs);
716                 } else {
717                         // copy and process atoms
718                         int oldPos = pmem.pos;
719                         patom.writeAtoms(id,atom.io,c,d);
720                         pmem.pos = oldPos;
721                         r = readAtom(patom,bcs);
722                 }
723         }
724
725         char fromStr[64];
726         fromStr[0] = 0;
727         if (fromID.isSet())
728                 fromID.toStr(fromStr);
729         char destStr[64];
730         destStr[0] = 0;
731         if (destID.isSet())
732                 destID.toStr(destStr);
733         char tmp[64];
734         bcs.chanID.toStr(tmp);
735
736 //      LOG_DEBUG(tmp);
737
738         if (ver_ex_number){
739                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
740                         bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,ttl);
741         } else if (ver_vp){
742                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(VP%04d), from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,ver_vp,fromStr,destStr,ttl);
743         } else {
744                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,fromStr,destStr,ttl);
745         }
746
747         if (fromID.isSet())
748                 if (fromID.isSame(servMgr->sessionID))
749                 {
750                         LOG_ERROR("BCST loopback"); 
751                         return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
752                 }
753
754         // broadcast back out if ttl > 0 
755         if ((ttl>0) && (!bcs.forMe))
756         {
757                 pack.len = pmem.pos;
758                 pack.type = ChanPacket::T_PCP;
759
760                 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
761                 {
762                         chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
763                 }
764
765                 if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
766                 {
767                         servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
768                 }
769
770                 if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
771                 {
772                         servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
773                 }
774
775                 if (bcs.group & (PCP_BCST_GROUP_RELAYS))
776                 {
777                         servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
778                 }
779
780
781 //              LOG_DEBUG("ttl=%d",ttl);
782
783         } else {
784 //              LOG_DEBUG("ttl=%d",ttl);
785         }
786         return r;
787 }
788
789
790 // ------------------------------------------
791 int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
792 {
793         int r=0;
794         ChanHit hit;
795         int rBan = 0;
796
797         if (id == PCP_CHAN)
798         {
799                 readChanAtoms(atom,numc,bcs);
800         }else if (id == PCP_ROOT)
801         {
802                 if (servMgr->isRoot)
803                         throw StreamException("Unauthorized root message");                             
804                 else
805                         readRootAtoms(atom,numc,bcs);
806
807         }else if (id == PCP_HOST)
808         {
809                 readHostAtoms(atom,numc,bcs,hit);
810                 Channel *ch = chanMgr->findChannelByID(hit.chanID);
811                 if (ch && (ch->isBroadcasting() || servMgr->vpDebug)){
812                         if (servMgr->autoPort0Kick && (hit.numHops == 1) && (hit.firewalled || (!hit.relay && !hit.numRelays))){
813                                 char tmp[32];
814                                 hit.host.IPtoStr(tmp);
815                                 LOG_DEBUG("host that can't relay is disconnect: %s", tmp);
816                                 rBan = PCP_ERROR_BANNED;
817                         }
818                         if (servMgr->allowOnlyVP && (hit.numHops == 1) && !hit.version_vp){
819                                 char tmp[32];
820                                 hit.host.IPtoStr(tmp);
821                                 LOG_DEBUG("host that is not VP is disconnect: %s", tmp);
822                                 rBan = PCP_ERROR_BANNED;
823                         }
824                 }
825
826         }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))           // PCP_MESG_ASCII to be depreciated
827         {
828                 String msg;
829                 atom.readString(msg.data,sizeof(msg.data),dlen);
830                 LOG_DEBUG("PCP got text: %s",msg.cstr());
831         }else if (id == PCP_BCST)
832         {
833                 r = readBroadcastAtoms(atom,numc,bcs);
834         }else if (id == PCP_HELO)
835         {
836                 atom.skip(numc,dlen);
837                 atom.writeParent(PCP_OLEH,1);
838                         atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
839         }else if (id == PCP_PUSH)
840         {
841
842                 readPushAtoms(atom,numc,bcs);
843         }else if (id == PCP_OK)
844         {
845                 atom.readInt();
846
847         }else if (id == PCP_QUIT)
848         {
849                 r = atom.readInt();
850                 if (!r)
851                         r = PCP_ERROR_QUIT;
852
853         }else if (id == PCP_ATOM)
854         {
855                 for(int i=0; i<numc; i++)
856                 {
857                         int nc,nd;
858                         ID4 aid = atom.read(nc,nd);
859                         int ar = procAtom(atom,aid,nc,nd,bcs);
860                         if (ar)
861                                 r = ar;
862                 }
863
864         }else
865         {
866                 LOG_CHANNEL("PCP skip: %s",id.getString().str());
867                 atom.skip(numc,dlen);
868         }
869
870         if (!r)
871                 r = rBan;
872
873         return r;
874
875 }
876
877 // ------------------------------------------
878 int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
879 {
880         int numc,dlen;
881         ID4 id = atom.read(numc,dlen);
882
883         return  procAtom(atom,id,numc,dlen,bcs);
884 }
885
886