OSDN Git Service

IM0034
[peercast-im/PeerCastIM.git] / c: / Git / PeerCast.root / PeerCast / core / common / pcp.cpp
1 // ------------------------------------------------
2 // File : pcp.cpp
3 // Date: 1-mar-2004
4 // Author: giles
5 //
6 // (c) 2002-4 peercast.org
7 // ------------------------------------------------
8 // This program is free software; you can redistribute it and/or modify
9 // it under the terms of the GNU General Public License as published by
10 // the Free Software Foundation; either version 2 of the License, or
11 // (at your option) any later version.
12
13 // This program is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16 // GNU General Public License for more details.
17 // ------------------------------------------------
18
19 #include "atom.h"
20 #include "pcp.h"
21 #include "peercast.h"
22 #include "version2.h"
23 #ifdef _DEBUG
24 #include "chkMemoryLeak.h"
25 #define DEBUG_NEW new(__FILE__, __LINE__)
26 #define new DEBUG_NEW
27 #endif
28 #include <ctime>
29
30 // ------------------------------------------
31 void PCPStream::init(GnuID &rid)
32 {
33         remoteID = rid;
34         routeList.clear();
35
36         lastPacketTime = 0;
37         nextRootPacket = 0;      // 0 seconds (never)
38
39         inData.init();
40         inData.accept = ChanPacket::T_PCP;
41
42         outData.init();
43         outData.accept = ChanPacket::T_PCP;
44 }
45 // ------------------------------------------
46 void PCPStream::readVersion(Stream &in)
47 {
48         int len = in.readInt();
49
50         if (len != 4)
51                 throw StreamException("Invalid PCP");
52
53         int ver = in.readInt();
54
55         LOG_DEBUG("PCP ver: %d",ver);
56 }
57 // ------------------------------------------
58 void PCPStream::readHeader(Stream &in,Channel *)
59 {
60 //      AtomStream atom(in);
61
62 //      if (in.readInt() != PCP_CONNECT)
63 //              throw StreamException("Not PCP");
64
65 //      readVersion(in);
66 }
67 // ------------------------------------------
68 bool PCPStream::sendPacket(ChanPacket &pack,GnuID &destID)
69 {
70         if (destID.isSet())
71                 if (!destID.isSame(remoteID))
72                         if (!routeList.contains(destID))
73                                 return false;
74
75         return outData.writePacket(pack);
76 }
77 // ------------------------------------------
78 void PCPStream::flush(Stream &in)
79 {
80         ChanPacket pack;
81         // send outward packets
82         while (outData.numPending())
83         {
84                 outData.readPacket(pack);
85                 pack.writeRaw(in);
86         }
87 }
88
89 // ------------------------------------------
90 unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
91 {
92         ChanPacket pack;
93         unsigned int len = 0, skip = 0;
94
95         while (outData.numPending())
96         {
97                 outData.readPacketPri(pack);
98
99                 if (size >= len + pack.len) {
100                         len += pack.len;
101                         pack.writeRaw(in);
102                 } else {
103                         skip++;
104                 }
105         }
106         if (skip > 0)
107                 LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
108
109         return len;
110 }
111
112 // ------------------------------------------
113 int PCPStream::readPacket(Stream &in,Channel *)
114 {
115         BroadcastState bcs;
116         bcs.ttl = 1;
117         return readPacket(in,bcs);
118 }
119 // ------------------------------------------
120 int PCPStream::readPacket(Stream &in,BroadcastState &bcs)
121 {
122         int error = PCP_ERROR_GENERAL;
123         try
124         {
125                 AtomStream atom(in);
126
127                 ChanPacket pack;
128                 MemoryStream mem(pack.data,sizeof(pack.data));
129                 AtomStream patom(mem);
130
131
132                 // send outward packets
133                 error = PCP_ERROR_WRITE;
134                 if (outData.numPending())
135                 {
136                         outData.readPacket(pack);
137                         pack.writeRaw(in);
138                 }
139                 error = PCP_ERROR_GENERAL;
140
141                 if (outData.willSkip())
142                 {
143                         error = PCP_ERROR_WRITE+PCP_ERROR_SKIP;
144                         throw StreamException("Send too slow");
145                 }
146
147
148                 error = PCP_ERROR_READ;
149                 // poll for new downward packet
150                 if (in.readReady())
151                 {
152                         int numc,numd;
153                         ID4 id;
154
155                         id = atom.read(numc,numd);
156
157                         mem.rewind();
158                         pack.len = patom.writeAtoms(id, in, numc, numd);
159                         pack.type = ChanPacket::T_PCP;
160
161                         //inData.writePacket(pack);
162                 //}
163                 error = PCP_ERROR_GENERAL;
164
165                 // process downward packets
166                 //if (inData.numPending())
167                 //{
168                         //inData.readPacket(pack);
169
170                         mem.rewind();
171
172                         //int numc,numd;
173                         id = patom.read(numc,numd);
174
175                         error = PCPStream::procAtom(patom,id,numc,numd,bcs);
176                         
177                         if (error)
178                         {
179                                 throw StreamException("PCP exception");
180                         }
181                 }
182
183                 error = 0;
184
185         }catch(StreamException &e)
186         {
187                 LOG_ERROR("PCP readPacket: %s (%d)",e.msg,error);
188         }
189
190         return error;
191 }
192
193 // ------------------------------------------
194 void PCPStream::readEnd(Stream &,Channel *)
195 {
196 }
197
198
199 // ------------------------------------------
200 void PCPStream::readPushAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
201 {
202         Host host;
203         GnuID   chanID;
204
205         chanID.clear();
206
207         for(int i=0; i<numc; i++)
208         {
209                 int c,d;
210                 ID4 id = atom.read(c,d);
211
212                 if (id == PCP_PUSH_IP)
213                         host.ip = atom.readInt();
214                 else if (id == PCP_PUSH_PORT)
215                         host.port = atom.readShort();
216                 else if (id == PCP_PUSH_CHANID)
217                         atom.readBytes(chanID.id,16);
218                 else
219                 {
220                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
221                         atom.skip(c,d);
222                 }
223         }
224
225
226         if (bcs.forMe)
227         {
228                 char ipstr[64];
229                 host.toStr(ipstr);
230
231                 Servent *s = NULL;
232
233                 if (chanID.isSet())
234                 {
235                         Channel *ch = chanMgr->findChannelByID(chanID);
236                         if (ch)
237                                 if (ch->isBroadcasting() || !ch->isFull() && !servMgr->relaysFull() && ch->info.id.isSame(chanID))
238                                         s = servMgr->allocServent();
239                 }else{
240                         s = servMgr->allocServent();
241                 }
242
243                 if (s)
244                 {
245                         LOG_DEBUG("GIVing to %s",ipstr);
246                         s->initGIV(host,chanID);
247                 }
248         }
249
250 }
251 // ------------------------------------------
252 void PCPStream::readRootAtoms(AtomStream &atom, int numc,BroadcastState &bcs)
253 {
254         String url;
255
256         for(int i=0; i<numc; i++)
257         {
258                 int c,d;
259                 ID4 id = atom.read(c,d);
260
261                 if (id == PCP_ROOT_UPDINT)
262                 {
263                         int si = atom.readInt();
264
265                         chanMgr->setUpdateInterval(si);
266                         LOG_DEBUG("PCP got new host update interval: %ds",si);
267                 }else if (id == PCP_ROOT_URL)
268                 {
269                         url = "http://www.peercast.org/";
270                         String loc;
271                         atom.readString(loc.data,sizeof(loc.data),d);
272                         url.append(loc);
273
274                 }else if (id == PCP_ROOT_CHECKVER)
275                 {
276                         unsigned int newVer = atom.readInt();
277                         if (newVer > PCP_CLIENT_VERSION)
278                         {
279                                 strcpy(servMgr->downloadURL,url.cstr());
280                                 peercastApp->notifyMessage(ServMgr::NT_UPGRADE,"There is a new version available, please click here to upgrade your client.");
281                         }
282                         LOG_DEBUG("PCP got version check: %d / %d",newVer,PCP_CLIENT_VERSION);
283
284                 }else if (id == PCP_ROOT_NEXT)
285                 {
286                         unsigned int time = atom.readInt();
287
288                         if (time)
289                         {
290                                 unsigned int ctime = sys->getTime();
291                                 nextRootPacket = ctime+time;
292                                 LOG_DEBUG("PCP expecting next root packet in %ds",time);
293                         }else
294                         {
295                                 nextRootPacket = 0;
296                         }
297
298                 }else if (id == PCP_ROOT_UPDATE)
299                 {
300                         atom.skip(c,d);
301
302                         chanMgr->broadcastTrackerUpdate(remoteID,true);
303
304                 }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))                   // PCP_MESG_ASCII to be depreciated 
305                 {
306                         String newMsg;
307
308                         atom.readString(newMsg.data,sizeof(newMsg.data),d);
309                         if (!newMsg.isSame(servMgr->rootMsg.cstr()))
310                         {
311                                 servMgr->rootMsg = newMsg;
312                                 LOG_DEBUG("PCP got new root mesg: %s",servMgr->rootMsg.cstr());
313                                 peercastApp->notifyMessage(ServMgr::NT_PEERCAST,servMgr->rootMsg.cstr());
314                         }
315                 }else
316                 {
317                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
318                         atom.skip(c,d);
319                 }
320         }
321 }
322
323 // ------------------------------------------
324 void PCPStream::readPktAtoms(Channel *ch,AtomStream &atom,int numc,BroadcastState &bcs)
325 {
326         ChanPacket pack;
327         ID4 type;
328
329
330         for(int i=0; i<numc; i++)
331         {
332                 int c,d;
333                 ID4 id = atom.read(c,d);
334
335                 if (id == PCP_CHAN_PKT_TYPE)
336                 {
337                         type = atom.readID4();
338
339                         if (type == PCP_CHAN_PKT_HEAD)
340                                 pack.type = ChanPacket::T_HEAD;
341                         else if (type == PCP_CHAN_PKT_DATA)
342                                 pack.type = ChanPacket::T_DATA;
343                         else
344                                 pack.type = ChanPacket::T_UNKNOWN;
345
346                 }else if (id == PCP_CHAN_PKT_POS)
347                 {
348                         pack.pos = atom.readInt();
349
350
351                 }else if (id == PCP_CHAN_PKT_DATA)
352                 {
353                         pack.len = d;
354                         atom.readBytes(pack.data,pack.len);
355                 }
356                 else
357                 {
358                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
359                         atom.skip(c,d);
360                 }
361         }
362
363         if (ch)
364         {
365
366                 int diff = pack.pos - ch->streamPos;
367                 if (diff)
368                 {
369                         LOG_DEBUG("PCP skipping %s%8d (%10d -> %10d) count=%2d",(diff>0)?"+":"",diff,ch->streamPos,pack.pos, ch->skipCount);
370                         if (ch->lastSkipTime + 120 < sys->getTime()){
371                                 ch->skipCount = 0;
372                         }
373                         ch->lastSkipTime = sys->getTime();
374                         ch->skipCount++; //JP-EX
375                         pack.skip = true;
376                 }
377
378                 if (servMgr->autoBumpSkipCount) //JP-EX
379                 {
380                         if ((ch->skipCount > servMgr->autoBumpSkipCount) && !(servMgr->disableAutoBumpIfDirect && ch->sourceHost.tracker)) //JP-MOD
381                         {
382                                 LOG_DEBUG("Auto bump");
383                                 ch->bump = true;
384                         }
385                 }
386
387                 if (pack.type == ChanPacket::T_HEAD)
388                 {
389                         LOG_DEBUG("New head packet at %d",pack.pos);
390                         bool renewhead;
391                         if (servMgr->keepDownstreams)
392                                 renewhead = (memcmp(ch->headPack.data, pack.data, pack.len) != 0);
393                         else
394                                 renewhead = true;
395
396                         /*
397                         // check for stream restart
398                         if (pack.pos == 0)              
399                         {
400                                 LOG_CHANNEL("PCP resetting stream");
401                                 ch->streamIndex++;
402                                 ch->rawData.init();
403                         }
404                         */
405                         if (renewhead || ch->lastStopTime + 30 < sys->getTime()) {
406                                 // check for stream restart
407                                 if (pack.pos == 0)
408                                 {
409                                         LOG_CHANNEL("PCP resetting stream");
410                                         ch->streamIndex++;
411                                         ch->rawData.init();
412                                 }
413
414                                 ch->headPack = pack;
415
416                                 ch->rawData.writePacket(pack,true);
417                                 ch->streamPos = pack.pos+pack.len;
418                         }
419
420                 }else if (pack.type == ChanPacket::T_DATA)
421                 {
422                         ch->rawData.writePacket(pack,true);
423                         ch->streamPos = pack.pos+pack.len;
424                 }
425
426         }
427
428         // update this parent packet stream position
429         if ((pack.pos) && (!bcs.streamPos || (pack.pos < bcs.streamPos)))
430                 bcs.streamPos = pack.pos;
431
432 }
433 // -----------------------------------
434 void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, ChanHit &hit, bool flg)
435 {
436 //      ChanHit hit;
437         hit.init();
438         GnuID chanID = bcs.chanID;      //use default
439
440         bool busy=false;
441
442         unsigned int ipNum=0;
443
444         for(int i=0; i<numc; i++)
445         {
446                 int c,d;
447                 ID4 id = atom.read(c,d);
448
449                 if (id == PCP_HOST_IP)
450                 {
451                         unsigned int ip = atom.readInt();
452                         hit.rhost[ipNum].ip = ip;
453                 }else if (id == PCP_HOST_PORT)
454                 {
455                         int port = atom.readShort();
456                         hit.rhost[ipNum++].port = port;
457
458                         if (ipNum > 1)
459                                 ipNum = 1;
460                 }
461                 else if (id == PCP_HOST_NUML)
462                 {
463                         hit.numListeners = atom.readInt();
464                         if (hit.numListeners > 10)
465                                 hit.numListeners = 10;
466                 }
467                 else if (id == PCP_HOST_NUMR)
468                 {
469                         hit.numRelays = atom.readInt();
470                         if (hit.numRelays > 100)
471                                 hit.numRelays = 100;
472                 }
473                 else if (id == PCP_HOST_UPTIME)
474                         hit.upTime = atom.readInt();
475                 else if (id == PCP_HOST_OLDPOS)
476                         hit.oldestPos = atom.readInt();
477                 else if (id == PCP_HOST_NEWPOS)
478                         hit.newestPos = atom.readInt();
479                 else if (id == PCP_HOST_VERSION)
480                         hit.version = atom.readInt();
481                 else if (id == PCP_HOST_VERSION_VP)
482                         hit.version_vp = atom.readInt();
483                 else if (id == PCP_HOST_VERSION_EX_PREFIX)
484                         atom.readBytes(hit.version_ex_prefix,2);
485                 else if (id == PCP_HOST_VERSION_EX_NUMBER){
486                         hit.version_ex_number = atom.readShort();
487                 }
488                 else if (id == PCP_HOST_FLAGS1)
489                 {
490                         int fl1 = atom.readChar();
491
492                         hit.recv = (fl1 & PCP_HOST_FLAGS1_RECV) !=0;
493                         hit.relay = (fl1 & PCP_HOST_FLAGS1_RELAY) !=0;
494                         hit.direct = (fl1 & PCP_HOST_FLAGS1_DIRECT) !=0;
495                         hit.cin = (fl1 & PCP_HOST_FLAGS1_CIN) !=0;
496                         hit.tracker = (fl1 & PCP_HOST_FLAGS1_TRACKER) !=0;
497                         hit.firewalled = (fl1 & PCP_HOST_FLAGS1_PUSH) !=0;
498
499
500                 }else if (id == PCP_HOST_ID)
501                         atom.readBytes(hit.sessionID.id,16);
502                 else if (id == PCP_HOST_CHANID)
503                         atom.readBytes(chanID.id,16);
504                 else if (id == PCP_HOST_UPHOST_IP)
505                         hit.uphost.ip = atom.readInt();
506                 else if (id == PCP_HOST_UPHOST_PORT)
507                         hit.uphost.port = atom.readInt();
508                 else if (id == PCP_HOST_UPHOST_HOPS)
509                         hit.uphostHops = atom.readInt();
510                 else if (id == PCP_HOST_CLAP_PP){ //JP-MOD
511                         hit.clap_pp = atom.readInt();
512                         if (hit.clap_pp & 1){
513                                 Channel *c = chanMgr->findChannelByID(chanID);
514                                 if(c && c->isBroadcasting()){
515                                         String sjis;
516                                         sjis = c->info.name;
517                                         sjis.convertTo(String::T_SJIS);
518                                         peercastApp->notifyMessage(ServMgr::NT_APPLAUSE, sjis);
519                                 }
520                         }
521                 }else
522                 {
523                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
524                         atom.skip(c,d);
525                 }
526         }
527
528         hit.host = hit.rhost[0];
529         hit.chanID = chanID;
530
531         hit.numHops = bcs.numHops;
532
533         hit.servent_id = bcs.servent_id;
534
535         if (flg && (bcs.ttl != 0)){
536 //              LOG_DEBUG("readHostAtoms HITLISTLOCK ON-------------");
537                 chanMgr->hitlistlock.on();
538                 if (hit.recv)
539                         chanMgr->addHit(hit);
540                 else
541                         chanMgr->delHit(hit);
542 //              LOG_DEBUG("readHostAtoms HITLISTLOCK OFF-------------");
543                 chanMgr->hitlistlock.off();
544         }
545
546         if (hit.numHops == 1){
547                 Servent *sv = servMgr->findServentByServentID(hit.servent_id);
548                 if (sv && sv->getHost().ip == hit.host.ip){
549 //                      LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
550                         sv->waitPort = hit.host.port;
551                         //hit.lastSendSeq = sv->serventHit.lastSendSeq;
552                         sv->serventHit = hit;
553                 }
554         }
555 }
556
557 // ------------------------------------------
558 void PCPStream::readChanAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
559 {
560 /*      Channel *ch=NULL;
561         ChanHitList *chl=NULL;
562         ChanInfo newInfo;
563
564         ch = chanMgr->findChannelByID(bcs.chanID);
565         chl = chanMgr->findHitListByID(bcs.chanID);
566
567         if (ch)
568                 newInfo = ch->info;
569         else if (chl)
570                 newInfo = chl->info;*/
571
572         Channel *ch=NULL;
573         ChanHitList *chl=NULL;
574         ChanInfo newInfo, chaInfo;
575
576         ch = this->parent;
577         if (ch){
578                 newInfo = ch->info;
579                 chaInfo = ch->info;
580         }
581
582         for(int i=0; i<numc; i++)
583         {
584
585                 int c,d;
586                 ID4 id = atom.read(c,d);
587
588                 if ((id == PCP_CHAN_PKT) && (ch))
589                 {
590                         readPktAtoms(ch,atom,c,bcs);
591                 }else if (id == PCP_CHAN_INFO)
592                 {
593                         newInfo.readInfoAtoms(atom,c);
594
595                 }else if (id == PCP_CHAN_TRACK)
596                 {
597                         newInfo.readTrackAtoms(atom,c);
598
599                 }else if (id == PCP_CHAN_BCID)
600                 {
601                         atom.readBytes(newInfo.bcID.id,16);
602
603                 }else if (id == PCP_CHAN_KEY)                   // depreciated
604                 {
605                         atom.readBytes(newInfo.bcID.id,16);
606                         newInfo.bcID.id[0] = 0;                         // clear flags
607
608                 }else if (id == PCP_CHAN_ID)
609                 {
610                         atom.readBytes(newInfo.id.id,16);
611
612                         ch = chanMgr->findChannelByID(newInfo.id);
613                         chl = chanMgr->findHitListByID(newInfo.id);
614
615                 }else
616                 {
617                         LOG_DEBUG("PCP skip: %s,%d,%d",id.getString().str(),c,d);
618                         atom.skip(c,d);
619                 }
620         }
621
622         chl = chanMgr->findHitList(newInfo);
623
624         if (!chl)
625                 chl = chanMgr->addHitList(newInfo);
626
627         if (chl)
628         {
629                 chl->info.update(newInfo);
630         
631                 if (!servMgr->chanLog.isEmpty())
632                 {
633                         //if (chl->numListeners())
634                         {
635                                 try
636                                 {
637                                         FileStream file;
638                                         file.openWriteAppend(servMgr->chanLog.cstr());
639
640                                         XML::Node *rn = new XML::Node("update time=\"%d\"",sys->getTime());
641                                         XML::Node *n = chl->info.createChannelXML();
642                                         n->add(chl->createXML(false));
643                                         n->add(chl->info.createTrackXML());
644                                         rn->add(n);     
645         
646                                         rn->write(file,0);
647                                         delete rn;
648                                         file.close();
649                                 }catch(StreamException &e)
650                                 {
651                                         LOG_ERROR("Unable to update channel log: %s",e.msg);
652                                 }
653                         }
654                 }
655
656         }
657
658         if (ch && !ch->isBroadcasting())
659                 ch->updateInfo(newInfo);
660
661
662 }
663 // ------------------------------------------
664 int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
665 {
666         ChanPacket pack;
667         //int ttl=0;            
668         int ver=0;
669         int ver_vp=0;
670         GnuID fromID,destID;
671         int r=0;
672         char ver_ex_prefix[2];
673         int ver_ex_number = 0;
674
675         fromID.clear();
676         destID.clear();
677
678         bcs.initPacketSettings();
679
680         MemoryStream pmem(pack.data,sizeof(pack.data));
681         AtomStream patom(pmem);
682
683         patom.writeParent(PCP_BCST,numc);
684
685         for(int i=0; i<numc; i++)
686         {
687                 int c,d;
688                 ID4 id = atom.read(c,d);
689                 
690                 if (id == PCP_BCST_TTL)
691                 {
692                         bcs.ttl = atom.readChar()-1;
693                         patom.writeChar(id,bcs.ttl);
694                 }else if (id == PCP_BCST_HOPS)
695                 {
696                         bcs.numHops = atom.readChar()+1;
697                         patom.writeChar(id,bcs.numHops);
698
699                 }else if (id == PCP_BCST_FROM)
700                 {
701                         atom.readBytes(fromID.id,16);
702                         patom.writeBytes(id,fromID.id,16);
703
704                         routeList.add(fromID);
705                 }else if (id == PCP_BCST_GROUP)
706                 {
707                         bcs.group = atom.readChar();
708                         patom.writeChar(id,bcs.group);
709                 }else if (id == PCP_BCST_DEST)
710                 {
711                         atom.readBytes(destID.id,16);
712                         patom.writeBytes(id,destID.id,16);
713                         bcs.forMe = destID.isSame(servMgr->sessionID);
714
715                         char idstr1[64];
716                         char idstr2[64];
717
718                         destID.toStr(idstr1);
719                         servMgr->sessionID.toStr(idstr2);
720
721                 }else if (id == PCP_BCST_CHANID)
722                 {
723                         atom.readBytes(bcs.chanID.id,16);
724                         patom.writeBytes(id,bcs.chanID.id,16);
725                 }else if (id == PCP_BCST_VERSION)
726                 {
727                         ver = atom.readInt();
728                         patom.writeInt(id,ver);
729                 }else if (id == PCP_BCST_VERSION_VP)
730                 {
731                         ver_vp = atom.readInt();
732                         patom.writeInt(id,ver_vp);
733                 }else if (id == PCP_BCST_VERSION_EX_PREFIX)
734                 {
735                         atom.readBytes(ver_ex_prefix,2);
736                         patom.writeBytes(id,ver_ex_prefix,2);
737                 }else if (id == PCP_BCST_VERSION_EX_NUMBER)
738                 {
739                         ver_ex_number = atom.readShort();
740                         patom.writeShort(id,ver_ex_number);
741                 }else if (id == PCP_HOST)
742                 {
743                         ChanHit hit;
744                         readHostAtoms(atom,c,bcs,hit,false);
745                         Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
746                         if (hit.uphost.ip == 0){
747 //                              LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
748                                 if (bcs.numHops == 1){
749                                         hit.uphost.ip = servMgr->serverHost.ip;
750                                         hit.uphost.port = servMgr->serverHost.port;
751                                         hit.uphostHops = 1;
752                                 } else {
753                                         //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
754                                         if (sv){
755                                                 hit.uphost.ip = sv->getHost().ip;
756                                                 hit.uphost.port = sv->waitPort;
757                                                 hit.uphostHops = bcs.numHops - 1;
758                                         }
759                                 }
760                         }
761                         if (sv &&
762                                 ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
763                                 && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
764                                 || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
765                                 || (hit.numHops != 1 && chanMgr->findParentHit(hit))))
766                         {
767                                 int oldPos = pmem.pos;
768                                 hit.writeAtoms(patom, hit.chanID);
769                                 pmem.pos = oldPos;
770                                 r = readAtom(patom,bcs);
771                         } else {
772                                 char tmp[80], tmp2[80], tmp3[80];
773                                 memset(tmp, 0, 80);
774                                 memset(tmp2, 0, 80);
775                                 memset(tmp3, 0, 80);
776                                 hit.uphost.toStr(tmp);
777                                 hit.host.toStr(tmp2);
778                                 if (sv)
779                                         sv->getHost().toStr(tmp3);
780                                 LOG_DEBUG("### Invalid bcst: hops=%d, l/r = %d/%d, ver=%d(VP%04d), ttl=%d",
781                                         bcs.numHops,hit.numListeners, hit.numRelays, ver,ver_vp,bcs.ttl);
782                                 LOG_DEBUG("### %s <- %s <- sv(%s)",
783                                         tmp2, tmp, tmp3);
784                                 bcs.ttl = 0;
785                         }
786                 } else {
787                         // copy and process atoms
788                         int oldPos = pmem.pos;
789                         patom.writeAtoms(id,atom.io,c,d);
790                         pmem.pos = oldPos;
791                         r = readAtom(patom,bcs);
792                 }
793         }
794
795         char fromStr[64];
796         fromStr[0] = 0;
797         if (fromID.isSet())
798                 fromID.toStr(fromStr);
799         char destStr[64];
800         destStr[0] = 0;
801         if (destID.isSet())
802                 destID.toStr(destStr);
803         char tmp[64];
804         bcs.chanID.toStr(tmp);
805
806         // Broadcast flood
807         if (servMgr->lastPCPFromID.isSame(fromID)
808                 && time(NULL) - servMgr->lastPCPBcstTime < 3)
809         {
810                 memcpy(servMgr->lastPCPFromID.id, fromID.id, 16);
811                 servMgr->lastPCPBcstTime = time(NULL);
812                 LOG_DEBUG("PCP bcst reject: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
813                         bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,bcs.ttl);
814
815                 return r;
816         }
817         memcpy(servMgr->lastPCPFromID.id, fromID.id, 16);
818         servMgr->lastPCPBcstTime = time(NULL);
819
820         servMgr->lastPCPFromID.toStr(destStr);
821
822 //      LOG_DEBUG(tmp);
823
824         if (ver_ex_number){
825                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(%c%c%04d), from=%s, dest=%s ttl=%d",
826                         bcs.group,bcs.numHops,ver,ver_ex_prefix[0],ver_ex_prefix[1],ver_ex_number,fromStr,destStr,bcs.ttl);
827         } else if (ver_vp){
828                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d(VP%04d), from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,ver_vp,fromStr,destStr,bcs.ttl);
829         } else {
830                 LOG_DEBUG("PCP bcst: group=%d, hops=%d, ver=%d, from=%s, dest=%s ttl=%d",bcs.group,bcs.numHops,ver,fromStr,destStr,bcs.ttl);
831         }
832
833         if (fromID.isSet())
834                 if (fromID.isSame(servMgr->sessionID))
835                 {
836                         LOG_ERROR("BCST loopback"); 
837                         return PCP_ERROR_BCST+PCP_ERROR_LOOPBACK;
838                 }
839
840                 // broadcast back out if ttl > 0 
841                 if ((bcs.ttl>0) && (!bcs.forMe))
842                 {
843                         pack.len = pmem.pos;
844                         pack.type = ChanPacket::T_PCP;
845
846                         if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
847                         {
848                                 pack.priority = 11 - bcs.numHops;
849                                 chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
850                         }
851
852                         if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
853                         {
854                                 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_COUT);
855                         }
856
857                         if (bcs.group & (PCP_BCST_GROUP_RELAYS|PCP_BCST_GROUP_TRACKERS))
858                         {
859                                 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_CIN);
860                         }
861
862                         if (bcs.group & (PCP_BCST_GROUP_RELAYS))
863                         {
864                                 servMgr->broadcastPacket(pack,bcs.chanID,remoteID,destID,Servent::T_RELAY);
865                         }
866
867
868                         //              LOG_DEBUG("ttl=%d",ttl);
869
870                 } else {
871                         //              LOG_DEBUG("ttl=%d",ttl);
872                 }
873         return r;
874 }
875
876
877 // ------------------------------------------
878 int PCPStream::procAtom(AtomStream &atom,ID4 id,int numc, int dlen,BroadcastState &bcs)
879 {
880         int r=0;
881         ChanHit hit;
882         int rBan = 0;
883
884         if (id == PCP_CHAN)
885         {
886                 readChanAtoms(atom,numc,bcs);
887         }else if (id == PCP_ROOT)
888         {
889                 if (servMgr->isRoot)
890                         throw StreamException("Unauthorized root message");                             
891                 else
892                         readRootAtoms(atom,numc,bcs);
893
894         }else if (id == PCP_HOST)
895         {
896                 readHostAtoms(atom,numc,bcs,hit);
897                 Channel *ch = chanMgr->findChannelByID(hit.chanID);
898                 if (ch && (ch->isBroadcasting() || servMgr->vpDebug)){
899                         if (servMgr->autoPort0Kick && (hit.numHops == 1) && (hit.firewalled || (!hit.relay && !hit.numRelays))){
900                                 char tmp[32];
901                                 hit.host.IPtoStr(tmp);
902                                 LOG_DEBUG("host that can't relay is disconnect: %s", tmp);
903                                 rBan = PCP_ERROR_BANNED;
904                         }
905                         if (servMgr->allowOnlyVP && (hit.numHops == 1) && !hit.version_vp){
906                                 char tmp[32];
907                                 hit.host.IPtoStr(tmp);
908                                 LOG_DEBUG("host that is not VP is disconnect: %s", tmp);
909                                 rBan = PCP_ERROR_BANNED;
910                         }
911                 }
912
913         }else if ((id == PCP_MESG_ASCII) || (id == PCP_MESG))           // PCP_MESG_ASCII to be depreciated
914         {
915                 String msg;
916                 atom.readString(msg.data,sizeof(msg.data),dlen);
917                 LOG_DEBUG("PCP got text: %s",msg.cstr());
918         }else if (id == PCP_BCST)
919         {
920                 r = readBroadcastAtoms(atom,numc,bcs);
921         }else if (id == PCP_HELO)
922         {
923                 atom.skip(numc,dlen);
924                 atom.writeParent(PCP_OLEH,1);
925                         atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
926         }else if (id == PCP_PUSH)
927         {
928
929                 readPushAtoms(atom,numc,bcs);
930         }else if (id == PCP_OK)
931         {
932                 atom.readInt();
933
934         }else if (id == PCP_QUIT)
935         {
936                 r = atom.readInt();
937                 if (!r)
938                         r = PCP_ERROR_QUIT;
939
940         }else if (id == PCP_ATOM)
941         {
942                 for(int i=0; i<numc; i++)
943                 {
944                         int nc,nd;
945                         ID4 aid = atom.read(nc,nd);
946                         int ar = procAtom(atom,aid,nc,nd,bcs);
947                         if (ar)
948                                 r = ar;
949                 }
950
951         }else
952         {
953                 LOG_CHANNEL("PCP skip: %s",id.getString().str());
954                 atom.skip(numc,dlen);
955         }
956
957         if (!r)
958                 r = rBan;
959
960         return r;
961
962 }
963
964 // ------------------------------------------
965 int PCPStream::readAtom(AtomStream &atom,BroadcastState &bcs)
966 {
967         int numc,dlen;
968         ID4 id = atom.read(numc,dlen);
969
970         return  procAtom(atom,id,numc,dlen,bcs);
971 }
972
973