OSDN Git Service

FLV配信用パッチをマージ。(github:niwakazoider/peercast@21998fef7e24f437ef8d50e17562ba95eb5c1843)
[peercast-im/PeerCastIM.git] / core / common / servent.cpp
1 // ------------------------------------------------
2 // File : servent.cpp
3 // Date: 4-apr-2002
4 // Author: giles
5 // Desc: 
6 //              Servents are the actual connections between clients. They do the handshaking,
7 //              transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 //              to it on connect, it uses this to transfer all of its data.
9 //
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
16
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
23
24 #include <stdlib.h>
25 #include "servent.h"
26 #include "sys.h"
27 #include "gnutella.h"
28 #include "xml.h"
29 #include "html.h"
30 #include "http.h"
31 #include "stats.h"
32 #include "servmgr.h"
33 #include "peercast.h"
34 #include "atom.h"
35 #include "pcp.h"
36 #include "version2.h"
37 #ifdef _DEBUG
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
40 #define new DEBUG_NEW
41 #endif
42
43 #include "win32/seh.h"
44
45
46 const int DIRECT_WRITE_TIMEOUT = 60;
47
48 // -----------------------------------
49 char *Servent::statusMsgs[]=
50 {
51         "NONE",
52                 "CONNECTING",
53         "PROTOCOL",
54         "HANDSHAKE",
55         "CONNECTED",
56         "CLOSING",
57                 "LISTENING",
58                 "TIMEOUT",
59                 "REFUSED",
60                 "VERIFIED",
61                 "ERROR",
62                 "WAIT",
63                 "FREE"
64 };
65
66 // -----------------------------------
67 char *Servent::typeMsgs[]=
68 {
69                 "NONE",
70         "INCOMING",
71         "SERVER",
72                 "RELAY",
73                 "DIRECT",
74                 "COUT",
75                 "CIN",
76                 "PGNU"
77 };
78 // -----------------------------------
79 bool    Servent::isPrivate() 
80 {
81         Host h = getHost();
82         return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
83 }
84 // -----------------------------------
85 bool    Servent::isAllowed(int a) 
86 {
87         Host h = getHost();
88
89         if (servMgr->isFiltered(ServFilter::F_BAN,h))
90                 return false;
91
92         return (allow&a)!=0;
93 }
94
95 // -----------------------------------
96 bool    Servent::isFiltered(int f) 
97 {
98         Host h = getHost();
99         return servMgr->isFiltered(f,h);
100 }
101
102 int servent_count = 1;
103 // -----------------------------------
104 Servent::Servent(int index)
105 :outPacketsPri(MAX_OUTPACKETS)
106 ,outPacketsNorm(MAX_OUTPACKETS)
107 ,seenIDs(MAX_HASH)
108 ,serventIndex(index)
109 ,sock(NULL)
110 ,next(NULL)
111 {
112         reset();
113         servent_id = servent_count++;
114         lastSkipTime = 0;
115         lastSkipCount = 0;
116         waitPort = 0;
117 }
118
119 // -----------------------------------
120 Servent::~Servent()
121 {       
122         
123 }
124 // -----------------------------------
125 void    Servent::kill() 
126 {
127         thread.active = false;
128                 
129         setStatus(S_CLOSING);
130
131         if (pcpStream)
132         {
133                 PCPStream *pcp = pcpStream;
134                 pcpStream = NULL;
135                 pcp->kill();
136                 delete pcp;
137         }
138
139         chanMgr->hitlistlock.on();
140         ChanHitList *chl = chanMgr->findHitListByID(chanID);
141         if (chl) {
142                 ChanHit *chh = chl->hit;
143                 ChanHit *prev = NULL;
144                 while (chh) {
145                         if (chh->servent_id == this->servent_id){
146                                 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
147                                         chh->numHops = 0;
148                                         chh->numListeners = 0;
149                                         chh->numRelays = 0;
150                                         prev = chh;
151                                         chh = chh->next;
152                                 } else {
153                                         ChanHit *next = chh->next;
154                                         if (prev)
155                                                 prev->next = next;
156                                         else
157                                                 chl->hit = next;
158
159                                         delete chh;
160                                         chh = next;
161                                 }
162                         } else {
163                                 prev = chh;
164                                 chh = chh->next;
165                         }
166                 }
167         }
168         chanMgr->hitlistlock.off();
169
170         if (sock)
171         {
172                 sock->close();
173                 delete sock;
174                 sock = NULL;
175         }
176
177         if (pushSock)
178         {
179                 pushSock->close();
180                 delete pushSock;
181                 pushSock = NULL;
182         }
183
184 //      thread.unlock();
185
186         if (type != T_SERVER)
187         {
188                 reset();
189                 setStatus(S_FREE);
190         }
191
192 }
193 // -----------------------------------
194 void    Servent::abort() 
195 {
196         thread.active = false;
197         if (sock)
198         {
199                 sock->close();
200         }
201
202 }
203
204 // -----------------------------------
205 void Servent::reset()
206 {
207
208         remoteID.clear();
209
210         servPort = 0;
211
212         pcpStream = NULL;
213
214         flowControl = false;
215         networkID.clear();
216
217         chanID.clear();
218
219         outputProtocol = ChanInfo::SP_UNKNOWN;
220
221         agent.clear();
222         sock = NULL;
223         allow = ALLOW_ALL;
224         syncPos = 0;
225         addMetadata = false;
226         nsSwitchNum = 0;
227         pack.func = 255;
228         lastConnect = lastPing = lastPacket = 0;
229         loginPassword.clear();
230         loginMount.clear();
231         bytesPerSecond = 0;
232         priorityConnect = false;
233         pushSock = NULL;
234         sendHeader = true;
235
236         outPacketsNorm.reset();
237         outPacketsPri.reset();
238
239         seenIDs.clear();
240
241         status = S_NONE;
242         type = T_NONE;
243
244         channel_id = 0;
245
246         serventHit.init();
247 }
248 // -----------------------------------
249 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
250 {
251
252         if  (      (type == t) 
253                         && (isConnected())
254                         && (!cid.isSet() || chanID.isSame(cid))
255                         && (!sid.isSet() || !sid.isSame(remoteID))
256                         && (pcpStream != NULL)
257                 )
258         {
259                 return pcpStream->sendPacket(pack,did);
260         }
261         return false;
262 }
263
264
265 // -----------------------------------
266 bool Servent::acceptGIV(ClientSocket *givSock)
267 {
268         if (!pushSock)
269         {
270                 pushSock = givSock;
271                 return true;
272         }else
273                 return false;
274 }
275
276 // -----------------------------------
277 Host Servent::getHost()
278 {
279         Host h(0,0);
280
281         if (sock)
282                 h = sock->host;
283
284         return h;
285 }
286
287 // -----------------------------------
288 bool Servent::outputPacket(GnuPacket &p, bool pri)
289 {
290         lock.on();
291
292         bool r=false;
293         if (pri)
294                 r = outPacketsPri.write(p);
295         else
296         {
297                 if (servMgr->useFlowControl)
298                 {
299                         int per = outPacketsNorm.percentFull();
300                         if (per > 50)
301                                 flowControl = true;
302                         else if (per < 25)
303                                 flowControl = false;
304                 }
305
306
307                 bool send=true;
308                 if (flowControl)
309                 {
310                         // if in flowcontrol, only allow packets with less of a hop count than already in queue
311                         if (p.hops >= outPacketsNorm.findMinHop())
312                                 send = false;
313                 }
314
315                 if (send)
316                         r = outPacketsNorm.write(p);
317         }
318
319         lock.off();
320         return r;
321 }
322
323 // -----------------------------------
324 bool Servent::initServer(Host &h)
325 {
326         try
327         {
328                 checkFree();
329
330                 status = S_WAIT;
331
332                 createSocket();
333
334                 sock->bind(h);
335
336                 thread.data = this;
337
338                 thread.func = serverProc;
339
340                 type = T_SERVER;
341
342                 if (!sys->startThread(&thread))
343                         throw StreamException("Can`t start thread");
344
345         }catch(StreamException &e)
346         {
347                 LOG_ERROR("Bad server: %s",e.msg);
348                 kill();
349                 return false;
350         }
351
352         return true;
353 }
354 // -----------------------------------
355 void Servent::checkFree()
356 {
357         if (sock)
358                 throw StreamException("Socket already set");
359         if (thread.active)
360                 throw StreamException("Thread already active");
361 }
362 // -----------------------------------
363 void Servent::initIncoming(ClientSocket *s, unsigned int a)
364 {
365
366         try{
367
368                 checkFree();
369
370                 type = T_INCOMING;
371                 sock = s;
372                 allow = a;
373                 thread.data = this;
374                 thread.func = incomingProc;
375                 thread.finish = false;
376
377                 setStatus(S_PROTOCOL);
378
379                 char ipStr[64];
380                 sock->host.toStr(ipStr);
381                 LOG_DEBUG("Incoming from %s",ipStr);
382
383                 if (!sys->startThread(&thread))
384                         throw StreamException("Can`t start thread");
385         }catch(StreamException &e)
386         {
387                 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
388                 //servMgr->shutdownTimer = 1;   
389                 kill();
390
391                 LOG_ERROR("INCOMING FAILED: %s",e.msg);
392
393         }
394 }
395
396 // -----------------------------------
397 void Servent::initOutgoing(TYPE ty)
398 {
399         try 
400         {
401                 checkFree();
402
403
404                 type = ty;
405
406                 thread.data = this;
407                 thread.func = outgoingProc;
408
409                 if (!sys->startThread(&thread))
410                         throw StreamException("Can`t start thread");
411
412         }catch(StreamException &e)
413         {
414                 LOG_ERROR("Unable to start outgoing: %s",e.msg);
415                 kill();
416         }
417 }
418
419 // -----------------------------------
420 void Servent::initPCP(Host &rh)
421 {
422         char ipStr[64];
423         rh.toStr(ipStr);
424         try 
425         {
426                 checkFree();
427
428             createSocket();
429
430                 type = T_COUT;
431
432                 sock->open(rh);
433
434                 if (!isAllowed(ALLOW_NETWORK))
435                         throw StreamException("Servent not allowed");
436
437                 thread.data = this;
438                 thread.func = outgoingProc;
439
440                 LOG_DEBUG("Outgoing to %s",ipStr);
441
442                 if (!sys->startThread(&thread))
443                         throw StreamException("Can`t start thread");
444
445         }catch(StreamException &e)
446         {
447                 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
448                 kill();
449         }
450 }
451
452 #if 0
453 // -----------------------------------
454 void    Servent::initChannelFetch(Host &host)
455 {
456         type = T_STREAM;
457
458         char ipStr[64];
459         host.toStr(ipStr);
460
461         checkFree();
462          
463         createSocket();
464                 
465         sock->open(host);
466
467                 
468         if (!isAllowed(ALLOW_DATA))     
469                 throw StreamException("Servent not allowed");
470                 
471         sock->connect();
472 }
473 #endif
474
475 // -----------------------------------
476 void Servent::initGIV(Host &h, GnuID &id)
477 {
478         char ipStr[64];
479         h.toStr(ipStr);
480         try 
481         {
482                 checkFree();
483
484                 givID = id;
485
486             createSocket();
487
488                 sock->open(h);
489
490                 if (!isAllowed(ALLOW_NETWORK))
491                         throw StreamException("Servent not allowed");
492                 
493                 sock->connect();
494
495                 thread.data = this;
496                 thread.func = givProc;
497
498                 type = T_RELAY;
499
500                 if (!sys->startThread(&thread))
501                         throw StreamException("Can`t start thread");
502
503         }catch(StreamException &e)
504         {
505                 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
506                 kill();
507         }
508 }
509 // -----------------------------------
510 void Servent::createSocket()
511 {
512         if (sock)
513                 LOG_ERROR("Servent::createSocket attempt made while active");
514
515         sock = sys->createSocket();
516 }
517 // -----------------------------------
518 void Servent::setStatus(STATUS s)
519 {
520         if (s != status)
521         {
522                 status = s;
523
524                 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
525                         lastConnect = sys->getTime();
526         }
527
528 }
529
530 // -----------------------------------
531 void Servent::handshakeOut()
532 {
533     sock->writeLine(GNU_PEERCONN);
534
535         char str[64];
536     
537         sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
538     sock->writeLineF("%s %d",PCX_HS_PCP,1);
539
540         if (priorityConnect)
541             sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
542         
543         if (networkID.isSet())
544         {
545                 networkID.toStr(str);
546                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
547         }
548
549         servMgr->sessionID.toStr(str);
550         sock->writeLineF("%s %s",PCX_HS_ID,str);
551
552         
553     sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
554         
555         sock->writeLine("");
556
557         HTTP http(*sock);
558
559         int r = http.readResponse();
560
561         if (r != 200)
562         {
563                 LOG_ERROR("Expected 200, got %d",r);
564                 throw StreamException("Unexpected HTTP response");
565         }
566
567
568         bool versionValid = false;
569
570         GnuID clientID;
571         clientID.clear();
572
573     while (http.nextHeader())
574     {
575                 LOG_DEBUG(http.cmdLine);
576
577                 char *arg = http.getArgStr();
578                 if (!arg)
579                         continue;
580
581                 if (http.isHeader(HTTP_HS_AGENT))
582                 {
583                         agent.set(arg);
584
585                         if (strnicmp(arg,"PeerCast/",9)==0)
586                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
587                 }else if (http.isHeader(PCX_HS_NETWORKID))
588                         clientID.fromStr(arg);
589     }
590
591         if (!clientID.isSame(networkID))
592                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
593
594         if (!versionValid)
595                 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
596
597
598     sock->writeLine(GNU_OK);
599     sock->writeLine("");
600
601 }
602
603
604 // -----------------------------------
605 void Servent::processOutChannel()
606 {
607 }
608
609
610 // -----------------------------------
611 void Servent::handshakeIn()
612 {
613
614         int osType=0;
615
616         HTTP http(*sock);
617
618
619         bool versionValid = false;
620         bool diffRootVer = false;
621
622         GnuID clientID;
623         clientID.clear();
624
625     while (http.nextHeader())
626     {
627                 LOG_DEBUG("%s",http.cmdLine);
628
629                 char *arg = http.getArgStr();
630                 if (!arg)
631                         continue;
632
633                 if (http.isHeader(HTTP_HS_AGENT))
634                 {
635                         agent.set(arg);
636
637                         if (strnicmp(arg,"PeerCast/",9)==0)
638                         {
639                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
640                                 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
641                         }
642                 }else if (http.isHeader(PCX_HS_NETWORKID))
643                 {
644                         clientID.fromStr(arg);
645
646                 }else if (http.isHeader(PCX_HS_PRIORITY))
647                 {
648                         priorityConnect = atoi(arg)!=0;
649
650                 }else if (http.isHeader(PCX_HS_ID))
651                 {
652                         GnuID id;
653                         id.fromStr(arg);
654                         if (id.isSame(servMgr->sessionID))
655                                 throw StreamException("Servent loopback");
656
657                 }else if (http.isHeader(PCX_HS_OS))
658                 {
659                         if (stricmp(arg,PCX_OS_LINUX)==0)
660                                 osType = 1;
661                         else if (stricmp(arg,PCX_OS_WIN32)==0)
662                                 osType = 2;
663                         else if (stricmp(arg,PCX_OS_MACOSX)==0)
664                                 osType = 3;
665                         else if (stricmp(arg,PCX_OS_WINAMP2)==0)
666                                 osType = 4;
667                 }
668
669     }
670
671         if (!clientID.isSame(networkID))
672                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
673
674         // if this is a priority connection and all incoming connections 
675         // are full then kill an old connection to make room. Otherwise reject connection.
676         //if (!priorityConnect)
677         {
678                 if (!isPrivate())
679                         if (servMgr->pubInFull())
680                                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
681         }
682
683         if (!versionValid)
684                 throw HTTPException(HTTP_SC_FORBIDDEN,403);
685
686     sock->writeLine(GNU_OK);
687
688     sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
689
690         if (networkID.isSet())
691         {
692                 char idStr[64];
693                 networkID.toStr(idStr);
694                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
695         }
696
697         if (servMgr->isRoot)
698         {
699                 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
700                 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
701                 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
702                 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
703                 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
704
705
706                 if (diffRootVer)
707                 {
708                         sock->writeString(PCX_HS_DL);
709                         sock->writeLine(PCX_DL_URL);
710                 }
711
712                 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
713         }
714
715
716         char hostIP[64];
717         Host h = sock->host;
718         h.IPtoStr(hostIP);
719     sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
720
721
722     sock->writeLine("");
723
724
725         while (http.nextHeader());
726 }
727
728 // -----------------------------------
729 bool    Servent::pingHost(Host &rhost,GnuID &rsid)
730 {
731         char ipstr[64];
732         rhost.toStr(ipstr);
733         LOG_DEBUG("Ping host %s: trying..",ipstr);
734         ClientSocket *s=NULL;
735         bool hostOK=false;
736         try
737         {
738                 s = sys->createSocket();
739                 if (!s)
740                         return false;
741                 else
742                 {
743
744                         s->setReadTimeout(15000);
745                         s->setWriteTimeout(15000);
746                         s->open(rhost);
747                         s->connect();
748
749                         AtomStream atom(*s);
750
751                         atom.writeInt(PCP_CONNECT,1);
752                         atom.writeParent(PCP_HELO,1);
753                                 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
754
755                         GnuID sid;
756                         sid.clear();
757
758                         int numc,numd;
759                         ID4 id = atom.read(numc,numd);
760                         if (id == PCP_OLEH)
761                         {
762                                 for(int i=0; i<numc; i++)
763                                 {
764                                         int c,d;
765                                         ID4 pid = atom.read(c,d);
766                                         if (pid == PCP_SESSIONID)
767                                                 atom.readBytes(sid.id,16,d);
768                                         else
769                                                 atom.skip(c,d);
770                                 }
771                         }else
772                         {
773                                 LOG_DEBUG("Ping response: %s",id.getString().str());
774                                 throw StreamException("Bad ping response");
775                         }
776
777                         if (!sid.isSame(rsid))
778                                 throw StreamException("SIDs don`t match");
779
780                         hostOK = true;
781                         LOG_DEBUG("Ping host %s: OK",ipstr);
782                         atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
783
784
785                 }
786         }catch(StreamException &e)
787         {
788                 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
789         }
790         if (s)
791         {
792                 s->close();
793                 delete s;
794         }
795
796         if (!hostOK)
797                 rhost.port = 0;
798
799         return true;
800 }
801
802 WLock canStreamLock;
803
804 // -----------------------------------
805 bool Servent::handshakeStream(ChanInfo &chanInfo)
806 {
807
808         HTTP http(*sock);
809
810
811         bool gotPCP=false;
812         unsigned int reqPos=0;
813         unsigned short listenPort = 0;
814
815         nsSwitchNum=0;
816
817         while (http.nextHeader())
818         {
819                 char *arg = http.getArgStr();
820                 if (!arg)
821                         continue;
822
823                 if (http.isHeader(PCX_HS_PCP))
824                         gotPCP = atoi(arg)!=0;
825                 else if (http.isHeader(PCX_HS_POS))
826                         reqPos = atoi(arg);
827                 else if (http.isHeader(PCX_HS_PORT))
828                         listenPort = (unsigned short)atoi(arg);
829                 else if (http.isHeader("icy-metadata"))
830                         addMetadata = atoi(arg) > 0;
831                 else if (http.isHeader(HTTP_HS_AGENT))
832                         agent = arg;
833                 else if (http.isHeader("Pragma"))
834                 {
835                         char *ssc = stristr(arg,"stream-switch-count=");
836                         char *so = stristr(arg,"stream-offset");
837
838                         if (ssc || so)
839                         {
840                                 nsSwitchNum=1;
841                                 //nsSwitchNum = atoi(ssc+20);
842                         }
843                 }
844
845                 LOG_DEBUG("Stream: %s",http.cmdLine);
846         }
847
848
849         if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
850                 outputProtocol = ChanInfo::SP_PEERCAST;
851
852         if (outputProtocol == ChanInfo::SP_HTTP)
853         {
854                 if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
855                           || (chanInfo.contentType == ChanInfo::T_WMA)
856                           || (chanInfo.contentType == ChanInfo::T_WMV)
857                           || (chanInfo.contentType == ChanInfo::T_ASX)
858                         )
859                 outputProtocol = ChanInfo::SP_MMS;
860         }
861
862
863         bool chanFound=false;
864         bool chanReady=false;
865
866         ChanHit *sourceHit = NULL;
867
868         Channel *ch = chanMgr->findChannelByID(chanInfo.id);
869         if (ch)
870         {
871                 sendHeader = true;
872                 if (reqPos || !isIndexTxt(&chanInfo))
873                 {
874                         streamPos = ch->rawData.findOldestPos(reqPos);
875                         //streamPos = ch->rawData.getLatestPos();
876                 }else
877                 {
878                         streamPos = ch->rawData.getLatestPos();
879                 }
880
881                 chanID = chanInfo.id;
882                 serventHit.host.ip = getHost().ip;
883                 serventHit.host.port = listenPort;
884                 if (serventHit.host.globalIP())
885                         serventHit.rhost[0] = serventHit.host;
886                 else
887                         serventHit.rhost[1] = serventHit.host;
888                 serventHit.chanID = chanID;
889
890                 canStreamLock.on();
891                 chanReady = canStream(ch);
892                 if (0 && !chanReady && ch->isPlaying())
893                 {
894                         if (ch->info.getUptime() > 60
895                                 && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
896                         {
897                                 sourceHit = &ch->sourceHost;  // send source host info
898
899                                 if (listenPort)
900                                 {
901                                         // connect "this" host later
902                                         chanMgr->addHit(serventHit);
903                                 }
904
905                                 char tmp[50];
906                                 getHost().toStr(tmp);
907                                 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
908                                 ch->bump = true;
909                         }
910                         else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
911                         {
912                                 chanReady = canStream(ch);
913                                 if (!chanReady)
914                                         LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
915                         }
916                 }
917                 if (!chanReady) type = T_INCOMING;
918                 thread.active = chanReady;
919                 setStatus(S_CONNECTED);
920                 canStreamLock.off();
921                 channel_id = ch->channel_id;
922
923                 //JP-Patch add-s
924                 if (servMgr->isCheckPushStream())
925                 {
926                         if (chanReady == true)
927                         {
928                                 Host h = getHost();
929
930                                 if (!h.isLocalhost()) 
931                                 {
932                                         do 
933                                         {
934                                                 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL) 
935                                                 {                                               
936                                                         char strip[256];
937                                                         h.toStr(strip);
938                                                         LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
939                                                         chanReady = false;
940                                                         break;
941                                                 }
942
943 /*                                              ChanHitList *hits[ChanMgr::MAX_HITLISTS];
944                                                 int numHits=0;
945                                                 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
946                                                 {
947                                                         ChanHitList *chl = &chanMgr->hitlists[i];
948                                                         if (chl->isUsed())
949                                                                 hits[numHits++] = chl;
950                                                 }
951                                                 bool isfw = false;
952                                                 int numRelay = 0;
953                                                 for(int i=0; i<numHits; i++)
954                                                 {
955                                                         ChanHitList *chl = hits[i];
956                                                         if (chl->isUsed())
957                                                         {
958                                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
959                                                                 {
960                                                                         ChanHit *hit = &chl->hits[j];
961                                                                         if (hit->host.isValid() && (h.ip == hit->host.ip)) 
962                                                                         {
963                                                                                 if (hit->firewalled)
964                                                                                         isfw = true;
965                                                                                 numRelay = hit->numRelays;
966                                                                         }
967                                                                 }
968                                                         }
969                                                 }
970                                                 if ((isfw == true) && (numRelay == 0))
971                                                 {
972                                                         char strip[256];
973                                                         h.toStr(strip);
974                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
975                                                         chanReady = false;
976                                                 }*/
977
978                                                 ChanHitList *chl = chanMgr->findHitList(chanInfo);
979                                                 ChanHit *hit = (chl ? chl->hit : NULL);
980                                                 while(hit){
981                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
982                                                         {
983                                                                 if ((hit->firewalled) && (hit->numRelays == 0)){
984                                                                         char strip[256];
985                                                                         h.toStr(strip);
986                                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
987                                                                         chanReady = false;
988                                                                 }
989                                                         }
990                                                         hit = hit->next;
991                                                 }
992                                         }while (0);
993                                 }               
994                         }
995                 }
996                 //JP-Patch add-e
997         }
998
999 //      LockBlock lockblock(chanMgr->hitlistlock);
1000
1001 //      lockblock.lockon();
1002         ChanHitList *chl = chanMgr->findHitList(chanInfo);
1003
1004         if (chl)
1005         {
1006                 chanFound = true;
1007         }
1008
1009
1010         bool result = false;
1011
1012         char idStr[64];
1013         chanInfo.id.toStr(idStr);
1014
1015         char sidStr[64];
1016         servMgr->sessionID.toStr(sidStr);
1017
1018         Host rhost = sock->host;
1019
1020
1021
1022
1023         AtomStream atom(*sock);
1024
1025
1026
1027         if (!chanFound)
1028         {
1029                 sock->writeLine(HTTP_SC_NOTFOUND);
1030             sock->writeLine("");
1031                 LOG_DEBUG("Sending channel not found");
1032                 return false;
1033         }
1034
1035
1036         if (!chanReady)
1037         {
1038                 if (outputProtocol == ChanInfo::SP_PCP)
1039                 {
1040
1041                         char tbuf[8196];
1042                         MemoryStream mem(tbuf, sizeof(tbuf));
1043                         mem.writeLine(HTTP_SC_UNAVAILABLE);
1044                         mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1045                         mem.writeLine("");
1046                         sock->write(tbuf, mem.getPosition());
1047
1048                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1049
1050                         char ripStr[64];
1051                         rhost.toStr(ripStr);
1052
1053                         LOG_DEBUG("Sending channel unavailable");
1054
1055                         ChanHitSearch chs;
1056
1057                         mem.rewind();
1058                         AtomStream atom2(mem);
1059
1060                         int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1061
1062                         if (sourceHit) {
1063                                 sourceHit->writeAtoms(atom2,chanInfo.id);       
1064                                 char tmp[50];
1065                                 sourceHit->host.toStr(tmp);
1066                                 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1067                         }
1068
1069                         chanMgr->hitlistlock.on();
1070
1071                         chl = chanMgr->findHitList(chanInfo);
1072
1073                         if (chl && !sourceHit)
1074                         {
1075                                 ChanHit best;
1076                                 
1077                                 // search for up to 8 other hits
1078                                 int cnt=0;
1079                                 int i;
1080                                 for(i=0; i<8; i++)
1081                                 {
1082                                         best.init();
1083
1084
1085                                         // find best hit this network if local IP
1086                                         if (!rhost.globalIP())
1087                                         {
1088                                                 chs.init();
1089                                                 chs.matchHost = servMgr->serverHost;
1090                                                 chs.waitDelay = 2;
1091                                                 chs.excludeID = remoteID;
1092                                                 if (chl->pickHits(chs)){
1093                                                         best = chs.best[0];
1094                                                         LOG_DEBUG("find best hit this network if local IP");
1095                                                 }
1096                                         }
1097
1098                                         // find best hit on same network
1099                                         if (!best.host.ip)
1100                                         {
1101                                                 chs.init();
1102                                                 chs.matchHost = rhost;
1103                                                 chs.waitDelay = 2;
1104                                                 chs.excludeID = remoteID;
1105                                                 if (chl->pickHits(chs)){
1106                                                         best = chs.best[0];
1107                                                         LOG_DEBUG("find best hit on same network");
1108                                                 }
1109
1110                                         }
1111
1112                                         // find best hit on other networks
1113 /*                                      if (!best.host.ip)
1114                                         {
1115                                                 chs.init();
1116                                                 chs.waitDelay = 2;
1117                                                 chs.excludeID = remoteID;
1118                                                 if (chl->pickHits(chs)){
1119                                                         best = chs.best[0];
1120                                                         LOG_DEBUG("find best hit on other networks");
1121                                                 }
1122
1123                                         }*/
1124                                         
1125                                         if (!best.host.ip)
1126                                                 break;
1127
1128                                         best.writeAtoms(atom2,chanInfo.id);                        
1129                                         cnt++;
1130                                 }
1131
1132                                 if (!best.host.ip){
1133                                         char tmp[50];
1134 //                                      chanMgr->hitlistlock.on();
1135                                         int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1136 //                                      chanMgr->hitlistlock.off();
1137                                         for (int i = 0; i < rhcnt; i++){
1138                                                 chs.best[i].writeAtoms(atom2, chanInfo.id);
1139                                                 chs.best[i].host.toStr(tmp);
1140                                                 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1141                                                 best.host.ip = chs.best[i].host.ip;
1142                                         }
1143                                         cnt += rhcnt;
1144                                 }
1145
1146                                 if (cnt)
1147                                 {
1148                                         LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1149
1150                                 }
1151                                 else if (rhost.port)
1152                                 {
1153                                         // find firewalled host
1154                                         chs.init();
1155                                         chs.waitDelay = 30;
1156                                         chs.useFirewalled = true;
1157                                         chs.excludeID = remoteID;
1158                                         if (chl->pickHits(chs))
1159                                         {
1160                                                 best = chs.best[0];
1161                                                 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1162                                                 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1163                                         }
1164                                 } 
1165
1166                                 // if all else fails, use tracker
1167                                 if (!best.host.ip)
1168                                 {
1169                                         // find best tracker on this network if local IP
1170                                         if (!rhost.globalIP())
1171                                         {
1172                                                 chs.init();
1173                                                 chs.matchHost = servMgr->serverHost;
1174                                                 chs.trackersOnly = true;
1175                                                 chs.excludeID = remoteID;
1176                                                 if (chl->pickHits(chs))
1177                                                         best = chs.best[0];
1178
1179                                         }
1180
1181                                         // find local tracker
1182                                         if (!best.host.ip)
1183                                         {
1184                                                 chs.init();
1185                                                 chs.matchHost = rhost;
1186                                                 chs.trackersOnly = true;
1187                                                 chs.excludeID = remoteID;
1188                                                 if (chl->pickHits(chs))
1189                                                         best = chs.best[0];
1190                                         }
1191
1192                                         // find global tracker
1193                                         if (!best.host.ip)
1194                                         {
1195                                                 chs.init();
1196                                                 chs.trackersOnly = true;
1197                                                 chs.excludeID = remoteID;
1198                                                 if (chl->pickHits(chs))
1199                                                         best = chs.best[0];
1200                                         }
1201
1202                                         if (best.host.ip)
1203                                         {
1204                                                 best.writeAtoms(atom2,chanInfo.id);                             
1205                                                 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1206                                         }else if (rhost.port)
1207                                         {
1208                                                 // find firewalled tracker
1209                                                 chs.init();
1210                                                 chs.useFirewalled = true;
1211                                                 chs.trackersOnly = true;
1212                                                 chs.excludeID = remoteID;
1213                                                 chs.waitDelay = 30;
1214                                                 if (chl->pickHits(chs))
1215                                                 {
1216                                                         best = chs.best[0];
1217                                                         int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1218                                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1219                                                 }
1220                                         }
1221
1222                                 }
1223
1224
1225                         }
1226
1227                         chanMgr->hitlistlock.off();
1228
1229                         // return not available yet code
1230                         atom2.writeInt(PCP_QUIT,error);
1231                         sock->write(tbuf, mem.getPosition());
1232                         result = false;
1233
1234                         /*
1235                         char c[512];
1236                         // wait disconnect from other host
1237                         try{
1238                                 while(sock->read(c, sizeof(c))){
1239                                         sys->sleep(10);
1240                                 }
1241                         }catch(StreamException &e){
1242                                 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1243                         }
1244                         */
1245                 }else
1246                 {
1247                         LOG_DEBUG("Sending channel unavailable");
1248                         sock->writeLine(HTTP_SC_UNAVAILABLE);
1249                         sock->writeLine("");
1250                         result = false;
1251                 }
1252
1253         } else {
1254
1255                 if (chanInfo.contentType != ChanInfo::T_MP3)
1256                         addMetadata=false;
1257
1258                 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))               // winamp mp3 metadata check
1259                 {
1260
1261                         sock->writeLine(ICY_OK);
1262
1263                         sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1264                         sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1265                         sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1266                         sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1267                         sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1268                         sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1269                         sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1270
1271                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1272
1273                 }else
1274                 {
1275
1276                         sock->writeLine(HTTP_SC_OK);
1277
1278                         if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1279                         {
1280                                 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1281
1282                                 sock->writeLine("Accept-Ranges: none");
1283
1284                                 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1285                                 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1286                                 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1287                                 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1288                                 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1289                                 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1290                         }
1291
1292
1293                         if (outputProtocol == ChanInfo::SP_HTTP)
1294                         {
1295                                 switch (chanInfo.contentType)
1296                                 {
1297                                         case ChanInfo::T_MOV:
1298                                                 sock->writeLine("Connection: close");
1299                                                 sock->writeLine("Content-Length: 10000000");
1300                                                 break;
1301                                 }
1302                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,chanInfo.getMIMEType());
1303                         } else if (outputProtocol == ChanInfo::SP_MMS)
1304                         {
1305                                 sock->writeLine("Server: Rex/9.0.0.2980");
1306                                 sock->writeLine("Cache-Control: no-cache");
1307                                 sock->writeLine("Pragma: no-cache");
1308                                 sock->writeLine("Pragma: client-id=3587303426");
1309                                 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1310
1311                                 if (nsSwitchNum)
1312                                 {
1313                                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1314                                 }else
1315                                 {
1316                                         if (agent.contains("Android"))
1317                                         {
1318                                                 LOG_DEBUG("INFO: Android client detected.");
1319                                                 sock->writeLineF("%s %s", HTTP_HS_CONTENT, MIME_WMV);
1320                                         } else
1321                                         {
1322                                                 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1323                                                 if (ch)
1324                                                         sock->writeLineF("Content-Length: %d",ch->headPack.len);
1325                                                 sock->writeLine("Connection: Keep-Alive");
1326                                         }
1327                                 }
1328                         
1329                         } else if (outputProtocol == ChanInfo::SP_PCP)
1330                         {
1331                                 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1332                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1333
1334                         }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1335                         {
1336                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1337                         }
1338                 }
1339                 sock->writeLine("");
1340                 result = true;
1341
1342                 if (gotPCP)
1343                 {
1344                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1345                         atom.writeInt(PCP_OK,0);
1346                         if (rhost.globalIP())
1347                                 serventHit.rhost[0] = rhost;
1348                         else
1349                                 serventHit.rhost[1] = rhost;
1350                         serventHit.sessionID = remoteID;
1351                         serventHit.numHops = 1;
1352                         chanMgr->addHit(serventHit);
1353                 }
1354
1355         }
1356
1357
1358
1359         return result;
1360 }
1361
1362 // -----------------------------------
1363 void Servent::handshakeGiv(GnuID &id)
1364 {
1365         if (id.isSet())
1366         {
1367                 char idstr[64];
1368                 id.toStr(idstr);
1369                 sock->writeLineF("GIV /%s",idstr);
1370         }else
1371                 sock->writeLine("GIV");
1372
1373         sock->writeLine("");
1374 }
1375
1376
1377 // -----------------------------------
1378 void Servent::processGnutella()
1379 {
1380         type = T_PGNU;
1381
1382         //if (servMgr->isRoot && !servMgr->needConnections())
1383         if (servMgr->isRoot)
1384         {
1385                 processRoot();
1386                 return;
1387         }
1388
1389
1390
1391         gnuStream.init(sock);
1392         setStatus(S_CONNECTED);
1393
1394         if (!servMgr->isRoot)
1395         {
1396                 chanMgr->broadcastRelays(this, 1, 1);
1397                 GnuPacket *p;
1398
1399                 if ((p=outPacketsNorm.curr()))  
1400                         gnuStream.sendPacket(*p);
1401                 return;
1402         }
1403
1404         gnuStream.ping(2);
1405
1406 //      if (type != T_LOOKUP)
1407 //              chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1408
1409         lastPacket = lastPing = sys->getTime();
1410         bool doneBigPing=false;
1411
1412         const unsigned int      abortTimeoutSecs = 60;          // abort connection after 60 secs of no activitiy
1413         const unsigned int      packetTimeoutSecs = 30;         // ping connection after 30 secs of no activity
1414
1415         unsigned int currBytes=0;
1416         unsigned int lastWait=0;
1417
1418         unsigned int lastTotalIn=0,lastTotalOut=0;
1419
1420         while (thread.active && sock->active())
1421         {
1422
1423                 if (sock->readReady())
1424                 {
1425                         lastPacket = sys->getTime();
1426
1427                         if (gnuStream.readPacket(pack))
1428                         {
1429                                 char ipstr[64];
1430                                 sock->host.toStr(ipstr);
1431
1432                                 GnuID routeID;
1433                                 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1434
1435                                 if (pack.func != GNU_FUNC_PONG)
1436                                         if (servMgr->seenPacket(pack))
1437                                                 ret = GnuStream::R_DUPLICATE;
1438
1439                                 seenIDs.add(pack.id);
1440
1441
1442                                 if (ret == GnuStream::R_PROCESS)
1443                                 {
1444                                         GnuID routeID;
1445                                         ret = gnuStream.processPacket(pack,this,routeID);
1446
1447                                         if (flowControl && (ret == GnuStream::R_BROADCAST))
1448                                                 ret = GnuStream::R_DROP;
1449
1450                                 }
1451
1452                                 switch(ret)
1453                                 {
1454                                         case GnuStream::R_BROADCAST:
1455                                                 if (servMgr->broadcast(pack,this))
1456                                                         stats.add(Stats::NUMBROADCASTED);
1457                                                 else
1458                                                         stats.add(Stats::NUMDROPPED);
1459                                                 break;
1460                                         case GnuStream::R_ROUTE:
1461                                                 if (servMgr->route(pack,routeID,NULL))
1462                                                         stats.add(Stats::NUMROUTED);
1463                                                 else
1464                                                         stats.add(Stats::NUMDROPPED);
1465                                                 break;
1466                                         case GnuStream::R_ACCEPTED:
1467                                                 stats.add(Stats::NUMACCEPTED);
1468                                                 break;
1469                                         case GnuStream::R_DUPLICATE:
1470                                                 stats.add(Stats::NUMDUP);
1471                                                 break;
1472                                         case GnuStream::R_DEAD:
1473                                                 stats.add(Stats::NUMDEAD);
1474                                                 break;
1475                                         case GnuStream::R_DISCARD:
1476                                                 stats.add(Stats::NUMDISCARDED);
1477                                                 break;
1478                                         case GnuStream::R_BADVERSION:
1479                                                 stats.add(Stats::NUMOLD);
1480                                                 break;
1481                                         case GnuStream::R_DROP:
1482                                                 stats.add(Stats::NUMDROPPED);
1483                                                 break;
1484                                 }
1485
1486
1487                                 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1488
1489
1490
1491                         }else{
1492                                 LOG_ERROR("Bad packet");
1493                         }
1494                 }
1495
1496
1497                 GnuPacket *p;
1498
1499                 if ((p=outPacketsPri.curr()))                           // priority packet
1500                 {
1501                         gnuStream.sendPacket(*p);
1502                         seenIDs.add(p->id);
1503                         outPacketsPri.next();
1504                 } else if ((p=outPacketsNorm.curr()))           // or.. normal packet
1505                 {
1506                         gnuStream.sendPacket(*p);
1507                         seenIDs.add(p->id);
1508                         outPacketsNorm.next();
1509                 }
1510
1511                 int lpt =  sys->getTime()-lastPacket;
1512
1513                 if (!doneBigPing)
1514                 {
1515                         if ((sys->getTime()-lastPing) > 15)
1516                         {
1517                                 gnuStream.ping(7);
1518                                 lastPing = sys->getTime();
1519                                 doneBigPing = true;
1520                         }
1521                 }else{
1522                         if (lpt > packetTimeoutSecs)
1523                         {
1524                                 
1525                                 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1526                                 {
1527                                         gnuStream.ping(1);
1528                                         lastPing = sys->getTime();
1529                                 }
1530
1531                         }
1532                 }
1533                 if (lpt > abortTimeoutSecs)
1534                         throw TimeoutException();
1535
1536
1537                 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1538                 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1539
1540                 unsigned int bytes = totIn+totOut;
1541
1542                 lastTotalIn = sock->totalBytesIn;
1543                 lastTotalOut = sock->totalBytesOut;
1544
1545                 const int serventBandwidth = 1000;
1546
1547                 int delay = sys->idleSleepTime;
1548                 if ((bytes) && (serventBandwidth >= 8))
1549                         delay = (bytes*1000)/(serventBandwidth/8);      // set delay relative packetsize
1550
1551                 if (delay < (int)sys->idleSleepTime)
1552                         delay = sys->idleSleepTime;
1553                 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1554
1555                 sys->sleep(delay);
1556         }
1557
1558 }
1559
1560
1561 // -----------------------------------
1562 void Servent::processRoot()
1563 {
1564         try 
1565         {
1566         
1567                 gnuStream.init(sock);
1568                 setStatus(S_CONNECTED);
1569
1570                 gnuStream.ping(2);
1571
1572                 unsigned int lastConnect = sys->getTime();
1573
1574                 while (thread.active && sock->active())
1575                 {
1576                         if (gnuStream.readPacket(pack))
1577                         {
1578                                 char ipstr[64];
1579                                 sock->host.toStr(ipstr);
1580                                 
1581                                 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1582
1583
1584                                 if (pack.func == GNU_FUNC_PING)         // if ping then pong back some hosts and close
1585                                 {
1586                                         
1587                                         Host hl[32];
1588                                         int cnt = servMgr->getNewestServents(hl,32,sock->host); 
1589                                         if (cnt)
1590                                         {
1591                                                 int start = sys->rnd() % cnt;
1592                                                 int max = cnt>8?8:cnt;
1593
1594                                                 for(int i=0; i<max; i++)
1595                                                 {
1596                                                         GnuPacket pong;
1597                                                         pack.hops = 1;
1598                                                         pong.initPong(hl[start],false,pack);
1599                                                         gnuStream.sendPacket(pong);
1600
1601                                                         char ipstr[64];
1602                                                         hl[start].toStr(ipstr);
1603
1604                                                         //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1605                                                         start = (start+1) % cnt;
1606                                                 }
1607                                                 char str[64];
1608                                                 sock->host.toStr(str);
1609                                                 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1610                                         }else
1611                                         {
1612                                                 LOG_NETWORK("No Pongs to send");
1613                                                 //return;
1614                                         }
1615                                 }else if (pack.func == GNU_FUNC_PONG)           // pong?
1616                                 {
1617                                         MemoryStream pong(pack.data,pack.len);
1618
1619                                         int ip,port;
1620                                         port = pong.readShort();
1621                                         ip = pong.readLong();
1622                                         ip = SWAP4(ip);
1623
1624
1625                                         Host h(ip,port);
1626                                         if ((ip) && (port) && (h.globalIP()))
1627                                         {
1628
1629                                                 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1630                                                 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1631                                         }
1632                                         //return;
1633                                 } else if (pack.func == GNU_FUNC_HIT)
1634                                 {
1635                                         MemoryStream data(pack.data,pack.len);
1636                                         ChanHit hit;
1637                                         gnuStream.readHit(data,hit,pack.hops,pack.id);
1638                                 }
1639
1640                                 //if (gnuStream.packetsIn > 5)  // die if we get too many packets
1641                                 //      return;
1642                         }
1643
1644                         if((sys->getTime()-lastConnect > 60))
1645                                 break;
1646                 }
1647
1648
1649         }catch(StreamException &e)
1650         {
1651                 LOG_ERROR("Relay: %s",e.msg);
1652         }
1653
1654         
1655 }       
1656
1657 // -----------------------------------
1658 int Servent::givProcMain(ThreadInfo *thread)
1659 {
1660 //      thread->lock();
1661         Servent *sv = (Servent*)thread->data;
1662         try 
1663         {
1664                 sv->handshakeGiv(sv->givID);
1665                 sv->handshakeIncoming();
1666
1667         }catch(StreamException &e)
1668         {
1669                 LOG_ERROR("GIV: %s",e.msg);
1670         }
1671
1672         sv->kill();
1673         sys->endThread(thread);
1674         return 0;
1675 }
1676
1677 // -----------------------------------
1678 int Servent::givProc(ThreadInfo *thread)
1679 {
1680         SEH_THREAD(givProcMain, Servent::givProc);
1681 }
1682
1683 // -----------------------------------
1684 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1685 {
1686
1687         bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1688         bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1689
1690         bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1691
1692         char tbuf[1024];
1693         MemoryStream mem(tbuf, sizeof(tbuf));
1694         AtomStream atom2(mem);
1695         atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1696                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1697                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1698                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1699                 if (nonFW)
1700                         atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1701                 if (testFW)
1702                         atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1703                 if (sendBCID)
1704                         atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1705         atom.io.write(tbuf, mem.getPosition());
1706
1707
1708         LOG_DEBUG("PCP outgoing waiting for OLEH..");
1709
1710         int numc,numd;
1711         ID4 id = atom.read(numc,numd);
1712         if (id != PCP_OLEH)
1713         {
1714                 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1715                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1716                 throw StreamException("Got unexpected PCP response");
1717         }
1718
1719
1720
1721         char arg[64];
1722
1723         GnuID clientID;
1724         clientID.clear();
1725         rid.clear();
1726         int version=0;
1727         int disable=0;
1728
1729         Host thisHost;
1730
1731         // read OLEH response
1732         for(int i=0; i<numc; i++)
1733         {
1734                 int c,dlen;
1735                 ID4 id = atom.read(c,dlen);
1736
1737                 if (id == PCP_HELO_AGENT)
1738                 {
1739                         atom.readString(arg,sizeof(arg),dlen);
1740                         agent.set(arg);
1741
1742                 }else if (id == PCP_HELO_REMOTEIP)
1743                 {
1744                         thisHost.ip = atom.readInt();
1745
1746                 }else if (id == PCP_HELO_PORT)
1747                 {
1748                         thisHost.port = atom.readShort();
1749
1750                 }else if (id == PCP_HELO_VERSION)
1751                 {
1752                         version = atom.readInt();
1753
1754                 }else if (id == PCP_HELO_DISABLE)
1755                 {
1756                         disable = atom.readInt();
1757
1758                 }else if (id == PCP_HELO_SESSIONID)
1759                 {
1760                         atom.readBytes(rid.id,16);
1761                         if (rid.isSame(servMgr->sessionID))
1762                                 throw StreamException("Servent loopback");
1763
1764                 }else
1765                 {
1766                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1767                         atom.skip(c,dlen);
1768                 }
1769
1770     }
1771
1772
1773         // update server ip/firewall status
1774         if (isTrusted)
1775         {
1776                 if (thisHost.isValid())
1777                 {
1778                         if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1779                         {
1780                                 char ipstr[64];
1781                                 thisHost.toStr(ipstr);
1782                                 LOG_DEBUG("Got new ip: %s",ipstr);
1783                                 servMgr->serverHost.ip = thisHost.ip;
1784                         }
1785
1786                         if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1787                         {
1788                                 if (thisHost.port && thisHost.globalIP())
1789                                         servMgr->setFirewall(ServMgr::FW_OFF);
1790                                 else
1791                                         servMgr->setFirewall(ServMgr::FW_ON);
1792                         }
1793                 }
1794
1795                 if (disable == 1)
1796                 {
1797                         LOG_ERROR("client disabled: %d",disable);
1798                         servMgr->isDisabled = true;             
1799                 }else
1800                 {
1801                         servMgr->isDisabled = false;            
1802                 }
1803         }
1804
1805
1806
1807         if (!rid.isSet())
1808         {
1809                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1810                 throw StreamException("Remote host not identified");
1811         }
1812
1813         LOG_DEBUG("PCP Outgoing handshake complete.");
1814
1815 }
1816
1817 // -----------------------------------
1818 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1819 {
1820         int numc,numd;
1821         ID4 id = atom.read(numc,numd);
1822
1823
1824         if (id != PCP_HELO)
1825         {
1826                 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1827                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1828                 throw StreamException("Got unexpected PCP response");
1829         }
1830
1831         char arg[64];
1832
1833         ID4 osType;
1834
1835         int version=0;
1836
1837         int pingPort=0;
1838
1839         GnuID bcID;
1840         GnuID clientID;
1841
1842         bcID.clear();
1843         clientID.clear();
1844
1845         rhost.port = 0;
1846
1847         for(int i=0; i<numc; i++)
1848         {
1849
1850                 int c,dlen;
1851                 ID4 id = atom.read(c,dlen);
1852
1853                 if (id == PCP_HELO_AGENT)
1854                 {
1855                         atom.readString(arg,sizeof(arg),dlen);
1856                         agent.set(arg);
1857
1858                 }else if (id == PCP_HELO_VERSION)
1859                 {
1860                         version = atom.readInt();
1861
1862                 }else if (id == PCP_HELO_SESSIONID)
1863                 {
1864                         atom.readBytes(rid.id,16);
1865                         if (rid.isSame(servMgr->sessionID))
1866                                 throw StreamException("Servent loopback");
1867
1868                 }else if (id == PCP_HELO_BCID)
1869                 {
1870                         atom.readBytes(bcID.id,16);
1871
1872                 }else if (id == PCP_HELO_OSTYPE)
1873                 {
1874                         osType = atom.readInt();
1875                 }else if (id == PCP_HELO_PORT)
1876                 {
1877                         rhost.port = atom.readShort();
1878                 }else if (id == PCP_HELO_PING)
1879                 {
1880                         pingPort = atom.readShort();
1881                 }else
1882                 {
1883                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1884                         atom.skip(c,dlen);
1885                 }
1886
1887     }
1888
1889         if (version)
1890                 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1891
1892
1893         if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1894                 rhost.ip = servMgr->serverHost.ip;
1895
1896         if (pingPort)
1897         {
1898                 char ripStr[64];
1899                 rhost.toStr(ripStr);
1900                 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1901                 rhost.port = pingPort;
1902                 if (!rhost.globalIP() || !pingHost(rhost,rid))
1903                         rhost.port = 0;
1904         }
1905
1906         if (servMgr->isRoot)
1907         {
1908                 if (bcID.isSet())
1909                 {
1910                         if (bcID.getFlags() & 1)        // private
1911                         {
1912                                 BCID *bcid = servMgr->findValidBCID(bcID);
1913                                 if (!bcid || (bcid && !bcid->valid))
1914                                 {
1915                                         atom.writeParent(PCP_OLEH,1);
1916                                         atom.writeInt(PCP_HELO_DISABLE,1);
1917                                         throw StreamException("Client is banned");
1918                                 }
1919                         }
1920                 }
1921         }
1922
1923
1924         char tbuf[1024];
1925         MemoryStream mem(tbuf, sizeof(tbuf));
1926         AtomStream atom2(mem);
1927         atom2.writeParent(PCP_OLEH,5);
1928                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1929                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1930                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1931                 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1932                 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1933
1934         if (version)
1935         {
1936                 if (version < PCP_CLIENT_MINVERSION)
1937                 {
1938                         atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1939                         atom.io.write(tbuf, mem.getPosition());
1940                         throw StreamException("Agent is not valid");
1941                 }
1942         }
1943
1944         if (!rid.isSet())
1945         {
1946                 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1947                 atom.io.write(tbuf, mem.getPosition());
1948                 throw StreamException("Remote host not identified");
1949         }
1950
1951
1952
1953         if (servMgr->isRoot)
1954         {
1955                 servMgr->writeRootAtoms(atom2,false);
1956         }
1957
1958         atom.io.write(tbuf, mem.getPosition());
1959
1960         LOG_DEBUG("PCP Incoming handshake complete.");
1961
1962 }
1963
1964 // -----------------------------------
1965 void Servent::processIncomingPCP(bool suggestOthers)
1966 {
1967         PCPStream::readVersion(*sock);
1968
1969
1970         AtomStream atom(*sock);
1971         Host rhost = sock->host;
1972
1973         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1974
1975
1976         bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1977                                                         || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1978         bool unavailable = servMgr->controlInFull();
1979         bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1980
1981         char rstr[64];
1982         rhost.toStr(rstr);
1983
1984         if (unavailable || alreadyConnected || offair)
1985         {
1986                 int error;
1987
1988                 if (alreadyConnected)
1989                         error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
1990                 else if (unavailable)
1991                         error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1992                 else if (offair)
1993                         error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
1994                 else 
1995                         error = PCP_ERROR_QUIT;
1996
1997
1998                 if (suggestOthers)
1999                 {
2000
2001                         ChanHit best;
2002                         ChanHitSearch chs;
2003
2004                         int cnt=0;
2005                         for(int i=0; i<8; i++)
2006                         {
2007                                 best.init();
2008
2009                                 // find best hit on this network                        
2010                                 if (!rhost.globalIP())
2011                                 {
2012                                         chs.init();
2013                                         chs.matchHost = servMgr->serverHost;
2014                                         chs.waitDelay = 2;
2015                                         chs.excludeID = remoteID;
2016                                         chs.trackersOnly = true;
2017                                         chs.useBusyControls = false;
2018                                         if (chanMgr->pickHits(chs))
2019                                                 best = chs.best[0];
2020                                 }
2021
2022                                 // find best hit on same network                        
2023                                 if (!best.host.ip)
2024                                 {
2025                                         chs.init();
2026                                         chs.matchHost = rhost;
2027                                         chs.waitDelay = 2;
2028                                         chs.excludeID = remoteID;
2029                                         chs.trackersOnly = true;
2030                                         chs.useBusyControls = false;
2031                                         if (chanMgr->pickHits(chs))
2032                                                 best = chs.best[0];
2033                                 }
2034
2035                                 // else find best hit on other networks
2036                                 if (!best.host.ip)
2037                                 {
2038                                         chs.init();
2039                                         chs.waitDelay = 2;
2040                                         chs.excludeID = remoteID;
2041                                         chs.trackersOnly = true;
2042                                         chs.useBusyControls = false;
2043                                         if (chanMgr->pickHits(chs))
2044                                                 best = chs.best[0];
2045                                 }
2046
2047                                 if (!best.host.ip)
2048                                         break;
2049                                 
2050                                 GnuID noID;
2051                                 noID.clear();
2052                                 best.writeAtoms(atom,noID);
2053                                 cnt++;
2054                         }
2055                         if (cnt)
2056                         {
2057                                 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2058                         }
2059                         else if (rhost.port)
2060                         {
2061                                 // send push request to best firewalled tracker on other network
2062                                 chs.init();
2063                                 chs.waitDelay = 30;
2064                                 chs.excludeID = remoteID;
2065                                 chs.trackersOnly = true;
2066                                 chs.useFirewalled = true;
2067                                 chs.useBusyControls = false;
2068                                 if (chanMgr->pickHits(chs))
2069                                 {
2070                                         best = chs.best[0];
2071                                         GnuID noID;
2072                                         noID.clear();
2073                                         int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2074                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2075                                 }
2076                         }else
2077                         {
2078                                 LOG_DEBUG("No available trackers");
2079                         }
2080                 }
2081
2082
2083                 LOG_ERROR("Sending QUIT to incoming: %d",error);
2084
2085                 atom.writeInt(PCP_QUIT,error);
2086                 return;         
2087         }
2088         
2089
2090         type = T_CIN;
2091         setStatus(S_CONNECTED);
2092
2093         atom.writeInt(PCP_OK,0);
2094
2095         // ask for update
2096         atom.writeParent(PCP_ROOT,1);
2097                 atom.writeParent(PCP_ROOT_UPDATE,0);
2098
2099         pcpStream = new PCPStream(remoteID);
2100
2101         int error = 0;
2102         BroadcastState bcs;
2103         while (!error && thread.active && !sock->eof())
2104         {
2105                 error = pcpStream->readPacket(*sock,bcs);
2106                 sys->sleepIdle();
2107
2108                 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2109                         error = PCP_ERROR_OFFAIR;
2110                 if (peercastInst->isQuitting)
2111                         error = PCP_ERROR_SHUTDOWN;
2112         }
2113
2114         pcpStream->flush(*sock);
2115
2116         error += PCP_ERROR_QUIT;
2117         atom.writeInt(PCP_QUIT,error);
2118
2119         LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2120
2121 }
2122
2123 // -----------------------------------
2124 int Servent::outgoingProcMain(ThreadInfo *thread)
2125 {
2126 //      thread->lock();
2127         LOG_DEBUG("COUT started");
2128
2129         Servent *sv = (Servent*)thread->data;
2130                 
2131         GnuID noID;
2132         noID.clear();
2133         sv->pcpStream = new PCPStream(noID);
2134
2135         while (sv->thread.active)
2136         {
2137                 sv->setStatus(S_WAIT);
2138
2139                 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2140                 {
2141                         ChanHit bestHit;
2142                         ChanHitSearch chs;
2143                         char ipStr[64];
2144
2145                         do
2146                         {
2147                                 bestHit.init();
2148
2149                                 if (servMgr->rootHost.isEmpty())
2150                                         break;
2151
2152                                 if (sv->pushSock)
2153                                 {
2154                                         sv->sock = sv->pushSock;
2155                                         sv->pushSock = NULL;
2156                                         bestHit.host = sv->sock->host;
2157                                         break;
2158                                 }
2159
2160                                 GnuID noID;
2161                                 noID.clear();
2162                                 ChanHitList *chl = chanMgr->findHitListByID(noID);
2163                                 if (chl)
2164                                 {
2165                                         // find local tracker
2166                                         chs.init();
2167                                         chs.matchHost = servMgr->serverHost;
2168                                         chs.waitDelay = MIN_TRACKER_RETRY;
2169                                         chs.excludeID = servMgr->sessionID;
2170                                         chs.trackersOnly = true;
2171                                         if (!chl->pickHits(chs))
2172                                         {
2173                                                 // else find global tracker
2174                                                 chs.init();
2175                                                 chs.waitDelay = MIN_TRACKER_RETRY;
2176                                                 chs.excludeID = servMgr->sessionID;
2177                                                 chs.trackersOnly = true;
2178                                                 chl->pickHits(chs);
2179                                         }
2180
2181                                         if (chs.numResults)
2182                                         {
2183                                                 bestHit = chs.best[0];
2184                                         }
2185                                 }
2186
2187
2188                                 unsigned int ctime = sys->getTime();
2189
2190                                 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2191                                 {
2192                                         bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2193                                         bestHit.yp = true;
2194                                         chanMgr->lastYPConnect = ctime;
2195                                 }
2196                                 sys->sleepIdle();
2197
2198                         }while (!bestHit.host.ip && (sv->thread.active));
2199
2200
2201                         if (!bestHit.host.ip)           // give up
2202                         {
2203                                 LOG_ERROR("COUT giving up");
2204                                 break;
2205                         }
2206
2207
2208                         bestHit.host.toStr(ipStr);
2209
2210                         int error=0;
2211                         try 
2212                         {
2213
2214                                 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2215
2216                                 if (!sv->sock)
2217                                 {
2218                                         sv->setStatus(S_CONNECTING);
2219                                         sv->sock = sys->createSocket();
2220                                         if (!sv->sock)
2221                                                 throw StreamException("Unable to create socket");
2222                                         sv->sock->open(bestHit.host);
2223                                         sv->sock->connect();
2224
2225                                 }
2226
2227                                 sv->sock->setReadTimeout(30000);
2228                                 AtomStream atom(*sv->sock);
2229
2230                                 sv->setStatus(S_HANDSHAKE);
2231
2232                                 Host rhost = sv->sock->host;
2233                                 atom.writeInt(PCP_CONNECT,1);
2234                                 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2235
2236                                 sv->setStatus(S_CONNECTED);
2237
2238                                 LOG_DEBUG("COUT to %s: OK",ipStr);
2239
2240                                 sv->pcpStream->init(sv->remoteID);
2241
2242                                 BroadcastState bcs;
2243                                 bcs.servent_id = sv->servent_id;
2244                                 error = 0;
2245                                 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2246                                 {
2247                                         error = sv->pcpStream->readPacket(*sv->sock,bcs);
2248
2249                                         sys->sleepIdle();
2250
2251                                         if (!chanMgr->isBroadcasting())
2252                                                 error = PCP_ERROR_OFFAIR;
2253                                         if (peercastInst->isQuitting)
2254                                                 error = PCP_ERROR_SHUTDOWN;
2255
2256                                         if (sv->pcpStream->nextRootPacket)
2257                                                 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2258                                                         error = PCP_ERROR_NOROOT;
2259                                 }
2260                                 sv->setStatus(S_CLOSING);
2261
2262                                 sv->pcpStream->flush(*sv->sock);
2263
2264                                 error += PCP_ERROR_QUIT;
2265                                 atom.writeInt(PCP_QUIT,error);
2266
2267                                 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2268
2269                         }catch(TimeoutException &e)
2270                         {
2271                                 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2272                                 sv->setStatus(S_TIMEOUT);
2273                         }catch(StreamException &e)
2274                         {
2275                                 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2276                                 sv->setStatus(S_ERROR);
2277                         }
2278
2279                         try
2280                         {
2281                                 if (sv->sock)
2282                                 {
2283                                         sv->sock->close();
2284                                         delete sv->sock;
2285                                         sv->sock = NULL;
2286                                 }
2287
2288                         }catch(StreamException &) {}
2289
2290                         // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2291                         if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2292                                 chanMgr->deadHit(bestHit);
2293
2294                 }
2295
2296                 sys->sleepIdle();
2297         }
2298
2299         sv->kill();
2300         sys->endThread(thread);
2301         LOG_DEBUG("COUT ended");
2302         return 0;
2303 }
2304 // -----------------------------------
2305 int Servent::outgoingProc(ThreadInfo *thread)
2306 {
2307         SEH_THREAD(outgoingProcMain, Servent::outgoingProc);
2308 }
2309 // -----------------------------------
2310 int Servent::incomingProcMain(ThreadInfo *thread)
2311 {
2312 //      thread->lock();
2313
2314         Servent *sv = (Servent*)thread->data;
2315         
2316         char ipStr[64];
2317         sv->sock->host.toStr(ipStr);
2318
2319         try 
2320         {
2321                 sv->handshakeIncoming();
2322         }catch(HTTPException &e)
2323         {
2324                 try
2325                 {
2326                         sv->sock->writeLine(e.msg);
2327                         if (e.code == 401)
2328                                 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2329                         sv->sock->writeLine("");
2330                 }catch(StreamException &){}
2331                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2332         }catch(StreamException &e)
2333         {
2334                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2335         }
2336
2337
2338         sv->kill();
2339         sys->endThread(thread);
2340         return 0;
2341 }
2342 // -----------------------------------
2343 int Servent::incomingProc(ThreadInfo *thread)
2344 {
2345         SEH_THREAD(incomingProcMain, Servent::incomingProc);
2346 }
2347 // -----------------------------------
2348 void Servent::processServent()
2349 {
2350         setStatus(S_HANDSHAKE);
2351
2352         handshakeIn();
2353
2354         if (!sock)
2355                 throw StreamException("Servent has no socket");
2356
2357         processGnutella();
2358 }
2359
2360 // -----------------------------------
2361 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2362 {       
2363         if (!doneHandshake)
2364         {
2365                 setStatus(S_HANDSHAKE);
2366
2367                 if (!handshakeStream(chanInfo))
2368                         return;
2369         }
2370
2371         if (chanInfo.id.isSet())
2372         {
2373
2374                 chanID = chanInfo.id;
2375
2376                 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2377
2378                 if (!waitForChannelHeader(chanInfo))
2379                         throw StreamException("Channel not ready");
2380
2381                 servMgr->totalStreams++;
2382
2383                 Host host = sock->host;
2384                 host.port = 0;  // force to 0 so we ignore the incoming port
2385
2386                 Channel *ch = chanMgr->findChannelByID(chanID);
2387                 if (!ch)
2388                         throw StreamException("Channel not found");
2389
2390                 if (outputProtocol == ChanInfo::SP_HTTP)
2391                 {
2392                         if ((addMetadata) && (chanMgr->icyMetaInterval))
2393                                 sendRawMetaChannel(chanMgr->icyMetaInterval);
2394                         else 
2395                                 sendRawChannel(true,true);
2396
2397                 }else if (outputProtocol == ChanInfo::SP_MMS)
2398                 {
2399                         if (nsSwitchNum)
2400                         {
2401                                 sendRawChannel(true,true);
2402                         }else
2403                         {
2404                                 sendRawChannel(true,false);
2405                         }
2406
2407                 }else if (outputProtocol  == ChanInfo::SP_PCP)
2408                 {
2409                         sendPCPChannel();
2410
2411                 } else if (outputProtocol  == ChanInfo::SP_PEERCAST)
2412                 {
2413                         sendPeercastChannel();
2414                 }
2415         }
2416
2417         setStatus(S_CLOSING);
2418 }
2419
2420 // -----------------------------------------
2421 #if 0
2422 // debug
2423                 FileStream file;
2424                 file.openReadOnly("c://test.mp3");
2425
2426                 LOG_DEBUG("raw file read");
2427                 char buf[4000];
2428                 int cnt=0;
2429                 while (!file.eof())
2430                 {
2431                         LOG_DEBUG("send %d",cnt++);
2432                         file.read(buf,sizeof(buf));
2433                         sock->write(buf,sizeof(buf));
2434
2435                 }
2436                 file.close();
2437                 LOG_DEBUG("raw file sent");
2438
2439         return;
2440 // debug
2441 #endif
2442 // -----------------------------------
2443 bool Servent::waitForChannelHeader(ChanInfo &info)
2444 {
2445         for(int i=0; i<30*10; i++)
2446         {
2447                 Channel *ch = chanMgr->findChannelByID(info.id);
2448                 if (!ch)
2449                         return false;
2450
2451                 if (ch->isPlaying() && (ch->rawData.writePos>0))
2452                         return true;
2453
2454                 if (!thread.active || !sock->active())
2455                         break;
2456                 sys->sleep(100);
2457         }
2458         return false;
2459 }
2460 // -----------------------------------
2461 void Servent::sendRawChannel(bool sendHead, bool sendData)
2462 {
2463         try
2464         {
2465
2466                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2467
2468                 Channel *ch = chanMgr->findChannelByID(chanID);
2469                 if (!ch)
2470                         throw StreamException("Channel not found");
2471
2472                 setStatus(S_CONNECTED);
2473
2474                 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2475
2476                 if (sendHead)
2477                 {
2478                         ch->headPack.writeRaw(*sock);
2479                         streamPos = ch->headPack.pos + ch->headPack.len;
2480                         LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2481                 }
2482
2483                 if (sendData)
2484                 {
2485
2486                         unsigned int streamIndex = ch->streamIndex;
2487                         unsigned int connectTime = sys->getTime();
2488                         unsigned int lastWriteTime = connectTime;
2489
2490                         while ((thread.active) && sock->active())
2491                         {
2492                                 ch = chanMgr->findChannelByID(chanID);
2493
2494                                 if (ch)
2495                                 {
2496
2497                                         if (streamIndex != ch->streamIndex)
2498                                         {
2499                                                 streamIndex = ch->streamIndex;
2500                                                 streamPos = ch->headPack.pos;
2501                                                 LOG_DEBUG("sendRaw got new stream index");
2502                                         }
2503
2504                                         ChanPacket rawPack;
2505                                         if (ch->rawData.findPacket(streamPos,rawPack))
2506                                         {
2507                                                 if (syncPos != rawPack.sync)
2508                                                         LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2509                                                 syncPos = rawPack.sync+1;
2510
2511                                                 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2512                                                 {
2513                                                         rawPack.writeRaw(*sock);
2514                                                         lastWriteTime = sys->getTime();
2515                                                 }
2516
2517                                                 if (rawPack.pos < streamPos)
2518                                                         LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2519                                                 streamPos = rawPack.pos+rawPack.len;
2520                                         } else if (sock->readReady()) {
2521                                                 char c;
2522                                                 int error = sock->readUpto(&c, 1);
2523                                                 if (error == 0) sock->close();
2524                                         }
2525                                 }
2526
2527                                 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2528                                         throw TimeoutException();
2529
2530                                 sys->sleepIdle();
2531                         }
2532                 }
2533         }catch(StreamException &e)
2534         {
2535                 LOG_ERROR("Stream channel: %s",e.msg);
2536         }
2537 }
2538
2539 #if 0
2540 // -----------------------------------
2541 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2542 {
2543         try
2544         {
2545                 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2546                 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2547                 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2548                 int numChanIDs=0;
2549                 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2550                 {
2551                         Channel *ch = &chanMgr->channels[i];
2552                         if (ch->isPlaying())
2553                                 chanIDs[numChanIDs++]=ch->info.id;
2554                 }
2555
2556
2557
2558                 setStatus(S_CONNECTED);
2559
2560
2561                 if (sendHead)
2562                 {
2563                         for(int i=0; i<numChanIDs; i++)
2564                         {
2565                                 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2566                                 if (ch)
2567                                 {
2568                                         LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2569                                         ch->headPack.writeRaw(*sock);
2570                                         chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2571                                         chanStreamIndex[i] = ch->streamIndex;
2572                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2573
2574                                 }
2575                         }
2576                 }
2577
2578                 if (sendData)
2579                 {
2580
2581                         unsigned int connectTime=sys->getTime();
2582
2583                         while ((thread.active) && sock->active())
2584                         {
2585
2586                                 for(int i=1; i<numChanIDs; i++)
2587                                 {
2588                                         Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2589                                         if (ch)
2590                                         {
2591                                                 if (chanStreamIndex[i] != ch->streamIndex)
2592                                                 {
2593                                                         chanStreamIndex[i] = ch->streamIndex;
2594                                                         chanStreamPos[i] = ch->headPack.pos;
2595                                                         LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2596                                                 }
2597
2598                                                 ChanPacket rawPack;
2599                                                 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2600                                                 {
2601                                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2602                                                                 rawPack.writeRaw(*sock);
2603
2604
2605                                                         if (rawPack.pos < chanStreamPos[i])
2606                                                                 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2607                                                         chanStreamPos[i] = rawPack.pos+rawPack.len;
2608
2609
2610                                                         //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2611                                                 }                                               
2612                                         }
2613                                         break;
2614                                 }
2615                                 
2616
2617                                 sys->sleepIdle();
2618                         }
2619                 }
2620         }catch(StreamException &e)
2621         {
2622                 LOG_ERROR("Stream channel: %s",e.msg);
2623         }
2624 }
2625 #endif
2626
2627 // -----------------------------------
2628 void Servent::sendRawMetaChannel(int interval)
2629 {
2630
2631         try
2632         {
2633                 Channel *ch = chanMgr->findChannelByID(chanID);
2634                 if (!ch)
2635                         throw StreamException("Channel not found");
2636
2637                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2638
2639                 setStatus(S_CONNECTED);
2640
2641                 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2642
2643
2644                 String lastTitle,lastURL;
2645
2646                 int             lastMsgTime=sys->getTime();
2647                 bool    showMsg=true;
2648
2649                 char buf[16384];
2650                 int bufPos=0;
2651
2652                 if ((interval > sizeof(buf)) || (interval < 1))
2653                         throw StreamException("Bad ICY Meta Interval value");
2654
2655                 unsigned int connectTime = sys->getTime();
2656                 unsigned int lastWriteTime = connectTime;
2657
2658                 streamPos = 0;          // raw meta channel has no header (its MP3)
2659
2660                 while ((thread.active) && sock->active())
2661                 {
2662                         ch = chanMgr->findChannelByID(chanID);
2663
2664                         if (ch)
2665                         {
2666
2667                                 ChanPacket rawPack;
2668                                 if (ch->rawData.findPacket(streamPos,rawPack))
2669                                 {
2670
2671                                         if (syncPos != rawPack.sync)
2672                                                 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2673                                         syncPos = rawPack.sync+1;
2674
2675                                         MemoryStream mem(rawPack.data,rawPack.len);
2676
2677                                         if (rawPack.type == ChanPacket::T_DATA)
2678                                         {
2679
2680                                                 int len = rawPack.len;
2681                                                 char *p = rawPack.data;
2682                                                 while (len)
2683                                                 {
2684                                                         int rl = len;
2685                                                         if ((bufPos+rl) > interval)
2686                                                                 rl = interval-bufPos;
2687                                                         memcpy(&buf[bufPos],p,rl);
2688                                                         bufPos+=rl;
2689                                                         p+=rl;
2690                                                         len-=rl;
2691
2692                                                         if (bufPos >= interval)
2693                                                         {
2694                                                                 bufPos = 0;     
2695                                                                 sock->write(buf,interval);
2696                                                                 lastWriteTime = sys->getTime();
2697
2698                                                                 if (chanMgr->broadcastMsgInterval)
2699                                                                         if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2700                                                                         {
2701                                                                                 showMsg ^= true;
2702                                                                                 lastMsgTime = sys->getTime();
2703                                                                         }
2704
2705                                                                 String *metaTitle = &ch->info.track.title;
2706                                                                 if (!ch->info.comment.isEmpty() && (showMsg))
2707                                                                         metaTitle = &ch->info.comment;
2708
2709
2710                                                                 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2711                                                                 {
2712
2713                                                                         char tmp[1024];
2714                                                                         String title,url;
2715
2716                                                                         title = *metaTitle;
2717                                                                         url = ch->info.url;
2718
2719                                                                         title.convertTo(String::T_META);
2720                                                                         url.convertTo(String::T_META);
2721
2722                                                                         sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2723                                                                         int len = ((strlen(tmp) + 15+1) / 16);
2724                                                                         sock->writeChar(len);
2725                                                                         sock->write(tmp,len*16);
2726
2727                                                                         lastTitle = *metaTitle;
2728                                                                         lastURL = ch->info.url;
2729
2730                                                                         LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2731
2732                                                                 }else
2733                                                                 {
2734                                                                         sock->writeChar(0);                                     
2735                                                                 }
2736
2737                                                         }
2738                                                 }
2739                                         }
2740                                         streamPos = rawPack.pos + rawPack.len;
2741                                 }
2742                         }
2743                         if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2744                                 throw TimeoutException();
2745
2746                         sys->sleepIdle();
2747
2748                 }
2749         }catch(StreamException &e)
2750         {
2751                 LOG_ERROR("Stream channel: %s",e.msg);
2752         }
2753 }
2754 // -----------------------------------
2755 void Servent::sendPeercastChannel()
2756 {
2757         try
2758         {
2759                 setStatus(S_CONNECTED);
2760
2761                 Channel *ch = chanMgr->findChannelByID(chanID);
2762                 if (!ch)
2763                         throw StreamException("Channel not found");
2764
2765                 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2766
2767                 sock->writeTag("PCST");
2768
2769                 ChanPacket pack;
2770
2771                 ch->headPack.writePeercast(*sock);
2772
2773                 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2774                 pack.writePeercast(*sock);
2775         
2776                 streamPos = 0;
2777                 unsigned int syncPos=0;
2778                 while ((thread.active) && sock->active())
2779                 {
2780                         ch = chanMgr->findChannelByID(chanID);
2781                         if (ch)
2782                         {
2783
2784                                 ChanPacket rawPack;
2785                                 if (ch->rawData.findPacket(streamPos,rawPack))
2786                                 {
2787                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2788                                         {
2789                                                 sock->writeTag("SYNC");
2790                                                 sock->writeShort(4);
2791                                                 sock->writeShort(0);
2792                                                 sock->write(&syncPos,4);
2793                                                 syncPos++;
2794
2795                                                 rawPack.writePeercast(*sock);
2796                                         }
2797                                         streamPos = rawPack.pos + rawPack.len;
2798                                 }
2799                         }
2800                         sys->sleepIdle();
2801                 }
2802
2803         }catch(StreamException &e)
2804         {
2805                 LOG_ERROR("Stream channel: %s",e.msg);
2806         }
2807 }
2808
2809 //WLock canStreamLock;
2810
2811 // -----------------------------------
2812 void Servent::sendPCPChannel()
2813 {
2814         bool skipCheck = false;
2815         unsigned int ptime = 0;
2816         int npacket = 0, upsize = 0;
2817
2818         Channel *ch = chanMgr->findChannelByID(chanID);
2819         if (!ch)
2820                 throw StreamException("Channel not found");
2821
2822         AtomStream atom(*sock);
2823
2824         pcpStream = new PCPStream(remoteID);
2825         int error=0;
2826
2827         try
2828         {
2829
2830                 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2831
2832
2833 //              setStatus(S_CONNECTED);
2834
2835                 //canStreamLock.on();
2836                 //thread.active = canStream(ch);
2837                 //setStatus(S_CONNECTED);
2838                 //canStreamLock.off();
2839
2840                 lastSkipTime = 0;
2841                 lastSkipCount = 0;
2842                 waitPort = 0;
2843
2844                 if (thread.active){
2845                         atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2846                                 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2847                                 ch->info.writeInfoAtoms(atom);
2848                                 ch->info.writeTrackAtoms(atom);
2849                                 if (sendHeader)
2850                                 {
2851                                         atom.writeParent(PCP_CHAN_PKT,3);
2852                                         atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2853                                         atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2854                                         atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2855
2856                                         if (streamPos == 0)
2857                                                 streamPos = ch->headPack.pos+ch->headPack.len;
2858                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2859                                 }
2860                 }
2861
2862                 unsigned int streamIndex = ch->streamIndex;
2863
2864                 ChanPacket rawPack;
2865                 char pbuf[ChanPacket::MAX_DATALEN*3];
2866                 MemoryStream mems(pbuf,sizeof(pbuf));
2867                 AtomStream atom2(mems);
2868
2869                 while (thread.active)
2870                 {
2871
2872                         Channel *ch = chanMgr->findChannelByID(chanID);
2873
2874                         if (ch)
2875                         {
2876
2877                                 if (streamIndex != ch->streamIndex)
2878                                 {
2879                                         streamIndex = ch->streamIndex;
2880                                         streamPos = ch->headPack.pos;
2881                                         LOG_DEBUG("sendPCPStream got new stream index");                                                
2882                                 }
2883
2884                                 mems.rewind();
2885
2886                                 if (ch->rawData.findPacket(streamPos,rawPack))
2887                                 {
2888                                         if ((streamPos < rawPack.pos) && !rawPack.skip){
2889                                                 if (skipCheck){
2890                                                         char tmp[32];
2891                                                         getHost().IPtoStr(tmp);
2892                                                         LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2893
2894                                                         if (sys->getTime() == lastSkipTime) {
2895                                                                 LOG_DEBUG("##### skip all buffer");
2896                                                                 streamPos = ch->rawData.getLatestPos();
2897                                                                 continue;
2898                                                         }
2899
2900                                                         lastSkipTime = sys->getTime();
2901                                                         lastSkipCount++;
2902                                                 } else {
2903                                                         skipCheck = true;
2904                                                 }
2905                                         }
2906
2907                                         if (rawPack.type == ChanPacket::T_HEAD)
2908                                         {
2909                                                 atom2.writeParent(PCP_CHAN,2);
2910                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2911                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2912                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2913                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2914                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2915
2916                                                 sock->write(pbuf, mems.getPosition());
2917                                         }else if (rawPack.type == ChanPacket::T_DATA)
2918                                         {
2919                                                 atom2.writeParent(PCP_CHAN,2);
2920                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2921                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2922                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2923                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2924                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2925
2926 #ifdef WIN32
2927                                                 sock->bufferingWrite(pbuf, mems.getPosition());
2928                                                 lastSkipTime = sock->bufList.lastSkipTime;
2929                                                 lastSkipCount = sock->bufList.skipCount;
2930 #else
2931                                                 sock->write(pbuf, mems.getPosition());
2932 #endif
2933                                         }
2934
2935                                         if (rawPack.pos < streamPos)
2936                                                 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2937
2938                                         //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2939
2940                                         streamPos = rawPack.pos+rawPack.len;
2941                                 }
2942                         } else {
2943                                 throw StreamException("Channel not found");
2944                         }
2945
2946 #ifdef WIN32
2947                         sock->bufferingWrite(NULL, 0);
2948                         lastSkipTime = sock->bufList.lastSkipTime;
2949                         lastSkipCount = sock->bufList.skipCount;
2950 #endif
2951                         BroadcastState bcs;
2952                         bcs.servent_id = servent_id;
2953 //                      error = pcpStream->readPacket(*sock,bcs);
2954
2955                         unsigned int t = sys->getTime();
2956                         if (t != ptime) {
2957                                 ptime = t;
2958                                 npacket = MAX_PROC_PACKETS;
2959                                 upsize = MAX_OUTWARD_SIZE;
2960                         }
2961
2962                         int len = pcpStream->flushUb(*sock, upsize);
2963                         upsize -= len;
2964
2965                         while (npacket > 0 && sock->readReady()) {
2966                                 npacket--;
2967                                 error = pcpStream->readPacket(*sock,bcs);
2968                                 if (error)
2969                                         throw StreamException("PCP exception");
2970                         }
2971
2972                         sys->sleepIdle();
2973
2974                 }
2975
2976                 LOG_DEBUG("PCP channel stream closed normally.");
2977
2978         }catch(StreamException &e)
2979         {
2980                 LOG_ERROR("Stream channel: %s",e.msg);
2981         }
2982
2983         try
2984         {
2985                 pcpStream->flush(*sock);
2986                 atom.writeInt(PCP_QUIT,error);
2987         }catch(StreamException &) {}
2988
2989 }
2990
2991 // -----------------------------------
2992 int Servent::serverProcMain(ThreadInfo *thread)
2993 {
2994 //      thread->lock();
2995
2996
2997         Servent *sv = (Servent*)thread->data;
2998
2999         try 
3000         {
3001                 if (!sv->sock)
3002                         throw StreamException("Server has no socket");
3003
3004                 sv->setStatus(S_LISTENING);
3005
3006
3007                 char servIP[64];
3008                 sv->sock->host.toStr(servIP);
3009
3010                 if (servMgr->isRoot)
3011                         LOG_DEBUG("Root Server started: %s",servIP);
3012                 else
3013                         LOG_DEBUG("Server started: %s",servIP);
3014                 
3015
3016                 while ((thread->active) && (sv->sock->active()))
3017                 {
3018                         if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3019                         {
3020                                 ClientSocket *cs = sv->sock->accept();
3021
3022                                 // \95s\90³\82È\83\\81[\83X\83A\83h\83\8c\83X(IPv4\83}\83\8b\83`\83L\83\83\83X\83g)\82ð\8f\9c\8aO
3023                                 if (cs && (((cs->host.ip >> 24) & 0xF0) == 0xE0))
3024                                 {
3025                                         char ip[64];
3026                                         cs->host.toStr(ip);
3027                                         cs->close();
3028                                         LOG_ERROR("reject incoming multicast address: %s", ip);
3029                                         peercastApp->notifyMessage(ServMgr::NT_PEERCAST, "reject multicast address");
3030                                 } else
3031                                 if (cs)
3032                                 {
3033                                         // countermeasure against DoS Atk
3034                                         if (cs->host.ip != (0x7F000001)) // bypass loopback
3035                                         {
3036                                                 // check blacklist
3037                                                 addrCont clientAddr(cs->host.ip);
3038                                                 servMgr->IP_blacklist->lock();
3039                                                 if (servMgr->IP_blacklist->find(clientAddr))
3040                                                 {
3041                                                         // blacklisted
3042                                                         servMgr->IP_blacklist->unlock();
3043
3044                                                         LOG_DEBUG("REFUSED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3045                                                         cs->close();
3046                                                         sys->sleep(100);
3047
3048                                                         continue;
3049                                                 }
3050
3051                                                 servMgr->IP_blacklist->unlock();
3052                                                 LOG_DEBUG("ACCEPT: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3053
3054
3055                                                 // check graylist
3056                                                 servMgr->IP_graylist->lock();
3057                                                 size_t idx;
3058                                                 if (servMgr->IP_graylist->find(clientAddr, &idx))
3059                                                 {
3060                                                         // update
3061                                                         ++(servMgr->IP_graylist->at(idx));
3062                                                         LOG_DEBUG("UPDATE: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3063                                                 } else
3064                                                 {
3065                                                         // graylisted
3066                                                         servMgr->IP_graylist->push_back(clientAddr);
3067                                                         LOG_DEBUG("GRAYED: %d.%d.%d.%d", (cs->host.ip >> 24), (cs->host.ip >> 16) & 0xFF, (cs->host.ip >> 8) & 0xFF, cs->host.ip & 0xFF);
3068                                                 }
3069                                                 servMgr->IP_graylist->unlock();
3070                                         }
3071
3072                                         LOG_DEBUG("accepted incoming");
3073                                         Servent *ns = servMgr->allocServent();
3074                                         if (ns)
3075                                         {
3076                                                 servMgr->lastIncoming = sys->getTime();
3077                                                 ns->servPort = sv->sock->host.port;
3078                                                 ns->networkID = servMgr->networkID;
3079                                                 ns->initIncoming(cs,sv->allow);
3080                                         }else
3081                                                 LOG_ERROR("Out of servents");
3082                                 }
3083                         }
3084                         sys->sleep(10);
3085                 }
3086         }catch(StreamException &e)
3087         {
3088                 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3089         }
3090
3091         
3092         LOG_DEBUG("Server stopped");
3093
3094         sv->kill();
3095         sys->endThread(thread);
3096         return 0;
3097 }
3098
3099 // -----------------------------------
3100 int Servent::serverProc(ThreadInfo *thread)
3101 {
3102         SEH_THREAD(serverProcMain, Servent::serverProc);
3103 }
3104  
3105 // -----------------------------------
3106 bool    Servent::writeVariable(Stream &s, const String &var)
3107 {
3108         char buf[1024];
3109
3110         if (var == "type")
3111                 strcpy(buf,getTypeStr());
3112         else if (var == "status")
3113                 strcpy(buf,getStatusStr());
3114         else if (var == "address")
3115         {
3116                 if (servMgr->enableGetName) //JP-EX s
3117                 {
3118                         getHost().toStr(buf);
3119                         char h_ip[64];
3120                         Host h = getHost();
3121                         h.toStr(h_ip);
3122
3123 /*                      ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3124                         int numHits=0;
3125                         for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3126                         {
3127                                 ChanHitList *chl = &chanMgr->hitlists[i];
3128                                 if (chl->isUsed())
3129                                         hits[numHits++] = chl;
3130                         }
3131                         bool ishit,isfw;
3132                         ishit = isfw = false;
3133                         int numRelay = 0;
3134                         if (numHits) 
3135                         {
3136                                 for(int k=0; k<numHits; k++)
3137                                 {
3138                                         ChanHitList *chl = hits[k];
3139                                         if (chl->isUsed())
3140                                         {
3141                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3142                                                 {
3143                                                         ChanHit *hit = &chl->hits[j];
3144                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
3145                                                         {
3146                                                                 ishit = true;
3147                                                                 if (hit->firewalled)
3148                                                                         isfw = true;
3149                                                                 numRelay += hit->numRelays;
3150                                                         }
3151                                                 }
3152                                         }
3153                                 }
3154                         }
3155                         strcpy(buf,"");
3156                         if (ishit == true)
3157                         {
3158                                 if (isfw == true)
3159                                 {
3160                                         if (numRelay== 0)
3161                                                 strcat(buf,"<font color=red>");
3162                                         else 
3163                                                 strcat(buf,"<font color=orange>");
3164                                 }
3165                                 else
3166                                         strcat(buf,"<font color=green>");
3167                         }
3168                         strcat(buf,h_ip);
3169                         char h_name[128];
3170                         if (ClientSocket::getHostname(h_name,h.ip))
3171                         {
3172                                 strcat(buf,"[");
3173                                 strcat(buf,h_name);
3174                                 strcat(buf,"]");
3175                         }
3176                         if (ishit == true) 
3177                         {
3178                                 strcat(buf,"</font>");
3179                         }
3180                 } //JP-EX e*/
3181
3182
3183                         bool isfw = false;
3184                         bool isRelay = true;
3185                         int numRelay = 0;
3186                         ChanHitList *chl = chanMgr->findHitListByID(chanID);
3187                         if (chl){
3188                                 ChanHit *hit = chl->hit;
3189                                 while(hit){
3190                                         if (hit->host.isValid() && (h.ip == hit->host.ip)){
3191                                                 isfw = hit->firewalled;
3192                                                 isRelay = hit->relay;
3193                                                 numRelay = hit->numRelays;
3194                                                 break;
3195                                         }
3196                                         hit = hit->next;
3197                                 }
3198                         }
3199                         strcpy(buf, "");
3200                         if (isfw){
3201                                 if (numRelay == 0){
3202                                         strcat(buf,"<font color=red>");
3203                                 } else {
3204                                         strcat(buf,"<font color=orange>");
3205                                 }
3206                         } else {
3207                                 if (!isRelay){
3208                                         if (numRelay==0){
3209                                                 strcpy(buf,"<font color=purple>");
3210                                         } else {
3211                                                 strcpy(buf,"<font color=blue>");
3212                                         }
3213                                 } else {
3214                                         strcpy(buf,"<font color=green>");
3215                                 }
3216                         }
3217                         strcat(buf,h_ip);
3218                         char h_name[128];
3219                         if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF\91Î\8dô)
3220                         {
3221                                 strcat(buf,"[");
3222                                 strcat(buf,h_name);
3223                                 strcat(buf,"]");
3224                         }
3225                         strcat(buf,"</font>");
3226                 }
3227                 else 
3228                         getHost().toStr(buf);
3229         }
3230         else if (var == "agent")
3231                 strcpy(buf,agent.cstr());
3232         else if (var == "bitrate")
3233         {
3234                 if (sock)
3235                 {
3236                         unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3237                         sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3238                 }else
3239                         strcpy(buf,"0");
3240         }else if (var == "uptime")
3241         {
3242                 String uptime;
3243                 if (lastConnect)
3244                         uptime.setFromStopwatch(sys->getTime()-lastConnect);
3245                 else
3246                         uptime.set("-");
3247                 strcpy(buf,uptime.cstr());
3248         }else if (var.startsWith("gnet."))
3249         {
3250
3251                 float ctime = (float)(sys->getTime()-lastConnect);
3252                 if (var == "gnet.packetsIn")
3253                         sprintf(buf,"%d",gnuStream.packetsIn);
3254                 else if (var == "gnet.packetsInPerSec")
3255                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3256                 else if (var == "gnet.packetsOut")
3257                         sprintf(buf,"%d",gnuStream.packetsOut);
3258                 else if (var == "gnet.packetsOutPerSec")
3259                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3260                 else if (var == "gnet.normQueue")
3261                         sprintf(buf,"%d",outPacketsNorm.numPending());
3262                 else if (var == "gnet.priQueue")
3263                         sprintf(buf,"%d",outPacketsPri.numPending());
3264                 else if (var == "gnet.flowControl")
3265                         sprintf(buf,"%d",flowControl?1:0);
3266                 else if (var == "gnet.routeTime")
3267                 {
3268                         int nr = seenIDs.numUsed();
3269                         unsigned int tim = sys->getTime()-seenIDs.getOldest();
3270                 
3271                         String tstr;
3272                         tstr.setFromStopwatch(tim);
3273
3274                         if (nr)
3275                                 strcpy(buf,tstr.cstr());
3276                         else
3277                                 strcpy(buf,"-");
3278                 }
3279                 else
3280                         return false;
3281
3282         }else
3283                 return false;
3284
3285         s.writeString(buf);
3286
3287         return true;
3288 }