OSDN Git Service

VP25-3マージ
[peercast-im/PeerCastIM.git] / c: / Git / PeerCast.root / PeerCast / core / common / servent.cpp
1 // ------------------------------------------------
2 // File : servent.cpp
3 // Date: 4-apr-2002
4 // Author: giles
5 // Desc: 
6 //              Servents are the actual connections between clients. They do the handshaking,
7 //              transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 //              to it on connect, it uses this to transfer all of its data.
9 //
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
16
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
23
24 #include <stdlib.h>
25 #include "servent.h"
26 #include "sys.h"
27 #include "gnutella.h"
28 #include "xml.h"
29 #include "html.h"
30 #include "http.h"
31 #include "stats.h"
32 #include "servmgr.h"
33 #include "peercast.h"
34 #include "atom.h"
35 #include "pcp.h"
36 #include "version2.h"
37 #ifdef _DEBUG
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
40 #define new DEBUG_NEW
41 #endif
42
43
44 const int DIRECT_WRITE_TIMEOUT = 60;
45
46 // -----------------------------------
47 char *Servent::statusMsgs[]=
48 {
49         "NONE",
50                 "CONNECTING",
51         "PROTOCOL",
52         "HANDSHAKE",
53         "CONNECTED",
54         "CLOSING",
55                 "LISTENING",
56                 "TIMEOUT",
57                 "REFUSED",
58                 "VERIFIED",
59                 "ERROR",
60                 "WAIT",
61                 "FREE"
62 };
63
64 // -----------------------------------
65 char *Servent::typeMsgs[]=
66 {
67                 "NONE",
68         "INCOMING",
69         "SERVER",
70                 "RELAY",
71                 "DIRECT",
72                 "COUT",
73                 "CIN",
74                 "PGNU"
75 };
76 // -----------------------------------
77 bool    Servent::isPrivate() 
78 {
79         Host h = getHost();
80         return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
81 }
82 // -----------------------------------
83 bool    Servent::isAllowed(int a) 
84 {
85         Host h = getHost();
86
87         if (servMgr->isFiltered(ServFilter::F_BAN,h))
88                 return false;
89
90         return (allow&a)!=0;
91 }
92
93 // -----------------------------------
94 bool    Servent::isFiltered(int f) 
95 {
96         Host h = getHost();
97         return servMgr->isFiltered(f,h);
98 }
99
100 int servent_count = 1;
101 // -----------------------------------
102 Servent::Servent(int index)
103 :outPacketsPri(MAX_OUTPACKETS)
104 ,outPacketsNorm(MAX_OUTPACKETS)
105 ,seenIDs(MAX_HASH)
106 ,serventIndex(index)
107 ,sock(NULL)
108 ,next(NULL)
109 {
110         reset();
111         servent_id = servent_count++;
112         lastSkipTime = 0;
113         lastSkipCount = 0;
114         waitPort = 0;
115 }
116
117 // -----------------------------------
118 Servent::~Servent()
119 {       
120         
121 }
122 // -----------------------------------
123 void    Servent::kill() 
124 {
125         thread.active = false;
126                 
127         setStatus(S_CLOSING);
128
129         if (pcpStream)
130         {
131                 PCPStream *pcp = pcpStream;
132                 pcpStream = NULL;
133                 pcp->kill();
134                 delete pcp;
135         }
136
137         chanMgr->hitlistlock.on();
138         ChanHitList *chl = chanMgr->findHitListByID(chanID);
139         if (chl) {
140                 ChanHit *chh = chl->hit;
141                 ChanHit *prev = NULL;
142                 while (chh) {
143                         if (chh->servent_id == this->servent_id){
144                                 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
145                                         chh->numHops = 0;
146                                         chh->numListeners = 0;
147                                         chh->numRelays = 0;
148                                         prev = chh;
149                                         chh = chh->next;
150                                 } else {
151                                         ChanHit *next = chh->next;
152                                         if (prev)
153                                                 prev->next = next;
154                                         else
155                                                 chl->hit = next;
156
157                                         delete chh;
158                                         chh = next;
159                                 }
160                         } else {
161                                 prev = chh;
162                                 chh = chh->next;
163                         }
164                 }
165         }
166         chanMgr->hitlistlock.off();
167
168         if (sock)
169         {
170                 sock->close();
171                 delete sock;
172                 sock = NULL;
173         }
174
175         if (pushSock)
176         {
177                 pushSock->close();
178                 delete pushSock;
179                 pushSock = NULL;
180         }
181
182 //      thread.unlock();
183
184         if (type != T_SERVER)
185         {
186                 reset();
187                 setStatus(S_FREE);
188         }
189
190 }
191 // -----------------------------------
192 void    Servent::abort() 
193 {
194         thread.active = false;
195         if (sock)
196         {
197                 sock->close();
198         }
199
200 }
201
202 // -----------------------------------
203 void Servent::reset()
204 {
205
206         remoteID.clear();
207
208         servPort = 0;
209
210         pcpStream = NULL;
211
212         flowControl = false;
213         networkID.clear();
214
215         chanID.clear();
216
217         outputProtocol = ChanInfo::SP_UNKNOWN;
218
219         agent.clear();
220         sock = NULL;
221         allow = ALLOW_ALL;
222         syncPos = 0;
223         addMetadata = false;
224         nsSwitchNum = 0;
225         pack.func = 255;
226         lastConnect = lastPing = lastPacket = 0;
227         loginPassword.clear();
228         loginMount.clear();
229         bytesPerSecond = 0;
230         priorityConnect = false;
231         pushSock = NULL;
232         sendHeader = true;
233
234         outPacketsNorm.reset();
235         outPacketsPri.reset();
236
237         seenIDs.clear();
238
239         status = S_NONE;
240         type = T_NONE;
241
242         channel_id = 0;
243 }
244 // -----------------------------------
245 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
246 {
247
248         if  (      (type == t) 
249                         && (isConnected())
250                         && (!cid.isSet() || chanID.isSame(cid))
251                         && (!sid.isSet() || !sid.isSame(remoteID))
252                         && (pcpStream != NULL)
253                 )
254         {
255                 return pcpStream->sendPacket(pack,did);
256         }
257         return false;
258 }
259
260
261 // -----------------------------------
262 bool Servent::acceptGIV(ClientSocket *givSock)
263 {
264         if (!pushSock)
265         {
266                 pushSock = givSock;
267                 return true;
268         }else
269                 return false;
270 }
271
272 // -----------------------------------
273 Host Servent::getHost()
274 {
275         Host h(0,0);
276
277         if (sock)
278                 h = sock->host;
279
280         return h;
281 }
282
283 // -----------------------------------
284 bool Servent::outputPacket(GnuPacket &p, bool pri)
285 {
286         lock.on();
287
288         bool r=false;
289         if (pri)
290                 r = outPacketsPri.write(p);
291         else
292         {
293                 if (servMgr->useFlowControl)
294                 {
295                         int per = outPacketsNorm.percentFull();
296                         if (per > 50)
297                                 flowControl = true;
298                         else if (per < 25)
299                                 flowControl = false;
300                 }
301
302
303                 bool send=true;
304                 if (flowControl)
305                 {
306                         // if in flowcontrol, only allow packets with less of a hop count than already in queue
307                         if (p.hops >= outPacketsNorm.findMinHop())
308                                 send = false;
309                 }
310
311                 if (send)
312                         r = outPacketsNorm.write(p);
313         }
314
315         lock.off();
316         return r;
317 }
318
319 // -----------------------------------
320 bool Servent::initServer(Host &h)
321 {
322         try
323         {
324                 checkFree();
325
326                 status = S_WAIT;
327
328                 createSocket();
329
330                 sock->bind(h);
331
332                 thread.data = this;
333
334                 thread.func = serverProc;
335
336                 type = T_SERVER;
337
338                 if (!sys->startThread(&thread))
339                         throw StreamException("Can`t start thread");
340
341         }catch(StreamException &e)
342         {
343                 LOG_ERROR("Bad server: %s",e.msg);
344                 kill();
345                 return false;
346         }
347
348         return true;
349 }
350 // -----------------------------------
351 void Servent::checkFree()
352 {
353         if (sock)
354                 throw StreamException("Socket already set");
355         if (thread.active)
356                 throw StreamException("Thread already active");
357 }
358 // -----------------------------------
359 void Servent::initIncoming(ClientSocket *s, unsigned int a)
360 {
361
362         try{
363
364                 checkFree();
365
366                 type = T_INCOMING;
367                 sock = s;
368                 allow = a;
369                 thread.data = this;
370                 thread.func = incomingProc;
371                 thread.finish = false;
372
373                 setStatus(S_PROTOCOL);
374
375                 char ipStr[64];
376                 sock->host.toStr(ipStr);
377                 LOG_DEBUG("Incoming from %s",ipStr);
378
379
380                 if (!sys->startThread(&thread))
381                         throw StreamException("Can`t start thread");
382         }catch(StreamException &e)
383         {
384                 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
385                 //servMgr->shutdownTimer = 1;   
386                 kill();
387
388                 LOG_ERROR("INCOMING FAILED: %s",e.msg);
389
390         }
391 }
392
393 // -----------------------------------
394 void Servent::initOutgoing(TYPE ty)
395 {
396         try 
397         {
398                 checkFree();
399
400
401                 type = ty;
402
403                 thread.data = this;
404                 thread.func = outgoingProc;
405
406                 if (!sys->startThread(&thread))
407                         throw StreamException("Can`t start thread");
408
409         }catch(StreamException &e)
410         {
411                 LOG_ERROR("Unable to start outgoing: %s",e.msg);
412                 kill();
413         }
414 }
415
416 // -----------------------------------
417 void Servent::initPCP(Host &rh)
418 {
419         char ipStr[64];
420         rh.toStr(ipStr);
421         try 
422         {
423                 checkFree();
424
425             createSocket();
426
427                 type = T_COUT;
428
429                 sock->open(rh);
430
431                 if (!isAllowed(ALLOW_NETWORK))
432                         throw StreamException("Servent not allowed");
433
434                 thread.data = this;
435                 thread.func = outgoingProc;
436
437                 LOG_DEBUG("Outgoing to %s",ipStr);
438
439                 if (!sys->startThread(&thread))
440                         throw StreamException("Can`t start thread");
441
442         }catch(StreamException &e)
443         {
444                 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
445                 kill();
446         }
447 }
448
449 #if 0
450 // -----------------------------------
451 void    Servent::initChannelFetch(Host &host)
452 {
453         type = T_STREAM;
454
455         char ipStr[64];
456         host.toStr(ipStr);
457
458         checkFree();
459          
460         createSocket();
461                 
462         sock->open(host);
463
464                 
465         if (!isAllowed(ALLOW_DATA))     
466                 throw StreamException("Servent not allowed");
467                 
468         sock->connect();
469 }
470 #endif
471
472 // -----------------------------------
473 void Servent::initGIV(Host &h, GnuID &id)
474 {
475         char ipStr[64];
476         h.toStr(ipStr);
477         try 
478         {
479                 checkFree();
480
481                 givID = id;
482
483             createSocket();
484
485                 sock->open(h);
486
487                 if (!isAllowed(ALLOW_NETWORK))
488                         throw StreamException("Servent not allowed");
489                 
490                 sock->connect();
491
492                 thread.data = this;
493                 thread.func = givProc;
494
495                 type = T_RELAY;
496
497                 if (!sys->startThread(&thread))
498                         throw StreamException("Can`t start thread");
499
500         }catch(StreamException &e)
501         {
502                 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
503                 kill();
504         }
505 }
506 // -----------------------------------
507 void Servent::createSocket()
508 {
509         if (sock)
510                 LOG_ERROR("Servent::createSocket attempt made while active");
511
512         sock = sys->createSocket();
513 }
514 // -----------------------------------
515 void Servent::setStatus(STATUS s)
516 {
517         if (s != status)
518         {
519                 status = s;
520
521                 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
522                         lastConnect = sys->getTime();
523         }
524
525 }
526
527 // -----------------------------------
528 void Servent::handshakeOut()
529 {
530     sock->writeLine(GNU_PEERCONN);
531
532         char str[64];
533     
534         sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
535     sock->writeLineF("%s %d",PCX_HS_PCP,1);
536
537         if (priorityConnect)
538             sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
539         
540         if (networkID.isSet())
541         {
542                 networkID.toStr(str);
543                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
544         }
545
546         servMgr->sessionID.toStr(str);
547         sock->writeLineF("%s %s",PCX_HS_ID,str);
548
549         
550     sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
551         
552         sock->writeLine("");
553
554         HTTP http(*sock);
555
556         int r = http.readResponse();
557
558         if (r != 200)
559         {
560                 LOG_ERROR("Expected 200, got %d",r);
561                 throw StreamException("Unexpected HTTP response");
562         }
563
564
565         bool versionValid = false;
566
567         GnuID clientID;
568         clientID.clear();
569
570     while (http.nextHeader())
571     {
572                 LOG_DEBUG(http.cmdLine);
573
574                 char *arg = http.getArgStr();
575                 if (!arg)
576                         continue;
577
578                 if (http.isHeader(HTTP_HS_AGENT))
579                 {
580                         agent.set(arg);
581
582                         if (strnicmp(arg,"PeerCast/",9)==0)
583                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
584                 }else if (http.isHeader(PCX_HS_NETWORKID))
585                         clientID.fromStr(arg);
586     }
587
588         if (!clientID.isSame(networkID))
589                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
590
591         if (!versionValid)
592                 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
593
594
595     sock->writeLine(GNU_OK);
596     sock->writeLine("");
597
598 }
599
600
601 // -----------------------------------
602 void Servent::processOutChannel()
603 {
604 }
605
606
607 // -----------------------------------
608 void Servent::handshakeIn()
609 {
610
611         int osType=0;
612
613         HTTP http(*sock);
614
615
616         bool versionValid = false;
617         bool diffRootVer = false;
618
619         GnuID clientID;
620         clientID.clear();
621
622     while (http.nextHeader())
623     {
624                 LOG_DEBUG("%s",http.cmdLine);
625
626                 char *arg = http.getArgStr();
627                 if (!arg)
628                         continue;
629
630                 if (http.isHeader(HTTP_HS_AGENT))
631                 {
632                         agent.set(arg);
633
634                         if (strnicmp(arg,"PeerCast/",9)==0)
635                         {
636                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
637                                 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
638                         }
639                 }else if (http.isHeader(PCX_HS_NETWORKID))
640                 {
641                         clientID.fromStr(arg);
642
643                 }else if (http.isHeader(PCX_HS_PRIORITY))
644                 {
645                         priorityConnect = atoi(arg)!=0;
646
647                 }else if (http.isHeader(PCX_HS_ID))
648                 {
649                         GnuID id;
650                         id.fromStr(arg);
651                         if (id.isSame(servMgr->sessionID))
652                                 throw StreamException("Servent loopback");
653
654                 }else if (http.isHeader(PCX_HS_OS))
655                 {
656                         if (stricmp(arg,PCX_OS_LINUX)==0)
657                                 osType = 1;
658                         else if (stricmp(arg,PCX_OS_WIN32)==0)
659                                 osType = 2;
660                         else if (stricmp(arg,PCX_OS_MACOSX)==0)
661                                 osType = 3;
662                         else if (stricmp(arg,PCX_OS_WINAMP2)==0)
663                                 osType = 4;
664                 }
665
666     }
667
668         if (!clientID.isSame(networkID))
669                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
670
671         // if this is a priority connection and all incoming connections 
672         // are full then kill an old connection to make room. Otherwise reject connection.
673         //if (!priorityConnect)
674         {
675                 if (!isPrivate())
676                         if (servMgr->pubInFull())
677                                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
678         }
679
680         if (!versionValid)
681                 throw HTTPException(HTTP_SC_FORBIDDEN,403);
682
683     sock->writeLine(GNU_OK);
684
685     sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
686
687         if (networkID.isSet())
688         {
689                 char idStr[64];
690                 networkID.toStr(idStr);
691                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
692         }
693
694         if (servMgr->isRoot)
695         {
696                 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
697                 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
698                 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
699                 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
700                 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
701
702
703                 if (diffRootVer)
704                 {
705                         sock->writeString(PCX_HS_DL);
706                         sock->writeLine(PCX_DL_URL);
707                 }
708
709                 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
710         }
711
712
713         char hostIP[64];
714         Host h = sock->host;
715         h.IPtoStr(hostIP);
716     sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
717
718
719     sock->writeLine("");
720
721
722         while (http.nextHeader());
723 }
724
725 // -----------------------------------
726 bool    Servent::pingHost(Host &rhost,GnuID &rsid)
727 {
728         char ipstr[64];
729         rhost.toStr(ipstr);
730         LOG_DEBUG("Ping host %s: trying..",ipstr);
731         ClientSocket *s=NULL;
732         bool hostOK=false;
733         try
734         {
735                 s = sys->createSocket();
736                 if (!s)
737                         return false;
738                 else
739                 {
740
741                         s->setReadTimeout(15000);
742                         s->setWriteTimeout(15000);
743                         s->open(rhost);
744                         s->connect();
745
746                         AtomStream atom(*s);
747
748                         atom.writeInt(PCP_CONNECT,1);
749                         atom.writeParent(PCP_HELO,1);
750                                 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
751
752                         GnuID sid;
753                         sid.clear();
754
755                         int numc,numd;
756                         ID4 id = atom.read(numc,numd);
757                         if (id == PCP_OLEH)
758                         {
759                                 for(int i=0; i<numc; i++)
760                                 {
761                                         int c,d;
762                                         ID4 pid = atom.read(c,d);
763                                         if (pid == PCP_SESSIONID)
764                                                 atom.readBytes(sid.id,16,d);
765                                         else
766                                                 atom.skip(c,d);
767                                 }
768                         }else
769                         {
770                                 LOG_DEBUG("Ping response: %s",id.getString().str());
771                                 throw StreamException("Bad ping response");
772                         }
773
774                         if (!sid.isSame(rsid))
775                                 throw StreamException("SIDs don`t match");
776
777                         hostOK = true;
778                         LOG_DEBUG("Ping host %s: OK",ipstr);
779                         atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
780
781
782                 }
783         }catch(StreamException &e)
784         {
785                 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
786         }
787         if (s)
788         {
789                 s->close();
790                 delete s;
791         }
792
793         if (!hostOK)
794                 rhost.port = 0;
795
796         return true;
797 }
798
799 WLock canStreamLock;
800
801 // -----------------------------------
802 bool Servent::handshakeStream(ChanInfo &chanInfo)
803 {
804
805         HTTP http(*sock);
806
807
808         bool gotPCP=false;
809         unsigned int reqPos=0;
810
811         nsSwitchNum=0;
812
813         while (http.nextHeader())
814         {
815                 char *arg = http.getArgStr();
816                 if (!arg)
817                         continue;
818
819                 if (http.isHeader(PCX_HS_PCP))
820                         gotPCP = atoi(arg)!=0;
821                 else if (http.isHeader(PCX_HS_POS))
822                         reqPos = atoi(arg);
823                 else if (http.isHeader("icy-metadata"))
824                         addMetadata = atoi(arg) > 0;
825                 else if (http.isHeader(HTTP_HS_AGENT))
826                         agent = arg;
827                 else if (http.isHeader("Pragma"))
828                 {
829                         char *ssc = stristr(arg,"stream-switch-count=");
830                         char *so = stristr(arg,"stream-offset");
831
832                         if (ssc || so)
833                         {
834                                 nsSwitchNum=1;
835                                 //nsSwitchNum = atoi(ssc+20);
836                         }
837                 }
838
839                 LOG_DEBUG("Stream: %s",http.cmdLine);
840         }
841
842
843         if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
844                 outputProtocol = ChanInfo::SP_PEERCAST;
845
846         if (outputProtocol == ChanInfo::SP_HTTP)
847         {
848                 if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
849                           || (chanInfo.contentType == ChanInfo::T_WMA)
850                           || (chanInfo.contentType == ChanInfo::T_WMV)
851                           || (chanInfo.contentType == ChanInfo::T_ASX)
852                         )
853                 outputProtocol = ChanInfo::SP_MMS;
854         }
855
856
857         bool chanFound=false;
858         bool chanReady=false;
859
860         ChanHit *sourceHit = NULL;
861
862         Channel *ch = chanMgr->findChannelByID(chanInfo.id);
863         if (ch)
864         {
865                 sendHeader = true;
866                 if (reqPos || !isIndexTxt(&chanInfo))
867                 {
868                         streamPos = ch->rawData.findOldestPos(reqPos);
869                         //streamPos = ch->rawData.getLatestPos();
870                 }else
871                 {
872                         streamPos = ch->rawData.getLatestPos();
873                 }
874
875                 chanID = chanInfo.id;
876                 canStreamLock.on();
877                 chanReady = canStream(ch);
878                 if (0 && !chanReady)
879                 {
880                         if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
881                         {
882                                 sourceHit = &ch->sourceHost;  // send source host info
883
884                                 if (ch->info.getUptime() > 60)  // if stable
885                                 {
886                                         // connect "this" host later
887                                         ChanHit nh;
888                                         nh.init();
889                                         nh.chanID = chanID;
890                                         nh.rhost[0] = getHost();
891                                         chanMgr->addHit(nh);
892                                 }
893
894                                 char tmp[50];
895                                 getHost().toStr(tmp);
896                                 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
897                                 ch->bump = true;
898                         }
899                         else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
900                         {
901                                 chanReady = canStream(ch);
902                                 if (!chanReady)
903                                         LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
904                         }
905                 }
906                 if (!chanReady) type = T_INCOMING;
907                 thread.active = chanReady;
908                 setStatus(S_CONNECTED);
909                 canStreamLock.off();
910                 channel_id = ch->channel_id;
911
912                 //JP-Patch add-s
913                 if (servMgr->isCheckPushStream())
914                 {
915                         if (chanReady == true)
916                         {
917                                 Host h = getHost();
918
919                                 if (!h.isLocalhost()) 
920                                 {
921                                         do 
922                                         {
923                                                 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL) 
924                                                 {                                               
925                                                         char strip[256];
926                                                         h.toStr(strip);
927                                                         LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
928                                                         chanReady = false;
929                                                         break;
930                                                 }
931
932 /*                                              ChanHitList *hits[ChanMgr::MAX_HITLISTS];
933                                                 int numHits=0;
934                                                 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
935                                                 {
936                                                         ChanHitList *chl = &chanMgr->hitlists[i];
937                                                         if (chl->isUsed())
938                                                                 hits[numHits++] = chl;
939                                                 }
940                                                 bool isfw = false;
941                                                 int numRelay = 0;
942                                                 for(int i=0; i<numHits; i++)
943                                                 {
944                                                         ChanHitList *chl = hits[i];
945                                                         if (chl->isUsed())
946                                                         {
947                                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
948                                                                 {
949                                                                         ChanHit *hit = &chl->hits[j];
950                                                                         if (hit->host.isValid() && (h.ip == hit->host.ip)) 
951                                                                         {
952                                                                                 if (hit->firewalled)
953                                                                                         isfw = true;
954                                                                                 numRelay = hit->numRelays;
955                                                                         }
956                                                                 }
957                                                         }
958                                                 }
959                                                 if ((isfw == true) && (numRelay == 0))
960                                                 {
961                                                         char strip[256];
962                                                         h.toStr(strip);
963                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
964                                                         chanReady = false;
965                                                 }*/
966
967                                                 ChanHitList *chl = chanMgr->findHitList(chanInfo);
968                                                 ChanHit *hit = chl->hit;
969                                                 while(hit){
970                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
971                                                         {
972                                                                 if ((hit->firewalled) && (hit->numRelays == 0)){
973                                                                         char strip[256];
974                                                                         h.toStr(strip);
975                                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
976                                                                         chanReady = false;
977                                                                 }
978                                                         }
979                                                         hit = hit->next;
980                                                 }
981                                         }while (0);
982                                 }               
983                         }
984                 }
985                 //JP-Patch add-e
986         }
987
988 //      LockBlock lockblock(chanMgr->hitlistlock);
989
990 //      lockblock.lockon();
991         ChanHitList *chl = chanMgr->findHitList(chanInfo);
992
993         if (chl)
994         {
995                 chanFound = true;
996         }
997
998
999         bool result = false;
1000
1001         char idStr[64];
1002         chanInfo.id.toStr(idStr);
1003
1004         char sidStr[64];
1005         servMgr->sessionID.toStr(sidStr);
1006
1007         Host rhost = sock->host;
1008
1009
1010
1011
1012         AtomStream atom(*sock);
1013
1014
1015
1016         if (!chanFound)
1017         {
1018                 sock->writeLine(HTTP_SC_NOTFOUND);
1019             sock->writeLine("");
1020                 LOG_DEBUG("Sending channel not found");
1021                 return false;
1022         }
1023
1024
1025         if (!chanReady)
1026         {
1027                 if (outputProtocol == ChanInfo::SP_PCP)
1028                 {
1029
1030                         char tbuf[8196];
1031                         MemoryStream mem(tbuf, sizeof(tbuf));
1032                         mem.writeLine(HTTP_SC_UNAVAILABLE);
1033                         mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1034                         mem.writeLine("");
1035                         sock->write(tbuf, mem.getPosition());
1036
1037                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1038
1039                         char ripStr[64];
1040                         rhost.toStr(ripStr);
1041
1042                         LOG_DEBUG("Sending channel unavailable");
1043
1044                         ChanHitSearch chs;
1045
1046                         mem.rewind();
1047                         AtomStream atom2(mem);
1048
1049                         int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1050
1051                         chanMgr->hitlistlock.on();
1052
1053                         chl = chanMgr->findHitList(chanInfo);
1054
1055                         if (chl)
1056                         {
1057                                 ChanHit best;
1058                                 
1059                                 // search for up to 8 other hits
1060                                 int cnt=0;
1061                                 int i;
1062                                 for(i=0; i<8; i++)
1063                                 {
1064                                         best.init();
1065
1066
1067                                         // find best hit this network if local IP
1068                                         if (!rhost.globalIP())
1069                                         {
1070                                                 chs.init();
1071                                                 chs.matchHost = servMgr->serverHost;
1072                                                 chs.waitDelay = 2;
1073                                                 chs.excludeID = remoteID;
1074                                                 if (chl->pickHits(chs)){
1075                                                         best = chs.best[0];
1076                                                         LOG_DEBUG("find best hit this network if local IP");
1077                                                 }
1078                                         }
1079
1080                                         // find best hit on same network
1081                                         if (!best.host.ip)
1082                                         {
1083                                                 chs.init();
1084                                                 chs.matchHost = rhost;
1085                                                 chs.waitDelay = 2;
1086                                                 chs.excludeID = remoteID;
1087                                                 if (chl->pickHits(chs)){
1088                                                         best = chs.best[0];
1089                                                         LOG_DEBUG("find best hit on same network");
1090                                                 }
1091
1092                                         }
1093
1094                                         // find best hit on other networks
1095 /*                                      if (!best.host.ip)
1096                                         {
1097                                                 chs.init();
1098                                                 chs.waitDelay = 2;
1099                                                 chs.excludeID = remoteID;
1100                                                 if (chl->pickHits(chs)){
1101                                                         best = chs.best[0];
1102                                                         LOG_DEBUG("find best hit on other networks");
1103                                                 }
1104
1105                                         }*/
1106                                         
1107                                         if (!best.host.ip)
1108                                                 break;
1109
1110                                         best.writeAtoms(atom2,chanInfo.id);                        
1111                                         cnt++;
1112                                 }
1113
1114                                 if (sourceHit) {
1115                                         char tmp[50];
1116                                         sourceHit->writeAtoms(atom2, chanInfo.id);
1117                                         chs.best[i].host.toStr(tmp);
1118                                         LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1119                                         best.host.ip = sourceHit->host.ip;
1120                                 }
1121
1122                                 if (!best.host.ip){
1123                                         char tmp[50];
1124 //                                      chanMgr->hitlistlock.on();
1125                                         int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1126 //                                      chanMgr->hitlistlock.off();
1127                                         for (int i = 0; i < cnt; i++){
1128                                                 chs.best[i].writeAtoms(atom2, chanInfo.id);
1129                                                 chs.best[i].host.toStr(tmp);
1130                                                 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1131                                                 best.host.ip = chs.best[i].host.ip;
1132                                         }
1133                                 }
1134
1135                                 if (cnt)
1136                                 {
1137                                         LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1138
1139                                 }
1140                                 else if (rhost.port)
1141                                 {
1142                                         // find firewalled host
1143                                         chs.init();
1144                                         chs.waitDelay = 30;
1145                                         chs.useFirewalled = true;
1146                                         chs.excludeID = remoteID;
1147                                         if (chl->pickHits(chs))
1148                                         {
1149                                                 best = chs.best[0];
1150                                                 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1151                                                 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1152                                         }
1153                                 } 
1154
1155                                 // if all else fails, use tracker
1156                                 if (!best.host.ip)
1157                                 {
1158                                         // find best tracker on this network if local IP
1159                                         if (!rhost.globalIP())
1160                                         {
1161                                                 chs.init();
1162                                                 chs.matchHost = servMgr->serverHost;
1163                                                 chs.trackersOnly = true;
1164                                                 chs.excludeID = remoteID;
1165                                                 if (chl->pickHits(chs))
1166                                                         best = chs.best[0];
1167
1168                                         }
1169
1170                                         // find local tracker
1171                                         if (!best.host.ip)
1172                                         {
1173                                                 chs.init();
1174                                                 chs.matchHost = rhost;
1175                                                 chs.trackersOnly = true;
1176                                                 chs.excludeID = remoteID;
1177                                                 if (chl->pickHits(chs))
1178                                                         best = chs.best[0];
1179                                         }
1180
1181                                         // find global tracker
1182                                         if (!best.host.ip)
1183                                         {
1184                                                 chs.init();
1185                                                 chs.trackersOnly = true;
1186                                                 chs.excludeID = remoteID;
1187                                                 if (chl->pickHits(chs))
1188                                                         best = chs.best[0];
1189                                         }
1190
1191                                         if (best.host.ip)
1192                                         {
1193                                                 best.writeAtoms(atom2,chanInfo.id);                             
1194                                                 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1195                                         }else if (rhost.port)
1196                                         {
1197                                                 // find firewalled tracker
1198                                                 chs.init();
1199                                                 chs.useFirewalled = true;
1200                                                 chs.trackersOnly = true;
1201                                                 chs.excludeID = remoteID;
1202                                                 chs.waitDelay = 30;
1203                                                 if (chl->pickHits(chs))
1204                                                 {
1205                                                         best = chs.best[0];
1206                                                         int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1207                                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1208                                                 }
1209                                         }
1210
1211                                 }
1212
1213
1214                         }
1215
1216                         chanMgr->hitlistlock.off();
1217
1218                         // return not available yet code
1219                         atom2.writeInt(PCP_QUIT,error);
1220                         sock->write(tbuf, mem.getPosition());
1221                         result = false;
1222
1223                         /*
1224                         char c[512];
1225                         // wait disconnect from other host
1226                         try{
1227                                 while(sock->read(c, sizeof(c))){
1228                                         sys->sleep(10);
1229                                 }
1230                         }catch(StreamException &e){
1231                                 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1232                         }
1233                         */
1234                 }else
1235                 {
1236                         LOG_DEBUG("Sending channel unavailable");
1237                         sock->writeLine(HTTP_SC_UNAVAILABLE);
1238                         sock->writeLine("");
1239                         result = false;
1240                 }
1241
1242         } else {
1243
1244                 if (chanInfo.contentType != ChanInfo::T_MP3)
1245                         addMetadata=false;
1246
1247                 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))               // winamp mp3 metadata check
1248                 {
1249
1250                         sock->writeLine(ICY_OK);
1251
1252                         sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1253                         sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1254                         sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1255                         sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1256                         sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1257                         sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1258                         sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1259
1260                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1261
1262                 }else
1263                 {
1264
1265                         sock->writeLine(HTTP_SC_OK);
1266
1267                         if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1268                         {
1269                                 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1270
1271                                 sock->writeLine("Accept-Ranges: none");
1272
1273                                 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1274                                 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1275                                 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1276                                 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1277                                 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1278                                 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1279                         }
1280
1281
1282                         if (outputProtocol == ChanInfo::SP_HTTP)
1283                         {
1284                                 switch (chanInfo.contentType)
1285                                 {
1286                                         case ChanInfo::T_OGG:
1287                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1288                                                 break;
1289                                         case ChanInfo::T_MP3:
1290                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1291                                                 break;
1292                                         case ChanInfo::T_MOV:
1293                                                 sock->writeLine("Connection: close");
1294                                                 sock->writeLine("Content-Length: 10000000");
1295                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1296                                                 break;
1297                                         case ChanInfo::T_MPG:
1298                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1299                                                 break;
1300                                         case ChanInfo::T_NSV:
1301                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1302                                                 break;
1303                                         case ChanInfo::T_ASX:
1304                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1305                                                 break;
1306                                         case ChanInfo::T_WMA:
1307                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1308                                                 break;
1309                                         case ChanInfo::T_WMV:
1310                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1311                                                 break;
1312                                 }
1313                         } else if (outputProtocol == ChanInfo::SP_MMS)
1314                         {
1315                                 sock->writeLine("Server: Rex/9.0.0.2980");
1316                                 sock->writeLine("Cache-Control: no-cache");
1317                                 sock->writeLine("Pragma: no-cache");
1318                                 sock->writeLine("Pragma: client-id=3587303426");
1319                                 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1320
1321                                 if (nsSwitchNum)
1322                                 {
1323                                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1324                                 }else
1325                                 {
1326                                         sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1327                                         if (ch)
1328                                                 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1329                                         sock->writeLine("Connection: Keep-Alive");
1330                                 }
1331                         
1332                         } else if (outputProtocol == ChanInfo::SP_PCP)
1333                         {
1334                                 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1335                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1336
1337                         }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1338                         {
1339                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1340                         }
1341                 }
1342                 sock->writeLine("");
1343                 result = true;
1344
1345                 if (gotPCP)
1346                 {
1347                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1348                         atom.writeInt(PCP_OK,0);
1349                 }
1350
1351         }
1352
1353
1354
1355         return result;
1356 }
1357
1358 // -----------------------------------
1359 void Servent::handshakeGiv(GnuID &id)
1360 {
1361         if (id.isSet())
1362         {
1363                 char idstr[64];
1364                 id.toStr(idstr);
1365                 sock->writeLineF("GIV /%s",idstr);
1366         }else
1367                 sock->writeLine("GIV");
1368
1369         sock->writeLine("");
1370 }
1371
1372
1373 // -----------------------------------
1374 void Servent::processGnutella()
1375 {
1376         type = T_PGNU;
1377
1378         //if (servMgr->isRoot && !servMgr->needConnections())
1379         if (servMgr->isRoot)
1380         {
1381                 processRoot();
1382                 return;
1383         }
1384
1385
1386
1387         gnuStream.init(sock);
1388         setStatus(S_CONNECTED);
1389
1390         if (!servMgr->isRoot)
1391         {
1392                 chanMgr->broadcastRelays(this, 1, 1);
1393                 GnuPacket *p;
1394
1395                 if ((p=outPacketsNorm.curr()))  
1396                         gnuStream.sendPacket(*p);
1397                 return;
1398         }
1399
1400         gnuStream.ping(2);
1401
1402 //      if (type != T_LOOKUP)
1403 //              chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1404
1405         lastPacket = lastPing = sys->getTime();
1406         bool doneBigPing=false;
1407
1408         const unsigned int      abortTimeoutSecs = 60;          // abort connection after 60 secs of no activitiy
1409         const unsigned int      packetTimeoutSecs = 30;         // ping connection after 30 secs of no activity
1410
1411         unsigned int currBytes=0;
1412         unsigned int lastWait=0;
1413
1414         unsigned int lastTotalIn=0,lastTotalOut=0;
1415
1416         while (thread.active && sock->active())
1417         {
1418
1419                 if (sock->readReady())
1420                 {
1421                         lastPacket = sys->getTime();
1422
1423                         if (gnuStream.readPacket(pack))
1424                         {
1425                                 char ipstr[64];
1426                                 sock->host.toStr(ipstr);
1427
1428                                 GnuID routeID;
1429                                 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1430
1431                                 if (pack.func != GNU_FUNC_PONG)
1432                                         if (servMgr->seenPacket(pack))
1433                                                 ret = GnuStream::R_DUPLICATE;
1434
1435                                 seenIDs.add(pack.id);
1436
1437
1438                                 if (ret == GnuStream::R_PROCESS)
1439                                 {
1440                                         GnuID routeID;
1441                                         ret = gnuStream.processPacket(pack,this,routeID);
1442
1443                                         if (flowControl && (ret == GnuStream::R_BROADCAST))
1444                                                 ret = GnuStream::R_DROP;
1445
1446                                 }
1447
1448                                 switch(ret)
1449                                 {
1450                                         case GnuStream::R_BROADCAST:
1451                                                 if (servMgr->broadcast(pack,this))
1452                                                         stats.add(Stats::NUMBROADCASTED);
1453                                                 else
1454                                                         stats.add(Stats::NUMDROPPED);
1455                                                 break;
1456                                         case GnuStream::R_ROUTE:
1457                                                 if (servMgr->route(pack,routeID,NULL))
1458                                                         stats.add(Stats::NUMROUTED);
1459                                                 else
1460                                                         stats.add(Stats::NUMDROPPED);
1461                                                 break;
1462                                         case GnuStream::R_ACCEPTED:
1463                                                 stats.add(Stats::NUMACCEPTED);
1464                                                 break;
1465                                         case GnuStream::R_DUPLICATE:
1466                                                 stats.add(Stats::NUMDUP);
1467                                                 break;
1468                                         case GnuStream::R_DEAD:
1469                                                 stats.add(Stats::NUMDEAD);
1470                                                 break;
1471                                         case GnuStream::R_DISCARD:
1472                                                 stats.add(Stats::NUMDISCARDED);
1473                                                 break;
1474                                         case GnuStream::R_BADVERSION:
1475                                                 stats.add(Stats::NUMOLD);
1476                                                 break;
1477                                         case GnuStream::R_DROP:
1478                                                 stats.add(Stats::NUMDROPPED);
1479                                                 break;
1480                                 }
1481
1482
1483                                 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1484
1485
1486
1487                         }else{
1488                                 LOG_ERROR("Bad packet");
1489                         }
1490                 }
1491
1492
1493                 GnuPacket *p;
1494
1495                 if ((p=outPacketsPri.curr()))                           // priority packet
1496                 {
1497                         gnuStream.sendPacket(*p);
1498                         seenIDs.add(p->id);
1499                         outPacketsPri.next();
1500                 } else if ((p=outPacketsNorm.curr()))           // or.. normal packet
1501                 {
1502                         gnuStream.sendPacket(*p);
1503                         seenIDs.add(p->id);
1504                         outPacketsNorm.next();
1505                 }
1506
1507                 int lpt =  sys->getTime()-lastPacket;
1508
1509                 if (!doneBigPing)
1510                 {
1511                         if ((sys->getTime()-lastPing) > 15)
1512                         {
1513                                 gnuStream.ping(7);
1514                                 lastPing = sys->getTime();
1515                                 doneBigPing = true;
1516                         }
1517                 }else{
1518                         if (lpt > packetTimeoutSecs)
1519                         {
1520                                 
1521                                 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1522                                 {
1523                                         gnuStream.ping(1);
1524                                         lastPing = sys->getTime();
1525                                 }
1526
1527                         }
1528                 }
1529                 if (lpt > abortTimeoutSecs)
1530                         throw TimeoutException();
1531
1532
1533                 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1534                 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1535
1536                 unsigned int bytes = totIn+totOut;
1537
1538                 lastTotalIn = sock->totalBytesIn;
1539                 lastTotalOut = sock->totalBytesOut;
1540
1541                 const int serventBandwidth = 1000;
1542
1543                 int delay = sys->idleSleepTime;
1544                 if ((bytes) && (serventBandwidth >= 8))
1545                         delay = (bytes*1000)/(serventBandwidth/8);      // set delay relative packetsize
1546
1547                 if (delay < (int)sys->idleSleepTime)
1548                         delay = sys->idleSleepTime;
1549                 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1550
1551                 sys->sleep(delay);
1552         }
1553
1554 }
1555
1556
1557 // -----------------------------------
1558 void Servent::processRoot()
1559 {
1560         try 
1561         {
1562         
1563                 gnuStream.init(sock);
1564                 setStatus(S_CONNECTED);
1565
1566                 gnuStream.ping(2);
1567
1568                 unsigned int lastConnect = sys->getTime();
1569
1570                 while (thread.active && sock->active())
1571                 {
1572                         if (gnuStream.readPacket(pack))
1573                         {
1574                                 char ipstr[64];
1575                                 sock->host.toStr(ipstr);
1576                                 
1577                                 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1578
1579
1580                                 if (pack.func == GNU_FUNC_PING)         // if ping then pong back some hosts and close
1581                                 {
1582                                         
1583                                         Host hl[32];
1584                                         int cnt = servMgr->getNewestServents(hl,32,sock->host); 
1585                                         if (cnt)
1586                                         {
1587                                                 int start = sys->rnd() % cnt;
1588                                                 int max = cnt>8?8:cnt;
1589
1590                                                 for(int i=0; i<max; i++)
1591                                                 {
1592                                                         GnuPacket pong;
1593                                                         pack.hops = 1;
1594                                                         pong.initPong(hl[start],false,pack);
1595                                                         gnuStream.sendPacket(pong);
1596
1597                                                         char ipstr[64];
1598                                                         hl[start].toStr(ipstr);
1599
1600                                                         //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1601                                                         start = (start+1) % cnt;
1602                                                 }
1603                                                 char str[64];
1604                                                 sock->host.toStr(str);
1605                                                 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1606                                         }else
1607                                         {
1608                                                 LOG_NETWORK("No Pongs to send");
1609                                                 //return;
1610                                         }
1611                                 }else if (pack.func == GNU_FUNC_PONG)           // pong?
1612                                 {
1613                                         MemoryStream pong(pack.data,pack.len);
1614
1615                                         int ip,port;
1616                                         port = pong.readShort();
1617                                         ip = pong.readLong();
1618                                         ip = SWAP4(ip);
1619
1620
1621                                         Host h(ip,port);
1622                                         if ((ip) && (port) && (h.globalIP()))
1623                                         {
1624
1625                                                 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1626                                                 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1627                                         }
1628                                         //return;
1629                                 } else if (pack.func == GNU_FUNC_HIT)
1630                                 {
1631                                         MemoryStream data(pack.data,pack.len);
1632                                         ChanHit hit;
1633                                         gnuStream.readHit(data,hit,pack.hops,pack.id);
1634                                 }
1635
1636                                 //if (gnuStream.packetsIn > 5)  // die if we get too many packets
1637                                 //      return;
1638                         }
1639
1640                         if((sys->getTime()-lastConnect > 60))
1641                                 break;
1642                 }
1643
1644
1645         }catch(StreamException &e)
1646         {
1647                 LOG_ERROR("Relay: %s",e.msg);
1648         }
1649
1650         
1651 }       
1652
1653 // -----------------------------------
1654 int Servent::givProc(ThreadInfo *thread)
1655 {
1656 //      thread->lock();
1657         Servent *sv = (Servent*)thread->data;
1658         try 
1659         {
1660                 sv->handshakeGiv(sv->givID);
1661                 sv->handshakeIncoming();
1662
1663         }catch(StreamException &e)
1664         {
1665                 LOG_ERROR("GIV: %s",e.msg);
1666         }
1667
1668         sv->kill();
1669         sys->endThread(thread);
1670         return 0;
1671 }
1672
1673 // -----------------------------------
1674 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1675 {
1676
1677         bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1678         bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1679
1680         bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1681
1682         char tbuf[1024];
1683         MemoryStream mem(tbuf, sizeof(tbuf));
1684         AtomStream atom2(mem);
1685         atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1686                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1687                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1688                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1689                 if (nonFW)
1690                         atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1691                 if (testFW)
1692                         atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1693                 if (sendBCID)
1694                         atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1695         atom.io.write(tbuf, mem.getPosition());
1696
1697
1698         LOG_DEBUG("PCP outgoing waiting for OLEH..");
1699
1700         int numc,numd;
1701         ID4 id = atom.read(numc,numd);
1702         if (id != PCP_OLEH)
1703         {
1704                 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1705                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1706                 throw StreamException("Got unexpected PCP response");
1707         }
1708
1709
1710
1711         char arg[64];
1712
1713         GnuID clientID;
1714         clientID.clear();
1715         rid.clear();
1716         int version=0;
1717         int disable=0;
1718
1719         Host thisHost;
1720
1721         // read OLEH response
1722         for(int i=0; i<numc; i++)
1723         {
1724                 int c,dlen;
1725                 ID4 id = atom.read(c,dlen);
1726
1727                 if (id == PCP_HELO_AGENT)
1728                 {
1729                         atom.readString(arg,sizeof(arg),dlen);
1730                         agent.set(arg);
1731
1732                 }else if (id == PCP_HELO_REMOTEIP)
1733                 {
1734                         thisHost.ip = atom.readInt();
1735
1736                 }else if (id == PCP_HELO_PORT)
1737                 {
1738                         thisHost.port = atom.readShort();
1739
1740                 }else if (id == PCP_HELO_VERSION)
1741                 {
1742                         version = atom.readInt();
1743
1744                 }else if (id == PCP_HELO_DISABLE)
1745                 {
1746                         disable = atom.readInt();
1747
1748                 }else if (id == PCP_HELO_SESSIONID)
1749                 {
1750                         atom.readBytes(rid.id,16);
1751                         if (rid.isSame(servMgr->sessionID))
1752                                 throw StreamException("Servent loopback");
1753
1754                 }else
1755                 {
1756                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1757                         atom.skip(c,dlen);
1758                 }
1759
1760     }
1761
1762
1763         // update server ip/firewall status
1764         if (isTrusted)
1765         {
1766                 if (thisHost.isValid())
1767                 {
1768                         if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1769                         {
1770                                 char ipstr[64];
1771                                 thisHost.toStr(ipstr);
1772                                 LOG_DEBUG("Got new ip: %s",ipstr);
1773                                 servMgr->serverHost.ip = thisHost.ip;
1774                         }
1775
1776                         if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1777                         {
1778                                 if (thisHost.port && thisHost.globalIP())
1779                                         servMgr->setFirewall(ServMgr::FW_OFF);
1780                                 else
1781                                         servMgr->setFirewall(ServMgr::FW_ON);
1782                         }
1783                 }
1784
1785                 if (disable == 1)
1786                 {
1787                         LOG_ERROR("client disabled: %d",disable);
1788                         servMgr->isDisabled = true;             
1789                 }else
1790                 {
1791                         servMgr->isDisabled = false;            
1792                 }
1793         }
1794
1795
1796
1797         if (!rid.isSet())
1798         {
1799                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1800                 throw StreamException("Remote host not identified");
1801         }
1802
1803         LOG_DEBUG("PCP Outgoing handshake complete.");
1804
1805 }
1806
1807 // -----------------------------------
1808 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1809 {
1810         int numc,numd;
1811         ID4 id = atom.read(numc,numd);
1812
1813
1814         if (id != PCP_HELO)
1815         {
1816                 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1817                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1818                 throw StreamException("Got unexpected PCP response");
1819         }
1820
1821         char arg[64];
1822
1823         ID4 osType;
1824
1825         int version=0;
1826
1827         int pingPort=0;
1828
1829         GnuID bcID;
1830         GnuID clientID;
1831
1832         bcID.clear();
1833         clientID.clear();
1834
1835         rhost.port = 0;
1836
1837         for(int i=0; i<numc; i++)
1838         {
1839
1840                 int c,dlen;
1841                 ID4 id = atom.read(c,dlen);
1842
1843                 if (id == PCP_HELO_AGENT)
1844                 {
1845                         atom.readString(arg,sizeof(arg),dlen);
1846                         agent.set(arg);
1847
1848                 }else if (id == PCP_HELO_VERSION)
1849                 {
1850                         version = atom.readInt();
1851
1852                 }else if (id == PCP_HELO_SESSIONID)
1853                 {
1854                         atom.readBytes(rid.id,16);
1855                         if (rid.isSame(servMgr->sessionID))
1856                                 throw StreamException("Servent loopback");
1857
1858                 }else if (id == PCP_HELO_BCID)
1859                 {
1860                         atom.readBytes(bcID.id,16);
1861
1862                 }else if (id == PCP_HELO_OSTYPE)
1863                 {
1864                         osType = atom.readInt();
1865                 }else if (id == PCP_HELO_PORT)
1866                 {
1867                         rhost.port = atom.readShort();
1868                 }else if (id == PCP_HELO_PING)
1869                 {
1870                         pingPort = atom.readShort();
1871                 }else
1872                 {
1873                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1874                         atom.skip(c,dlen);
1875                 }
1876
1877     }
1878
1879         if (version)
1880                 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1881
1882
1883         if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1884                 rhost.ip = servMgr->serverHost.ip;
1885
1886         if (pingPort)
1887         {
1888                 char ripStr[64];
1889                 rhost.toStr(ripStr);
1890                 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1891                 rhost.port = pingPort;
1892                 if (!rhost.globalIP() || !pingHost(rhost,rid))
1893                         rhost.port = 0;
1894         }
1895
1896         if (servMgr->isRoot)
1897         {
1898                 if (bcID.isSet())
1899                 {
1900                         if (bcID.getFlags() & 1)        // private
1901                         {
1902                                 BCID *bcid = servMgr->findValidBCID(bcID);
1903                                 if (!bcid || (bcid && !bcid->valid))
1904                                 {
1905                                         atom.writeParent(PCP_OLEH,1);
1906                                         atom.writeInt(PCP_HELO_DISABLE,1);
1907                                         throw StreamException("Client is banned");
1908                                 }
1909                         }
1910                 }
1911         }
1912
1913
1914         char tbuf[1024];
1915         MemoryStream mem(tbuf, sizeof(tbuf));
1916         AtomStream atom2(mem);
1917         atom2.writeParent(PCP_OLEH,5);
1918                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1919                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1920                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1921                 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1922                 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1923
1924         if (version)
1925         {
1926                 if (version < PCP_CLIENT_MINVERSION)
1927                 {
1928                         atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1929                         atom.io.write(tbuf, mem.getPosition());
1930                         throw StreamException("Agent is not valid");
1931                 }
1932         }
1933
1934         if (!rid.isSet())
1935         {
1936                 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1937                 atom.io.write(tbuf, mem.getPosition());
1938                 throw StreamException("Remote host not identified");
1939         }
1940
1941
1942
1943         if (servMgr->isRoot)
1944         {
1945                 servMgr->writeRootAtoms(atom2,false);
1946         }
1947
1948         atom.io.write(tbuf, mem.getPosition());
1949
1950         LOG_DEBUG("PCP Incoming handshake complete.");
1951
1952 }
1953
1954 // -----------------------------------
1955 void Servent::processIncomingPCP(bool suggestOthers)
1956 {
1957         PCPStream::readVersion(*sock);
1958
1959
1960         AtomStream atom(*sock);
1961         Host rhost = sock->host;
1962
1963         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1964
1965
1966         bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1967                                                         || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1968         bool unavailable = servMgr->controlInFull();
1969         bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1970
1971         char rstr[64];
1972         rhost.toStr(rstr);
1973
1974         if (unavailable || alreadyConnected || offair)
1975         {
1976                 int error;
1977
1978                 if (alreadyConnected)
1979                         error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
1980                 else if (unavailable)
1981                         error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1982                 else if (offair)
1983                         error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
1984                 else 
1985                         error = PCP_ERROR_QUIT;
1986
1987
1988                 if (suggestOthers)
1989                 {
1990
1991                         ChanHit best;
1992                         ChanHitSearch chs;
1993
1994                         int cnt=0;
1995                         for(int i=0; i<8; i++)
1996                         {
1997                                 best.init();
1998
1999                                 // find best hit on this network                        
2000                                 if (!rhost.globalIP())
2001                                 {
2002                                         chs.init();
2003                                         chs.matchHost = servMgr->serverHost;
2004                                         chs.waitDelay = 2;
2005                                         chs.excludeID = remoteID;
2006                                         chs.trackersOnly = true;
2007                                         chs.useBusyControls = false;
2008                                         if (chanMgr->pickHits(chs))
2009                                                 best = chs.best[0];
2010                                 }
2011
2012                                 // find best hit on same network                        
2013                                 if (!best.host.ip)
2014                                 {
2015                                         chs.init();
2016                                         chs.matchHost = rhost;
2017                                         chs.waitDelay = 2;
2018                                         chs.excludeID = remoteID;
2019                                         chs.trackersOnly = true;
2020                                         chs.useBusyControls = false;
2021                                         if (chanMgr->pickHits(chs))
2022                                                 best = chs.best[0];
2023                                 }
2024
2025                                 // else find best hit on other networks
2026                                 if (!best.host.ip)
2027                                 {
2028                                         chs.init();
2029                                         chs.waitDelay = 2;
2030                                         chs.excludeID = remoteID;
2031                                         chs.trackersOnly = true;
2032                                         chs.useBusyControls = false;
2033                                         if (chanMgr->pickHits(chs))
2034                                                 best = chs.best[0];
2035                                 }
2036
2037                                 if (!best.host.ip)
2038                                         break;
2039                                 
2040                                 GnuID noID;
2041                                 noID.clear();
2042                                 best.writeAtoms(atom,noID);
2043                                 cnt++;
2044                         }
2045                         if (cnt)
2046                         {
2047                                 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2048                         }
2049                         else if (rhost.port)
2050                         {
2051                                 // send push request to best firewalled tracker on other network
2052                                 chs.init();
2053                                 chs.waitDelay = 30;
2054                                 chs.excludeID = remoteID;
2055                                 chs.trackersOnly = true;
2056                                 chs.useFirewalled = true;
2057                                 chs.useBusyControls = false;
2058                                 if (chanMgr->pickHits(chs))
2059                                 {
2060                                         best = chs.best[0];
2061                                         GnuID noID;
2062                                         noID.clear();
2063                                         int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2064                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2065                                 }
2066                         }else
2067                         {
2068                                 LOG_DEBUG("No available trackers");
2069                         }
2070                 }
2071
2072
2073                 LOG_ERROR("Sending QUIT to incoming: %d",error);
2074
2075                 atom.writeInt(PCP_QUIT,error);
2076                 return;         
2077         }
2078         
2079
2080         type = T_CIN;
2081         setStatus(S_CONNECTED);
2082
2083         atom.writeInt(PCP_OK,0);
2084
2085         // ask for update
2086         atom.writeParent(PCP_ROOT,1);
2087                 atom.writeParent(PCP_ROOT_UPDATE,0);
2088
2089         pcpStream = new PCPStream(remoteID);
2090
2091         int error = 0;
2092         BroadcastState bcs;
2093         while (!error && thread.active && !sock->eof())
2094         {
2095                 error = pcpStream->readPacket(*sock,bcs);
2096                 sys->sleepIdle();
2097
2098                 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2099                         error = PCP_ERROR_OFFAIR;
2100                 if (peercastInst->isQuitting)
2101                         error = PCP_ERROR_SHUTDOWN;
2102         }
2103
2104         pcpStream->flush(*sock);
2105
2106         error += PCP_ERROR_QUIT;
2107         atom.writeInt(PCP_QUIT,error);
2108
2109         LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2110
2111 }
2112
2113 // -----------------------------------
2114 int Servent::outgoingProc(ThreadInfo *thread)
2115 {
2116 //      thread->lock();
2117         LOG_DEBUG("COUT started");
2118
2119         Servent *sv = (Servent*)thread->data;
2120                 
2121         GnuID noID;
2122         noID.clear();
2123         sv->pcpStream = new PCPStream(noID);
2124
2125         while (sv->thread.active)
2126         {
2127                 sv->setStatus(S_WAIT);
2128
2129                 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2130                 {
2131                         ChanHit bestHit;
2132                         ChanHitSearch chs;
2133                         char ipStr[64];
2134
2135                         do
2136                         {
2137                                 bestHit.init();
2138
2139                                 if (servMgr->rootHost.isEmpty())
2140                                         break;
2141
2142                                 if (sv->pushSock)
2143                                 {
2144                                         sv->sock = sv->pushSock;
2145                                         sv->pushSock = NULL;
2146                                         bestHit.host = sv->sock->host;
2147                                         break;
2148                                 }
2149
2150                                 GnuID noID;
2151                                 noID.clear();
2152                                 ChanHitList *chl = chanMgr->findHitListByID(noID);
2153                                 if (chl)
2154                                 {
2155                                         // find local tracker
2156                                         chs.init();
2157                                         chs.matchHost = servMgr->serverHost;
2158                                         chs.waitDelay = MIN_TRACKER_RETRY;
2159                                         chs.excludeID = servMgr->sessionID;
2160                                         chs.trackersOnly = true;
2161                                         if (!chl->pickHits(chs))
2162                                         {
2163                                                 // else find global tracker
2164                                                 chs.init();
2165                                                 chs.waitDelay = MIN_TRACKER_RETRY;
2166                                                 chs.excludeID = servMgr->sessionID;
2167                                                 chs.trackersOnly = true;
2168                                                 chl->pickHits(chs);
2169                                         }
2170
2171                                         if (chs.numResults)
2172                                         {
2173                                                 bestHit = chs.best[0];
2174                                         }
2175                                 }
2176
2177
2178                                 unsigned int ctime = sys->getTime();
2179
2180                                 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2181                                 {
2182                                         bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2183                                         bestHit.yp = true;
2184                                         chanMgr->lastYPConnect = ctime;
2185                                 }
2186                                 sys->sleepIdle();
2187
2188                         }while (!bestHit.host.ip && (sv->thread.active));
2189
2190
2191                         if (!bestHit.host.ip)           // give up
2192                         {
2193                                 LOG_ERROR("COUT giving up");
2194                                 break;
2195                         }
2196
2197
2198                         bestHit.host.toStr(ipStr);
2199
2200                         int error=0;
2201                         try 
2202                         {
2203
2204                                 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2205
2206                                 if (!sv->sock)
2207                                 {
2208                                         sv->setStatus(S_CONNECTING);
2209                                         sv->sock = sys->createSocket();
2210                                         if (!sv->sock)
2211                                                 throw StreamException("Unable to create socket");
2212                                         sv->sock->open(bestHit.host);
2213                                         sv->sock->connect();
2214
2215                                 }
2216
2217                                 sv->sock->setReadTimeout(30000);
2218                                 AtomStream atom(*sv->sock);
2219
2220                                 sv->setStatus(S_HANDSHAKE);
2221
2222                                 Host rhost = sv->sock->host;
2223                                 atom.writeInt(PCP_CONNECT,1);
2224                                 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2225
2226                                 sv->setStatus(S_CONNECTED);
2227
2228                                 LOG_DEBUG("COUT to %s: OK",ipStr);
2229
2230                                 sv->pcpStream->init(sv->remoteID);
2231
2232                                 BroadcastState bcs;
2233                                 bcs.servent_id = sv->servent_id;
2234                                 error = 0;
2235                                 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2236                                 {
2237                                         error = sv->pcpStream->readPacket(*sv->sock,bcs);
2238
2239                                         sys->sleepIdle();
2240
2241                                         if (!chanMgr->isBroadcasting())
2242                                                 error = PCP_ERROR_OFFAIR;
2243                                         if (peercastInst->isQuitting)
2244                                                 error = PCP_ERROR_SHUTDOWN;
2245
2246                                         if (sv->pcpStream->nextRootPacket)
2247                                                 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2248                                                         error = PCP_ERROR_NOROOT;
2249                                 }
2250                                 sv->setStatus(S_CLOSING);
2251
2252                                 sv->pcpStream->flush(*sv->sock);
2253
2254                                 error += PCP_ERROR_QUIT;
2255                                 atom.writeInt(PCP_QUIT,error);
2256
2257                                 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2258
2259                         }catch(TimeoutException &e)
2260                         {
2261                                 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2262                                 sv->setStatus(S_TIMEOUT);
2263                         }catch(StreamException &e)
2264                         {
2265                                 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2266                                 sv->setStatus(S_ERROR);
2267                         }
2268
2269                         try
2270                         {
2271                                 if (sv->sock)
2272                                 {
2273                                         sv->sock->close();
2274                                         delete sv->sock;
2275                                         sv->sock = NULL;
2276                                 }
2277
2278                         }catch(StreamException &) {}
2279
2280                         // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2281                         if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2282                                 chanMgr->deadHit(bestHit);
2283
2284                 }
2285
2286                 sys->sleepIdle();
2287         }
2288
2289         sv->kill();
2290         sys->endThread(thread);
2291         LOG_DEBUG("COUT ended");
2292         return 0;
2293 }
2294 // -----------------------------------
2295 int Servent::incomingProc(ThreadInfo *thread)
2296 {
2297 //      thread->lock();
2298
2299         Servent *sv = (Servent*)thread->data;
2300         
2301         char ipStr[64];
2302         sv->sock->host.toStr(ipStr);
2303
2304         try 
2305         {
2306                 sv->handshakeIncoming();
2307         }catch(HTTPException &e)
2308         {
2309                 try
2310                 {
2311                         sv->sock->writeLine(e.msg);
2312                         if (e.code == 401)
2313                                 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2314                         sv->sock->writeLine("");
2315                 }catch(StreamException &){}
2316                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2317         }catch(StreamException &e)
2318         {
2319                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2320         }
2321
2322
2323         sv->kill();
2324         sys->endThread(thread);
2325         return 0;
2326 }
2327 // -----------------------------------
2328 void Servent::processServent()
2329 {
2330         setStatus(S_HANDSHAKE);
2331
2332         handshakeIn();
2333
2334         if (!sock)
2335                 throw StreamException("Servent has no socket");
2336
2337         processGnutella();
2338 }
2339
2340 // -----------------------------------
2341 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2342 {       
2343         if (!doneHandshake)
2344         {
2345                 setStatus(S_HANDSHAKE);
2346
2347                 if (!handshakeStream(chanInfo))
2348                         return;
2349         }
2350
2351         if (chanInfo.id.isSet())
2352         {
2353
2354                 chanID = chanInfo.id;
2355
2356                 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2357
2358                 if (!waitForChannelHeader(chanInfo))
2359                         throw StreamException("Channel not ready");
2360
2361                 servMgr->totalStreams++;
2362
2363                 Host host = sock->host;
2364                 host.port = 0;  // force to 0 so we ignore the incoming port
2365
2366                 Channel *ch = chanMgr->findChannelByID(chanID);
2367                 if (!ch)
2368                         throw StreamException("Channel not found");
2369
2370                 if (outputProtocol == ChanInfo::SP_HTTP)
2371                 {
2372                         if ((addMetadata) && (chanMgr->icyMetaInterval))
2373                                 sendRawMetaChannel(chanMgr->icyMetaInterval);
2374                         else 
2375                                 sendRawChannel(true,true);
2376
2377                 }else if (outputProtocol == ChanInfo::SP_MMS)
2378                 {
2379                         if (nsSwitchNum)
2380                         {
2381                                 sendRawChannel(true,true);
2382                         }else
2383                         {
2384                                 sendRawChannel(true,false);
2385                         }
2386
2387                 }else if (outputProtocol  == ChanInfo::SP_PCP)
2388                 {
2389                         sendPCPChannel();
2390
2391                 } else if (outputProtocol  == ChanInfo::SP_PEERCAST)
2392                 {
2393                         sendPeercastChannel();
2394                 }
2395         }
2396
2397         setStatus(S_CLOSING);
2398 }
2399
2400 // -----------------------------------------
2401 #if 0
2402 // debug
2403                 FileStream file;
2404                 file.openReadOnly("c://test.mp3");
2405
2406                 LOG_DEBUG("raw file read");
2407                 char buf[4000];
2408                 int cnt=0;
2409                 while (!file.eof())
2410                 {
2411                         LOG_DEBUG("send %d",cnt++);
2412                         file.read(buf,sizeof(buf));
2413                         sock->write(buf,sizeof(buf));
2414
2415                 }
2416                 file.close();
2417                 LOG_DEBUG("raw file sent");
2418
2419         return;
2420 // debug
2421 #endif
2422 // -----------------------------------
2423 bool Servent::waitForChannelHeader(ChanInfo &info)
2424 {
2425         for(int i=0; i<30*10; i++)
2426         {
2427                 Channel *ch = chanMgr->findChannelByID(info.id);
2428                 if (!ch)
2429                         return false;
2430
2431                 if (ch->isPlaying() && (ch->rawData.writePos>0))
2432                         return true;
2433
2434                 if (!thread.active || !sock->active())
2435                         break;
2436                 sys->sleep(100);
2437         }
2438         return false;
2439 }
2440 // -----------------------------------
2441 void Servent::sendRawChannel(bool sendHead, bool sendData)
2442 {
2443         try
2444         {
2445
2446                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2447
2448                 Channel *ch = chanMgr->findChannelByID(chanID);
2449                 if (!ch)
2450                         throw StreamException("Channel not found");
2451
2452                 setStatus(S_CONNECTED);
2453
2454                 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2455
2456                 if (sendHead)
2457                 {
2458                         ch->headPack.writeRaw(*sock);
2459                         streamPos = ch->headPack.pos + ch->headPack.len;
2460                         LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2461                 }
2462
2463                 if (sendData)
2464                 {
2465
2466                         unsigned int streamIndex = ch->streamIndex;
2467                         unsigned int connectTime = sys->getTime();
2468                         unsigned int lastWriteTime = connectTime;
2469
2470                         while ((thread.active) && sock->active())
2471                         {
2472                                 ch = chanMgr->findChannelByID(chanID);
2473
2474                                 if (ch)
2475                                 {
2476
2477                                         if (streamIndex != ch->streamIndex)
2478                                         {
2479                                                 streamIndex = ch->streamIndex;
2480                                                 streamPos = ch->headPack.pos;
2481                                                 LOG_DEBUG("sendRaw got new stream index");
2482                                         }
2483
2484                                         ChanPacket rawPack;
2485                                         if (ch->rawData.findPacket(streamPos,rawPack))
2486                                         {
2487                                                 if (syncPos != rawPack.sync)
2488                                                         LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2489                                                 syncPos = rawPack.sync+1;
2490
2491                                                 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2492                                                 {
2493                                                         rawPack.writeRaw(*sock);
2494                                                         lastWriteTime = sys->getTime();
2495                                                 }
2496
2497                                                 if (rawPack.pos < streamPos)
2498                                                         LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2499                                                 streamPos = rawPack.pos+rawPack.len;
2500                                         } else if (sock->readReady()) {
2501                                                 char c;
2502                                                 int error = sock->readUpto(&c, 1);
2503                                                 if (error == 0) sock->close();
2504                                         }
2505                                 }
2506
2507                                 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2508                                         throw TimeoutException();
2509
2510                                 sys->sleepIdle();
2511                         }
2512                 }
2513         }catch(StreamException &e)
2514         {
2515                 LOG_ERROR("Stream channel: %s",e.msg);
2516         }
2517 }
2518
2519 #if 0
2520 // -----------------------------------
2521 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2522 {
2523         try
2524         {
2525                 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2526                 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2527                 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2528                 int numChanIDs=0;
2529                 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2530                 {
2531                         Channel *ch = &chanMgr->channels[i];
2532                         if (ch->isPlaying())
2533                                 chanIDs[numChanIDs++]=ch->info.id;
2534                 }
2535
2536
2537
2538                 setStatus(S_CONNECTED);
2539
2540
2541                 if (sendHead)
2542                 {
2543                         for(int i=0; i<numChanIDs; i++)
2544                         {
2545                                 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2546                                 if (ch)
2547                                 {
2548                                         LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2549                                         ch->headPack.writeRaw(*sock);
2550                                         chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2551                                         chanStreamIndex[i] = ch->streamIndex;
2552                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2553
2554                                 }
2555                         }
2556                 }
2557
2558                 if (sendData)
2559                 {
2560
2561                         unsigned int connectTime=sys->getTime();
2562
2563                         while ((thread.active) && sock->active())
2564                         {
2565
2566                                 for(int i=1; i<numChanIDs; i++)
2567                                 {
2568                                         Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2569                                         if (ch)
2570                                         {
2571                                                 if (chanStreamIndex[i] != ch->streamIndex)
2572                                                 {
2573                                                         chanStreamIndex[i] = ch->streamIndex;
2574                                                         chanStreamPos[i] = ch->headPack.pos;
2575                                                         LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2576                                                 }
2577
2578                                                 ChanPacket rawPack;
2579                                                 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2580                                                 {
2581                                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2582                                                                 rawPack.writeRaw(*sock);
2583
2584
2585                                                         if (rawPack.pos < chanStreamPos[i])
2586                                                                 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2587                                                         chanStreamPos[i] = rawPack.pos+rawPack.len;
2588
2589
2590                                                         //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2591                                                 }                                               
2592                                         }
2593                                         break;
2594                                 }
2595                                 
2596
2597                                 sys->sleepIdle();
2598                         }
2599                 }
2600         }catch(StreamException &e)
2601         {
2602                 LOG_ERROR("Stream channel: %s",e.msg);
2603         }
2604 }
2605 #endif
2606
2607 // -----------------------------------
2608 void Servent::sendRawMetaChannel(int interval)
2609 {
2610
2611         try
2612         {
2613                 Channel *ch = chanMgr->findChannelByID(chanID);
2614                 if (!ch)
2615                         throw StreamException("Channel not found");
2616
2617                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2618
2619                 setStatus(S_CONNECTED);
2620
2621                 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2622
2623
2624                 String lastTitle,lastURL;
2625
2626                 int             lastMsgTime=sys->getTime();
2627                 bool    showMsg=true;
2628
2629                 char buf[16384];
2630                 int bufPos=0;
2631
2632                 if ((interval > sizeof(buf)) || (interval < 1))
2633                         throw StreamException("Bad ICY Meta Interval value");
2634
2635                 unsigned int connectTime = sys->getTime();
2636                 unsigned int lastWriteTime = connectTime;
2637
2638                 streamPos = 0;          // raw meta channel has no header (its MP3)
2639
2640                 while ((thread.active) && sock->active())
2641                 {
2642                         ch = chanMgr->findChannelByID(chanID);
2643
2644                         if (ch)
2645                         {
2646
2647                                 ChanPacket rawPack;
2648                                 if (ch->rawData.findPacket(streamPos,rawPack))
2649                                 {
2650
2651                                         if (syncPos != rawPack.sync)
2652                                                 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2653                                         syncPos = rawPack.sync+1;
2654
2655                                         MemoryStream mem(rawPack.data,rawPack.len);
2656
2657                                         if (rawPack.type == ChanPacket::T_DATA)
2658                                         {
2659
2660                                                 int len = rawPack.len;
2661                                                 char *p = rawPack.data;
2662                                                 while (len)
2663                                                 {
2664                                                         int rl = len;
2665                                                         if ((bufPos+rl) > interval)
2666                                                                 rl = interval-bufPos;
2667                                                         memcpy(&buf[bufPos],p,rl);
2668                                                         bufPos+=rl;
2669                                                         p+=rl;
2670                                                         len-=rl;
2671
2672                                                         if (bufPos >= interval)
2673                                                         {
2674                                                                 bufPos = 0;     
2675                                                                 sock->write(buf,interval);
2676                                                                 lastWriteTime = sys->getTime();
2677
2678                                                                 if (chanMgr->broadcastMsgInterval)
2679                                                                         if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2680                                                                         {
2681                                                                                 showMsg ^= true;
2682                                                                                 lastMsgTime = sys->getTime();
2683                                                                         }
2684
2685                                                                 String *metaTitle = &ch->info.track.title;
2686                                                                 if (!ch->info.comment.isEmpty() && (showMsg))
2687                                                                         metaTitle = &ch->info.comment;
2688
2689
2690                                                                 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2691                                                                 {
2692
2693                                                                         char tmp[1024];
2694                                                                         String title,url;
2695
2696                                                                         title = *metaTitle;
2697                                                                         url = ch->info.url;
2698
2699                                                                         title.convertTo(String::T_META);
2700                                                                         url.convertTo(String::T_META);
2701
2702                                                                         sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2703                                                                         int len = ((strlen(tmp) + 15+1) / 16);
2704                                                                         sock->writeChar(len);
2705                                                                         sock->write(tmp,len*16);
2706
2707                                                                         lastTitle = *metaTitle;
2708                                                                         lastURL = ch->info.url;
2709
2710                                                                         LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2711
2712                                                                 }else
2713                                                                 {
2714                                                                         sock->writeChar(0);                                     
2715                                                                 }
2716
2717                                                         }
2718                                                 }
2719                                         }
2720                                         streamPos = rawPack.pos + rawPack.len;
2721                                 }
2722                         }
2723                         if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2724                                 throw TimeoutException();
2725
2726                         sys->sleepIdle();
2727
2728                 }
2729         }catch(StreamException &e)
2730         {
2731                 LOG_ERROR("Stream channel: %s",e.msg);
2732         }
2733 }
2734 // -----------------------------------
2735 void Servent::sendPeercastChannel()
2736 {
2737         try
2738         {
2739                 setStatus(S_CONNECTED);
2740
2741                 Channel *ch = chanMgr->findChannelByID(chanID);
2742                 if (!ch)
2743                         throw StreamException("Channel not found");
2744
2745                 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2746
2747                 sock->writeTag("PCST");
2748
2749                 ChanPacket pack;
2750
2751                 ch->headPack.writePeercast(*sock);
2752
2753                 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2754                 pack.writePeercast(*sock);
2755         
2756                 streamPos = 0;
2757                 unsigned int syncPos=0;
2758                 while ((thread.active) && sock->active())
2759                 {
2760                         ch = chanMgr->findChannelByID(chanID);
2761                         if (ch)
2762                         {
2763
2764                                 ChanPacket rawPack;
2765                                 if (ch->rawData.findPacket(streamPos,rawPack))
2766                                 {
2767                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2768                                         {
2769                                                 sock->writeTag("SYNC");
2770                                                 sock->writeShort(4);
2771                                                 sock->writeShort(0);
2772                                                 sock->write(&syncPos,4);
2773                                                 syncPos++;
2774
2775                                                 rawPack.writePeercast(*sock);
2776                                         }
2777                                         streamPos = rawPack.pos + rawPack.len;
2778                                 }
2779                         }
2780                         sys->sleepIdle();
2781                 }
2782
2783         }catch(StreamException &e)
2784         {
2785                 LOG_ERROR("Stream channel: %s",e.msg);
2786         }
2787 }
2788
2789 //WLock canStreamLock;
2790
2791 // -----------------------------------
2792 void Servent::sendPCPChannel()
2793 {
2794         bool skipCheck = false;
2795
2796         Channel *ch = chanMgr->findChannelByID(chanID);
2797         if (!ch)
2798                 throw StreamException("Channel not found");
2799
2800         AtomStream atom(*sock);
2801
2802         pcpStream = new PCPStream(remoteID);
2803         int error=0;
2804
2805         try
2806         {
2807
2808                 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2809
2810
2811 //              setStatus(S_CONNECTED);
2812
2813                 //canStreamLock.on();
2814                 //thread.active = canStream(ch);
2815                 //setStatus(S_CONNECTED);
2816                 //canStreamLock.off();
2817
2818                 lastSkipTime = 0;
2819                 lastSkipCount = 0;
2820                 waitPort = 0;
2821
2822                 if (thread.active){
2823                         atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2824                                 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2825                                 ch->info.writeInfoAtoms(atom);
2826                                 ch->info.writeTrackAtoms(atom);
2827                                 if (sendHeader)
2828                                 {
2829                                         atom.writeParent(PCP_CHAN_PKT,3);
2830                                         atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2831                                         atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2832                                         atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2833
2834                                         if (streamPos == 0)
2835                                                 streamPos = ch->headPack.pos+ch->headPack.len;
2836                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2837                                 }
2838                 }
2839
2840                 unsigned int streamIndex = ch->streamIndex;
2841
2842                 ChanPacket rawPack;
2843                 char pbuf[ChanPacket::MAX_DATALEN*3];
2844                 MemoryStream mems(pbuf,sizeof(pbuf));
2845                 AtomStream atom2(mems);
2846
2847                 while (thread.active)
2848                 {
2849
2850                         Channel *ch = chanMgr->findChannelByID(chanID);
2851
2852                         if (ch)
2853                         {
2854
2855                                 if (streamIndex != ch->streamIndex)
2856                                 {
2857                                         streamIndex = ch->streamIndex;
2858                                         streamPos = ch->headPack.pos;
2859                                         LOG_DEBUG("sendPCPStream got new stream index");                                                
2860                                 }
2861
2862                                 mems.rewind();
2863
2864                                 if (ch->rawData.findPacket(streamPos,rawPack))
2865                                 {
2866                                         if ((streamPos < rawPack.pos) && !rawPack.skip){
2867                                                 if (skipCheck){
2868                                                         char tmp[32];
2869                                                         getHost().IPtoStr(tmp);
2870                                                         LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2871
2872                                                         if (sys->getTime() == lastSkipTime) {
2873                                                                 LOG_DEBUG("##### skip all buffer");
2874                                                                 streamPos = ch->rawData.getLatestPos();
2875                                                                 continue;
2876                                                         }
2877
2878                                                         lastSkipTime = sys->getTime();
2879                                                         lastSkipCount++;
2880                                                 } else {
2881                                                         skipCheck = true;
2882                                                 }
2883                                         }
2884
2885                                         if (rawPack.type == ChanPacket::T_HEAD)
2886                                         {
2887                                                 atom2.writeParent(PCP_CHAN,2);
2888                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2889                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2890                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2891                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2892                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2893
2894                                                 sock->write(pbuf, mems.getPosition());
2895                                         }else if (rawPack.type == ChanPacket::T_DATA)
2896                                         {
2897                                                 atom2.writeParent(PCP_CHAN,2);
2898                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2899                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2900                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2901                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2902                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2903
2904 #ifdef WIN32
2905                                                 sock->bufferingWrite(pbuf, mems.getPosition());
2906                                                 lastSkipTime = sock->bufList.lastSkipTime;
2907                                                 lastSkipCount = sock->bufList.skipCount;
2908 #else
2909                                                 sock->write(pbuf, mems.getPosition());
2910 #endif
2911                                         }
2912
2913                                         if (rawPack.pos < streamPos)
2914                                                 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2915
2916                                         //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2917
2918                                         streamPos = rawPack.pos+rawPack.len;
2919                                 }
2920                         } else {
2921                                 throw StreamException("Channel not found");
2922                         }
2923
2924 #ifdef WIN32
2925                         sock->bufferingWrite(NULL, 0);
2926                         lastSkipTime = sock->bufList.lastSkipTime;
2927                         lastSkipCount = sock->bufList.skipCount;
2928 #endif
2929                         BroadcastState bcs;
2930                         bcs.servent_id = servent_id;
2931 //                      error = pcpStream->readPacket(*sock,bcs);
2932                         do {
2933                                 error = pcpStream->readPacket(*sock,bcs);
2934                                 if (error)
2935                                         throw StreamException("PCP exception");
2936                         } while (sock->readReady() || pcpStream->outData.numPending());
2937
2938                         sys->sleepIdle();
2939
2940                 }
2941
2942                 LOG_DEBUG("PCP channel stream closed normally.");
2943
2944         }catch(StreamException &e)
2945         {
2946                 LOG_ERROR("Stream channel: %s",e.msg);
2947         }
2948
2949         try
2950         {
2951                 pcpStream->flush(*sock);
2952                 atom.writeInt(PCP_QUIT,error);
2953         }catch(StreamException &) {}
2954
2955 }
2956
2957 // -----------------------------------
2958 int Servent::serverProc(ThreadInfo *thread)
2959 {
2960 //      thread->lock();
2961
2962
2963         Servent *sv = (Servent*)thread->data;
2964
2965         try 
2966         {
2967                 if (!sv->sock)
2968                         throw StreamException("Server has no socket");
2969
2970                 sv->setStatus(S_LISTENING);
2971
2972
2973                 char servIP[64];
2974                 sv->sock->host.toStr(servIP);
2975
2976                 if (servMgr->isRoot)
2977                         LOG_DEBUG("Root Server started: %s",servIP);
2978                 else
2979                         LOG_DEBUG("Server started: %s",servIP);
2980                 
2981
2982                 while ((thread->active) && (sv->sock->active()))
2983                 {
2984                         if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
2985                         {
2986                                 ClientSocket *cs = sv->sock->accept();
2987                                 if (cs)
2988                                 {       
2989                                         LOG_DEBUG("accepted incoming");
2990                                         Servent *ns = servMgr->allocServent();
2991                                         if (ns)
2992                                         {
2993                                                 servMgr->lastIncoming = sys->getTime();
2994                                                 ns->servPort = sv->sock->host.port;
2995                                                 ns->networkID = servMgr->networkID;
2996                                                 ns->initIncoming(cs,sv->allow);
2997                                         }else
2998                                                 LOG_ERROR("Out of servents");
2999                                 }
3000                         }
3001                         sys->sleep(100);
3002                 }
3003         }catch(StreamException &e)
3004         {
3005                 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3006         }
3007
3008         
3009         LOG_DEBUG("Server stopped");
3010
3011         sv->kill();
3012         sys->endThread(thread);
3013         return 0;
3014 }
3015  
3016 // -----------------------------------
3017 bool    Servent::writeVariable(Stream &s, const String &var)
3018 {
3019         char buf[1024];
3020
3021         if (var == "type")
3022                 strcpy(buf,getTypeStr());
3023         else if (var == "status")
3024                 strcpy(buf,getStatusStr());
3025         else if (var == "address")
3026         {
3027                 if (servMgr->enableGetName) //JP-EX s
3028                 {
3029                         getHost().toStr(buf);
3030                         char h_ip[64];
3031                         Host h = getHost();
3032                         h.toStr(h_ip);
3033
3034 /*                      ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3035                         int numHits=0;
3036                         for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3037                         {
3038                                 ChanHitList *chl = &chanMgr->hitlists[i];
3039                                 if (chl->isUsed())
3040                                         hits[numHits++] = chl;
3041                         }
3042                         bool ishit,isfw;
3043                         ishit = isfw = false;
3044                         int numRelay = 0;
3045                         if (numHits) 
3046                         {
3047                                 for(int k=0; k<numHits; k++)
3048                                 {
3049                                         ChanHitList *chl = hits[k];
3050                                         if (chl->isUsed())
3051                                         {
3052                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3053                                                 {
3054                                                         ChanHit *hit = &chl->hits[j];
3055                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
3056                                                         {
3057                                                                 ishit = true;
3058                                                                 if (hit->firewalled)
3059                                                                         isfw = true;
3060                                                                 numRelay += hit->numRelays;
3061                                                         }
3062                                                 }
3063                                         }
3064                                 }
3065                         }
3066                         strcpy(buf,"");
3067                         if (ishit == true)
3068                         {
3069                                 if (isfw == true)
3070                                 {
3071                                         if (numRelay== 0)
3072                                                 strcat(buf,"<font color=red>");
3073                                         else 
3074                                                 strcat(buf,"<font color=orange>");
3075                                 }
3076                                 else
3077                                         strcat(buf,"<font color=green>");
3078                         }
3079                         strcat(buf,h_ip);
3080                         char h_name[128];
3081                         if (ClientSocket::getHostname(h_name,h.ip))
3082                         {
3083                                 strcat(buf,"[");
3084                                 strcat(buf,h_name);
3085                                 strcat(buf,"]");
3086                         }
3087                         if (ishit == true) 
3088                         {
3089                                 strcat(buf,"</font>");
3090                         }
3091                 } //JP-EX e*/
3092
3093
3094                         bool isfw = false;
3095                         bool isRelay = true;
3096                         int numRelay = 0;
3097                         ChanHitList *chl = chanMgr->findHitListByID(chanID);
3098                         if (chl){
3099                                 ChanHit *hit = chl->hit;
3100                                 while(hit){
3101                                         if (hit->host.isValid() && (h.ip == hit->host.ip)){
3102                                                 isfw = hit->firewalled;
3103                                                 isRelay = hit->relay;
3104                                                 numRelay = hit->numRelays;
3105                                                 break;
3106                                         }
3107                                         hit = hit->next;
3108                                 }
3109                         }
3110                         strcpy(buf, "");
3111                         if (isfw){
3112                                 if (numRelay == 0){
3113                                         strcat(buf,"<font color=red>");
3114                                 } else {
3115                                         strcat(buf,"<font color=orange>");
3116                                 }
3117                         } else {
3118                                 if (!isRelay){
3119                                         if (numRelay==0){
3120                                                 strcpy(buf,"<font color=purple>");
3121                                         } else {
3122                                                 strcpy(buf,"<font color=blue>");
3123                                         }
3124                                 } else {
3125                                         strcpy(buf,"<font color=green>");
3126                                 }
3127                         }
3128                         strcat(buf,h_ip);
3129                         char h_name[128];
3130                         if (ClientSocket::getHostname(h_name,h.ip))
3131                         {
3132                                 strcat(buf,"[");
3133                                 strcat(buf,h_name);
3134                                 strcat(buf,"]");
3135                         }
3136                         strcat(buf,"</font>");
3137                 }
3138                 else 
3139                         getHost().toStr(buf);
3140         }
3141         else if (var == "agent")
3142                 strcpy(buf,agent.cstr());
3143         else if (var == "bitrate")
3144         {
3145                 if (sock)
3146                 {
3147                         unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3148                         sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3149                 }else
3150                         strcpy(buf,"0");
3151         }else if (var == "uptime")
3152         {
3153                 String uptime;
3154                 if (lastConnect)
3155                         uptime.setFromStopwatch(sys->getTime()-lastConnect);
3156                 else
3157                         uptime.set("-");
3158                 strcpy(buf,uptime.cstr());
3159         }else if (var.startsWith("gnet."))
3160         {
3161
3162                 float ctime = (float)(sys->getTime()-lastConnect);
3163                 if (var == "gnet.packetsIn")
3164                         sprintf(buf,"%d",gnuStream.packetsIn);
3165                 else if (var == "gnet.packetsInPerSec")
3166                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3167                 else if (var == "gnet.packetsOut")
3168                         sprintf(buf,"%d",gnuStream.packetsOut);
3169                 else if (var == "gnet.packetsOutPerSec")
3170                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3171                 else if (var == "gnet.normQueue")
3172                         sprintf(buf,"%d",outPacketsNorm.numPending());
3173                 else if (var == "gnet.priQueue")
3174                         sprintf(buf,"%d",outPacketsPri.numPending());
3175                 else if (var == "gnet.flowControl")
3176                         sprintf(buf,"%d",flowControl?1:0);
3177                 else if (var == "gnet.routeTime")
3178                 {
3179                         int nr = seenIDs.numUsed();
3180                         unsigned int tim = sys->getTime()-seenIDs.getOldest();
3181                 
3182                         String tstr;
3183                         tstr.setFromStopwatch(tim);
3184
3185                         if (nr)
3186                                 strcpy(buf,tstr.cstr());
3187                         else
3188                                 strcpy(buf,"-");
3189                 }
3190                 else
3191                         return false;
3192
3193         }else
3194                 return false;
3195
3196         s.writeString(buf);
3197
3198         return true;
3199 }