OSDN Git Service

VP27マージ
[peercast-im/PeerCastIM.git] / c: / Git / PeerCast.root / PeerCast / core / common / servent.cpp
1 // ------------------------------------------------
2 // File : servent.cpp
3 // Date: 4-apr-2002
4 // Author: giles
5 // Desc: 
6 //              Servents are the actual connections between clients. They do the handshaking,
7 //              transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 //              to it on connect, it uses this to transfer all of its data.
9 //
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
16
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
23
24 #include <stdlib.h>
25 #include "servent.h"
26 #include "sys.h"
27 #include "gnutella.h"
28 #include "xml.h"
29 #include "html.h"
30 #include "http.h"
31 #include "stats.h"
32 #include "servmgr.h"
33 #include "peercast.h"
34 #include "atom.h"
35 #include "pcp.h"
36 #include "version2.h"
37 #ifdef _DEBUG
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
40 #define new DEBUG_NEW
41 #endif
42
43
44 const int DIRECT_WRITE_TIMEOUT = 60;
45
46 // -----------------------------------
47 char *Servent::statusMsgs[]=
48 {
49         "NONE",
50                 "CONNECTING",
51         "PROTOCOL",
52         "HANDSHAKE",
53         "CONNECTED",
54         "CLOSING",
55                 "LISTENING",
56                 "TIMEOUT",
57                 "REFUSED",
58                 "VERIFIED",
59                 "ERROR",
60                 "WAIT",
61                 "FREE"
62 };
63
64 // -----------------------------------
65 char *Servent::typeMsgs[]=
66 {
67                 "NONE",
68         "INCOMING",
69         "SERVER",
70                 "RELAY",
71                 "DIRECT",
72                 "COUT",
73                 "CIN",
74                 "PGNU"
75 };
76 // -----------------------------------
77 bool    Servent::isPrivate() 
78 {
79         Host h = getHost();
80         return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
81 }
82 // -----------------------------------
83 bool    Servent::isAllowed(int a) 
84 {
85         Host h = getHost();
86
87         if (servMgr->isFiltered(ServFilter::F_BAN,h))
88                 return false;
89
90         return (allow&a)!=0;
91 }
92
93 // -----------------------------------
94 bool    Servent::isFiltered(int f) 
95 {
96         Host h = getHost();
97         return servMgr->isFiltered(f,h);
98 }
99
100 int servent_count = 1;
101 // -----------------------------------
102 Servent::Servent(int index)
103 :outPacketsPri(MAX_OUTPACKETS)
104 ,outPacketsNorm(MAX_OUTPACKETS)
105 ,seenIDs(MAX_HASH)
106 ,serventIndex(index)
107 ,sock(NULL)
108 ,next(NULL)
109 {
110         reset();
111         servent_id = servent_count++;
112         lastSkipTime = 0;
113         lastSkipCount = 0;
114         waitPort = 0;
115 }
116
117 // -----------------------------------
118 Servent::~Servent()
119 {       
120         
121 }
122 // -----------------------------------
123 void    Servent::kill() 
124 {
125         thread.active = false;
126                 
127         setStatus(S_CLOSING);
128
129         if (pcpStream)
130         {
131                 PCPStream *pcp = pcpStream;
132                 pcpStream = NULL;
133                 pcp->kill();
134                 delete pcp;
135         }
136
137         chanMgr->hitlistlock.on();
138         ChanHitList *chl = chanMgr->findHitListByID(chanID);
139         if (chl) {
140                 ChanHit *chh = chl->hit;
141                 ChanHit *prev = NULL;
142                 while (chh) {
143                         if (chh->servent_id == this->servent_id){
144                                 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
145                                         chh->numHops = 0;
146                                         chh->numListeners = 0;
147                                         chh->numRelays = 0;
148                                         prev = chh;
149                                         chh = chh->next;
150                                 } else {
151                                         ChanHit *next = chh->next;
152                                         if (prev)
153                                                 prev->next = next;
154                                         else
155                                                 chl->hit = next;
156
157                                         delete chh;
158                                         chh = next;
159                                 }
160                         } else {
161                                 prev = chh;
162                                 chh = chh->next;
163                         }
164                 }
165         }
166         chanMgr->hitlistlock.off();
167
168         if (sock)
169         {
170                 sock->close();
171                 delete sock;
172                 sock = NULL;
173         }
174
175         if (pushSock)
176         {
177                 pushSock->close();
178                 delete pushSock;
179                 pushSock = NULL;
180         }
181
182 //      thread.unlock();
183
184         if (type != T_SERVER)
185         {
186                 reset();
187                 setStatus(S_FREE);
188         }
189
190 }
191 // -----------------------------------
192 void    Servent::abort() 
193 {
194         thread.active = false;
195         if (sock)
196         {
197                 sock->close();
198         }
199
200 }
201
202 // -----------------------------------
203 void Servent::reset()
204 {
205
206         remoteID.clear();
207
208         servPort = 0;
209
210         pcpStream = NULL;
211
212         flowControl = false;
213         networkID.clear();
214
215         chanID.clear();
216
217         outputProtocol = ChanInfo::SP_UNKNOWN;
218
219         agent.clear();
220         sock = NULL;
221         allow = ALLOW_ALL;
222         syncPos = 0;
223         addMetadata = false;
224         nsSwitchNum = 0;
225         pack.func = 255;
226         lastConnect = lastPing = lastPacket = 0;
227         loginPassword.clear();
228         loginMount.clear();
229         bytesPerSecond = 0;
230         priorityConnect = false;
231         pushSock = NULL;
232         sendHeader = true;
233
234         outPacketsNorm.reset();
235         outPacketsPri.reset();
236
237         seenIDs.clear();
238
239         status = S_NONE;
240         type = T_NONE;
241
242         channel_id = 0;
243
244         serventHit.init();
245 }
246 // -----------------------------------
247 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
248 {
249
250         if  (      (type == t) 
251                         && (isConnected())
252                         && (!cid.isSet() || chanID.isSame(cid))
253                         && (!sid.isSet() || !sid.isSame(remoteID))
254                         && (pcpStream != NULL)
255                 )
256         {
257                 return pcpStream->sendPacket(pack,did);
258         }
259         return false;
260 }
261
262
263 // -----------------------------------
264 bool Servent::acceptGIV(ClientSocket *givSock)
265 {
266         if (!pushSock)
267         {
268                 pushSock = givSock;
269                 return true;
270         }else
271                 return false;
272 }
273
274 // -----------------------------------
275 Host Servent::getHost()
276 {
277         Host h(0,0);
278
279         if (sock)
280                 h = sock->host;
281
282         return h;
283 }
284
285 // -----------------------------------
286 bool Servent::outputPacket(GnuPacket &p, bool pri)
287 {
288         lock.on();
289
290         bool r=false;
291         if (pri)
292                 r = outPacketsPri.write(p);
293         else
294         {
295                 if (servMgr->useFlowControl)
296                 {
297                         int per = outPacketsNorm.percentFull();
298                         if (per > 50)
299                                 flowControl = true;
300                         else if (per < 25)
301                                 flowControl = false;
302                 }
303
304
305                 bool send=true;
306                 if (flowControl)
307                 {
308                         // if in flowcontrol, only allow packets with less of a hop count than already in queue
309                         if (p.hops >= outPacketsNorm.findMinHop())
310                                 send = false;
311                 }
312
313                 if (send)
314                         r = outPacketsNorm.write(p);
315         }
316
317         lock.off();
318         return r;
319 }
320
321 // -----------------------------------
322 bool Servent::initServer(Host &h)
323 {
324         try
325         {
326                 checkFree();
327
328                 status = S_WAIT;
329
330                 createSocket();
331
332                 sock->bind(h);
333
334                 thread.data = this;
335
336                 thread.func = serverProc;
337
338                 type = T_SERVER;
339
340                 if (!sys->startThread(&thread))
341                         throw StreamException("Can`t start thread");
342
343         }catch(StreamException &e)
344         {
345                 LOG_ERROR("Bad server: %s",e.msg);
346                 kill();
347                 return false;
348         }
349
350         return true;
351 }
352 // -----------------------------------
353 void Servent::checkFree()
354 {
355         if (sock)
356                 throw StreamException("Socket already set");
357         if (thread.active)
358                 throw StreamException("Thread already active");
359 }
360 // -----------------------------------
361 void Servent::initIncoming(ClientSocket *s, unsigned int a)
362 {
363
364         try{
365
366                 checkFree();
367
368                 type = T_INCOMING;
369                 sock = s;
370                 allow = a;
371                 thread.data = this;
372                 thread.func = incomingProc;
373                 thread.finish = false;
374
375                 setStatus(S_PROTOCOL);
376
377                 char ipStr[64];
378                 sock->host.toStr(ipStr);
379                 LOG_DEBUG("Incoming from %s",ipStr);
380
381
382                 if (!sys->startThread(&thread))
383                         throw StreamException("Can`t start thread");
384         }catch(StreamException &e)
385         {
386                 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
387                 //servMgr->shutdownTimer = 1;   
388                 kill();
389
390                 LOG_ERROR("INCOMING FAILED: %s",e.msg);
391
392         }
393 }
394
395 // -----------------------------------
396 void Servent::initOutgoing(TYPE ty)
397 {
398         try 
399         {
400                 checkFree();
401
402
403                 type = ty;
404
405                 thread.data = this;
406                 thread.func = outgoingProc;
407
408                 if (!sys->startThread(&thread))
409                         throw StreamException("Can`t start thread");
410
411         }catch(StreamException &e)
412         {
413                 LOG_ERROR("Unable to start outgoing: %s",e.msg);
414                 kill();
415         }
416 }
417
418 // -----------------------------------
419 void Servent::initPCP(Host &rh)
420 {
421         char ipStr[64];
422         rh.toStr(ipStr);
423         try 
424         {
425                 checkFree();
426
427             createSocket();
428
429                 type = T_COUT;
430
431                 sock->open(rh);
432
433                 if (!isAllowed(ALLOW_NETWORK))
434                         throw StreamException("Servent not allowed");
435
436                 thread.data = this;
437                 thread.func = outgoingProc;
438
439                 LOG_DEBUG("Outgoing to %s",ipStr);
440
441                 if (!sys->startThread(&thread))
442                         throw StreamException("Can`t start thread");
443
444         }catch(StreamException &e)
445         {
446                 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
447                 kill();
448         }
449 }
450
451 #if 0
452 // -----------------------------------
453 void    Servent::initChannelFetch(Host &host)
454 {
455         type = T_STREAM;
456
457         char ipStr[64];
458         host.toStr(ipStr);
459
460         checkFree();
461          
462         createSocket();
463                 
464         sock->open(host);
465
466                 
467         if (!isAllowed(ALLOW_DATA))     
468                 throw StreamException("Servent not allowed");
469                 
470         sock->connect();
471 }
472 #endif
473
474 // -----------------------------------
475 void Servent::initGIV(Host &h, GnuID &id)
476 {
477         char ipStr[64];
478         h.toStr(ipStr);
479         try 
480         {
481                 checkFree();
482
483                 givID = id;
484
485             createSocket();
486
487                 sock->open(h);
488
489                 if (!isAllowed(ALLOW_NETWORK))
490                         throw StreamException("Servent not allowed");
491                 
492                 sock->connect();
493
494                 thread.data = this;
495                 thread.func = givProc;
496
497                 type = T_RELAY;
498
499                 if (!sys->startThread(&thread))
500                         throw StreamException("Can`t start thread");
501
502         }catch(StreamException &e)
503         {
504                 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
505                 kill();
506         }
507 }
508 // -----------------------------------
509 void Servent::createSocket()
510 {
511         if (sock)
512                 LOG_ERROR("Servent::createSocket attempt made while active");
513
514         sock = sys->createSocket();
515 }
516 // -----------------------------------
517 void Servent::setStatus(STATUS s)
518 {
519         if (s != status)
520         {
521                 status = s;
522
523                 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
524                         lastConnect = sys->getTime();
525         }
526
527 }
528
529 // -----------------------------------
530 void Servent::handshakeOut()
531 {
532     sock->writeLine(GNU_PEERCONN);
533
534         char str[64];
535     
536         sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
537     sock->writeLineF("%s %d",PCX_HS_PCP,1);
538
539         if (priorityConnect)
540             sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
541         
542         if (networkID.isSet())
543         {
544                 networkID.toStr(str);
545                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
546         }
547
548         servMgr->sessionID.toStr(str);
549         sock->writeLineF("%s %s",PCX_HS_ID,str);
550
551         
552     sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
553         
554         sock->writeLine("");
555
556         HTTP http(*sock);
557
558         int r = http.readResponse();
559
560         if (r != 200)
561         {
562                 LOG_ERROR("Expected 200, got %d",r);
563                 throw StreamException("Unexpected HTTP response");
564         }
565
566
567         bool versionValid = false;
568
569         GnuID clientID;
570         clientID.clear();
571
572     while (http.nextHeader())
573     {
574                 LOG_DEBUG(http.cmdLine);
575
576                 char *arg = http.getArgStr();
577                 if (!arg)
578                         continue;
579
580                 if (http.isHeader(HTTP_HS_AGENT))
581                 {
582                         agent.set(arg);
583
584                         if (strnicmp(arg,"PeerCast/",9)==0)
585                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
586                 }else if (http.isHeader(PCX_HS_NETWORKID))
587                         clientID.fromStr(arg);
588     }
589
590         if (!clientID.isSame(networkID))
591                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
592
593         if (!versionValid)
594                 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
595
596
597     sock->writeLine(GNU_OK);
598     sock->writeLine("");
599
600 }
601
602
603 // -----------------------------------
604 void Servent::processOutChannel()
605 {
606 }
607
608
609 // -----------------------------------
610 void Servent::handshakeIn()
611 {
612
613         int osType=0;
614
615         HTTP http(*sock);
616
617
618         bool versionValid = false;
619         bool diffRootVer = false;
620
621         GnuID clientID;
622         clientID.clear();
623
624     while (http.nextHeader())
625     {
626                 LOG_DEBUG("%s",http.cmdLine);
627
628                 char *arg = http.getArgStr();
629                 if (!arg)
630                         continue;
631
632                 if (http.isHeader(HTTP_HS_AGENT))
633                 {
634                         agent.set(arg);
635
636                         if (strnicmp(arg,"PeerCast/",9)==0)
637                         {
638                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
639                                 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
640                         }
641                 }else if (http.isHeader(PCX_HS_NETWORKID))
642                 {
643                         clientID.fromStr(arg);
644
645                 }else if (http.isHeader(PCX_HS_PRIORITY))
646                 {
647                         priorityConnect = atoi(arg)!=0;
648
649                 }else if (http.isHeader(PCX_HS_ID))
650                 {
651                         GnuID id;
652                         id.fromStr(arg);
653                         if (id.isSame(servMgr->sessionID))
654                                 throw StreamException("Servent loopback");
655
656                 }else if (http.isHeader(PCX_HS_OS))
657                 {
658                         if (stricmp(arg,PCX_OS_LINUX)==0)
659                                 osType = 1;
660                         else if (stricmp(arg,PCX_OS_WIN32)==0)
661                                 osType = 2;
662                         else if (stricmp(arg,PCX_OS_MACOSX)==0)
663                                 osType = 3;
664                         else if (stricmp(arg,PCX_OS_WINAMP2)==0)
665                                 osType = 4;
666                 }
667
668     }
669
670         if (!clientID.isSame(networkID))
671                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
672
673         // if this is a priority connection and all incoming connections 
674         // are full then kill an old connection to make room. Otherwise reject connection.
675         //if (!priorityConnect)
676         {
677                 if (!isPrivate())
678                         if (servMgr->pubInFull())
679                                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
680         }
681
682         if (!versionValid)
683                 throw HTTPException(HTTP_SC_FORBIDDEN,403);
684
685     sock->writeLine(GNU_OK);
686
687     sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
688
689         if (networkID.isSet())
690         {
691                 char idStr[64];
692                 networkID.toStr(idStr);
693                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
694         }
695
696         if (servMgr->isRoot)
697         {
698                 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
699                 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
700                 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
701                 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
702                 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
703
704
705                 if (diffRootVer)
706                 {
707                         sock->writeString(PCX_HS_DL);
708                         sock->writeLine(PCX_DL_URL);
709                 }
710
711                 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
712         }
713
714
715         char hostIP[64];
716         Host h = sock->host;
717         h.IPtoStr(hostIP);
718     sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
719
720
721     sock->writeLine("");
722
723
724         while (http.nextHeader());
725 }
726
727 // -----------------------------------
728 bool    Servent::pingHost(Host &rhost,GnuID &rsid)
729 {
730         char ipstr[64];
731         rhost.toStr(ipstr);
732         LOG_DEBUG("Ping host %s: trying..",ipstr);
733         ClientSocket *s=NULL;
734         bool hostOK=false;
735         try
736         {
737                 s = sys->createSocket();
738                 if (!s)
739                         return false;
740                 else
741                 {
742
743                         s->setReadTimeout(15000);
744                         s->setWriteTimeout(15000);
745                         s->open(rhost);
746                         s->connect();
747
748                         AtomStream atom(*s);
749
750                         atom.writeInt(PCP_CONNECT,1);
751                         atom.writeParent(PCP_HELO,1);
752                                 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
753
754                         GnuID sid;
755                         sid.clear();
756
757                         int numc,numd;
758                         ID4 id = atom.read(numc,numd);
759                         if (id == PCP_OLEH)
760                         {
761                                 for(int i=0; i<numc; i++)
762                                 {
763                                         int c,d;
764                                         ID4 pid = atom.read(c,d);
765                                         if (pid == PCP_SESSIONID)
766                                                 atom.readBytes(sid.id,16,d);
767                                         else
768                                                 atom.skip(c,d);
769                                 }
770                         }else
771                         {
772                                 LOG_DEBUG("Ping response: %s",id.getString().str());
773                                 throw StreamException("Bad ping response");
774                         }
775
776                         if (!sid.isSame(rsid))
777                                 throw StreamException("SIDs don`t match");
778
779                         hostOK = true;
780                         LOG_DEBUG("Ping host %s: OK",ipstr);
781                         atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
782
783
784                 }
785         }catch(StreamException &e)
786         {
787                 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
788         }
789         if (s)
790         {
791                 s->close();
792                 delete s;
793         }
794
795         if (!hostOK)
796                 rhost.port = 0;
797
798         return true;
799 }
800
801 WLock canStreamLock;
802
803 // -----------------------------------
804 bool Servent::handshakeStream(ChanInfo &chanInfo)
805 {
806
807         HTTP http(*sock);
808
809
810         bool gotPCP=false;
811         unsigned int reqPos=0;
812         unsigned short listenPort = 0;
813
814         nsSwitchNum=0;
815
816         while (http.nextHeader())
817         {
818                 char *arg = http.getArgStr();
819                 if (!arg)
820                         continue;
821
822                 if (http.isHeader(PCX_HS_PCP))
823                         gotPCP = atoi(arg)!=0;
824                 else if (http.isHeader(PCX_HS_POS))
825                         reqPos = atoi(arg);
826                 else if (http.isHeader(PCX_HS_PORT))
827                         listenPort = (unsigned short)atoi(arg);
828                 else if (http.isHeader("icy-metadata"))
829                         addMetadata = atoi(arg) > 0;
830                 else if (http.isHeader(HTTP_HS_AGENT))
831                         agent = arg;
832                 else if (http.isHeader("Pragma"))
833                 {
834                         char *ssc = stristr(arg,"stream-switch-count=");
835                         char *so = stristr(arg,"stream-offset");
836
837                         if (ssc || so)
838                         {
839                                 nsSwitchNum=1;
840                                 //nsSwitchNum = atoi(ssc+20);
841                         }
842                 }
843
844                 LOG_DEBUG("Stream: %s",http.cmdLine);
845         }
846
847
848         if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
849                 outputProtocol = ChanInfo::SP_PEERCAST;
850
851         if (outputProtocol == ChanInfo::SP_HTTP)
852         {
853                 if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
854                           || (chanInfo.contentType == ChanInfo::T_WMA)
855                           || (chanInfo.contentType == ChanInfo::T_WMV)
856                           || (chanInfo.contentType == ChanInfo::T_ASX)
857                         )
858                 outputProtocol = ChanInfo::SP_MMS;
859         }
860
861
862         bool chanFound=false;
863         bool chanReady=false;
864
865         ChanHit *sourceHit = NULL;
866
867         Channel *ch = chanMgr->findChannelByID(chanInfo.id);
868         if (ch)
869         {
870                 sendHeader = true;
871                 if (reqPos || !isIndexTxt(&chanInfo))
872                 {
873                         streamPos = ch->rawData.findOldestPos(reqPos);
874                         //streamPos = ch->rawData.getLatestPos();
875                 }else
876                 {
877                         streamPos = ch->rawData.getLatestPos();
878                 }
879
880                 chanID = chanInfo.id;
881                 serventHit.rhost[0].ip = getHost().ip;
882                 serventHit.rhost[0].port = listenPort;
883                 serventHit.host = serventHit.rhost[0];
884                 serventHit.chanID = chanID;
885
886                 canStreamLock.on();
887                 chanReady = canStream(ch);
888                 if (/*0 && */!chanReady)
889                 {
890                         if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
891                         {
892                                 sourceHit = &ch->sourceHost;  // send source host info
893
894                                 if (listenPort && ch->info.getUptime() > 60)  // if stable
895                                 {
896                                         // connect "this" host later
897                                         chanMgr->addHit(serventHit);
898                                 }
899
900                                 char tmp[50];
901                                 getHost().toStr(tmp);
902                                 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
903                                 ch->bump = true;
904                         }
905                         else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
906                         {
907                                 chanReady = canStream(ch);
908                                 if (!chanReady)
909                                         LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
910                         }
911                 }
912                 if (!chanReady) type = T_INCOMING;
913                 thread.active = chanReady;
914                 setStatus(S_CONNECTED);
915                 canStreamLock.off();
916                 channel_id = ch->channel_id;
917
918                 //JP-Patch add-s
919                 if (servMgr->isCheckPushStream())
920                 {
921                         if (chanReady == true)
922                         {
923                                 Host h = getHost();
924
925                                 if (!h.isLocalhost()) 
926                                 {
927                                         do 
928                                         {
929                                                 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL) 
930                                                 {                                               
931                                                         char strip[256];
932                                                         h.toStr(strip);
933                                                         LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
934                                                         chanReady = false;
935                                                         break;
936                                                 }
937
938 /*                                              ChanHitList *hits[ChanMgr::MAX_HITLISTS];
939                                                 int numHits=0;
940                                                 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
941                                                 {
942                                                         ChanHitList *chl = &chanMgr->hitlists[i];
943                                                         if (chl->isUsed())
944                                                                 hits[numHits++] = chl;
945                                                 }
946                                                 bool isfw = false;
947                                                 int numRelay = 0;
948                                                 for(int i=0; i<numHits; i++)
949                                                 {
950                                                         ChanHitList *chl = hits[i];
951                                                         if (chl->isUsed())
952                                                         {
953                                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
954                                                                 {
955                                                                         ChanHit *hit = &chl->hits[j];
956                                                                         if (hit->host.isValid() && (h.ip == hit->host.ip)) 
957                                                                         {
958                                                                                 if (hit->firewalled)
959                                                                                         isfw = true;
960                                                                                 numRelay = hit->numRelays;
961                                                                         }
962                                                                 }
963                                                         }
964                                                 }
965                                                 if ((isfw == true) && (numRelay == 0))
966                                                 {
967                                                         char strip[256];
968                                                         h.toStr(strip);
969                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
970                                                         chanReady = false;
971                                                 }*/
972
973                                                 ChanHitList *chl = chanMgr->findHitList(chanInfo);
974                                                 ChanHit *hit = chl->hit;
975                                                 while(hit){
976                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
977                                                         {
978                                                                 if ((hit->firewalled) && (hit->numRelays == 0)){
979                                                                         char strip[256];
980                                                                         h.toStr(strip);
981                                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
982                                                                         chanReady = false;
983                                                                 }
984                                                         }
985                                                         hit = hit->next;
986                                                 }
987                                         }while (0);
988                                 }               
989                         }
990                 }
991                 //JP-Patch add-e
992         }
993
994 //      LockBlock lockblock(chanMgr->hitlistlock);
995
996 //      lockblock.lockon();
997         ChanHitList *chl = chanMgr->findHitList(chanInfo);
998
999         if (chl)
1000         {
1001                 chanFound = true;
1002         }
1003
1004
1005         bool result = false;
1006
1007         char idStr[64];
1008         chanInfo.id.toStr(idStr);
1009
1010         char sidStr[64];
1011         servMgr->sessionID.toStr(sidStr);
1012
1013         Host rhost = sock->host;
1014
1015
1016
1017
1018         AtomStream atom(*sock);
1019
1020
1021
1022         if (!chanFound)
1023         {
1024                 sock->writeLine(HTTP_SC_NOTFOUND);
1025             sock->writeLine("");
1026                 LOG_DEBUG("Sending channel not found");
1027                 return false;
1028         }
1029
1030
1031         if (!chanReady)
1032         {
1033                 if (outputProtocol == ChanInfo::SP_PCP)
1034                 {
1035
1036                         char tbuf[8196];
1037                         MemoryStream mem(tbuf, sizeof(tbuf));
1038                         mem.writeLine(HTTP_SC_UNAVAILABLE);
1039                         mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1040                         mem.writeLine("");
1041                         sock->write(tbuf, mem.getPosition());
1042
1043                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1044
1045                         char ripStr[64];
1046                         rhost.toStr(ripStr);
1047
1048                         LOG_DEBUG("Sending channel unavailable");
1049
1050                         ChanHitSearch chs;
1051
1052                         mem.rewind();
1053                         AtomStream atom2(mem);
1054
1055                         int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1056
1057                         chanMgr->hitlistlock.on();
1058
1059                         chl = chanMgr->findHitList(chanInfo);
1060
1061                         if (chl)
1062                         {
1063                                 ChanHit best;
1064                                 
1065                                 // search for up to 8 other hits
1066                                 int cnt=0;
1067                                 int i;
1068                                 for(i=0; i<8; i++)
1069                                 {
1070                                         best.init();
1071
1072
1073                                         // find best hit this network if local IP
1074                                         if (!rhost.globalIP())
1075                                         {
1076                                                 chs.init();
1077                                                 chs.matchHost = servMgr->serverHost;
1078                                                 chs.waitDelay = 2;
1079                                                 chs.excludeID = remoteID;
1080                                                 if (chl->pickHits(chs)){
1081                                                         best = chs.best[0];
1082                                                         LOG_DEBUG("find best hit this network if local IP");
1083                                                 }
1084                                         }
1085
1086                                         // find best hit on same network
1087                                         if (!best.host.ip)
1088                                         {
1089                                                 chs.init();
1090                                                 chs.matchHost = rhost;
1091                                                 chs.waitDelay = 2;
1092                                                 chs.excludeID = remoteID;
1093                                                 if (chl->pickHits(chs)){
1094                                                         best = chs.best[0];
1095                                                         LOG_DEBUG("find best hit on same network");
1096                                                 }
1097
1098                                         }
1099
1100                                         // find best hit on other networks
1101 /*                                      if (!best.host.ip)
1102                                         {
1103                                                 chs.init();
1104                                                 chs.waitDelay = 2;
1105                                                 chs.excludeID = remoteID;
1106                                                 if (chl->pickHits(chs)){
1107                                                         best = chs.best[0];
1108                                                         LOG_DEBUG("find best hit on other networks");
1109                                                 }
1110
1111                                         }*/
1112                                         
1113                                         if (!best.host.ip)
1114                                                 break;
1115
1116                                         best.writeAtoms(atom2,chanInfo.id);                        
1117                                         cnt++;
1118                                 }
1119
1120                                 if (sourceHit) {
1121                                         char tmp[50];
1122                                         sourceHit->writeAtoms(atom2, chanInfo.id);
1123                                         sourceHit->host.toStr(tmp);
1124                                         LOG_DEBUG("relay info(sourceHit): %s", tmp);
1125                                         best.host.ip = sourceHit->host.ip;
1126                                 }
1127
1128                                 if (!best.host.ip){
1129                                         char tmp[50];
1130 //                                      chanMgr->hitlistlock.on();
1131                                         int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1132 //                                      chanMgr->hitlistlock.off();
1133                                         for (int i = 0; i < cnt; i++){
1134                                                 chs.best[i].writeAtoms(atom2, chanInfo.id);
1135                                                 chs.best[i].host.toStr(tmp);
1136                                                 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1137                                                 best.host.ip = chs.best[i].host.ip;
1138                                         }
1139                                 }
1140
1141                                 if (cnt)
1142                                 {
1143                                         LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1144
1145                                 }
1146                                 else if (rhost.port)
1147                                 {
1148                                         // find firewalled host
1149                                         chs.init();
1150                                         chs.waitDelay = 30;
1151                                         chs.useFirewalled = true;
1152                                         chs.excludeID = remoteID;
1153                                         if (chl->pickHits(chs))
1154                                         {
1155                                                 best = chs.best[0];
1156                                                 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1157                                                 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1158                                         }
1159                                 } 
1160
1161                                 // if all else fails, use tracker
1162                                 if (!best.host.ip)
1163                                 {
1164                                         // find best tracker on this network if local IP
1165                                         if (!rhost.globalIP())
1166                                         {
1167                                                 chs.init();
1168                                                 chs.matchHost = servMgr->serverHost;
1169                                                 chs.trackersOnly = true;
1170                                                 chs.excludeID = remoteID;
1171                                                 if (chl->pickHits(chs))
1172                                                         best = chs.best[0];
1173
1174                                         }
1175
1176                                         // find local tracker
1177                                         if (!best.host.ip)
1178                                         {
1179                                                 chs.init();
1180                                                 chs.matchHost = rhost;
1181                                                 chs.trackersOnly = true;
1182                                                 chs.excludeID = remoteID;
1183                                                 if (chl->pickHits(chs))
1184                                                         best = chs.best[0];
1185                                         }
1186
1187                                         // find global tracker
1188                                         if (!best.host.ip)
1189                                         {
1190                                                 chs.init();
1191                                                 chs.trackersOnly = true;
1192                                                 chs.excludeID = remoteID;
1193                                                 if (chl->pickHits(chs))
1194                                                         best = chs.best[0];
1195                                         }
1196
1197                                         if (best.host.ip)
1198                                         {
1199                                                 best.writeAtoms(atom2,chanInfo.id);                             
1200                                                 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1201                                         }else if (rhost.port)
1202                                         {
1203                                                 // find firewalled tracker
1204                                                 chs.init();
1205                                                 chs.useFirewalled = true;
1206                                                 chs.trackersOnly = true;
1207                                                 chs.excludeID = remoteID;
1208                                                 chs.waitDelay = 30;
1209                                                 if (chl->pickHits(chs))
1210                                                 {
1211                                                         best = chs.best[0];
1212                                                         int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1213                                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1214                                                 }
1215                                         }
1216
1217                                 }
1218
1219
1220                         }
1221
1222                         chanMgr->hitlistlock.off();
1223
1224                         // return not available yet code
1225                         atom2.writeInt(PCP_QUIT,error);
1226                         sock->write(tbuf, mem.getPosition());
1227                         result = false;
1228
1229                         /*
1230                         char c[512];
1231                         // wait disconnect from other host
1232                         try{
1233                                 while(sock->read(c, sizeof(c))){
1234                                         sys->sleep(10);
1235                                 }
1236                         }catch(StreamException &e){
1237                                 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1238                         }
1239                         */
1240                 }else
1241                 {
1242                         LOG_DEBUG("Sending channel unavailable");
1243                         sock->writeLine(HTTP_SC_UNAVAILABLE);
1244                         sock->writeLine("");
1245                         result = false;
1246                 }
1247
1248         } else {
1249
1250                 if (chanInfo.contentType != ChanInfo::T_MP3)
1251                         addMetadata=false;
1252
1253                 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))               // winamp mp3 metadata check
1254                 {
1255
1256                         sock->writeLine(ICY_OK);
1257
1258                         sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1259                         sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1260                         sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1261                         sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1262                         sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1263                         sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1264                         sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1265
1266                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1267
1268                 }else
1269                 {
1270
1271                         sock->writeLine(HTTP_SC_OK);
1272
1273                         if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1274                         {
1275                                 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1276
1277                                 sock->writeLine("Accept-Ranges: none");
1278
1279                                 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1280                                 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1281                                 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1282                                 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1283                                 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1284                                 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1285                         }
1286
1287
1288                         if (outputProtocol == ChanInfo::SP_HTTP)
1289                         {
1290                                 switch (chanInfo.contentType)
1291                                 {
1292                                         case ChanInfo::T_OGG:
1293                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1294                                                 break;
1295                                         case ChanInfo::T_MP3:
1296                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1297                                                 break;
1298                                         case ChanInfo::T_MOV:
1299                                                 sock->writeLine("Connection: close");
1300                                                 sock->writeLine("Content-Length: 10000000");
1301                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1302                                                 break;
1303                                         case ChanInfo::T_MPG:
1304                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1305                                                 break;
1306                                         case ChanInfo::T_NSV:
1307                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1308                                                 break;
1309                                         case ChanInfo::T_ASX:
1310                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1311                                                 break;
1312                                         case ChanInfo::T_WMA:
1313                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1314                                                 break;
1315                                         case ChanInfo::T_WMV:
1316                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1317                                                 break;
1318                                 }
1319                         } else if (outputProtocol == ChanInfo::SP_MMS)
1320                         {
1321                                 sock->writeLine("Server: Rex/9.0.0.2980");
1322                                 sock->writeLine("Cache-Control: no-cache");
1323                                 sock->writeLine("Pragma: no-cache");
1324                                 sock->writeLine("Pragma: client-id=3587303426");
1325                                 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1326
1327                                 if (nsSwitchNum)
1328                                 {
1329                                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1330                                 }else
1331                                 {
1332                                         sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1333                                         if (ch)
1334                                                 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1335                                         sock->writeLine("Connection: Keep-Alive");
1336                                 }
1337                         
1338                         } else if (outputProtocol == ChanInfo::SP_PCP)
1339                         {
1340                                 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1341                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1342
1343                         }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1344                         {
1345                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1346                         }
1347                 }
1348                 sock->writeLine("");
1349                 result = true;
1350
1351                 if (gotPCP)
1352                 {
1353                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1354                         atom.writeInt(PCP_OK,0);
1355                 }
1356
1357         }
1358
1359
1360
1361         return result;
1362 }
1363
1364 // -----------------------------------
1365 void Servent::handshakeGiv(GnuID &id)
1366 {
1367         if (id.isSet())
1368         {
1369                 char idstr[64];
1370                 id.toStr(idstr);
1371                 sock->writeLineF("GIV /%s",idstr);
1372         }else
1373                 sock->writeLine("GIV");
1374
1375         sock->writeLine("");
1376 }
1377
1378
1379 // -----------------------------------
1380 void Servent::processGnutella()
1381 {
1382         type = T_PGNU;
1383
1384         //if (servMgr->isRoot && !servMgr->needConnections())
1385         if (servMgr->isRoot)
1386         {
1387                 processRoot();
1388                 return;
1389         }
1390
1391
1392
1393         gnuStream.init(sock);
1394         setStatus(S_CONNECTED);
1395
1396         if (!servMgr->isRoot)
1397         {
1398                 chanMgr->broadcastRelays(this, 1, 1);
1399                 GnuPacket *p;
1400
1401                 if ((p=outPacketsNorm.curr()))  
1402                         gnuStream.sendPacket(*p);
1403                 return;
1404         }
1405
1406         gnuStream.ping(2);
1407
1408 //      if (type != T_LOOKUP)
1409 //              chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1410
1411         lastPacket = lastPing = sys->getTime();
1412         bool doneBigPing=false;
1413
1414         const unsigned int      abortTimeoutSecs = 60;          // abort connection after 60 secs of no activitiy
1415         const unsigned int      packetTimeoutSecs = 30;         // ping connection after 30 secs of no activity
1416
1417         unsigned int currBytes=0;
1418         unsigned int lastWait=0;
1419
1420         unsigned int lastTotalIn=0,lastTotalOut=0;
1421
1422         while (thread.active && sock->active())
1423         {
1424
1425                 if (sock->readReady())
1426                 {
1427                         lastPacket = sys->getTime();
1428
1429                         if (gnuStream.readPacket(pack))
1430                         {
1431                                 char ipstr[64];
1432                                 sock->host.toStr(ipstr);
1433
1434                                 GnuID routeID;
1435                                 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1436
1437                                 if (pack.func != GNU_FUNC_PONG)
1438                                         if (servMgr->seenPacket(pack))
1439                                                 ret = GnuStream::R_DUPLICATE;
1440
1441                                 seenIDs.add(pack.id);
1442
1443
1444                                 if (ret == GnuStream::R_PROCESS)
1445                                 {
1446                                         GnuID routeID;
1447                                         ret = gnuStream.processPacket(pack,this,routeID);
1448
1449                                         if (flowControl && (ret == GnuStream::R_BROADCAST))
1450                                                 ret = GnuStream::R_DROP;
1451
1452                                 }
1453
1454                                 switch(ret)
1455                                 {
1456                                         case GnuStream::R_BROADCAST:
1457                                                 if (servMgr->broadcast(pack,this))
1458                                                         stats.add(Stats::NUMBROADCASTED);
1459                                                 else
1460                                                         stats.add(Stats::NUMDROPPED);
1461                                                 break;
1462                                         case GnuStream::R_ROUTE:
1463                                                 if (servMgr->route(pack,routeID,NULL))
1464                                                         stats.add(Stats::NUMROUTED);
1465                                                 else
1466                                                         stats.add(Stats::NUMDROPPED);
1467                                                 break;
1468                                         case GnuStream::R_ACCEPTED:
1469                                                 stats.add(Stats::NUMACCEPTED);
1470                                                 break;
1471                                         case GnuStream::R_DUPLICATE:
1472                                                 stats.add(Stats::NUMDUP);
1473                                                 break;
1474                                         case GnuStream::R_DEAD:
1475                                                 stats.add(Stats::NUMDEAD);
1476                                                 break;
1477                                         case GnuStream::R_DISCARD:
1478                                                 stats.add(Stats::NUMDISCARDED);
1479                                                 break;
1480                                         case GnuStream::R_BADVERSION:
1481                                                 stats.add(Stats::NUMOLD);
1482                                                 break;
1483                                         case GnuStream::R_DROP:
1484                                                 stats.add(Stats::NUMDROPPED);
1485                                                 break;
1486                                 }
1487
1488
1489                                 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1490
1491
1492
1493                         }else{
1494                                 LOG_ERROR("Bad packet");
1495                         }
1496                 }
1497
1498
1499                 GnuPacket *p;
1500
1501                 if ((p=outPacketsPri.curr()))                           // priority packet
1502                 {
1503                         gnuStream.sendPacket(*p);
1504                         seenIDs.add(p->id);
1505                         outPacketsPri.next();
1506                 } else if ((p=outPacketsNorm.curr()))           // or.. normal packet
1507                 {
1508                         gnuStream.sendPacket(*p);
1509                         seenIDs.add(p->id);
1510                         outPacketsNorm.next();
1511                 }
1512
1513                 int lpt =  sys->getTime()-lastPacket;
1514
1515                 if (!doneBigPing)
1516                 {
1517                         if ((sys->getTime()-lastPing) > 15)
1518                         {
1519                                 gnuStream.ping(7);
1520                                 lastPing = sys->getTime();
1521                                 doneBigPing = true;
1522                         }
1523                 }else{
1524                         if (lpt > packetTimeoutSecs)
1525                         {
1526                                 
1527                                 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1528                                 {
1529                                         gnuStream.ping(1);
1530                                         lastPing = sys->getTime();
1531                                 }
1532
1533                         }
1534                 }
1535                 if (lpt > abortTimeoutSecs)
1536                         throw TimeoutException();
1537
1538
1539                 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1540                 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1541
1542                 unsigned int bytes = totIn+totOut;
1543
1544                 lastTotalIn = sock->totalBytesIn;
1545                 lastTotalOut = sock->totalBytesOut;
1546
1547                 const int serventBandwidth = 1000;
1548
1549                 int delay = sys->idleSleepTime;
1550                 if ((bytes) && (serventBandwidth >= 8))
1551                         delay = (bytes*1000)/(serventBandwidth/8);      // set delay relative packetsize
1552
1553                 if (delay < (int)sys->idleSleepTime)
1554                         delay = sys->idleSleepTime;
1555                 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1556
1557                 sys->sleep(delay);
1558         }
1559
1560 }
1561
1562
1563 // -----------------------------------
1564 void Servent::processRoot()
1565 {
1566         try 
1567         {
1568         
1569                 gnuStream.init(sock);
1570                 setStatus(S_CONNECTED);
1571
1572                 gnuStream.ping(2);
1573
1574                 unsigned int lastConnect = sys->getTime();
1575
1576                 while (thread.active && sock->active())
1577                 {
1578                         if (gnuStream.readPacket(pack))
1579                         {
1580                                 char ipstr[64];
1581                                 sock->host.toStr(ipstr);
1582                                 
1583                                 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1584
1585
1586                                 if (pack.func == GNU_FUNC_PING)         // if ping then pong back some hosts and close
1587                                 {
1588                                         
1589                                         Host hl[32];
1590                                         int cnt = servMgr->getNewestServents(hl,32,sock->host); 
1591                                         if (cnt)
1592                                         {
1593                                                 int start = sys->rnd() % cnt;
1594                                                 int max = cnt>8?8:cnt;
1595
1596                                                 for(int i=0; i<max; i++)
1597                                                 {
1598                                                         GnuPacket pong;
1599                                                         pack.hops = 1;
1600                                                         pong.initPong(hl[start],false,pack);
1601                                                         gnuStream.sendPacket(pong);
1602
1603                                                         char ipstr[64];
1604                                                         hl[start].toStr(ipstr);
1605
1606                                                         //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1607                                                         start = (start+1) % cnt;
1608                                                 }
1609                                                 char str[64];
1610                                                 sock->host.toStr(str);
1611                                                 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1612                                         }else
1613                                         {
1614                                                 LOG_NETWORK("No Pongs to send");
1615                                                 //return;
1616                                         }
1617                                 }else if (pack.func == GNU_FUNC_PONG)           // pong?
1618                                 {
1619                                         MemoryStream pong(pack.data,pack.len);
1620
1621                                         int ip,port;
1622                                         port = pong.readShort();
1623                                         ip = pong.readLong();
1624                                         ip = SWAP4(ip);
1625
1626
1627                                         Host h(ip,port);
1628                                         if ((ip) && (port) && (h.globalIP()))
1629                                         {
1630
1631                                                 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1632                                                 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1633                                         }
1634                                         //return;
1635                                 } else if (pack.func == GNU_FUNC_HIT)
1636                                 {
1637                                         MemoryStream data(pack.data,pack.len);
1638                                         ChanHit hit;
1639                                         gnuStream.readHit(data,hit,pack.hops,pack.id);
1640                                 }
1641
1642                                 //if (gnuStream.packetsIn > 5)  // die if we get too many packets
1643                                 //      return;
1644                         }
1645
1646                         if((sys->getTime()-lastConnect > 60))
1647                                 break;
1648                 }
1649
1650
1651         }catch(StreamException &e)
1652         {
1653                 LOG_ERROR("Relay: %s",e.msg);
1654         }
1655
1656         
1657 }       
1658
1659 // -----------------------------------
1660 int Servent::givProc(ThreadInfo *thread)
1661 {
1662 //      thread->lock();
1663         Servent *sv = (Servent*)thread->data;
1664         try 
1665         {
1666                 sv->handshakeGiv(sv->givID);
1667                 sv->handshakeIncoming();
1668
1669         }catch(StreamException &e)
1670         {
1671                 LOG_ERROR("GIV: %s",e.msg);
1672         }
1673
1674         sv->kill();
1675         sys->endThread(thread);
1676         return 0;
1677 }
1678
1679 // -----------------------------------
1680 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1681 {
1682
1683         bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1684         bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1685
1686         bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1687
1688         char tbuf[1024];
1689         MemoryStream mem(tbuf, sizeof(tbuf));
1690         AtomStream atom2(mem);
1691         atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1692                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1693                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1694                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1695                 if (nonFW)
1696                         atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1697                 if (testFW)
1698                         atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1699                 if (sendBCID)
1700                         atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1701         atom.io.write(tbuf, mem.getPosition());
1702
1703
1704         LOG_DEBUG("PCP outgoing waiting for OLEH..");
1705
1706         int numc,numd;
1707         ID4 id = atom.read(numc,numd);
1708         if (id != PCP_OLEH)
1709         {
1710                 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1711                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1712                 throw StreamException("Got unexpected PCP response");
1713         }
1714
1715
1716
1717         char arg[64];
1718
1719         GnuID clientID;
1720         clientID.clear();
1721         rid.clear();
1722         int version=0;
1723         int disable=0;
1724
1725         Host thisHost;
1726
1727         // read OLEH response
1728         for(int i=0; i<numc; i++)
1729         {
1730                 int c,dlen;
1731                 ID4 id = atom.read(c,dlen);
1732
1733                 if (id == PCP_HELO_AGENT)
1734                 {
1735                         atom.readString(arg,sizeof(arg),dlen);
1736                         agent.set(arg);
1737
1738                 }else if (id == PCP_HELO_REMOTEIP)
1739                 {
1740                         thisHost.ip = atom.readInt();
1741
1742                 }else if (id == PCP_HELO_PORT)
1743                 {
1744                         thisHost.port = atom.readShort();
1745
1746                 }else if (id == PCP_HELO_VERSION)
1747                 {
1748                         version = atom.readInt();
1749
1750                 }else if (id == PCP_HELO_DISABLE)
1751                 {
1752                         disable = atom.readInt();
1753
1754                 }else if (id == PCP_HELO_SESSIONID)
1755                 {
1756                         atom.readBytes(rid.id,16);
1757                         if (rid.isSame(servMgr->sessionID))
1758                                 throw StreamException("Servent loopback");
1759
1760                 }else
1761                 {
1762                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1763                         atom.skip(c,dlen);
1764                 }
1765
1766     }
1767
1768
1769         // update server ip/firewall status
1770         if (isTrusted)
1771         {
1772                 if (thisHost.isValid())
1773                 {
1774                         if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1775                         {
1776                                 char ipstr[64];
1777                                 thisHost.toStr(ipstr);
1778                                 LOG_DEBUG("Got new ip: %s",ipstr);
1779                                 servMgr->serverHost.ip = thisHost.ip;
1780                         }
1781
1782                         if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1783                         {
1784                                 if (thisHost.port && thisHost.globalIP())
1785                                         servMgr->setFirewall(ServMgr::FW_OFF);
1786                                 else
1787                                         servMgr->setFirewall(ServMgr::FW_ON);
1788                         }
1789                 }
1790
1791                 if (disable == 1)
1792                 {
1793                         LOG_ERROR("client disabled: %d",disable);
1794                         servMgr->isDisabled = true;             
1795                 }else
1796                 {
1797                         servMgr->isDisabled = false;            
1798                 }
1799         }
1800
1801
1802
1803         if (!rid.isSet())
1804         {
1805                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1806                 throw StreamException("Remote host not identified");
1807         }
1808
1809         LOG_DEBUG("PCP Outgoing handshake complete.");
1810
1811 }
1812
1813 // -----------------------------------
1814 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1815 {
1816         int numc,numd;
1817         ID4 id = atom.read(numc,numd);
1818
1819
1820         if (id != PCP_HELO)
1821         {
1822                 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1823                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1824                 throw StreamException("Got unexpected PCP response");
1825         }
1826
1827         char arg[64];
1828
1829         ID4 osType;
1830
1831         int version=0;
1832
1833         int pingPort=0;
1834
1835         GnuID bcID;
1836         GnuID clientID;
1837
1838         bcID.clear();
1839         clientID.clear();
1840
1841         rhost.port = 0;
1842
1843         for(int i=0; i<numc; i++)
1844         {
1845
1846                 int c,dlen;
1847                 ID4 id = atom.read(c,dlen);
1848
1849                 if (id == PCP_HELO_AGENT)
1850                 {
1851                         atom.readString(arg,sizeof(arg),dlen);
1852                         agent.set(arg);
1853
1854                 }else if (id == PCP_HELO_VERSION)
1855                 {
1856                         version = atom.readInt();
1857
1858                 }else if (id == PCP_HELO_SESSIONID)
1859                 {
1860                         atom.readBytes(rid.id,16);
1861                         if (rid.isSame(servMgr->sessionID))
1862                                 throw StreamException("Servent loopback");
1863
1864                 }else if (id == PCP_HELO_BCID)
1865                 {
1866                         atom.readBytes(bcID.id,16);
1867
1868                 }else if (id == PCP_HELO_OSTYPE)
1869                 {
1870                         osType = atom.readInt();
1871                 }else if (id == PCP_HELO_PORT)
1872                 {
1873                         rhost.port = atom.readShort();
1874                 }else if (id == PCP_HELO_PING)
1875                 {
1876                         pingPort = atom.readShort();
1877                 }else
1878                 {
1879                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1880                         atom.skip(c,dlen);
1881                 }
1882
1883     }
1884
1885         if (version)
1886                 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1887
1888
1889         if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1890                 rhost.ip = servMgr->serverHost.ip;
1891
1892         if (pingPort)
1893         {
1894                 char ripStr[64];
1895                 rhost.toStr(ripStr);
1896                 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1897                 rhost.port = pingPort;
1898                 if (!rhost.globalIP() || !pingHost(rhost,rid))
1899                         rhost.port = 0;
1900         }
1901
1902         if (servMgr->isRoot)
1903         {
1904                 if (bcID.isSet())
1905                 {
1906                         if (bcID.getFlags() & 1)        // private
1907                         {
1908                                 BCID *bcid = servMgr->findValidBCID(bcID);
1909                                 if (!bcid || (bcid && !bcid->valid))
1910                                 {
1911                                         atom.writeParent(PCP_OLEH,1);
1912                                         atom.writeInt(PCP_HELO_DISABLE,1);
1913                                         throw StreamException("Client is banned");
1914                                 }
1915                         }
1916                 }
1917         }
1918
1919
1920         char tbuf[1024];
1921         MemoryStream mem(tbuf, sizeof(tbuf));
1922         AtomStream atom2(mem);
1923         atom2.writeParent(PCP_OLEH,5);
1924                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1925                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1926                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1927                 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1928                 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1929
1930         if (version)
1931         {
1932                 if (version < PCP_CLIENT_MINVERSION)
1933                 {
1934                         atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1935                         atom.io.write(tbuf, mem.getPosition());
1936                         throw StreamException("Agent is not valid");
1937                 }
1938         }
1939
1940         if (!rid.isSet())
1941         {
1942                 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1943                 atom.io.write(tbuf, mem.getPosition());
1944                 throw StreamException("Remote host not identified");
1945         }
1946
1947
1948
1949         if (servMgr->isRoot)
1950         {
1951                 servMgr->writeRootAtoms(atom2,false);
1952         }
1953
1954         atom.io.write(tbuf, mem.getPosition());
1955
1956         LOG_DEBUG("PCP Incoming handshake complete.");
1957
1958 }
1959
1960 // -----------------------------------
1961 void Servent::processIncomingPCP(bool suggestOthers)
1962 {
1963         PCPStream::readVersion(*sock);
1964
1965
1966         AtomStream atom(*sock);
1967         Host rhost = sock->host;
1968
1969         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1970
1971
1972         bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1973                                                         || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1974         bool unavailable = servMgr->controlInFull();
1975         bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1976
1977         char rstr[64];
1978         rhost.toStr(rstr);
1979
1980         if (unavailable || alreadyConnected || offair)
1981         {
1982                 int error;
1983
1984                 if (alreadyConnected)
1985                         error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
1986                 else if (unavailable)
1987                         error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1988                 else if (offair)
1989                         error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
1990                 else 
1991                         error = PCP_ERROR_QUIT;
1992
1993
1994                 if (suggestOthers)
1995                 {
1996
1997                         ChanHit best;
1998                         ChanHitSearch chs;
1999
2000                         int cnt=0;
2001                         for(int i=0; i<8; i++)
2002                         {
2003                                 best.init();
2004
2005                                 // find best hit on this network                        
2006                                 if (!rhost.globalIP())
2007                                 {
2008                                         chs.init();
2009                                         chs.matchHost = servMgr->serverHost;
2010                                         chs.waitDelay = 2;
2011                                         chs.excludeID = remoteID;
2012                                         chs.trackersOnly = true;
2013                                         chs.useBusyControls = false;
2014                                         if (chanMgr->pickHits(chs))
2015                                                 best = chs.best[0];
2016                                 }
2017
2018                                 // find best hit on same network                        
2019                                 if (!best.host.ip)
2020                                 {
2021                                         chs.init();
2022                                         chs.matchHost = rhost;
2023                                         chs.waitDelay = 2;
2024                                         chs.excludeID = remoteID;
2025                                         chs.trackersOnly = true;
2026                                         chs.useBusyControls = false;
2027                                         if (chanMgr->pickHits(chs))
2028                                                 best = chs.best[0];
2029                                 }
2030
2031                                 // else find best hit on other networks
2032                                 if (!best.host.ip)
2033                                 {
2034                                         chs.init();
2035                                         chs.waitDelay = 2;
2036                                         chs.excludeID = remoteID;
2037                                         chs.trackersOnly = true;
2038                                         chs.useBusyControls = false;
2039                                         if (chanMgr->pickHits(chs))
2040                                                 best = chs.best[0];
2041                                 }
2042
2043                                 if (!best.host.ip)
2044                                         break;
2045                                 
2046                                 GnuID noID;
2047                                 noID.clear();
2048                                 best.writeAtoms(atom,noID);
2049                                 cnt++;
2050                         }
2051                         if (cnt)
2052                         {
2053                                 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2054                         }
2055                         else if (rhost.port)
2056                         {
2057                                 // send push request to best firewalled tracker on other network
2058                                 chs.init();
2059                                 chs.waitDelay = 30;
2060                                 chs.excludeID = remoteID;
2061                                 chs.trackersOnly = true;
2062                                 chs.useFirewalled = true;
2063                                 chs.useBusyControls = false;
2064                                 if (chanMgr->pickHits(chs))
2065                                 {
2066                                         best = chs.best[0];
2067                                         GnuID noID;
2068                                         noID.clear();
2069                                         int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2070                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2071                                 }
2072                         }else
2073                         {
2074                                 LOG_DEBUG("No available trackers");
2075                         }
2076                 }
2077
2078
2079                 LOG_ERROR("Sending QUIT to incoming: %d",error);
2080
2081                 atom.writeInt(PCP_QUIT,error);
2082                 return;         
2083         }
2084         
2085
2086         type = T_CIN;
2087         setStatus(S_CONNECTED);
2088
2089         atom.writeInt(PCP_OK,0);
2090
2091         // ask for update
2092         atom.writeParent(PCP_ROOT,1);
2093                 atom.writeParent(PCP_ROOT_UPDATE,0);
2094
2095         pcpStream = new PCPStream(remoteID);
2096
2097         int error = 0;
2098         BroadcastState bcs;
2099         while (!error && thread.active && !sock->eof())
2100         {
2101                 error = pcpStream->readPacket(*sock,bcs);
2102                 sys->sleepIdle();
2103
2104                 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2105                         error = PCP_ERROR_OFFAIR;
2106                 if (peercastInst->isQuitting)
2107                         error = PCP_ERROR_SHUTDOWN;
2108         }
2109
2110         pcpStream->flush(*sock);
2111
2112         error += PCP_ERROR_QUIT;
2113         atom.writeInt(PCP_QUIT,error);
2114
2115         LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2116
2117 }
2118
2119 // -----------------------------------
2120 int Servent::outgoingProc(ThreadInfo *thread)
2121 {
2122 //      thread->lock();
2123         LOG_DEBUG("COUT started");
2124
2125         Servent *sv = (Servent*)thread->data;
2126                 
2127         GnuID noID;
2128         noID.clear();
2129         sv->pcpStream = new PCPStream(noID);
2130
2131         while (sv->thread.active)
2132         {
2133                 sv->setStatus(S_WAIT);
2134
2135                 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2136                 {
2137                         ChanHit bestHit;
2138                         ChanHitSearch chs;
2139                         char ipStr[64];
2140
2141                         do
2142                         {
2143                                 bestHit.init();
2144
2145                                 if (servMgr->rootHost.isEmpty())
2146                                         break;
2147
2148                                 if (sv->pushSock)
2149                                 {
2150                                         sv->sock = sv->pushSock;
2151                                         sv->pushSock = NULL;
2152                                         bestHit.host = sv->sock->host;
2153                                         break;
2154                                 }
2155
2156                                 GnuID noID;
2157                                 noID.clear();
2158                                 ChanHitList *chl = chanMgr->findHitListByID(noID);
2159                                 if (chl)
2160                                 {
2161                                         // find local tracker
2162                                         chs.init();
2163                                         chs.matchHost = servMgr->serverHost;
2164                                         chs.waitDelay = MIN_TRACKER_RETRY;
2165                                         chs.excludeID = servMgr->sessionID;
2166                                         chs.trackersOnly = true;
2167                                         if (!chl->pickHits(chs))
2168                                         {
2169                                                 // else find global tracker
2170                                                 chs.init();
2171                                                 chs.waitDelay = MIN_TRACKER_RETRY;
2172                                                 chs.excludeID = servMgr->sessionID;
2173                                                 chs.trackersOnly = true;
2174                                                 chl->pickHits(chs);
2175                                         }
2176
2177                                         if (chs.numResults)
2178                                         {
2179                                                 bestHit = chs.best[0];
2180                                         }
2181                                 }
2182
2183
2184                                 unsigned int ctime = sys->getTime();
2185
2186                                 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2187                                 {
2188                                         bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2189                                         bestHit.yp = true;
2190                                         chanMgr->lastYPConnect = ctime;
2191                                 }
2192                                 sys->sleepIdle();
2193
2194                         }while (!bestHit.host.ip && (sv->thread.active));
2195
2196
2197                         if (!bestHit.host.ip)           // give up
2198                         {
2199                                 LOG_ERROR("COUT giving up");
2200                                 break;
2201                         }
2202
2203
2204                         bestHit.host.toStr(ipStr);
2205
2206                         int error=0;
2207                         try 
2208                         {
2209
2210                                 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2211
2212                                 if (!sv->sock)
2213                                 {
2214                                         sv->setStatus(S_CONNECTING);
2215                                         sv->sock = sys->createSocket();
2216                                         if (!sv->sock)
2217                                                 throw StreamException("Unable to create socket");
2218                                         sv->sock->open(bestHit.host);
2219                                         sv->sock->connect();
2220
2221                                 }
2222
2223                                 sv->sock->setReadTimeout(30000);
2224                                 AtomStream atom(*sv->sock);
2225
2226                                 sv->setStatus(S_HANDSHAKE);
2227
2228                                 Host rhost = sv->sock->host;
2229                                 atom.writeInt(PCP_CONNECT,1);
2230                                 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2231
2232                                 sv->setStatus(S_CONNECTED);
2233
2234                                 LOG_DEBUG("COUT to %s: OK",ipStr);
2235
2236                                 sv->pcpStream->init(sv->remoteID);
2237
2238                                 BroadcastState bcs;
2239                                 bcs.servent_id = sv->servent_id;
2240                                 error = 0;
2241                                 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2242                                 {
2243                                         error = sv->pcpStream->readPacket(*sv->sock,bcs);
2244
2245                                         sys->sleepIdle();
2246
2247                                         if (!chanMgr->isBroadcasting())
2248                                                 error = PCP_ERROR_OFFAIR;
2249                                         if (peercastInst->isQuitting)
2250                                                 error = PCP_ERROR_SHUTDOWN;
2251
2252                                         if (sv->pcpStream->nextRootPacket)
2253                                                 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2254                                                         error = PCP_ERROR_NOROOT;
2255                                 }
2256                                 sv->setStatus(S_CLOSING);
2257
2258                                 sv->pcpStream->flush(*sv->sock);
2259
2260                                 error += PCP_ERROR_QUIT;
2261                                 atom.writeInt(PCP_QUIT,error);
2262
2263                                 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2264
2265                         }catch(TimeoutException &e)
2266                         {
2267                                 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2268                                 sv->setStatus(S_TIMEOUT);
2269                         }catch(StreamException &e)
2270                         {
2271                                 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2272                                 sv->setStatus(S_ERROR);
2273                         }
2274
2275                         try
2276                         {
2277                                 if (sv->sock)
2278                                 {
2279                                         sv->sock->close();
2280                                         delete sv->sock;
2281                                         sv->sock = NULL;
2282                                 }
2283
2284                         }catch(StreamException &) {}
2285
2286                         // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2287                         if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2288                                 chanMgr->deadHit(bestHit);
2289
2290                 }
2291
2292                 sys->sleepIdle();
2293         }
2294
2295         sv->kill();
2296         sys->endThread(thread);
2297         LOG_DEBUG("COUT ended");
2298         return 0;
2299 }
2300 // -----------------------------------
2301 int Servent::incomingProc(ThreadInfo *thread)
2302 {
2303 //      thread->lock();
2304
2305         Servent *sv = (Servent*)thread->data;
2306         
2307         char ipStr[64];
2308         sv->sock->host.toStr(ipStr);
2309
2310         try 
2311         {
2312                 sv->handshakeIncoming();
2313         }catch(HTTPException &e)
2314         {
2315                 try
2316                 {
2317                         sv->sock->writeLine(e.msg);
2318                         if (e.code == 401)
2319                                 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2320                         sv->sock->writeLine("");
2321                 }catch(StreamException &){}
2322                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2323         }catch(StreamException &e)
2324         {
2325                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2326         }
2327
2328
2329         sv->kill();
2330         sys->endThread(thread);
2331         return 0;
2332 }
2333 // -----------------------------------
2334 void Servent::processServent()
2335 {
2336         setStatus(S_HANDSHAKE);
2337
2338         handshakeIn();
2339
2340         if (!sock)
2341                 throw StreamException("Servent has no socket");
2342
2343         processGnutella();
2344 }
2345
2346 // -----------------------------------
2347 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2348 {       
2349         if (!doneHandshake)
2350         {
2351                 setStatus(S_HANDSHAKE);
2352
2353                 if (!handshakeStream(chanInfo))
2354                         return;
2355         }
2356
2357         if (chanInfo.id.isSet())
2358         {
2359
2360                 chanID = chanInfo.id;
2361
2362                 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2363
2364                 if (!waitForChannelHeader(chanInfo))
2365                         throw StreamException("Channel not ready");
2366
2367                 servMgr->totalStreams++;
2368
2369                 Host host = sock->host;
2370                 host.port = 0;  // force to 0 so we ignore the incoming port
2371
2372                 Channel *ch = chanMgr->findChannelByID(chanID);
2373                 if (!ch)
2374                         throw StreamException("Channel not found");
2375
2376                 if (outputProtocol == ChanInfo::SP_HTTP)
2377                 {
2378                         if ((addMetadata) && (chanMgr->icyMetaInterval))
2379                                 sendRawMetaChannel(chanMgr->icyMetaInterval);
2380                         else 
2381                                 sendRawChannel(true,true);
2382
2383                 }else if (outputProtocol == ChanInfo::SP_MMS)
2384                 {
2385                         if (nsSwitchNum)
2386                         {
2387                                 sendRawChannel(true,true);
2388                         }else
2389                         {
2390                                 sendRawChannel(true,false);
2391                         }
2392
2393                 }else if (outputProtocol  == ChanInfo::SP_PCP)
2394                 {
2395                         sendPCPChannel();
2396
2397                 } else if (outputProtocol  == ChanInfo::SP_PEERCAST)
2398                 {
2399                         sendPeercastChannel();
2400                 }
2401         }
2402
2403         setStatus(S_CLOSING);
2404 }
2405
2406 // -----------------------------------------
2407 #if 0
2408 // debug
2409                 FileStream file;
2410                 file.openReadOnly("c://test.mp3");
2411
2412                 LOG_DEBUG("raw file read");
2413                 char buf[4000];
2414                 int cnt=0;
2415                 while (!file.eof())
2416                 {
2417                         LOG_DEBUG("send %d",cnt++);
2418                         file.read(buf,sizeof(buf));
2419                         sock->write(buf,sizeof(buf));
2420
2421                 }
2422                 file.close();
2423                 LOG_DEBUG("raw file sent");
2424
2425         return;
2426 // debug
2427 #endif
2428 // -----------------------------------
2429 bool Servent::waitForChannelHeader(ChanInfo &info)
2430 {
2431         for(int i=0; i<30*10; i++)
2432         {
2433                 Channel *ch = chanMgr->findChannelByID(info.id);
2434                 if (!ch)
2435                         return false;
2436
2437                 if (ch->isPlaying() && (ch->rawData.writePos>0))
2438                         return true;
2439
2440                 if (!thread.active || !sock->active())
2441                         break;
2442                 sys->sleep(100);
2443         }
2444         return false;
2445 }
2446 // -----------------------------------
2447 void Servent::sendRawChannel(bool sendHead, bool sendData)
2448 {
2449         try
2450         {
2451
2452                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2453
2454                 Channel *ch = chanMgr->findChannelByID(chanID);
2455                 if (!ch)
2456                         throw StreamException("Channel not found");
2457
2458                 setStatus(S_CONNECTED);
2459
2460                 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2461
2462                 if (sendHead)
2463                 {
2464                         ch->headPack.writeRaw(*sock);
2465                         streamPos = ch->headPack.pos + ch->headPack.len;
2466                         LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2467                 }
2468
2469                 if (sendData)
2470                 {
2471
2472                         unsigned int streamIndex = ch->streamIndex;
2473                         unsigned int connectTime = sys->getTime();
2474                         unsigned int lastWriteTime = connectTime;
2475
2476                         while ((thread.active) && sock->active())
2477                         {
2478                                 ch = chanMgr->findChannelByID(chanID);
2479
2480                                 if (ch)
2481                                 {
2482
2483                                         if (streamIndex != ch->streamIndex)
2484                                         {
2485                                                 streamIndex = ch->streamIndex;
2486                                                 streamPos = ch->headPack.pos;
2487                                                 LOG_DEBUG("sendRaw got new stream index");
2488                                         }
2489
2490                                         ChanPacket rawPack;
2491                                         if (ch->rawData.findPacket(streamPos,rawPack))
2492                                         {
2493                                                 if (syncPos != rawPack.sync)
2494                                                         LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2495                                                 syncPos = rawPack.sync+1;
2496
2497                                                 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2498                                                 {
2499                                                         rawPack.writeRaw(*sock);
2500                                                         lastWriteTime = sys->getTime();
2501                                                 }
2502
2503                                                 if (rawPack.pos < streamPos)
2504                                                         LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2505                                                 streamPos = rawPack.pos+rawPack.len;
2506                                         } else if (sock->readReady()) {
2507                                                 char c;
2508                                                 int error = sock->readUpto(&c, 1);
2509                                                 if (error == 0) sock->close();
2510                                         }
2511                                 }
2512
2513                                 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2514                                         throw TimeoutException();
2515
2516                                 sys->sleepIdle();
2517                         }
2518                 }
2519         }catch(StreamException &e)
2520         {
2521                 LOG_ERROR("Stream channel: %s",e.msg);
2522         }
2523 }
2524
2525 #if 0
2526 // -----------------------------------
2527 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2528 {
2529         try
2530         {
2531                 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2532                 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2533                 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2534                 int numChanIDs=0;
2535                 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2536                 {
2537                         Channel *ch = &chanMgr->channels[i];
2538                         if (ch->isPlaying())
2539                                 chanIDs[numChanIDs++]=ch->info.id;
2540                 }
2541
2542
2543
2544                 setStatus(S_CONNECTED);
2545
2546
2547                 if (sendHead)
2548                 {
2549                         for(int i=0; i<numChanIDs; i++)
2550                         {
2551                                 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2552                                 if (ch)
2553                                 {
2554                                         LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2555                                         ch->headPack.writeRaw(*sock);
2556                                         chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2557                                         chanStreamIndex[i] = ch->streamIndex;
2558                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2559
2560                                 }
2561                         }
2562                 }
2563
2564                 if (sendData)
2565                 {
2566
2567                         unsigned int connectTime=sys->getTime();
2568
2569                         while ((thread.active) && sock->active())
2570                         {
2571
2572                                 for(int i=1; i<numChanIDs; i++)
2573                                 {
2574                                         Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2575                                         if (ch)
2576                                         {
2577                                                 if (chanStreamIndex[i] != ch->streamIndex)
2578                                                 {
2579                                                         chanStreamIndex[i] = ch->streamIndex;
2580                                                         chanStreamPos[i] = ch->headPack.pos;
2581                                                         LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2582                                                 }
2583
2584                                                 ChanPacket rawPack;
2585                                                 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2586                                                 {
2587                                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2588                                                                 rawPack.writeRaw(*sock);
2589
2590
2591                                                         if (rawPack.pos < chanStreamPos[i])
2592                                                                 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2593                                                         chanStreamPos[i] = rawPack.pos+rawPack.len;
2594
2595
2596                                                         //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2597                                                 }                                               
2598                                         }
2599                                         break;
2600                                 }
2601                                 
2602
2603                                 sys->sleepIdle();
2604                         }
2605                 }
2606         }catch(StreamException &e)
2607         {
2608                 LOG_ERROR("Stream channel: %s",e.msg);
2609         }
2610 }
2611 #endif
2612
2613 // -----------------------------------
2614 void Servent::sendRawMetaChannel(int interval)
2615 {
2616
2617         try
2618         {
2619                 Channel *ch = chanMgr->findChannelByID(chanID);
2620                 if (!ch)
2621                         throw StreamException("Channel not found");
2622
2623                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2624
2625                 setStatus(S_CONNECTED);
2626
2627                 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2628
2629
2630                 String lastTitle,lastURL;
2631
2632                 int             lastMsgTime=sys->getTime();
2633                 bool    showMsg=true;
2634
2635                 char buf[16384];
2636                 int bufPos=0;
2637
2638                 if ((interval > sizeof(buf)) || (interval < 1))
2639                         throw StreamException("Bad ICY Meta Interval value");
2640
2641                 unsigned int connectTime = sys->getTime();
2642                 unsigned int lastWriteTime = connectTime;
2643
2644                 streamPos = 0;          // raw meta channel has no header (its MP3)
2645
2646                 while ((thread.active) && sock->active())
2647                 {
2648                         ch = chanMgr->findChannelByID(chanID);
2649
2650                         if (ch)
2651                         {
2652
2653                                 ChanPacket rawPack;
2654                                 if (ch->rawData.findPacket(streamPos,rawPack))
2655                                 {
2656
2657                                         if (syncPos != rawPack.sync)
2658                                                 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2659                                         syncPos = rawPack.sync+1;
2660
2661                                         MemoryStream mem(rawPack.data,rawPack.len);
2662
2663                                         if (rawPack.type == ChanPacket::T_DATA)
2664                                         {
2665
2666                                                 int len = rawPack.len;
2667                                                 char *p = rawPack.data;
2668                                                 while (len)
2669                                                 {
2670                                                         int rl = len;
2671                                                         if ((bufPos+rl) > interval)
2672                                                                 rl = interval-bufPos;
2673                                                         memcpy(&buf[bufPos],p,rl);
2674                                                         bufPos+=rl;
2675                                                         p+=rl;
2676                                                         len-=rl;
2677
2678                                                         if (bufPos >= interval)
2679                                                         {
2680                                                                 bufPos = 0;     
2681                                                                 sock->write(buf,interval);
2682                                                                 lastWriteTime = sys->getTime();
2683
2684                                                                 if (chanMgr->broadcastMsgInterval)
2685                                                                         if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2686                                                                         {
2687                                                                                 showMsg ^= true;
2688                                                                                 lastMsgTime = sys->getTime();
2689                                                                         }
2690
2691                                                                 String *metaTitle = &ch->info.track.title;
2692                                                                 if (!ch->info.comment.isEmpty() && (showMsg))
2693                                                                         metaTitle = &ch->info.comment;
2694
2695
2696                                                                 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2697                                                                 {
2698
2699                                                                         char tmp[1024];
2700                                                                         String title,url;
2701
2702                                                                         title = *metaTitle;
2703                                                                         url = ch->info.url;
2704
2705                                                                         title.convertTo(String::T_META);
2706                                                                         url.convertTo(String::T_META);
2707
2708                                                                         sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2709                                                                         int len = ((strlen(tmp) + 15+1) / 16);
2710                                                                         sock->writeChar(len);
2711                                                                         sock->write(tmp,len*16);
2712
2713                                                                         lastTitle = *metaTitle;
2714                                                                         lastURL = ch->info.url;
2715
2716                                                                         LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2717
2718                                                                 }else
2719                                                                 {
2720                                                                         sock->writeChar(0);                                     
2721                                                                 }
2722
2723                                                         }
2724                                                 }
2725                                         }
2726                                         streamPos = rawPack.pos + rawPack.len;
2727                                 }
2728                         }
2729                         if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2730                                 throw TimeoutException();
2731
2732                         sys->sleepIdle();
2733
2734                 }
2735         }catch(StreamException &e)
2736         {
2737                 LOG_ERROR("Stream channel: %s",e.msg);
2738         }
2739 }
2740 // -----------------------------------
2741 void Servent::sendPeercastChannel()
2742 {
2743         try
2744         {
2745                 setStatus(S_CONNECTED);
2746
2747                 Channel *ch = chanMgr->findChannelByID(chanID);
2748                 if (!ch)
2749                         throw StreamException("Channel not found");
2750
2751                 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2752
2753                 sock->writeTag("PCST");
2754
2755                 ChanPacket pack;
2756
2757                 ch->headPack.writePeercast(*sock);
2758
2759                 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2760                 pack.writePeercast(*sock);
2761         
2762                 streamPos = 0;
2763                 unsigned int syncPos=0;
2764                 while ((thread.active) && sock->active())
2765                 {
2766                         ch = chanMgr->findChannelByID(chanID);
2767                         if (ch)
2768                         {
2769
2770                                 ChanPacket rawPack;
2771                                 if (ch->rawData.findPacket(streamPos,rawPack))
2772                                 {
2773                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2774                                         {
2775                                                 sock->writeTag("SYNC");
2776                                                 sock->writeShort(4);
2777                                                 sock->writeShort(0);
2778                                                 sock->write(&syncPos,4);
2779                                                 syncPos++;
2780
2781                                                 rawPack.writePeercast(*sock);
2782                                         }
2783                                         streamPos = rawPack.pos + rawPack.len;
2784                                 }
2785                         }
2786                         sys->sleepIdle();
2787                 }
2788
2789         }catch(StreamException &e)
2790         {
2791                 LOG_ERROR("Stream channel: %s",e.msg);
2792         }
2793 }
2794
2795 //WLock canStreamLock;
2796
2797 // -----------------------------------
2798 void Servent::sendPCPChannel()
2799 {
2800         bool skipCheck = false;
2801         unsigned int ptime = 0;
2802         int npacket = 0, upsize = 0;
2803
2804         Channel *ch = chanMgr->findChannelByID(chanID);
2805         if (!ch)
2806                 throw StreamException("Channel not found");
2807
2808         AtomStream atom(*sock);
2809
2810         pcpStream = new PCPStream(remoteID);
2811         int error=0;
2812
2813         try
2814         {
2815
2816                 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2817
2818
2819 //              setStatus(S_CONNECTED);
2820
2821                 //canStreamLock.on();
2822                 //thread.active = canStream(ch);
2823                 //setStatus(S_CONNECTED);
2824                 //canStreamLock.off();
2825
2826                 lastSkipTime = 0;
2827                 lastSkipCount = 0;
2828                 waitPort = 0;
2829
2830                 if (thread.active){
2831                         atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2832                                 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2833                                 ch->info.writeInfoAtoms(atom);
2834                                 ch->info.writeTrackAtoms(atom);
2835                                 if (sendHeader)
2836                                 {
2837                                         atom.writeParent(PCP_CHAN_PKT,3);
2838                                         atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2839                                         atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2840                                         atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2841
2842                                         if (streamPos == 0)
2843                                                 streamPos = ch->headPack.pos+ch->headPack.len;
2844                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2845                                 }
2846                 }
2847
2848                 unsigned int streamIndex = ch->streamIndex;
2849
2850                 ChanPacket rawPack;
2851                 char pbuf[ChanPacket::MAX_DATALEN*3];
2852                 MemoryStream mems(pbuf,sizeof(pbuf));
2853                 AtomStream atom2(mems);
2854
2855                 while (thread.active)
2856                 {
2857
2858                         Channel *ch = chanMgr->findChannelByID(chanID);
2859
2860                         if (ch)
2861                         {
2862
2863                                 if (streamIndex != ch->streamIndex)
2864                                 {
2865                                         streamIndex = ch->streamIndex;
2866                                         streamPos = ch->headPack.pos;
2867                                         LOG_DEBUG("sendPCPStream got new stream index");                                                
2868                                 }
2869
2870                                 mems.rewind();
2871
2872                                 if (ch->rawData.findPacket(streamPos,rawPack))
2873                                 {
2874                                         if ((streamPos < rawPack.pos) && !rawPack.skip){
2875                                                 if (skipCheck){
2876                                                         char tmp[32];
2877                                                         getHost().IPtoStr(tmp);
2878                                                         LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2879
2880                                                         if (sys->getTime() == lastSkipTime) {
2881                                                                 LOG_DEBUG("##### skip all buffer");
2882                                                                 streamPos = ch->rawData.getLatestPos();
2883                                                                 continue;
2884                                                         }
2885
2886                                                         lastSkipTime = sys->getTime();
2887                                                         lastSkipCount++;
2888                                                 } else {
2889                                                         skipCheck = true;
2890                                                 }
2891                                         }
2892
2893                                         if (rawPack.type == ChanPacket::T_HEAD)
2894                                         {
2895                                                 atom2.writeParent(PCP_CHAN,2);
2896                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2897                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2898                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2899                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2900                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2901
2902                                                 sock->write(pbuf, mems.getPosition());
2903                                         }else if (rawPack.type == ChanPacket::T_DATA)
2904                                         {
2905                                                 atom2.writeParent(PCP_CHAN,2);
2906                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2907                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2908                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2909                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2910                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2911
2912 #ifdef WIN32
2913                                                 sock->bufferingWrite(pbuf, mems.getPosition());
2914                                                 lastSkipTime = sock->bufList.lastSkipTime;
2915                                                 lastSkipCount = sock->bufList.skipCount;
2916 #else
2917                                                 sock->write(pbuf, mems.getPosition());
2918 #endif
2919                                         }
2920
2921                                         if (rawPack.pos < streamPos)
2922                                                 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2923
2924                                         //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2925
2926                                         streamPos = rawPack.pos+rawPack.len;
2927                                 }
2928                         } else {
2929                                 throw StreamException("Channel not found");
2930                         }
2931
2932 #ifdef WIN32
2933                         sock->bufferingWrite(NULL, 0);
2934                         lastSkipTime = sock->bufList.lastSkipTime;
2935                         lastSkipCount = sock->bufList.skipCount;
2936 #endif
2937                         BroadcastState bcs;
2938                         bcs.servent_id = servent_id;
2939 //                      error = pcpStream->readPacket(*sock,bcs);
2940
2941                         unsigned int t = sys->getTime();
2942                         if (t != ptime) {
2943                                 ptime = t;
2944                                 npacket = MAX_PROC_PACKETS;
2945                                 upsize = MAX_OUTWARD_SIZE;
2946                         }
2947
2948                         int len = pcpStream->flushUb(*sock, upsize);
2949                         upsize -= len;
2950
2951                         while (npacket > 0 && sock->readReady()) {
2952                                 npacket--;
2953                                 error = pcpStream->readPacket(*sock,bcs);
2954                                 if (error)
2955                                         throw StreamException("PCP exception");
2956                         }
2957
2958                         sys->sleepIdle();
2959
2960                 }
2961
2962                 LOG_DEBUG("PCP channel stream closed normally.");
2963
2964         }catch(StreamException &e)
2965         {
2966                 LOG_ERROR("Stream channel: %s",e.msg);
2967         }
2968
2969         try
2970         {
2971                 pcpStream->flush(*sock);
2972                 atom.writeInt(PCP_QUIT,error);
2973         }catch(StreamException &) {}
2974
2975 }
2976
2977 // -----------------------------------
2978 int Servent::serverProc(ThreadInfo *thread)
2979 {
2980 //      thread->lock();
2981
2982
2983         Servent *sv = (Servent*)thread->data;
2984
2985         try 
2986         {
2987                 if (!sv->sock)
2988                         throw StreamException("Server has no socket");
2989
2990                 sv->setStatus(S_LISTENING);
2991
2992
2993                 char servIP[64];
2994                 sv->sock->host.toStr(servIP);
2995
2996                 if (servMgr->isRoot)
2997                         LOG_DEBUG("Root Server started: %s",servIP);
2998                 else
2999                         LOG_DEBUG("Server started: %s",servIP);
3000                 
3001
3002                 while ((thread->active) && (sv->sock->active()))
3003                 {
3004                         if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3005                         {
3006                                 ClientSocket *cs = sv->sock->accept();
3007                                 if (cs)
3008                                 {       
3009                                         LOG_DEBUG("accepted incoming");
3010                                         Servent *ns = servMgr->allocServent();
3011                                         if (ns)
3012                                         {
3013                                                 servMgr->lastIncoming = sys->getTime();
3014                                                 ns->servPort = sv->sock->host.port;
3015                                                 ns->networkID = servMgr->networkID;
3016                                                 ns->initIncoming(cs,sv->allow);
3017                                         }else
3018                                                 LOG_ERROR("Out of servents");
3019                                 }
3020                         }
3021                         sys->sleep(100);
3022                 }
3023         }catch(StreamException &e)
3024         {
3025                 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3026         }
3027
3028         
3029         LOG_DEBUG("Server stopped");
3030
3031         sv->kill();
3032         sys->endThread(thread);
3033         return 0;
3034 }
3035  
3036 // -----------------------------------
3037 bool    Servent::writeVariable(Stream &s, const String &var)
3038 {
3039         char buf[1024];
3040
3041         if (var == "type")
3042                 strcpy(buf,getTypeStr());
3043         else if (var == "status")
3044                 strcpy(buf,getStatusStr());
3045         else if (var == "address")
3046         {
3047                 if (servMgr->enableGetName) //JP-EX s
3048                 {
3049                         getHost().toStr(buf);
3050                         char h_ip[64];
3051                         Host h = getHost();
3052                         h.toStr(h_ip);
3053
3054 /*                      ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3055                         int numHits=0;
3056                         for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3057                         {
3058                                 ChanHitList *chl = &chanMgr->hitlists[i];
3059                                 if (chl->isUsed())
3060                                         hits[numHits++] = chl;
3061                         }
3062                         bool ishit,isfw;
3063                         ishit = isfw = false;
3064                         int numRelay = 0;
3065                         if (numHits) 
3066                         {
3067                                 for(int k=0; k<numHits; k++)
3068                                 {
3069                                         ChanHitList *chl = hits[k];
3070                                         if (chl->isUsed())
3071                                         {
3072                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3073                                                 {
3074                                                         ChanHit *hit = &chl->hits[j];
3075                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
3076                                                         {
3077                                                                 ishit = true;
3078                                                                 if (hit->firewalled)
3079                                                                         isfw = true;
3080                                                                 numRelay += hit->numRelays;
3081                                                         }
3082                                                 }
3083                                         }
3084                                 }
3085                         }
3086                         strcpy(buf,"");
3087                         if (ishit == true)
3088                         {
3089                                 if (isfw == true)
3090                                 {
3091                                         if (numRelay== 0)
3092                                                 strcat(buf,"<font color=red>");
3093                                         else 
3094                                                 strcat(buf,"<font color=orange>");
3095                                 }
3096                                 else
3097                                         strcat(buf,"<font color=green>");
3098                         }
3099                         strcat(buf,h_ip);
3100                         char h_name[128];
3101                         if (ClientSocket::getHostname(h_name,h.ip))
3102                         {
3103                                 strcat(buf,"[");
3104                                 strcat(buf,h_name);
3105                                 strcat(buf,"]");
3106                         }
3107                         if (ishit == true) 
3108                         {
3109                                 strcat(buf,"</font>");
3110                         }
3111                 } //JP-EX e*/
3112
3113
3114                         bool isfw = false;
3115                         bool isRelay = true;
3116                         int numRelay = 0;
3117                         ChanHitList *chl = chanMgr->findHitListByID(chanID);
3118                         if (chl){
3119                                 ChanHit *hit = chl->hit;
3120                                 while(hit){
3121                                         if (hit->host.isValid() && (h.ip == hit->host.ip)){
3122                                                 isfw = hit->firewalled;
3123                                                 isRelay = hit->relay;
3124                                                 numRelay = hit->numRelays;
3125                                                 break;
3126                                         }
3127                                         hit = hit->next;
3128                                 }
3129                         }
3130                         strcpy(buf, "");
3131                         if (isfw){
3132                                 if (numRelay == 0){
3133                                         strcat(buf,"<font color=red>");
3134                                 } else {
3135                                         strcat(buf,"<font color=orange>");
3136                                 }
3137                         } else {
3138                                 if (!isRelay){
3139                                         if (numRelay==0){
3140                                                 strcpy(buf,"<font color=purple>");
3141                                         } else {
3142                                                 strcpy(buf,"<font color=blue>");
3143                                         }
3144                                 } else {
3145                                         strcpy(buf,"<font color=green>");
3146                                 }
3147                         }
3148                         strcat(buf,h_ip);
3149                         char h_name[128];
3150                         if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF\91Î\8dô)
3151                         {
3152                                 strcat(buf,"[");
3153                                 strcat(buf,h_name);
3154                                 strcat(buf,"]");
3155                         }
3156                         strcat(buf,"</font>");
3157                 }
3158                 else 
3159                         getHost().toStr(buf);
3160         }
3161         else if (var == "agent")
3162                 strcpy(buf,agent.cstr());
3163         else if (var == "bitrate")
3164         {
3165                 if (sock)
3166                 {
3167                         unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3168                         sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3169                 }else
3170                         strcpy(buf,"0");
3171         }else if (var == "uptime")
3172         {
3173                 String uptime;
3174                 if (lastConnect)
3175                         uptime.setFromStopwatch(sys->getTime()-lastConnect);
3176                 else
3177                         uptime.set("-");
3178                 strcpy(buf,uptime.cstr());
3179         }else if (var.startsWith("gnet."))
3180         {
3181
3182                 float ctime = (float)(sys->getTime()-lastConnect);
3183                 if (var == "gnet.packetsIn")
3184                         sprintf(buf,"%d",gnuStream.packetsIn);
3185                 else if (var == "gnet.packetsInPerSec")
3186                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3187                 else if (var == "gnet.packetsOut")
3188                         sprintf(buf,"%d",gnuStream.packetsOut);
3189                 else if (var == "gnet.packetsOutPerSec")
3190                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3191                 else if (var == "gnet.normQueue")
3192                         sprintf(buf,"%d",outPacketsNorm.numPending());
3193                 else if (var == "gnet.priQueue")
3194                         sprintf(buf,"%d",outPacketsPri.numPending());
3195                 else if (var == "gnet.flowControl")
3196                         sprintf(buf,"%d",flowControl?1:0);
3197                 else if (var == "gnet.routeTime")
3198                 {
3199                         int nr = seenIDs.numUsed();
3200                         unsigned int tim = sys->getTime()-seenIDs.getOldest();
3201                 
3202                         String tstr;
3203                         tstr.setFromStopwatch(tim);
3204
3205                         if (nr)
3206                                 strcpy(buf,tstr.cstr());
3207                         else
3208                                 strcpy(buf,"-");
3209                 }
3210                 else
3211                         return false;
3212
3213         }else
3214                 return false;
3215
3216         s.writeString(buf);
3217
3218         return true;
3219 }