OSDN Git Service

VP27マージ
[peercast-im/PeerCastIM.git] / PeerCast.root / PeerCast / core / common / pcp.cpp
1 // ------------------------------------------------
2 // File : pcp.cpp
3 // Date: 1-mar-2004
4 // Author: giles
5 //
6 // (c) 2002-4 peercast.org
7 // ------------------------------------------------
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 // ------------------------------------------------
18
19 #include "atom.h"
20 #include "pcp.h"
21 #include "peercast.h"
22 #include "version2.h"
23 #ifdef _DEBUG
24 #include "chkMemoryLeak.h"
25 #define DEBUG_NEW new(__FILE__, __LINE__)
26 #define new DEBUG_NEW
27 #endif
28
29 // ------------------------------------------
30 void PCPStream::init(GnuID &rid)
31 {
32         remoteID = rid;
33         routeList.clear();
34
35         lastPacketTime = 0;
36         nextRootPacket = 0;      // 0 seconds (never)
37
38         inData.init();
39         inData.accept = ChanPacket::T_PCP;
40
41         outData.init();
42         outData.accept = ChanPacket::T_PCP;
43 }
44 // ------------------------------------------
45 void PCPStream::readVersion(Stream &in)
46 {
47         int len = in.readInt();
48
49         if (len != 4)
50                 throw StreamException("Invalid PCP");
51
52         int ver = in.readInt();
53
54         LOG_DEBUG("PCP ver: %d",ver);
55 }
56 // ------------------------------------------
57 void PCPStream::readHeader(Stream &in,Channel *)
58 {
59 //      AtomStream atom(in);
60
61 //      if (in.readInt() != PCP_CONNECT)
62 //              throw StreamException("Not PCP");
63
64 //      readVersion(in);
65 }
66 // ------------------------------------------
67 bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
68 {
69         if (destID.isSet())
70                 if (!destID.isSame(remoteID))
71                         if (!routeList.contains(destID))
72                                 return false;
73
74         return outData.writePacket(pack);
75 }
76 // ------------------------------------------
77 void PCPStream::flush(Stream &in)
78 {
79         ChanPacket pack;
80         // send outward packets
81         while (outData.numPending())
82         {
83                 outData.readPacket(pack);
84                 pack.writeRaw(in);
85         }
86 }
87
88 // ------------------------------------------
89 unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
90 {
91         ChanPacket pack;
92         unsigned int len = 0, skip = 0;
93
94         while (outData.numPending())
95         {
96                 outData.readPacketPri(pack);
97
98                 if (size >= len + pack.len) {
99                         len += pack.len;
100                         pack.writeRaw(in);
101                 } else {
102                         skip++;
103                 }
104         }
105         if (skip > 0)
106                 LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
107
108         return len;
109 }
110
111 // ------------------------------------------
112 int PCPStream::readPacket(Stream &in,Channel *)
113 {
114         BroadcastState bcs;
115         bcs.ttl = 1;
116         return readPacket(in,bcs);
117 }
118 // ------------------------------------------
119 int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
120 {
121         int error = PCP_ERROR_GENERAL;
122         try
123         {
124                 AtomStream atom(in);
125
126                 ChanPacket pack;
127                 MemoryStream mem(pack.data,sizeof(pack.data));
128                 AtomStream patom(mem);
129
130
131                 // send outward packets
132                 error = PCP_ERROR_WRITE;
133                 if (outData.numPending())
134                 {
135                         outData.readPacket(pack);
136                         pack.writeRaw(in);
137                 }
138                 error = PCP_ERROR_GENERAL;
139
140                 if (outData.willSkip())
141                 {
142                         error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
143                         throw StreamException("Send too slow");
144                 }
145
146
147                 error = PCP_ERROR_READ;
148                 // poll for new downward packet
149                 if (in.readReady())
150                 {
151                         int numc,numd;
152                         ID4 id;
153
154                         id = atom.read(numc,numd);
155
156                         mem.rewind();
157                         pack.len = patom.writeAtoms(id, in, numc, numd);
158                         pack.type = ChanPacket::T_PCP;
159
160                         //inData.writePacket(pack);
161                 //}
162                 error = PCP_ERROR_GENERAL;
163
164                 // process downward packets
165                 //if (inData.numPending())
166                 //{
167                         //inData.readPacket(pack);
168
169                         mem.rewind();
170
171                         //int numc,numd;
172                         id = patom.read(numc,numd);
173
174                         error = PCPStream::procAtom(patom,id,numc,numd,bcs);
175                         
176                         if (error)
177                         {
178                                 throw StreamException("PCP exception");
179                         }
180                 }
181
182                 error = 0;
183
184         }catch(StreamException &e)
185         {
186                 LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
187         }
188
189         return error;
190 }
191
192 // ------------------------------------------
193 void PCPStream::readEnd(Stream &,Channel *)
194 {
195 }
196
197
198 // ------------------------------------------
199 void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
200 {
201         Host host;
202         GnuID   chanID;
203
204         chanID.clear();
205
206         for(int i=0; i<numc; i++)
207         {
208                 int c,d;
209                 ID4 id = atom.read(c,d);
210
211                 if (id == PCP_PUSH_IP)
212                         host.ip = atom.readInt();
213                 else if (id == PCP_PUSH_PORT)
214                         host.port = atom.readShort();
215                 else if (id == PCP_PUSH_CHANID)
216                         atom.readBytes(chanID.id,16);
217                 else
218                 {
219                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
220                         atom.skip(c,d);
221                 }
222         }
223
224
225         if (bcs.forMe)
226         {
227                 char ipstr[64];
228                 host.toStr(ipstr);
229
230                 Servent *s = NULL;
231
232                 if (chanID.isSet())
233                 {
234                         Channel *ch = chanMgr->findChannelByID(chanID);
235                         if (ch)
236                                 if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
237                                         s = servMgr->allocServent();
238                 }else{
239                         s = servMgr->allocServent();
240                 }
241
242                 if (s)
243                 {
244                         LOG_DEBUG("GIVing to %s",ipstr);
245                         s->initGIV(host,chanID);
246                 }
247         }
248
249 }
250 // ------------------------------------------
251 void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
252 {
253         String url;
254
255         for(int i=0; i<numc; i++)
256         {
257                 int c,d;
258                 ID4 id = atom.read(c,d);
259
260                 if (id == PCP_ROOT_UPDINT)
261                 {
262                         int si = atom.readInt();
263
264                         chanMgr->setUpdateInterval(si);
265                         LOG_DEBUG("PCP got new host update interval: %ds",si);
266                 }else if (id == PCP_ROOT_URL)
267                 {
268                         url = "http://www.peercast.org/";
269                         String loc;
270                         atom.readString(loc.data,sizeof(loc.data),d);
271                         url.append(loc);
272
273                 }else if (id == PCP_ROOT_CHECKVER)
274                 {
275                         unsigned int newVer = atom.readInt();
276                         if (newVer > PCP_CLIENT_VERSION)
277                         {
278                                 strcpy(servMgr->downloadURL,url.cstr());
279                                 peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
280                         }
281                         LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
282
283                 }else if (id == PCP_ROOT_NEXT)
284                 {
285                         unsigned int time = atom.readInt();
286
287                         if (time)
288                         {
289                                 unsigned int ctime = sys->getTime();
290                                 nextRootPacket = ctime+time;
291                                 LOG_DEBUG("PCP expecting next root packet in %ds",time);
292                         }else
293                         {
294                                 nextRootPacket = 0;
295                         }
296
297                 }else if (id == PCP_ROOT_UPDATE)
298                 {
299                         atom.skip(c,d);
300
301                         chanMgr->broadcastTrackerUpdate(remoteID,true);
302
303                 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))                   // PCP_MESG_ASCII to be depreciated 
304                 {
305                         String newMsg;
306
307                         atom.readString(newMsg.data,sizeof(newMsg.data),d);
308                         if (!newMsg.isSame(servMgr->rootMsg.cstr()))
309                         {
310                                 servMgr->rootMsg = newMsg;
311                                 LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
312                                 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
313                         }
314                 }else
315                 {
316                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
317                         atom.skip(c,d);
318                 }
319         }
320 }
321
322 // ------------------------------------------
323 void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
324 {
325         ChanPacket pack;
326         ID4 type;
327
328
329         for(int i=0; i<numc; i++)
330         {
331                 int c,d;
332                 ID4 id = atom.read(c,d);
333
334                 if (id == PCP_CHAN_PKT_TYPE)
335                 {
336                         type = atom.readID4();
337
338                         if (type == PCP_CHAN_PKT_HEAD)
339                                 pack.type = ChanPacket::T_HEAD;
340                         else if (type == PCP_CHAN_PKT_DATA)
341                                 pack.type = ChanPacket::T_DATA;
342                         else
343                                 pack.type = ChanPacket::T_UNKNOWN;
344
345                 }else if (id == PCP_CHAN_PKT_POS)
346                 {
347                         pack.pos = atom.readInt();
348
349
350                 }else if (id == PCP_CHAN_PKT_DATA)
351                 {
352                         pack.len = d;
353                         atom.readBytes(pack.data,pack.len);
354                 }
355                 else
356                 {
357                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
358                         atom.skip(c,d);
359                 }
360         }
361
362         if (ch)
363         {
364
365                 int diff = pack.pos - ch->streamPos;
366                 if (diff)
367                 {
368                         LOG_DEBUG("PCP skipping %s%8d (%10d -> %10d) count=%2d",(diff>0)?"+":"",diff,ch->streamPos,pack.pos, ch->skipCount);
369                         if (ch->lastSkipTime + 120 < sys->getTime()){
370                                 ch->skipCount = 0;
371                         }
372                         ch->lastSkipTime = sys->getTime();
373                         ch->skipCount++; //JP-EX
374                         pack.skip = true;
375                 }
376
377                 if (servMgr->autoBumpSkipCount) //JP-EX
378                 {
379                         if ((ch->skipCount > servMgr->autoBumpSkipCount) && !(servMgr->disableAutoBumpIfDirect && ch->sourceHost.tracker)) //JP-MOD
380                         {
381                                 LOG_DEBUG("Auto bump");
382                                 ch->bump = true;
383                         }
384                 }
385
386                 if (pack.type == ChanPacket::T_HEAD)
387                 {
388                         LOG_DEBUG("New head packet at %d",pack.pos);
389                         bool renewhead;
390                         if (servMgr->keepDownstreams)
391                                 renewhead = (memcmp(ch->headPack.data, pack.data, pack.len) != 0);
392                         else
393                                 renewhead = true;
394
395                         /*
396                         // check for stream restart
397                         if (pack.pos == 0)              
398                         {
399                                 LOG_CHANNEL("PCP resetting stream");
400                                 ch->streamIndex++;
401                                 ch->rawData.init();
402                         }
403                         */
404                         if (renewhead || ch->lastStopTime + 30 < sys->getTime()) {
405                                 // check for stream restart
406                                 if (pack.pos == 0)
407                                 {
408                                         LOG_CHANNEL("PCP resetting stream");
409                                         ch->streamIndex++;
410                                         ch->rawData.init();
411                                 }
412
413                                 ch->headPack = pack;
414
415                                 ch->rawData.writePacket(pack,true);
416                                 ch->streamPos = pack.pos+pack.len;
417                         }
418
419                 }else if (pack.type == ChanPacket::T_DATA)
420                 {
421                         ch->rawData.writePacket(pack,true);
422                         ch->streamPos = pack.pos+pack.len;
423                 }
424
425         }
426
427         // update this parent packet stream position
428         if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
429                 bcs.streamPos = pack.pos;
430
431 }
432 // -----------------------------------
433 void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, ChanHit &hit, bool flg)
434 {
435 //      ChanHit hit;
436         hit.init();
437         GnuID chanID = bcs.chanID;      //use default
438
439         bool busy=false;
440
441         unsigned int ipNum=0;
442
443         for(int i=0; i<numc; i++)
444         {
445                 int c,d;
446                 ID4 id = atom.read(c,d);
447
448                 if (id == PCP_HOST_IP)
449                 {
450                         unsigned int ip = atom.readInt();
451                         hit.rhost[ipNum].ip = ip;
452                 }else if (id == PCP_HOST_PORT)
453                 {
454                         int port = atom.readShort();
455                         hit.rhost[ipNum++].port = port;
456
457                         if (ipNum > 1)
458                                 ipNum = 1;
459                 }
460                 else if (id == PCP_HOST_NUML)
461                 {
462                         hit.numListeners = atom.readInt();
463                         if (hit.numListeners > 10)
464                                 hit.numListeners = 10;
465                 }
466                 else if (id == PCP_HOST_NUMR)
467                 {
468                         hit.numRelays = atom.readInt();
469                         if (hit.numRelays > 100)
470                                 hit.numRelays = 100;
471                 }
472                 else if (id == PCP_HOST_UPTIME)
473                         hit.upTime = atom.readInt();
474                 else if (id == PCP_HOST_OLDPOS)
475                         hit.oldestPos = atom.readInt();
476                 else if (id == PCP_HOST_NEWPOS)
477                         hit.newestPos = atom.readInt();
478                 else if (id == PCP_HOST_VERSION)
479                         hit.version = atom.readInt();
480                 else if (id == PCP_HOST_VERSION_VP)
481                         hit.version_vp = atom.readInt();
482                 else if (id == PCP_HOST_VERSION_EX_PREFIX)
483                         atom.readBytes(hit.version_ex_prefix,2);
484                 else if (id == PCP_HOST_VERSION_EX_NUMBER){
485                         hit.version_ex_number = atom.readShort();
486                 }
487                 else if (id == PCP_HOST_FLAGS1)
488                 {
489                         int fl1 = atom.readChar();
490
491                         hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
492                         hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
493                         hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
494                         hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
495                         hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
496                         hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
497
498
499                 }else if (id == PCP_HOST_ID)
500                         atom.readBytes(hit.sessionID.id,16);
501                 else if (id == PCP_HOST_CHANID)
502                         atom.readBytes(chanID.id,16);
503                 else if (id == PCP_HOST_UPHOST_IP)
504                         hit.uphost.ip = atom.readInt();
505                 else if (id == PCP_HOST_UPHOST_PORT)
506                         hit.uphost.port = atom.readInt();
507                 else if (id == PCP_HOST_UPHOST_HOPS)
508                         hit.uphostHops = atom.readInt();
509                 else if (id == PCP_HOST_CLAP_PP){ //JP-MOD
510                         hit.clap_pp = atom.readInt();
511                         if (hit.clap_pp & 1){
512                                 Channel *c = chanMgr->findChannelByID(chanID);
513                                 if(c && c->isBroadcasting()){
514                                         String sjis;
515                                         sjis = c->info.name;
516                                         sjis.convertTo(String::T_SJIS);
517                                         peercastApp->notifyMessage(ServMgr::NT_APPLAUSE, sjis);
518                                 }
519                         }
520                 }else
521                 {
522                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
523                         atom.skip(c,d);
524                 }
525         }
526
527         hit.host = hit.rhost[0];
528         hit.chanID = chanID;
529
530         hit.numHops = bcs.numHops;
531
532         hit.servent_id = bcs.servent_id;
533
534         if (flg && (bcs.ttl != 0)){
535 //              LOG_DEBUG("readHostAtoms HITLISTLOCK ON-------------");
536                 chanMgr->hitlistlock.on();
537                 if (hit.recv)
538                         chanMgr->addHit(hit);
539                 else
540                         chanMgr->delHit(hit);
541 //              LOG_DEBUG("readHostAtoms HITLISTLOCK OFF-------------");
542                 chanMgr->hitlistlock.off();
543         }
544
545         if (hit.numHops == 1){
546                 Servent *sv = servMgr->findServentByServentID(hit.servent_id);
547                 if (sv && sv->getHost().ip == hit.host.ip){
548 //                      LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
549                         sv->waitPort = hit.host.port;
550                         hit.lastSendSeq = sv->serventHit.lastSendSeq;
551                         sv->serventHit = hit;
552                 }
553         }
554 }
555
556 // ------------------------------------------
557 void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
558 {
559 /*      Channel *ch=NULL;
560         ChanHitList *chl=NULL;
561         ChanInfo newInfo;
562
563         ch = chanMgr->findChannelByID(bcs.chanID);
564         chl = chanMgr->findHitListByID(bcs.chanID);
565
566         if (ch)
567                 newInfo = ch->info;
568         else if (chl)
569                 newInfo = chl->info;*/
570
571         Channel *ch=NULL;
572         ChanHitList *chl=NULL;
573         ChanInfo newInfo, chaInfo;
574
575         ch = this->parent;
576         if (ch){
577                 newInfo = ch->info;
578                 chaInfo = ch->info;
579         }
580
581         for(int i=0; i<numc; i++)
582         {
583
584                 int c,d;
585                 ID4 id = atom.read(c,d);
586
587                 if ((id == PCP_CHAN_PKT) && (ch))
588                 {
589                         readPktAtoms(ch,atom,c,bcs);
590                 }else if (id == PCP_CHAN_INFO)
591                 {
592                         newInfo.readInfoAtoms(atom,c);
593
594                 }else if (id == PCP_CHAN_TRACK)
595                 {
596                         newInfo.readTrackAtoms(atom,c);
597
598                 }else if (id == PCP_CHAN_BCID)
599                 {
600                         atom.readBytes(newInfo.bcID.id,16);
601
602                 }else if (id == PCP_CHAN_KEY)                   // depreciated
603                 {
604                         atom.readBytes(newInfo.bcID.id,16);
605                         newInfo.bcID.id[0] = 0;                         // clear flags
606
607                 }else if (id == PCP_CHAN_ID)
608                 {
609                         atom.readBytes(newInfo.id.id,16);
610
611                         ch = chanMgr->findChannelByID(newInfo.id);
612                         chl = chanMgr->findHitListByID(newInfo.id);
613
614                 }else
615                 {
616                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
617                         atom.skip(c,d);
618                 }
619         }
620
621         chl = chanMgr->findHitList(newInfo);
622
623         if (!chl)
624                 chl = chanMgr->addHitList(newInfo);
625
626         if (chl)
627         {
628                 chl->info.update(newInfo);
629         
630                 if (!servMgr->chanLog.isEmpty())
631                 {
632                         //if (chl->numListeners())
633                         {
634                                 try
635                                 {
636                                         FileStream file;
637                                         file.openWriteAppend(servMgr->chanLog.cstr());
638
639                                         XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
640                                         XML::Node *n = chl->info.createChannelXML();
641                                         n->add(chl->createXML(false));
642                                         n->add(chl->info.createTrackXML());
643                                         rn->add(n);     
644         
645                                         rn->write(file,0);
646                                         delete rn;
647                                         file.close();
648                                 }catch(StreamException &e)
649                                 {
650                                         LOG_ERROR("Unable to update channel log: %s",e.msg);
651                                 }
652                         }
653                 }
654
655         }
656
657         if (ch && !ch->isBroadcasting())
658                 ch->updateInfo(newInfo);
659
660
661 }
662 // ------------------------------------------
663 int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
664 {
665         ChanPacket pack;
666         //int ttl=0;            
667         int ver=0;
668         int ver_vp=0;
669         GnuID fromID,destID;
670         int r=0;
671         char ver_ex_prefix[2];
672         int ver_ex_number = 0;
673
674         fromID.clear();
675         destID.clear();
676
677         bcs.initPacketSettings();
678
679         MemoryStream pmem(pack.data,sizeof(pack.data));
680         AtomStream patom(pmem);
681
682         patom.writeParent(PCP_BCST,numc);
683
684         for(int i=0; i<numc; i++)
685         {
686                 int c,d;
687                 ID4 id = atom.read(c,d);
688                 
689                 if (id == PCP_BCST_TTL)
690                 {
691                         bcs.ttl = atom.readChar()-1;
692                         patom.writeChar(id,bcs.ttl);
693                 }else if (id == PCP_BCST_HOPS)
694                 {
695                         bcs.numHops = atom.readChar()+1;
696                         patom.writeChar(id,bcs.numHops);
697
698                 }else if (id == PCP_BCST_FROM)
699                 {
700                         atom.readBytes(fromID.id,16);
701                         patom.writeBytes(id,fromID.id,16);
702
703                         routeList.add(fromID);
704                 }else if (id == PCP_BCST_GROUP)
705                 {
706                         bcs.group = atom.readChar();
707                         patom.writeChar(id,bcs.group);
708                 }else if (id == PCP_BCST_DEST)
709                 {
710                         atom.readBytes(destID.id,16);
711                         patom.writeBytes(id,destID.id,16);
712                         bcs.forMe = destID.isSame(servMgr->sessionID);
713
714                         char idstr1[64];
715                         char idstr2[64];
716
717                         destID.toStr(idstr1);
718                         servMgr->sessionID.toStr(idstr2);
719
720                 }else if (id == PCP_BCST_CHANID)
721                 {
722                         atom.readBytes(bcs.chanID.id,16);
723                         patom.writeBytes(id,bcs.chanID.id,16);
724                 }else if (id == PCP_BCST_VERSION)
725                 {
726                         ver = atom.readInt();
727                         patom.writeInt(id,ver);
728                 }else if (id == PCP_BCST_VERSION_VP)
729                 {
730                         ver_vp = atom.readInt();
731                         patom.writeInt(id,ver_vp);
732                 }else if (id == PCP_BCST_VERSION_EX_PREFIX)
733                 {
734                         atom.readBytes(ver_ex_prefix,2);
735                         patom.writeBytes(id,ver_ex_prefix,2);
736                 }else if (id == PCP_BCST_VERSION_EX_NUMBER)
737                 {
738                         ver_ex_number = atom.readShort();
739                         patom.writeShort(id,ver_ex_number);
740                 }else if (id == PCP_HOST)
741                 {
742                         ChanHit hit;
743                         readHostAtoms(atom,c,bcs,hit,false);
744                         Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
745                         if (hit.uphost.ip == 0){
746 //                              LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
747                                 if (bcs.numHops == 1){
748                                         hit.uphost.ip = servMgr->serverHost.ip;
749                                         hit.uphost.port = servMgr->serverHost.port;
750                                         hit.uphostHops = 1;
751                                 } else {
752                                         //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
753                                         if (sv){
754                                                 hit.uphost.ip = sv->getHost().ip;
755                                                 hit.uphost.port = sv->waitPort;
756                                                 hit.uphostHops = bcs.numHops - 1;
757                                         }
758                                 }
759                         }
760                         if (sv &&
761                                 ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
762                                 && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
763                                 || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
764                                 || chanMgr->findParentHit(hit)))
765                         {
766                                 int oldPos = pmem.pos;
767                                 hit.writeAtoms(patom, hit.chanID);
768                                 pmem.pos = oldPos;
769                                 r = readAtom(patom,bcs);
770                         } else {
771                                 char tmp[80], tmp2[80], tmp3[80];
772                                 hit.uphost.toStr(tmp);
773                                 hit.host.toStr(tmp2);
774                                 sv->getHost().toStr(tmp3);
775                                 LOG_DEBUG("### Invalid bcst: hops=%d, l/r = %d/%d, ver=%d(VP%04d), ttl=%d",
776                                         bcs.numHops,hit.numListeners, hit.numRelays, ver,ver_vp,bcs.ttl);
777                                 LOG_DEBUG("### %s <- %s <- sv(%s)",
778                                         tmp2, tmp, tmp3);
779                                 bcs.ttl = 0;
780                         }
781                 } else {
782                         // copy and process atoms
783                         int oldPos = pmem.pos;
784                         patom.writeAtoms(id,atom.io,c,d);
785                         pmem.pos = oldPos;
786                         r = readAtom(patom,bcs);
787                 }
788         }
789
790         char fromStr[64];
791         fromStr[0] = 0;
792         if (fromID.isSet())
793                 fromID.toStr(fromStr);
794         char destStr[64];
795         destStr[0] = 0;
796         if (destID.isSet())
797                 destID.toStr(destStr);
798         char tmp[64];
799         bcs.chanID.toStr(tmp);
800
801 //      LOG_DEBUG(tmp);
802
803         if (ver_ex_number){
804                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
805                         bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,bcs.ttl);
806         } else if (ver_vp){
807                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(VP%04d), from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,ver_vp,fromStr,destStr,bcs.ttl);
808         } else {
809                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,fromStr,destStr,bcs.ttl);
810         }
811
812         if (fromID.isSet())
813                 if (fromID.isSame(servMgr->sessionID))
814                 {
815                         LOG_ERROR("BCST loopback"); 
816                         return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
817                 }
818
819                 // broadcast back out if ttl > 0 
820                 if ((bcs.ttl>0) && (!bcs.forMe))
821                 {
822                         pack.len = pmem.pos;
823                         pack.type = ChanPacket::T_PCP;
824
825                         if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
826                         {
827                                 pack.priority = 11 - bcs.numHops;
828                                 chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
829                         }
830
831                         if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
832                         {
833                                 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
834                         }
835
836                         if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
837                         {
838                                 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
839                         }
840
841                         if (bcs.group & (PCP_BCST_GROUP_RELAYS))
842                         {
843                                 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
844                         }
845
846
847                         //              LOG_DEBUG("ttl=%d",ttl);
848
849                 } else {
850                         //              LOG_DEBUG("ttl=%d",ttl);
851                 }
852         return r;
853 }
854
855
856 // ------------------------------------------
857 int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
858 {
859         int r=0;
860         ChanHit hit;
861         int rBan = 0;
862
863         if (id == PCP_CHAN)
864         {
865                 readChanAtoms(atom,numc,bcs);
866         }else if (id == PCP_ROOT)
867         {
868                 if (servMgr->isRoot)
869                         throw StreamException("Unauthorized root message");                             
870                 else
871                         readRootAtoms(atom,numc,bcs);
872
873         }else if (id == PCP_HOST)
874         {
875                 readHostAtoms(atom,numc,bcs,hit);
876                 Channel *ch = chanMgr->findChannelByID(hit.chanID);
877                 if (ch && (ch->isBroadcasting() || servMgr->vpDebug)){
878                         if (servMgr->autoPort0Kick && (hit.numHops == 1) && (hit.firewalled || (!hit.relay && !hit.numRelays))){
879                                 char tmp[32];
880                                 hit.host.IPtoStr(tmp);
881                                 LOG_DEBUG("host that can't relay is disconnect: %s", tmp);
882                                 rBan = PCP_ERROR_BANNED;
883                         }
884                         if (servMgr->allowOnlyVP && (hit.numHops == 1) && !hit.version_vp){
885                                 char tmp[32];
886                                 hit.host.IPtoStr(tmp);
887                                 LOG_DEBUG("host that is not VP is disconnect: %s", tmp);
888                                 rBan = PCP_ERROR_BANNED;
889                         }
890                 }
891
892         }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))           // PCP_MESG_ASCII to be depreciated
893         {
894                 String msg;
895                 atom.readString(msg.data,sizeof(msg.data),dlen);
896                 LOG_DEBUG("PCP got text: %s",msg.cstr());
897         }else if (id == PCP_BCST)
898         {
899                 r = readBroadcastAtoms(atom,numc,bcs);
900         }else if (id == PCP_HELO)
901         {
902                 atom.skip(numc,dlen);
903                 atom.writeParent(PCP_OLEH,1);
904                         atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
905         }else if (id == PCP_PUSH)
906         {
907
908                 readPushAtoms(atom,numc,bcs);
909         }else if (id == PCP_OK)
910         {
911                 atom.readInt();
912
913         }else if (id == PCP_QUIT)
914         {
915                 r = atom.readInt();
916                 if (!r)
917                         r = PCP_ERROR_QUIT;
918
919         }else if (id == PCP_ATOM)
920         {
921                 for(int i=0; i<numc; i++)
922                 {
923                         int nc,nd;
924                         ID4 aid = atom.read(nc,nd);
925                         int ar = procAtom(atom,aid,nc,nd,bcs);
926                         if (ar)
927                                 r = ar;
928                 }
929
930         }else
931         {
932                 LOG_CHANNEL("PCP skip: %s",id.getString().str());
933                 atom.skip(numc,dlen);
934         }
935
936         if (!r)
937                 r = rBan;
938
939         return r;
940
941 }
942
943 // ------------------------------------------
944 int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
945 {
946         int numc,dlen;
947         ID4 id = atom.read(numc,dlen);
948
949         return  procAtom(atom,id,numc,dlen,bcs);
950 }
951
952