OSDN Git Service

CoreのスレッドにSEHを設置
[peercast-im/PeerCastIM.git] / PeerCast.root / PeerCast / core / common / servent.cpp
1 // ------------------------------------------------
2 // File : servent.cpp
3 // Date: 4-apr-2002
4 // Author: giles
5 // Desc: 
6 //              Servents are the actual connections between clients. They do the handshaking,
7 //              transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 //              to it on connect, it uses this to transfer all of its data.
9 //
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
16
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
23
24 #include <stdlib.h>
25 #include "servent.h"
26 #include "sys.h"
27 #include "gnutella.h"
28 #include "xml.h"
29 #include "html.h"
30 #include "http.h"
31 #include "stats.h"
32 #include "servmgr.h"
33 #include "peercast.h"
34 #include "atom.h"
35 #include "pcp.h"
36 #include "version2.h"
37 #ifdef _DEBUG
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
40 #define new DEBUG_NEW
41 #endif
42
43 #include "win32/seh.h"
44
45
46 const int DIRECT_WRITE_TIMEOUT = 60;
47
48 // -----------------------------------
49 char *Servent::statusMsgs[]=
50 {
51         "NONE",
52                 "CONNECTING",
53         "PROTOCOL",
54         "HANDSHAKE",
55         "CONNECTED",
56         "CLOSING",
57                 "LISTENING",
58                 "TIMEOUT",
59                 "REFUSED",
60                 "VERIFIED",
61                 "ERROR",
62                 "WAIT",
63                 "FREE"
64 };
65
66 // -----------------------------------
67 char *Servent::typeMsgs[]=
68 {
69                 "NONE",
70         "INCOMING",
71         "SERVER",
72                 "RELAY",
73                 "DIRECT",
74                 "COUT",
75                 "CIN",
76                 "PGNU"
77 };
78 // -----------------------------------
79 bool    Servent::isPrivate() 
80 {
81         Host h = getHost();
82         return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
83 }
84 // -----------------------------------
85 bool    Servent::isAllowed(int a) 
86 {
87         Host h = getHost();
88
89         if (servMgr->isFiltered(ServFilter::F_BAN,h))
90                 return false;
91
92         return (allow&a)!=0;
93 }
94
95 // -----------------------------------
96 bool    Servent::isFiltered(int f) 
97 {
98         Host h = getHost();
99         return servMgr->isFiltered(f,h);
100 }
101
102 int servent_count = 1;
103 // -----------------------------------
104 Servent::Servent(int index)
105 :outPacketsPri(MAX_OUTPACKETS)
106 ,outPacketsNorm(MAX_OUTPACKETS)
107 ,seenIDs(MAX_HASH)
108 ,serventIndex(index)
109 ,sock(NULL)
110 ,next(NULL)
111 {
112         reset();
113         servent_id = servent_count++;
114         lastSkipTime = 0;
115         lastSkipCount = 0;
116         waitPort = 0;
117 }
118
119 // -----------------------------------
120 Servent::~Servent()
121 {       
122         
123 }
124 // -----------------------------------
125 void    Servent::kill() 
126 {
127         thread.active = false;
128                 
129         setStatus(S_CLOSING);
130
131         if (pcpStream)
132         {
133                 PCPStream *pcp = pcpStream;
134                 pcpStream = NULL;
135                 pcp->kill();
136                 delete pcp;
137         }
138
139         chanMgr->hitlistlock.on();
140         ChanHitList *chl = chanMgr->findHitListByID(chanID);
141         if (chl) {
142                 ChanHit *chh = chl->hit;
143                 ChanHit *prev = NULL;
144                 while (chh) {
145                         if (chh->servent_id == this->servent_id){
146                                 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
147                                         chh->numHops = 0;
148                                         chh->numListeners = 0;
149                                         chh->numRelays = 0;
150                                         prev = chh;
151                                         chh = chh->next;
152                                 } else {
153                                         ChanHit *next = chh->next;
154                                         if (prev)
155                                                 prev->next = next;
156                                         else
157                                                 chl->hit = next;
158
159                                         delete chh;
160                                         chh = next;
161                                 }
162                         } else {
163                                 prev = chh;
164                                 chh = chh->next;
165                         }
166                 }
167         }
168         chanMgr->hitlistlock.off();
169
170         if (sock)
171         {
172                 sock->close();
173                 delete sock;
174                 sock = NULL;
175         }
176
177         if (pushSock)
178         {
179                 pushSock->close();
180                 delete pushSock;
181                 pushSock = NULL;
182         }
183
184 //      thread.unlock();
185
186         if (type != T_SERVER)
187         {
188                 reset();
189                 setStatus(S_FREE);
190         }
191
192 }
193 // -----------------------------------
194 void    Servent::abort() 
195 {
196         thread.active = false;
197         if (sock)
198         {
199                 sock->close();
200         }
201
202 }
203
204 // -----------------------------------
205 void Servent::reset()
206 {
207
208         remoteID.clear();
209
210         servPort = 0;
211
212         pcpStream = NULL;
213
214         flowControl = false;
215         networkID.clear();
216
217         chanID.clear();
218
219         outputProtocol = ChanInfo::SP_UNKNOWN;
220
221         agent.clear();
222         sock = NULL;
223         allow = ALLOW_ALL;
224         syncPos = 0;
225         addMetadata = false;
226         nsSwitchNum = 0;
227         pack.func = 255;
228         lastConnect = lastPing = lastPacket = 0;
229         loginPassword.clear();
230         loginMount.clear();
231         bytesPerSecond = 0;
232         priorityConnect = false;
233         pushSock = NULL;
234         sendHeader = true;
235
236         outPacketsNorm.reset();
237         outPacketsPri.reset();
238
239         seenIDs.clear();
240
241         status = S_NONE;
242         type = T_NONE;
243
244         channel_id = 0;
245
246         serventHit.init();
247 }
248 // -----------------------------------
249 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
250 {
251
252         if  (      (type == t) 
253                         && (isConnected())
254                         && (!cid.isSet() || chanID.isSame(cid))
255                         && (!sid.isSet() || !sid.isSame(remoteID))
256                         && (pcpStream != NULL)
257                 )
258         {
259                 return pcpStream->sendPacket(pack,did);
260         }
261         return false;
262 }
263
264
265 // -----------------------------------
266 bool Servent::acceptGIV(ClientSocket *givSock)
267 {
268         if (!pushSock)
269         {
270                 pushSock = givSock;
271                 return true;
272         }else
273                 return false;
274 }
275
276 // -----------------------------------
277 Host Servent::getHost()
278 {
279         Host h(0,0);
280
281         if (sock)
282                 h = sock->host;
283
284         return h;
285 }
286
287 // -----------------------------------
288 bool Servent::outputPacket(GnuPacket &p, bool pri)
289 {
290         lock.on();
291
292         bool r=false;
293         if (pri)
294                 r = outPacketsPri.write(p);
295         else
296         {
297                 if (servMgr->useFlowControl)
298                 {
299                         int per = outPacketsNorm.percentFull();
300                         if (per > 50)
301                                 flowControl = true;
302                         else if (per < 25)
303                                 flowControl = false;
304                 }
305
306
307                 bool send=true;
308                 if (flowControl)
309                 {
310                         // if in flowcontrol, only allow packets with less of a hop count than already in queue
311                         if (p.hops >= outPacketsNorm.findMinHop())
312                                 send = false;
313                 }
314
315                 if (send)
316                         r = outPacketsNorm.write(p);
317         }
318
319         lock.off();
320         return r;
321 }
322
323 // -----------------------------------
324 bool Servent::initServer(Host &h)
325 {
326         try
327         {
328                 checkFree();
329
330                 status = S_WAIT;
331
332                 createSocket();
333
334                 sock->bind(h);
335
336                 thread.data = this;
337
338                 thread.func = serverProc;
339
340                 type = T_SERVER;
341
342                 if (!sys->startThread(&thread))
343                         throw StreamException("Can`t start thread");
344
345         }catch(StreamException &e)
346         {
347                 LOG_ERROR("Bad server: %s",e.msg);
348                 kill();
349                 return false;
350         }
351
352         return true;
353 }
354 // -----------------------------------
355 void Servent::checkFree()
356 {
357         if (sock)
358                 throw StreamException("Socket already set");
359         if (thread.active)
360                 throw StreamException("Thread already active");
361 }
362 // -----------------------------------
363 void Servent::initIncoming(ClientSocket *s, unsigned int a)
364 {
365
366         try{
367
368                 checkFree();
369
370                 type = T_INCOMING;
371                 sock = s;
372                 allow = a;
373                 thread.data = this;
374                 thread.func = incomingProc;
375                 thread.finish = false;
376
377                 setStatus(S_PROTOCOL);
378
379                 char ipStr[64];
380                 sock->host.toStr(ipStr);
381                 LOG_DEBUG("Incoming from %s",ipStr);
382
383
384                 if (!sys->startThread(&thread))
385                         throw StreamException("Can`t start thread");
386         }catch(StreamException &e)
387         {
388                 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
389                 //servMgr->shutdownTimer = 1;   
390                 kill();
391
392                 LOG_ERROR("INCOMING FAILED: %s",e.msg);
393
394         }
395 }
396
397 // -----------------------------------
398 void Servent::initOutgoing(TYPE ty)
399 {
400         try 
401         {
402                 checkFree();
403
404
405                 type = ty;
406
407                 thread.data = this;
408                 thread.func = outgoingProc;
409
410                 if (!sys->startThread(&thread))
411                         throw StreamException("Can`t start thread");
412
413         }catch(StreamException &e)
414         {
415                 LOG_ERROR("Unable to start outgoing: %s",e.msg);
416                 kill();
417         }
418 }
419
420 // -----------------------------------
421 void Servent::initPCP(Host &rh)
422 {
423         char ipStr[64];
424         rh.toStr(ipStr);
425         try 
426         {
427                 checkFree();
428
429             createSocket();
430
431                 type = T_COUT;
432
433                 sock->open(rh);
434
435                 if (!isAllowed(ALLOW_NETWORK))
436                         throw StreamException("Servent not allowed");
437
438                 thread.data = this;
439                 thread.func = outgoingProc;
440
441                 LOG_DEBUG("Outgoing to %s",ipStr);
442
443                 if (!sys->startThread(&thread))
444                         throw StreamException("Can`t start thread");
445
446         }catch(StreamException &e)
447         {
448                 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
449                 kill();
450         }
451 }
452
453 #if 0
454 // -----------------------------------
455 void    Servent::initChannelFetch(Host &host)
456 {
457         type = T_STREAM;
458
459         char ipStr[64];
460         host.toStr(ipStr);
461
462         checkFree();
463          
464         createSocket();
465                 
466         sock->open(host);
467
468                 
469         if (!isAllowed(ALLOW_DATA))     
470                 throw StreamException("Servent not allowed");
471                 
472         sock->connect();
473 }
474 #endif
475
476 // -----------------------------------
477 void Servent::initGIV(Host &h, GnuID &id)
478 {
479         char ipStr[64];
480         h.toStr(ipStr);
481         try 
482         {
483                 checkFree();
484
485                 givID = id;
486
487             createSocket();
488
489                 sock->open(h);
490
491                 if (!isAllowed(ALLOW_NETWORK))
492                         throw StreamException("Servent not allowed");
493                 
494                 sock->connect();
495
496                 thread.data = this;
497                 thread.func = givProc;
498
499                 type = T_RELAY;
500
501                 if (!sys->startThread(&thread))
502                         throw StreamException("Can`t start thread");
503
504         }catch(StreamException &e)
505         {
506                 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
507                 kill();
508         }
509 }
510 // -----------------------------------
511 void Servent::createSocket()
512 {
513         if (sock)
514                 LOG_ERROR("Servent::createSocket attempt made while active");
515
516         sock = sys->createSocket();
517 }
518 // -----------------------------------
519 void Servent::setStatus(STATUS s)
520 {
521         if (s != status)
522         {
523                 status = s;
524
525                 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
526                         lastConnect = sys->getTime();
527         }
528
529 }
530
531 // -----------------------------------
532 void Servent::handshakeOut()
533 {
534     sock->writeLine(GNU_PEERCONN);
535
536         char str[64];
537     
538         sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
539     sock->writeLineF("%s %d",PCX_HS_PCP,1);
540
541         if (priorityConnect)
542             sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
543         
544         if (networkID.isSet())
545         {
546                 networkID.toStr(str);
547                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
548         }
549
550         servMgr->sessionID.toStr(str);
551         sock->writeLineF("%s %s",PCX_HS_ID,str);
552
553         
554     sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
555         
556         sock->writeLine("");
557
558         HTTP http(*sock);
559
560         int r = http.readResponse();
561
562         if (r != 200)
563         {
564                 LOG_ERROR("Expected 200, got %d",r);
565                 throw StreamException("Unexpected HTTP response");
566         }
567
568
569         bool versionValid = false;
570
571         GnuID clientID;
572         clientID.clear();
573
574     while (http.nextHeader())
575     {
576                 LOG_DEBUG(http.cmdLine);
577
578                 char *arg = http.getArgStr();
579                 if (!arg)
580                         continue;
581
582                 if (http.isHeader(HTTP_HS_AGENT))
583                 {
584                         agent.set(arg);
585
586                         if (strnicmp(arg,"PeerCast/",9)==0)
587                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
588                 }else if (http.isHeader(PCX_HS_NETWORKID))
589                         clientID.fromStr(arg);
590     }
591
592         if (!clientID.isSame(networkID))
593                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
594
595         if (!versionValid)
596                 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
597
598
599     sock->writeLine(GNU_OK);
600     sock->writeLine("");
601
602 }
603
604
605 // -----------------------------------
606 void Servent::processOutChannel()
607 {
608 }
609
610
611 // -----------------------------------
612 void Servent::handshakeIn()
613 {
614
615         int osType=0;
616
617         HTTP http(*sock);
618
619
620         bool versionValid = false;
621         bool diffRootVer = false;
622
623         GnuID clientID;
624         clientID.clear();
625
626     while (http.nextHeader())
627     {
628                 LOG_DEBUG("%s",http.cmdLine);
629
630                 char *arg = http.getArgStr();
631                 if (!arg)
632                         continue;
633
634                 if (http.isHeader(HTTP_HS_AGENT))
635                 {
636                         agent.set(arg);
637
638                         if (strnicmp(arg,"PeerCast/",9)==0)
639                         {
640                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
641                                 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
642                         }
643                 }else if (http.isHeader(PCX_HS_NETWORKID))
644                 {
645                         clientID.fromStr(arg);
646
647                 }else if (http.isHeader(PCX_HS_PRIORITY))
648                 {
649                         priorityConnect = atoi(arg)!=0;
650
651                 }else if (http.isHeader(PCX_HS_ID))
652                 {
653                         GnuID id;
654                         id.fromStr(arg);
655                         if (id.isSame(servMgr->sessionID))
656                                 throw StreamException("Servent loopback");
657
658                 }else if (http.isHeader(PCX_HS_OS))
659                 {
660                         if (stricmp(arg,PCX_OS_LINUX)==0)
661                                 osType = 1;
662                         else if (stricmp(arg,PCX_OS_WIN32)==0)
663                                 osType = 2;
664                         else if (stricmp(arg,PCX_OS_MACOSX)==0)
665                                 osType = 3;
666                         else if (stricmp(arg,PCX_OS_WINAMP2)==0)
667                                 osType = 4;
668                 }
669
670     }
671
672         if (!clientID.isSame(networkID))
673                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
674
675         // if this is a priority connection and all incoming connections 
676         // are full then kill an old connection to make room. Otherwise reject connection.
677         //if (!priorityConnect)
678         {
679                 if (!isPrivate())
680                         if (servMgr->pubInFull())
681                                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
682         }
683
684         if (!versionValid)
685                 throw HTTPException(HTTP_SC_FORBIDDEN,403);
686
687     sock->writeLine(GNU_OK);
688
689     sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
690
691         if (networkID.isSet())
692         {
693                 char idStr[64];
694                 networkID.toStr(idStr);
695                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
696         }
697
698         if (servMgr->isRoot)
699         {
700                 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
701                 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
702                 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
703                 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
704                 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
705
706
707                 if (diffRootVer)
708                 {
709                         sock->writeString(PCX_HS_DL);
710                         sock->writeLine(PCX_DL_URL);
711                 }
712
713                 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
714         }
715
716
717         char hostIP[64];
718         Host h = sock->host;
719         h.IPtoStr(hostIP);
720     sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
721
722
723     sock->writeLine("");
724
725
726         while (http.nextHeader());
727 }
728
729 // -----------------------------------
730 bool    Servent::pingHost(Host &rhost,GnuID &rsid)
731 {
732         char ipstr[64];
733         rhost.toStr(ipstr);
734         LOG_DEBUG("Ping host %s: trying..",ipstr);
735         ClientSocket *s=NULL;
736         bool hostOK=false;
737         try
738         {
739                 s = sys->createSocket();
740                 if (!s)
741                         return false;
742                 else
743                 {
744
745                         s->setReadTimeout(15000);
746                         s->setWriteTimeout(15000);
747                         s->open(rhost);
748                         s->connect();
749
750                         AtomStream atom(*s);
751
752                         atom.writeInt(PCP_CONNECT,1);
753                         atom.writeParent(PCP_HELO,1);
754                                 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
755
756                         GnuID sid;
757                         sid.clear();
758
759                         int numc,numd;
760                         ID4 id = atom.read(numc,numd);
761                         if (id == PCP_OLEH)
762                         {
763                                 for(int i=0; i<numc; i++)
764                                 {
765                                         int c,d;
766                                         ID4 pid = atom.read(c,d);
767                                         if (pid == PCP_SESSIONID)
768                                                 atom.readBytes(sid.id,16,d);
769                                         else
770                                                 atom.skip(c,d);
771                                 }
772                         }else
773                         {
774                                 LOG_DEBUG("Ping response: %s",id.getString().str());
775                                 throw StreamException("Bad ping response");
776                         }
777
778                         if (!sid.isSame(rsid))
779                                 throw StreamException("SIDs don`t match");
780
781                         hostOK = true;
782                         LOG_DEBUG("Ping host %s: OK",ipstr);
783                         atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
784
785
786                 }
787         }catch(StreamException &e)
788         {
789                 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
790         }
791         if (s)
792         {
793                 s->close();
794                 delete s;
795         }
796
797         if (!hostOK)
798                 rhost.port = 0;
799
800         return true;
801 }
802
803 WLock canStreamLock;
804
805 // -----------------------------------
806 bool Servent::handshakeStream(ChanInfo &chanInfo)
807 {
808
809         HTTP http(*sock);
810
811
812         bool gotPCP=false;
813         unsigned int reqPos=0;
814         unsigned short listenPort = 0;
815
816         nsSwitchNum=0;
817
818         while (http.nextHeader())
819         {
820                 char *arg = http.getArgStr();
821                 if (!arg)
822                         continue;
823
824                 if (http.isHeader(PCX_HS_PCP))
825                         gotPCP = atoi(arg)!=0;
826                 else if (http.isHeader(PCX_HS_POS))
827                         reqPos = atoi(arg);
828                 else if (http.isHeader(PCX_HS_PORT))
829                         listenPort = (unsigned short)atoi(arg);
830                 else if (http.isHeader("icy-metadata"))
831                         addMetadata = atoi(arg) > 0;
832                 else if (http.isHeader(HTTP_HS_AGENT))
833                         agent = arg;
834                 else if (http.isHeader("Pragma"))
835                 {
836                         char *ssc = stristr(arg,"stream-switch-count=");
837                         char *so = stristr(arg,"stream-offset");
838
839                         if (ssc || so)
840                         {
841                                 nsSwitchNum=1;
842                                 //nsSwitchNum = atoi(ssc+20);
843                         }
844                 }
845
846                 LOG_DEBUG("Stream: %s",http.cmdLine);
847         }
848
849
850         if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
851                 outputProtocol = ChanInfo::SP_PEERCAST;
852
853         if (outputProtocol == ChanInfo::SP_HTTP)
854         {
855                 if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
856                           || (chanInfo.contentType == ChanInfo::T_WMA)
857                           || (chanInfo.contentType == ChanInfo::T_WMV)
858                           || (chanInfo.contentType == ChanInfo::T_ASX)
859                         )
860                 outputProtocol = ChanInfo::SP_MMS;
861         }
862
863
864         bool chanFound=false;
865         bool chanReady=false;
866
867         ChanHit *sourceHit = NULL;
868
869         Channel *ch = chanMgr->findChannelByID(chanInfo.id);
870         if (ch)
871         {
872                 sendHeader = true;
873                 if (reqPos || !isIndexTxt(&chanInfo))
874                 {
875                         streamPos = ch->rawData.findOldestPos(reqPos);
876                         //streamPos = ch->rawData.getLatestPos();
877                 }else
878                 {
879                         streamPos = ch->rawData.getLatestPos();
880                 }
881
882                 chanID = chanInfo.id;
883                 serventHit.host.ip = getHost().ip;
884                 serventHit.host.port = listenPort;
885                 if (serventHit.host.globalIP())
886                         serventHit.rhost[0] = serventHit.host;
887                 else
888                         serventHit.rhost[1] = serventHit.host;
889                 serventHit.chanID = chanID;
890
891                 canStreamLock.on();
892                 chanReady = canStream(ch);
893                 if (0 && !chanReady && ch->isPlaying())
894                 {
895                         if (ch->info.getUptime() > 60
896                                 && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
897                         {
898                                 sourceHit = &ch->sourceHost;  // send source host info
899
900                                 if (listenPort)
901                                 {
902                                         // connect "this" host later
903                                         chanMgr->addHit(serventHit);
904                                 }
905
906                                 char tmp[50];
907                                 getHost().toStr(tmp);
908                                 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
909                                 ch->bump = true;
910                         }
911                         else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
912                         {
913                                 chanReady = canStream(ch);
914                                 if (!chanReady)
915                                         LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
916                         }
917                 }
918                 if (!chanReady) type = T_INCOMING;
919                 thread.active = chanReady;
920                 setStatus(S_CONNECTED);
921                 canStreamLock.off();
922                 channel_id = ch->channel_id;
923
924                 //JP-Patch add-s
925                 if (servMgr->isCheckPushStream())
926                 {
927                         if (chanReady == true)
928                         {
929                                 Host h = getHost();
930
931                                 if (!h.isLocalhost()) 
932                                 {
933                                         do 
934                                         {
935                                                 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL) 
936                                                 {                                               
937                                                         char strip[256];
938                                                         h.toStr(strip);
939                                                         LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
940                                                         chanReady = false;
941                                                         break;
942                                                 }
943
944 /*                                              ChanHitList *hits[ChanMgr::MAX_HITLISTS];
945                                                 int numHits=0;
946                                                 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
947                                                 {
948                                                         ChanHitList *chl = &chanMgr->hitlists[i];
949                                                         if (chl->isUsed())
950                                                                 hits[numHits++] = chl;
951                                                 }
952                                                 bool isfw = false;
953                                                 int numRelay = 0;
954                                                 for(int i=0; i<numHits; i++)
955                                                 {
956                                                         ChanHitList *chl = hits[i];
957                                                         if (chl->isUsed())
958                                                         {
959                                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
960                                                                 {
961                                                                         ChanHit *hit = &chl->hits[j];
962                                                                         if (hit->host.isValid() && (h.ip == hit->host.ip)) 
963                                                                         {
964                                                                                 if (hit->firewalled)
965                                                                                         isfw = true;
966                                                                                 numRelay = hit->numRelays;
967                                                                         }
968                                                                 }
969                                                         }
970                                                 }
971                                                 if ((isfw == true) && (numRelay == 0))
972                                                 {
973                                                         char strip[256];
974                                                         h.toStr(strip);
975                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
976                                                         chanReady = false;
977                                                 }*/
978
979                                                 ChanHitList *chl = chanMgr->findHitList(chanInfo);
980                                                 ChanHit *hit = chl->hit;
981                                                 while(hit){
982                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
983                                                         {
984                                                                 if ((hit->firewalled) && (hit->numRelays == 0)){
985                                                                         char strip[256];
986                                                                         h.toStr(strip);
987                                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
988                                                                         chanReady = false;
989                                                                 }
990                                                         }
991                                                         hit = hit->next;
992                                                 }
993                                         }while (0);
994                                 }               
995                         }
996                 }
997                 //JP-Patch add-e
998         }
999
1000 //      LockBlock lockblock(chanMgr->hitlistlock);
1001
1002 //      lockblock.lockon();
1003         ChanHitList *chl = chanMgr->findHitList(chanInfo);
1004
1005         if (chl)
1006         {
1007                 chanFound = true;
1008         }
1009
1010
1011         bool result = false;
1012
1013         char idStr[64];
1014         chanInfo.id.toStr(idStr);
1015
1016         char sidStr[64];
1017         servMgr->sessionID.toStr(sidStr);
1018
1019         Host rhost = sock->host;
1020
1021
1022
1023
1024         AtomStream atom(*sock);
1025
1026
1027
1028         if (!chanFound)
1029         {
1030                 sock->writeLine(HTTP_SC_NOTFOUND);
1031             sock->writeLine("");
1032                 LOG_DEBUG("Sending channel not found");
1033                 return false;
1034         }
1035
1036
1037         if (!chanReady)
1038         {
1039                 if (outputProtocol == ChanInfo::SP_PCP)
1040                 {
1041
1042                         char tbuf[8196];
1043                         MemoryStream mem(tbuf, sizeof(tbuf));
1044                         mem.writeLine(HTTP_SC_UNAVAILABLE);
1045                         mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1046                         mem.writeLine("");
1047                         sock->write(tbuf, mem.getPosition());
1048
1049                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1050
1051                         char ripStr[64];
1052                         rhost.toStr(ripStr);
1053
1054                         LOG_DEBUG("Sending channel unavailable");
1055
1056                         ChanHitSearch chs;
1057
1058                         mem.rewind();
1059                         AtomStream atom2(mem);
1060
1061                         int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1062
1063                         if (sourceHit) {
1064                                 sourceHit->writeAtoms(atom2,chanInfo.id);       
1065                                 char tmp[50];
1066                                 sourceHit->host.toStr(tmp);
1067                                 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1068                         }
1069
1070                         chanMgr->hitlistlock.on();
1071
1072                         chl = chanMgr->findHitList(chanInfo);
1073
1074                         if (chl && !sourceHit)
1075                         {
1076                                 ChanHit best;
1077                                 
1078                                 // search for up to 8 other hits
1079                                 int cnt=0;
1080                                 int i;
1081                                 for(i=0; i<8; i++)
1082                                 {
1083                                         best.init();
1084
1085
1086                                         // find best hit this network if local IP
1087                                         if (!rhost.globalIP())
1088                                         {
1089                                                 chs.init();
1090                                                 chs.matchHost = servMgr->serverHost;
1091                                                 chs.waitDelay = 2;
1092                                                 chs.excludeID = remoteID;
1093                                                 if (chl->pickHits(chs)){
1094                                                         best = chs.best[0];
1095                                                         LOG_DEBUG("find best hit this network if local IP");
1096                                                 }
1097                                         }
1098
1099                                         // find best hit on same network
1100                                         if (!best.host.ip)
1101                                         {
1102                                                 chs.init();
1103                                                 chs.matchHost = rhost;
1104                                                 chs.waitDelay = 2;
1105                                                 chs.excludeID = remoteID;
1106                                                 if (chl->pickHits(chs)){
1107                                                         best = chs.best[0];
1108                                                         LOG_DEBUG("find best hit on same network");
1109                                                 }
1110
1111                                         }
1112
1113                                         // find best hit on other networks
1114 /*                                      if (!best.host.ip)
1115                                         {
1116                                                 chs.init();
1117                                                 chs.waitDelay = 2;
1118                                                 chs.excludeID = remoteID;
1119                                                 if (chl->pickHits(chs)){
1120                                                         best = chs.best[0];
1121                                                         LOG_DEBUG("find best hit on other networks");
1122                                                 }
1123
1124                                         }*/
1125                                         
1126                                         if (!best.host.ip)
1127                                                 break;
1128
1129                                         best.writeAtoms(atom2,chanInfo.id);                        
1130                                         cnt++;
1131                                 }
1132
1133                                 if (!best.host.ip){
1134                                         char tmp[50];
1135 //                                      chanMgr->hitlistlock.on();
1136                                         int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1137 //                                      chanMgr->hitlistlock.off();
1138                                         for (int i = 0; i < rhcnt; i++){
1139                                                 chs.best[i].writeAtoms(atom2, chanInfo.id);
1140                                                 chs.best[i].host.toStr(tmp);
1141                                                 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1142                                                 best.host.ip = chs.best[i].host.ip;
1143                                         }
1144                                         cnt += rhcnt;
1145                                 }
1146
1147                                 if (cnt)
1148                                 {
1149                                         LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1150
1151                                 }
1152                                 else if (rhost.port)
1153                                 {
1154                                         // find firewalled host
1155                                         chs.init();
1156                                         chs.waitDelay = 30;
1157                                         chs.useFirewalled = true;
1158                                         chs.excludeID = remoteID;
1159                                         if (chl->pickHits(chs))
1160                                         {
1161                                                 best = chs.best[0];
1162                                                 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1163                                                 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1164                                         }
1165                                 } 
1166
1167                                 // if all else fails, use tracker
1168                                 if (!best.host.ip)
1169                                 {
1170                                         // find best tracker on this network if local IP
1171                                         if (!rhost.globalIP())
1172                                         {
1173                                                 chs.init();
1174                                                 chs.matchHost = servMgr->serverHost;
1175                                                 chs.trackersOnly = true;
1176                                                 chs.excludeID = remoteID;
1177                                                 if (chl->pickHits(chs))
1178                                                         best = chs.best[0];
1179
1180                                         }
1181
1182                                         // find local tracker
1183                                         if (!best.host.ip)
1184                                         {
1185                                                 chs.init();
1186                                                 chs.matchHost = rhost;
1187                                                 chs.trackersOnly = true;
1188                                                 chs.excludeID = remoteID;
1189                                                 if (chl->pickHits(chs))
1190                                                         best = chs.best[0];
1191                                         }
1192
1193                                         // find global tracker
1194                                         if (!best.host.ip)
1195                                         {
1196                                                 chs.init();
1197                                                 chs.trackersOnly = true;
1198                                                 chs.excludeID = remoteID;
1199                                                 if (chl->pickHits(chs))
1200                                                         best = chs.best[0];
1201                                         }
1202
1203                                         if (best.host.ip)
1204                                         {
1205                                                 best.writeAtoms(atom2,chanInfo.id);                             
1206                                                 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1207                                         }else if (rhost.port)
1208                                         {
1209                                                 // find firewalled tracker
1210                                                 chs.init();
1211                                                 chs.useFirewalled = true;
1212                                                 chs.trackersOnly = true;
1213                                                 chs.excludeID = remoteID;
1214                                                 chs.waitDelay = 30;
1215                                                 if (chl->pickHits(chs))
1216                                                 {
1217                                                         best = chs.best[0];
1218                                                         int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1219                                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1220                                                 }
1221                                         }
1222
1223                                 }
1224
1225
1226                         }
1227
1228                         chanMgr->hitlistlock.off();
1229
1230                         // return not available yet code
1231                         atom2.writeInt(PCP_QUIT,error);
1232                         sock->write(tbuf, mem.getPosition());
1233                         result = false;
1234
1235                         /*
1236                         char c[512];
1237                         // wait disconnect from other host
1238                         try{
1239                                 while(sock->read(c, sizeof(c))){
1240                                         sys->sleep(10);
1241                                 }
1242                         }catch(StreamException &e){
1243                                 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1244                         }
1245                         */
1246                 }else
1247                 {
1248                         LOG_DEBUG("Sending channel unavailable");
1249                         sock->writeLine(HTTP_SC_UNAVAILABLE);
1250                         sock->writeLine("");
1251                         result = false;
1252                 }
1253
1254         } else {
1255
1256                 if (chanInfo.contentType != ChanInfo::T_MP3)
1257                         addMetadata=false;
1258
1259                 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))               // winamp mp3 metadata check
1260                 {
1261
1262                         sock->writeLine(ICY_OK);
1263
1264                         sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1265                         sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1266                         sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1267                         sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1268                         sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1269                         sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1270                         sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1271
1272                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1273
1274                 }else
1275                 {
1276
1277                         sock->writeLine(HTTP_SC_OK);
1278
1279                         if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1280                         {
1281                                 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1282
1283                                 sock->writeLine("Accept-Ranges: none");
1284
1285                                 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1286                                 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1287                                 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1288                                 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1289                                 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1290                                 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1291                         }
1292
1293
1294                         if (outputProtocol == ChanInfo::SP_HTTP)
1295                         {
1296                                 switch (chanInfo.contentType)
1297                                 {
1298                                         case ChanInfo::T_OGG:
1299                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1300                                                 break;
1301                                         case ChanInfo::T_MP3:
1302                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1303                                                 break;
1304                                         case ChanInfo::T_MOV:
1305                                                 sock->writeLine("Connection: close");
1306                                                 sock->writeLine("Content-Length: 10000000");
1307                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1308                                                 break;
1309                                         case ChanInfo::T_MPG:
1310                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1311                                                 break;
1312                                         case ChanInfo::T_NSV:
1313                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1314                                                 break;
1315                                         case ChanInfo::T_ASX:
1316                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1317                                                 break;
1318                                         case ChanInfo::T_WMA:
1319                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1320                                                 break;
1321                                         case ChanInfo::T_WMV:
1322                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1323                                                 break;
1324                                 }
1325                         } else if (outputProtocol == ChanInfo::SP_MMS)
1326                         {
1327                                 sock->writeLine("Server: Rex/9.0.0.2980");
1328                                 sock->writeLine("Cache-Control: no-cache");
1329                                 sock->writeLine("Pragma: no-cache");
1330                                 sock->writeLine("Pragma: client-id=3587303426");
1331                                 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1332
1333                                 if (nsSwitchNum)
1334                                 {
1335                                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1336                                 }else
1337                                 {
1338                                         sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1339                                         if (ch)
1340                                                 sock->writeLineF("Content-Length: %d",ch->headPack.len);
1341                                         sock->writeLine("Connection: Keep-Alive");
1342                                 }
1343                         
1344                         } else if (outputProtocol == ChanInfo::SP_PCP)
1345                         {
1346                                 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1347                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1348
1349                         }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1350                         {
1351                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1352                         }
1353                 }
1354                 sock->writeLine("");
1355                 result = true;
1356
1357                 if (gotPCP)
1358                 {
1359                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1360                         atom.writeInt(PCP_OK,0);
1361                         if (rhost.globalIP())
1362                                 serventHit.rhost[0] = rhost;
1363                         else
1364                                 serventHit.rhost[1] = rhost;
1365                         serventHit.sessionID = remoteID;
1366                         serventHit.numHops = 1;
1367                         chanMgr->addHit(serventHit);
1368                 }
1369
1370         }
1371
1372
1373
1374         return result;
1375 }
1376
1377 // -----------------------------------
1378 void Servent::handshakeGiv(GnuID &id)
1379 {
1380         if (id.isSet())
1381         {
1382                 char idstr[64];
1383                 id.toStr(idstr);
1384                 sock->writeLineF("GIV /%s",idstr);
1385         }else
1386                 sock->writeLine("GIV");
1387
1388         sock->writeLine("");
1389 }
1390
1391
1392 // -----------------------------------
1393 void Servent::processGnutella()
1394 {
1395         type = T_PGNU;
1396
1397         //if (servMgr->isRoot && !servMgr->needConnections())
1398         if (servMgr->isRoot)
1399         {
1400                 processRoot();
1401                 return;
1402         }
1403
1404
1405
1406         gnuStream.init(sock);
1407         setStatus(S_CONNECTED);
1408
1409         if (!servMgr->isRoot)
1410         {
1411                 chanMgr->broadcastRelays(this, 1, 1);
1412                 GnuPacket *p;
1413
1414                 if ((p=outPacketsNorm.curr()))  
1415                         gnuStream.sendPacket(*p);
1416                 return;
1417         }
1418
1419         gnuStream.ping(2);
1420
1421 //      if (type != T_LOOKUP)
1422 //              chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1423
1424         lastPacket = lastPing = sys->getTime();
1425         bool doneBigPing=false;
1426
1427         const unsigned int      abortTimeoutSecs = 60;          // abort connection after 60 secs of no activitiy
1428         const unsigned int      packetTimeoutSecs = 30;         // ping connection after 30 secs of no activity
1429
1430         unsigned int currBytes=0;
1431         unsigned int lastWait=0;
1432
1433         unsigned int lastTotalIn=0,lastTotalOut=0;
1434
1435         while (thread.active && sock->active())
1436         {
1437
1438                 if (sock->readReady())
1439                 {
1440                         lastPacket = sys->getTime();
1441
1442                         if (gnuStream.readPacket(pack))
1443                         {
1444                                 char ipstr[64];
1445                                 sock->host.toStr(ipstr);
1446
1447                                 GnuID routeID;
1448                                 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1449
1450                                 if (pack.func != GNU_FUNC_PONG)
1451                                         if (servMgr->seenPacket(pack))
1452                                                 ret = GnuStream::R_DUPLICATE;
1453
1454                                 seenIDs.add(pack.id);
1455
1456
1457                                 if (ret == GnuStream::R_PROCESS)
1458                                 {
1459                                         GnuID routeID;
1460                                         ret = gnuStream.processPacket(pack,this,routeID);
1461
1462                                         if (flowControl && (ret == GnuStream::R_BROADCAST))
1463                                                 ret = GnuStream::R_DROP;
1464
1465                                 }
1466
1467                                 switch(ret)
1468                                 {
1469                                         case GnuStream::R_BROADCAST:
1470                                                 if (servMgr->broadcast(pack,this))
1471                                                         stats.add(Stats::NUMBROADCASTED);
1472                                                 else
1473                                                         stats.add(Stats::NUMDROPPED);
1474                                                 break;
1475                                         case GnuStream::R_ROUTE:
1476                                                 if (servMgr->route(pack,routeID,NULL))
1477                                                         stats.add(Stats::NUMROUTED);
1478                                                 else
1479                                                         stats.add(Stats::NUMDROPPED);
1480                                                 break;
1481                                         case GnuStream::R_ACCEPTED:
1482                                                 stats.add(Stats::NUMACCEPTED);
1483                                                 break;
1484                                         case GnuStream::R_DUPLICATE:
1485                                                 stats.add(Stats::NUMDUP);
1486                                                 break;
1487                                         case GnuStream::R_DEAD:
1488                                                 stats.add(Stats::NUMDEAD);
1489                                                 break;
1490                                         case GnuStream::R_DISCARD:
1491                                                 stats.add(Stats::NUMDISCARDED);
1492                                                 break;
1493                                         case GnuStream::R_BADVERSION:
1494                                                 stats.add(Stats::NUMOLD);
1495                                                 break;
1496                                         case GnuStream::R_DROP:
1497                                                 stats.add(Stats::NUMDROPPED);
1498                                                 break;
1499                                 }
1500
1501
1502                                 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1503
1504
1505
1506                         }else{
1507                                 LOG_ERROR("Bad packet");
1508                         }
1509                 }
1510
1511
1512                 GnuPacket *p;
1513
1514                 if ((p=outPacketsPri.curr()))                           // priority packet
1515                 {
1516                         gnuStream.sendPacket(*p);
1517                         seenIDs.add(p->id);
1518                         outPacketsPri.next();
1519                 } else if ((p=outPacketsNorm.curr()))           // or.. normal packet
1520                 {
1521                         gnuStream.sendPacket(*p);
1522                         seenIDs.add(p->id);
1523                         outPacketsNorm.next();
1524                 }
1525
1526                 int lpt =  sys->getTime()-lastPacket;
1527
1528                 if (!doneBigPing)
1529                 {
1530                         if ((sys->getTime()-lastPing) > 15)
1531                         {
1532                                 gnuStream.ping(7);
1533                                 lastPing = sys->getTime();
1534                                 doneBigPing = true;
1535                         }
1536                 }else{
1537                         if (lpt > packetTimeoutSecs)
1538                         {
1539                                 
1540                                 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1541                                 {
1542                                         gnuStream.ping(1);
1543                                         lastPing = sys->getTime();
1544                                 }
1545
1546                         }
1547                 }
1548                 if (lpt > abortTimeoutSecs)
1549                         throw TimeoutException();
1550
1551
1552                 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1553                 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1554
1555                 unsigned int bytes = totIn+totOut;
1556
1557                 lastTotalIn = sock->totalBytesIn;
1558                 lastTotalOut = sock->totalBytesOut;
1559
1560                 const int serventBandwidth = 1000;
1561
1562                 int delay = sys->idleSleepTime;
1563                 if ((bytes) && (serventBandwidth >= 8))
1564                         delay = (bytes*1000)/(serventBandwidth/8);      // set delay relative packetsize
1565
1566                 if (delay < (int)sys->idleSleepTime)
1567                         delay = sys->idleSleepTime;
1568                 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1569
1570                 sys->sleep(delay);
1571         }
1572
1573 }
1574
1575
1576 // -----------------------------------
1577 void Servent::processRoot()
1578 {
1579         try 
1580         {
1581         
1582                 gnuStream.init(sock);
1583                 setStatus(S_CONNECTED);
1584
1585                 gnuStream.ping(2);
1586
1587                 unsigned int lastConnect = sys->getTime();
1588
1589                 while (thread.active && sock->active())
1590                 {
1591                         if (gnuStream.readPacket(pack))
1592                         {
1593                                 char ipstr[64];
1594                                 sock->host.toStr(ipstr);
1595                                 
1596                                 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1597
1598
1599                                 if (pack.func == GNU_FUNC_PING)         // if ping then pong back some hosts and close
1600                                 {
1601                                         
1602                                         Host hl[32];
1603                                         int cnt = servMgr->getNewestServents(hl,32,sock->host); 
1604                                         if (cnt)
1605                                         {
1606                                                 int start = sys->rnd() % cnt;
1607                                                 int max = cnt>8?8:cnt;
1608
1609                                                 for(int i=0; i<max; i++)
1610                                                 {
1611                                                         GnuPacket pong;
1612                                                         pack.hops = 1;
1613                                                         pong.initPong(hl[start],false,pack);
1614                                                         gnuStream.sendPacket(pong);
1615
1616                                                         char ipstr[64];
1617                                                         hl[start].toStr(ipstr);
1618
1619                                                         //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1620                                                         start = (start+1) % cnt;
1621                                                 }
1622                                                 char str[64];
1623                                                 sock->host.toStr(str);
1624                                                 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1625                                         }else
1626                                         {
1627                                                 LOG_NETWORK("No Pongs to send");
1628                                                 //return;
1629                                         }
1630                                 }else if (pack.func == GNU_FUNC_PONG)           // pong?
1631                                 {
1632                                         MemoryStream pong(pack.data,pack.len);
1633
1634                                         int ip,port;
1635                                         port = pong.readShort();
1636                                         ip = pong.readLong();
1637                                         ip = SWAP4(ip);
1638
1639
1640                                         Host h(ip,port);
1641                                         if ((ip) && (port) && (h.globalIP()))
1642                                         {
1643
1644                                                 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1645                                                 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1646                                         }
1647                                         //return;
1648                                 } else if (pack.func == GNU_FUNC_HIT)
1649                                 {
1650                                         MemoryStream data(pack.data,pack.len);
1651                                         ChanHit hit;
1652                                         gnuStream.readHit(data,hit,pack.hops,pack.id);
1653                                 }
1654
1655                                 //if (gnuStream.packetsIn > 5)  // die if we get too many packets
1656                                 //      return;
1657                         }
1658
1659                         if((sys->getTime()-lastConnect > 60))
1660                                 break;
1661                 }
1662
1663
1664         }catch(StreamException &e)
1665         {
1666                 LOG_ERROR("Relay: %s",e.msg);
1667         }
1668
1669         
1670 }       
1671
1672 // -----------------------------------
1673 int Servent::givProcMain(ThreadInfo *thread)
1674 {
1675 //      thread->lock();
1676         Servent *sv = (Servent*)thread->data;
1677         try 
1678         {
1679                 sv->handshakeGiv(sv->givID);
1680                 sv->handshakeIncoming();
1681
1682         }catch(StreamException &e)
1683         {
1684                 LOG_ERROR("GIV: %s",e.msg);
1685         }
1686
1687         sv->kill();
1688         sys->endThread(thread);
1689         return 0;
1690 }
1691
1692 // -----------------------------------
1693 int Servent::givProc(ThreadInfo *thread)
1694 {
1695         SEH_THREAD(givProcMain, Servent::givProc);
1696 }
1697
1698 // -----------------------------------
1699 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1700 {
1701
1702         bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1703         bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1704
1705         bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1706
1707         char tbuf[1024];
1708         MemoryStream mem(tbuf, sizeof(tbuf));
1709         AtomStream atom2(mem);
1710         atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1711                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1712                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1713                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1714                 if (nonFW)
1715                         atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1716                 if (testFW)
1717                         atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1718                 if (sendBCID)
1719                         atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1720         atom.io.write(tbuf, mem.getPosition());
1721
1722
1723         LOG_DEBUG("PCP outgoing waiting for OLEH..");
1724
1725         int numc,numd;
1726         ID4 id = atom.read(numc,numd);
1727         if (id != PCP_OLEH)
1728         {
1729                 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1730                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1731                 throw StreamException("Got unexpected PCP response");
1732         }
1733
1734
1735
1736         char arg[64];
1737
1738         GnuID clientID;
1739         clientID.clear();
1740         rid.clear();
1741         int version=0;
1742         int disable=0;
1743
1744         Host thisHost;
1745
1746         // read OLEH response
1747         for(int i=0; i<numc; i++)
1748         {
1749                 int c,dlen;
1750                 ID4 id = atom.read(c,dlen);
1751
1752                 if (id == PCP_HELO_AGENT)
1753                 {
1754                         atom.readString(arg,sizeof(arg),dlen);
1755                         agent.set(arg);
1756
1757                 }else if (id == PCP_HELO_REMOTEIP)
1758                 {
1759                         thisHost.ip = atom.readInt();
1760
1761                 }else if (id == PCP_HELO_PORT)
1762                 {
1763                         thisHost.port = atom.readShort();
1764
1765                 }else if (id == PCP_HELO_VERSION)
1766                 {
1767                         version = atom.readInt();
1768
1769                 }else if (id == PCP_HELO_DISABLE)
1770                 {
1771                         disable = atom.readInt();
1772
1773                 }else if (id == PCP_HELO_SESSIONID)
1774                 {
1775                         atom.readBytes(rid.id,16);
1776                         if (rid.isSame(servMgr->sessionID))
1777                                 throw StreamException("Servent loopback");
1778
1779                 }else
1780                 {
1781                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1782                         atom.skip(c,dlen);
1783                 }
1784
1785     }
1786
1787
1788         // update server ip/firewall status
1789         if (isTrusted)
1790         {
1791                 if (thisHost.isValid())
1792                 {
1793                         if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1794                         {
1795                                 char ipstr[64];
1796                                 thisHost.toStr(ipstr);
1797                                 LOG_DEBUG("Got new ip: %s",ipstr);
1798                                 servMgr->serverHost.ip = thisHost.ip;
1799                         }
1800
1801                         if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1802                         {
1803                                 if (thisHost.port && thisHost.globalIP())
1804                                         servMgr->setFirewall(ServMgr::FW_OFF);
1805                                 else
1806                                         servMgr->setFirewall(ServMgr::FW_ON);
1807                         }
1808                 }
1809
1810                 if (disable == 1)
1811                 {
1812                         LOG_ERROR("client disabled: %d",disable);
1813                         servMgr->isDisabled = true;             
1814                 }else
1815                 {
1816                         servMgr->isDisabled = false;            
1817                 }
1818         }
1819
1820
1821
1822         if (!rid.isSet())
1823         {
1824                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1825                 throw StreamException("Remote host not identified");
1826         }
1827
1828         LOG_DEBUG("PCP Outgoing handshake complete.");
1829
1830 }
1831
1832 // -----------------------------------
1833 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1834 {
1835         int numc,numd;
1836         ID4 id = atom.read(numc,numd);
1837
1838
1839         if (id != PCP_HELO)
1840         {
1841                 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1842                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1843                 throw StreamException("Got unexpected PCP response");
1844         }
1845
1846         char arg[64];
1847
1848         ID4 osType;
1849
1850         int version=0;
1851
1852         int pingPort=0;
1853
1854         GnuID bcID;
1855         GnuID clientID;
1856
1857         bcID.clear();
1858         clientID.clear();
1859
1860         rhost.port = 0;
1861
1862         for(int i=0; i<numc; i++)
1863         {
1864
1865                 int c,dlen;
1866                 ID4 id = atom.read(c,dlen);
1867
1868                 if (id == PCP_HELO_AGENT)
1869                 {
1870                         atom.readString(arg,sizeof(arg),dlen);
1871                         agent.set(arg);
1872
1873                 }else if (id == PCP_HELO_VERSION)
1874                 {
1875                         version = atom.readInt();
1876
1877                 }else if (id == PCP_HELO_SESSIONID)
1878                 {
1879                         atom.readBytes(rid.id,16);
1880                         if (rid.isSame(servMgr->sessionID))
1881                                 throw StreamException("Servent loopback");
1882
1883                 }else if (id == PCP_HELO_BCID)
1884                 {
1885                         atom.readBytes(bcID.id,16);
1886
1887                 }else if (id == PCP_HELO_OSTYPE)
1888                 {
1889                         osType = atom.readInt();
1890                 }else if (id == PCP_HELO_PORT)
1891                 {
1892                         rhost.port = atom.readShort();
1893                 }else if (id == PCP_HELO_PING)
1894                 {
1895                         pingPort = atom.readShort();
1896                 }else
1897                 {
1898                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1899                         atom.skip(c,dlen);
1900                 }
1901
1902     }
1903
1904         if (version)
1905                 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1906
1907
1908         if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1909                 rhost.ip = servMgr->serverHost.ip;
1910
1911         if (pingPort)
1912         {
1913                 char ripStr[64];
1914                 rhost.toStr(ripStr);
1915                 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1916                 rhost.port = pingPort;
1917                 if (!rhost.globalIP() || !pingHost(rhost,rid))
1918                         rhost.port = 0;
1919         }
1920
1921         if (servMgr->isRoot)
1922         {
1923                 if (bcID.isSet())
1924                 {
1925                         if (bcID.getFlags() & 1)        // private
1926                         {
1927                                 BCID *bcid = servMgr->findValidBCID(bcID);
1928                                 if (!bcid || (bcid && !bcid->valid))
1929                                 {
1930                                         atom.writeParent(PCP_OLEH,1);
1931                                         atom.writeInt(PCP_HELO_DISABLE,1);
1932                                         throw StreamException("Client is banned");
1933                                 }
1934                         }
1935                 }
1936         }
1937
1938
1939         char tbuf[1024];
1940         MemoryStream mem(tbuf, sizeof(tbuf));
1941         AtomStream atom2(mem);
1942         atom2.writeParent(PCP_OLEH,5);
1943                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1944                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1945                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1946                 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1947                 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1948
1949         if (version)
1950         {
1951                 if (version < PCP_CLIENT_MINVERSION)
1952                 {
1953                         atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1954                         atom.io.write(tbuf, mem.getPosition());
1955                         throw StreamException("Agent is not valid");
1956                 }
1957         }
1958
1959         if (!rid.isSet())
1960         {
1961                 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1962                 atom.io.write(tbuf, mem.getPosition());
1963                 throw StreamException("Remote host not identified");
1964         }
1965
1966
1967
1968         if (servMgr->isRoot)
1969         {
1970                 servMgr->writeRootAtoms(atom2,false);
1971         }
1972
1973         atom.io.write(tbuf, mem.getPosition());
1974
1975         LOG_DEBUG("PCP Incoming handshake complete.");
1976
1977 }
1978
1979 // -----------------------------------
1980 void Servent::processIncomingPCP(bool suggestOthers)
1981 {
1982         PCPStream::readVersion(*sock);
1983
1984
1985         AtomStream atom(*sock);
1986         Host rhost = sock->host;
1987
1988         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1989
1990
1991         bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1992                                                         || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
1993         bool unavailable = servMgr->controlInFull();
1994         bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
1995
1996         char rstr[64];
1997         rhost.toStr(rstr);
1998
1999         if (unavailable || alreadyConnected || offair)
2000         {
2001                 int error;
2002
2003                 if (alreadyConnected)
2004                         error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
2005                 else if (unavailable)
2006                         error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
2007                 else if (offair)
2008                         error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
2009                 else 
2010                         error = PCP_ERROR_QUIT;
2011
2012
2013                 if (suggestOthers)
2014                 {
2015
2016                         ChanHit best;
2017                         ChanHitSearch chs;
2018
2019                         int cnt=0;
2020                         for(int i=0; i<8; i++)
2021                         {
2022                                 best.init();
2023
2024                                 // find best hit on this network                        
2025                                 if (!rhost.globalIP())
2026                                 {
2027                                         chs.init();
2028                                         chs.matchHost = servMgr->serverHost;
2029                                         chs.waitDelay = 2;
2030                                         chs.excludeID = remoteID;
2031                                         chs.trackersOnly = true;
2032                                         chs.useBusyControls = false;
2033                                         if (chanMgr->pickHits(chs))
2034                                                 best = chs.best[0];
2035                                 }
2036
2037                                 // find best hit on same network                        
2038                                 if (!best.host.ip)
2039                                 {
2040                                         chs.init();
2041                                         chs.matchHost = rhost;
2042                                         chs.waitDelay = 2;
2043                                         chs.excludeID = remoteID;
2044                                         chs.trackersOnly = true;
2045                                         chs.useBusyControls = false;
2046                                         if (chanMgr->pickHits(chs))
2047                                                 best = chs.best[0];
2048                                 }
2049
2050                                 // else find best hit on other networks
2051                                 if (!best.host.ip)
2052                                 {
2053                                         chs.init();
2054                                         chs.waitDelay = 2;
2055                                         chs.excludeID = remoteID;
2056                                         chs.trackersOnly = true;
2057                                         chs.useBusyControls = false;
2058                                         if (chanMgr->pickHits(chs))
2059                                                 best = chs.best[0];
2060                                 }
2061
2062                                 if (!best.host.ip)
2063                                         break;
2064                                 
2065                                 GnuID noID;
2066                                 noID.clear();
2067                                 best.writeAtoms(atom,noID);
2068                                 cnt++;
2069                         }
2070                         if (cnt)
2071                         {
2072                                 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2073                         }
2074                         else if (rhost.port)
2075                         {
2076                                 // send push request to best firewalled tracker on other network
2077                                 chs.init();
2078                                 chs.waitDelay = 30;
2079                                 chs.excludeID = remoteID;
2080                                 chs.trackersOnly = true;
2081                                 chs.useFirewalled = true;
2082                                 chs.useBusyControls = false;
2083                                 if (chanMgr->pickHits(chs))
2084                                 {
2085                                         best = chs.best[0];
2086                                         GnuID noID;
2087                                         noID.clear();
2088                                         int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2089                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2090                                 }
2091                         }else
2092                         {
2093                                 LOG_DEBUG("No available trackers");
2094                         }
2095                 }
2096
2097
2098                 LOG_ERROR("Sending QUIT to incoming: %d",error);
2099
2100                 atom.writeInt(PCP_QUIT,error);
2101                 return;         
2102         }
2103         
2104
2105         type = T_CIN;
2106         setStatus(S_CONNECTED);
2107
2108         atom.writeInt(PCP_OK,0);
2109
2110         // ask for update
2111         atom.writeParent(PCP_ROOT,1);
2112                 atom.writeParent(PCP_ROOT_UPDATE,0);
2113
2114         pcpStream = new PCPStream(remoteID);
2115
2116         int error = 0;
2117         BroadcastState bcs;
2118         while (!error && thread.active && !sock->eof())
2119         {
2120                 error = pcpStream->readPacket(*sock,bcs);
2121                 sys->sleepIdle();
2122
2123                 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2124                         error = PCP_ERROR_OFFAIR;
2125                 if (peercastInst->isQuitting)
2126                         error = PCP_ERROR_SHUTDOWN;
2127         }
2128
2129         pcpStream->flush(*sock);
2130
2131         error += PCP_ERROR_QUIT;
2132         atom.writeInt(PCP_QUIT,error);
2133
2134         LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2135
2136 }
2137
2138 // -----------------------------------
2139 int Servent::outgoingProcMain(ThreadInfo *thread)
2140 {
2141 //      thread->lock();
2142         LOG_DEBUG("COUT started");
2143
2144         Servent *sv = (Servent*)thread->data;
2145                 
2146         GnuID noID;
2147         noID.clear();
2148         sv->pcpStream = new PCPStream(noID);
2149
2150         while (sv->thread.active)
2151         {
2152                 sv->setStatus(S_WAIT);
2153
2154                 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2155                 {
2156                         ChanHit bestHit;
2157                         ChanHitSearch chs;
2158                         char ipStr[64];
2159
2160                         do
2161                         {
2162                                 bestHit.init();
2163
2164                                 if (servMgr->rootHost.isEmpty())
2165                                         break;
2166
2167                                 if (sv->pushSock)
2168                                 {
2169                                         sv->sock = sv->pushSock;
2170                                         sv->pushSock = NULL;
2171                                         bestHit.host = sv->sock->host;
2172                                         break;
2173                                 }
2174
2175                                 GnuID noID;
2176                                 noID.clear();
2177                                 ChanHitList *chl = chanMgr->findHitListByID(noID);
2178                                 if (chl)
2179                                 {
2180                                         // find local tracker
2181                                         chs.init();
2182                                         chs.matchHost = servMgr->serverHost;
2183                                         chs.waitDelay = MIN_TRACKER_RETRY;
2184                                         chs.excludeID = servMgr->sessionID;
2185                                         chs.trackersOnly = true;
2186                                         if (!chl->pickHits(chs))
2187                                         {
2188                                                 // else find global tracker
2189                                                 chs.init();
2190                                                 chs.waitDelay = MIN_TRACKER_RETRY;
2191                                                 chs.excludeID = servMgr->sessionID;
2192                                                 chs.trackersOnly = true;
2193                                                 chl->pickHits(chs);
2194                                         }
2195
2196                                         if (chs.numResults)
2197                                         {
2198                                                 bestHit = chs.best[0];
2199                                         }
2200                                 }
2201
2202
2203                                 unsigned int ctime = sys->getTime();
2204
2205                                 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2206                                 {
2207                                         bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2208                                         bestHit.yp = true;
2209                                         chanMgr->lastYPConnect = ctime;
2210                                 }
2211                                 sys->sleepIdle();
2212
2213                         }while (!bestHit.host.ip && (sv->thread.active));
2214
2215
2216                         if (!bestHit.host.ip)           // give up
2217                         {
2218                                 LOG_ERROR("COUT giving up");
2219                                 break;
2220                         }
2221
2222
2223                         bestHit.host.toStr(ipStr);
2224
2225                         int error=0;
2226                         try 
2227                         {
2228
2229                                 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2230
2231                                 if (!sv->sock)
2232                                 {
2233                                         sv->setStatus(S_CONNECTING);
2234                                         sv->sock = sys->createSocket();
2235                                         if (!sv->sock)
2236                                                 throw StreamException("Unable to create socket");
2237                                         sv->sock->open(bestHit.host);
2238                                         sv->sock->connect();
2239
2240                                 }
2241
2242                                 sv->sock->setReadTimeout(30000);
2243                                 AtomStream atom(*sv->sock);
2244
2245                                 sv->setStatus(S_HANDSHAKE);
2246
2247                                 Host rhost = sv->sock->host;
2248                                 atom.writeInt(PCP_CONNECT,1);
2249                                 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2250
2251                                 sv->setStatus(S_CONNECTED);
2252
2253                                 LOG_DEBUG("COUT to %s: OK",ipStr);
2254
2255                                 sv->pcpStream->init(sv->remoteID);
2256
2257                                 BroadcastState bcs;
2258                                 bcs.servent_id = sv->servent_id;
2259                                 error = 0;
2260                                 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2261                                 {
2262                                         error = sv->pcpStream->readPacket(*sv->sock,bcs);
2263
2264                                         sys->sleepIdle();
2265
2266                                         if (!chanMgr->isBroadcasting())
2267                                                 error = PCP_ERROR_OFFAIR;
2268                                         if (peercastInst->isQuitting)
2269                                                 error = PCP_ERROR_SHUTDOWN;
2270
2271                                         if (sv->pcpStream->nextRootPacket)
2272                                                 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2273                                                         error = PCP_ERROR_NOROOT;
2274                                 }
2275                                 sv->setStatus(S_CLOSING);
2276
2277                                 sv->pcpStream->flush(*sv->sock);
2278
2279                                 error += PCP_ERROR_QUIT;
2280                                 atom.writeInt(PCP_QUIT,error);
2281
2282                                 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2283
2284                         }catch(TimeoutException &e)
2285                         {
2286                                 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2287                                 sv->setStatus(S_TIMEOUT);
2288                         }catch(StreamException &e)
2289                         {
2290                                 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2291                                 sv->setStatus(S_ERROR);
2292                         }
2293
2294                         try
2295                         {
2296                                 if (sv->sock)
2297                                 {
2298                                         sv->sock->close();
2299                                         delete sv->sock;
2300                                         sv->sock = NULL;
2301                                 }
2302
2303                         }catch(StreamException &) {}
2304
2305                         // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2306                         if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2307                                 chanMgr->deadHit(bestHit);
2308
2309                 }
2310
2311                 sys->sleepIdle();
2312         }
2313
2314         sv->kill();
2315         sys->endThread(thread);
2316         LOG_DEBUG("COUT ended");
2317         return 0;
2318 }
2319 // -----------------------------------
2320 int Servent::outgoingProc(ThreadInfo *thread)
2321 {
2322         SEH_THREAD(outgoingProcMain, Servent::outgoingProc);
2323 }
2324 // -----------------------------------
2325 int Servent::incomingProcMain(ThreadInfo *thread)
2326 {
2327 //      thread->lock();
2328
2329         Servent *sv = (Servent*)thread->data;
2330         
2331         char ipStr[64];
2332         sv->sock->host.toStr(ipStr);
2333
2334         try 
2335         {
2336                 sv->handshakeIncoming();
2337         }catch(HTTPException &e)
2338         {
2339                 try
2340                 {
2341                         sv->sock->writeLine(e.msg);
2342                         if (e.code == 401)
2343                                 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2344                         sv->sock->writeLine("");
2345                 }catch(StreamException &){}
2346                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2347         }catch(StreamException &e)
2348         {
2349                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2350         }
2351
2352
2353         sv->kill();
2354         sys->endThread(thread);
2355         return 0;
2356 }
2357 // -----------------------------------
2358 int Servent::incomingProc(ThreadInfo *thread)
2359 {
2360         SEH_THREAD(incomingProcMain, Servent::incomingProc);
2361 }
2362 // -----------------------------------
2363 void Servent::processServent()
2364 {
2365         setStatus(S_HANDSHAKE);
2366
2367         handshakeIn();
2368
2369         if (!sock)
2370                 throw StreamException("Servent has no socket");
2371
2372         processGnutella();
2373 }
2374
2375 // -----------------------------------
2376 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2377 {       
2378         if (!doneHandshake)
2379         {
2380                 setStatus(S_HANDSHAKE);
2381
2382                 if (!handshakeStream(chanInfo))
2383                         return;
2384         }
2385
2386         if (chanInfo.id.isSet())
2387         {
2388
2389                 chanID = chanInfo.id;
2390
2391                 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2392
2393                 if (!waitForChannelHeader(chanInfo))
2394                         throw StreamException("Channel not ready");
2395
2396                 servMgr->totalStreams++;
2397
2398                 Host host = sock->host;
2399                 host.port = 0;  // force to 0 so we ignore the incoming port
2400
2401                 Channel *ch = chanMgr->findChannelByID(chanID);
2402                 if (!ch)
2403                         throw StreamException("Channel not found");
2404
2405                 if (outputProtocol == ChanInfo::SP_HTTP)
2406                 {
2407                         if ((addMetadata) && (chanMgr->icyMetaInterval))
2408                                 sendRawMetaChannel(chanMgr->icyMetaInterval);
2409                         else 
2410                                 sendRawChannel(true,true);
2411
2412                 }else if (outputProtocol == ChanInfo::SP_MMS)
2413                 {
2414                         if (nsSwitchNum)
2415                         {
2416                                 sendRawChannel(true,true);
2417                         }else
2418                         {
2419                                 sendRawChannel(true,false);
2420                         }
2421
2422                 }else if (outputProtocol  == ChanInfo::SP_PCP)
2423                 {
2424                         sendPCPChannel();
2425
2426                 } else if (outputProtocol  == ChanInfo::SP_PEERCAST)
2427                 {
2428                         sendPeercastChannel();
2429                 }
2430         }
2431
2432         setStatus(S_CLOSING);
2433 }
2434
2435 // -----------------------------------------
2436 #if 0
2437 // debug
2438                 FileStream file;
2439                 file.openReadOnly("c://test.mp3");
2440
2441                 LOG_DEBUG("raw file read");
2442                 char buf[4000];
2443                 int cnt=0;
2444                 while (!file.eof())
2445                 {
2446                         LOG_DEBUG("send %d",cnt++);
2447                         file.read(buf,sizeof(buf));
2448                         sock->write(buf,sizeof(buf));
2449
2450                 }
2451                 file.close();
2452                 LOG_DEBUG("raw file sent");
2453
2454         return;
2455 // debug
2456 #endif
2457 // -----------------------------------
2458 bool Servent::waitForChannelHeader(ChanInfo &info)
2459 {
2460         for(int i=0; i<30*10; i++)
2461         {
2462                 Channel *ch = chanMgr->findChannelByID(info.id);
2463                 if (!ch)
2464                         return false;
2465
2466                 if (ch->isPlaying() && (ch->rawData.writePos>0))
2467                         return true;
2468
2469                 if (!thread.active || !sock->active())
2470                         break;
2471                 sys->sleep(100);
2472         }
2473         return false;
2474 }
2475 // -----------------------------------
2476 void Servent::sendRawChannel(bool sendHead, bool sendData)
2477 {
2478         try
2479         {
2480
2481                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2482
2483                 Channel *ch = chanMgr->findChannelByID(chanID);
2484                 if (!ch)
2485                         throw StreamException("Channel not found");
2486
2487                 setStatus(S_CONNECTED);
2488
2489                 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2490
2491                 if (sendHead)
2492                 {
2493                         ch->headPack.writeRaw(*sock);
2494                         streamPos = ch->headPack.pos + ch->headPack.len;
2495                         LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2496                 }
2497
2498                 if (sendData)
2499                 {
2500
2501                         unsigned int streamIndex = ch->streamIndex;
2502                         unsigned int connectTime = sys->getTime();
2503                         unsigned int lastWriteTime = connectTime;
2504
2505                         while ((thread.active) && sock->active())
2506                         {
2507                                 ch = chanMgr->findChannelByID(chanID);
2508
2509                                 if (ch)
2510                                 {
2511
2512                                         if (streamIndex != ch->streamIndex)
2513                                         {
2514                                                 streamIndex = ch->streamIndex;
2515                                                 streamPos = ch->headPack.pos;
2516                                                 LOG_DEBUG("sendRaw got new stream index");
2517                                         }
2518
2519                                         ChanPacket rawPack;
2520                                         if (ch->rawData.findPacket(streamPos,rawPack))
2521                                         {
2522                                                 if (syncPos != rawPack.sync)
2523                                                         LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2524                                                 syncPos = rawPack.sync+1;
2525
2526                                                 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2527                                                 {
2528                                                         rawPack.writeRaw(*sock);
2529                                                         lastWriteTime = sys->getTime();
2530                                                 }
2531
2532                                                 if (rawPack.pos < streamPos)
2533                                                         LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2534                                                 streamPos = rawPack.pos+rawPack.len;
2535                                         } else if (sock->readReady()) {
2536                                                 char c;
2537                                                 int error = sock->readUpto(&c, 1);
2538                                                 if (error == 0) sock->close();
2539                                         }
2540                                 }
2541
2542                                 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2543                                         throw TimeoutException();
2544
2545                                 sys->sleepIdle();
2546                         }
2547                 }
2548         }catch(StreamException &e)
2549         {
2550                 LOG_ERROR("Stream channel: %s",e.msg);
2551         }
2552 }
2553
2554 #if 0
2555 // -----------------------------------
2556 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2557 {
2558         try
2559         {
2560                 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2561                 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2562                 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2563                 int numChanIDs=0;
2564                 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2565                 {
2566                         Channel *ch = &chanMgr->channels[i];
2567                         if (ch->isPlaying())
2568                                 chanIDs[numChanIDs++]=ch->info.id;
2569                 }
2570
2571
2572
2573                 setStatus(S_CONNECTED);
2574
2575
2576                 if (sendHead)
2577                 {
2578                         for(int i=0; i<numChanIDs; i++)
2579                         {
2580                                 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2581                                 if (ch)
2582                                 {
2583                                         LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2584                                         ch->headPack.writeRaw(*sock);
2585                                         chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2586                                         chanStreamIndex[i] = ch->streamIndex;
2587                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2588
2589                                 }
2590                         }
2591                 }
2592
2593                 if (sendData)
2594                 {
2595
2596                         unsigned int connectTime=sys->getTime();
2597
2598                         while ((thread.active) && sock->active())
2599                         {
2600
2601                                 for(int i=1; i<numChanIDs; i++)
2602                                 {
2603                                         Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2604                                         if (ch)
2605                                         {
2606                                                 if (chanStreamIndex[i] != ch->streamIndex)
2607                                                 {
2608                                                         chanStreamIndex[i] = ch->streamIndex;
2609                                                         chanStreamPos[i] = ch->headPack.pos;
2610                                                         LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2611                                                 }
2612
2613                                                 ChanPacket rawPack;
2614                                                 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2615                                                 {
2616                                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2617                                                                 rawPack.writeRaw(*sock);
2618
2619
2620                                                         if (rawPack.pos < chanStreamPos[i])
2621                                                                 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2622                                                         chanStreamPos[i] = rawPack.pos+rawPack.len;
2623
2624
2625                                                         //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2626                                                 }                                               
2627                                         }
2628                                         break;
2629                                 }
2630                                 
2631
2632                                 sys->sleepIdle();
2633                         }
2634                 }
2635         }catch(StreamException &e)
2636         {
2637                 LOG_ERROR("Stream channel: %s",e.msg);
2638         }
2639 }
2640 #endif
2641
2642 // -----------------------------------
2643 void Servent::sendRawMetaChannel(int interval)
2644 {
2645
2646         try
2647         {
2648                 Channel *ch = chanMgr->findChannelByID(chanID);
2649                 if (!ch)
2650                         throw StreamException("Channel not found");
2651
2652                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2653
2654                 setStatus(S_CONNECTED);
2655
2656                 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2657
2658
2659                 String lastTitle,lastURL;
2660
2661                 int             lastMsgTime=sys->getTime();
2662                 bool    showMsg=true;
2663
2664                 char buf[16384];
2665                 int bufPos=0;
2666
2667                 if ((interval > sizeof(buf)) || (interval < 1))
2668                         throw StreamException("Bad ICY Meta Interval value");
2669
2670                 unsigned int connectTime = sys->getTime();
2671                 unsigned int lastWriteTime = connectTime;
2672
2673                 streamPos = 0;          // raw meta channel has no header (its MP3)
2674
2675                 while ((thread.active) && sock->active())
2676                 {
2677                         ch = chanMgr->findChannelByID(chanID);
2678
2679                         if (ch)
2680                         {
2681
2682                                 ChanPacket rawPack;
2683                                 if (ch->rawData.findPacket(streamPos,rawPack))
2684                                 {
2685
2686                                         if (syncPos != rawPack.sync)
2687                                                 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2688                                         syncPos = rawPack.sync+1;
2689
2690                                         MemoryStream mem(rawPack.data,rawPack.len);
2691
2692                                         if (rawPack.type == ChanPacket::T_DATA)
2693                                         {
2694
2695                                                 int len = rawPack.len;
2696                                                 char *p = rawPack.data;
2697                                                 while (len)
2698                                                 {
2699                                                         int rl = len;
2700                                                         if ((bufPos+rl) > interval)
2701                                                                 rl = interval-bufPos;
2702                                                         memcpy(&buf[bufPos],p,rl);
2703                                                         bufPos+=rl;
2704                                                         p+=rl;
2705                                                         len-=rl;
2706
2707                                                         if (bufPos >= interval)
2708                                                         {
2709                                                                 bufPos = 0;     
2710                                                                 sock->write(buf,interval);
2711                                                                 lastWriteTime = sys->getTime();
2712
2713                                                                 if (chanMgr->broadcastMsgInterval)
2714                                                                         if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2715                                                                         {
2716                                                                                 showMsg ^= true;
2717                                                                                 lastMsgTime = sys->getTime();
2718                                                                         }
2719
2720                                                                 String *metaTitle = &ch->info.track.title;
2721                                                                 if (!ch->info.comment.isEmpty() && (showMsg))
2722                                                                         metaTitle = &ch->info.comment;
2723
2724
2725                                                                 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2726                                                                 {
2727
2728                                                                         char tmp[1024];
2729                                                                         String title,url;
2730
2731                                                                         title = *metaTitle;
2732                                                                         url = ch->info.url;
2733
2734                                                                         title.convertTo(String::T_META);
2735                                                                         url.convertTo(String::T_META);
2736
2737                                                                         sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2738                                                                         int len = ((strlen(tmp) + 15+1) / 16);
2739                                                                         sock->writeChar(len);
2740                                                                         sock->write(tmp,len*16);
2741
2742                                                                         lastTitle = *metaTitle;
2743                                                                         lastURL = ch->info.url;
2744
2745                                                                         LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2746
2747                                                                 }else
2748                                                                 {
2749                                                                         sock->writeChar(0);                                     
2750                                                                 }
2751
2752                                                         }
2753                                                 }
2754                                         }
2755                                         streamPos = rawPack.pos + rawPack.len;
2756                                 }
2757                         }
2758                         if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2759                                 throw TimeoutException();
2760
2761                         sys->sleepIdle();
2762
2763                 }
2764         }catch(StreamException &e)
2765         {
2766                 LOG_ERROR("Stream channel: %s",e.msg);
2767         }
2768 }
2769 // -----------------------------------
2770 void Servent::sendPeercastChannel()
2771 {
2772         try
2773         {
2774                 setStatus(S_CONNECTED);
2775
2776                 Channel *ch = chanMgr->findChannelByID(chanID);
2777                 if (!ch)
2778                         throw StreamException("Channel not found");
2779
2780                 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2781
2782                 sock->writeTag("PCST");
2783
2784                 ChanPacket pack;
2785
2786                 ch->headPack.writePeercast(*sock);
2787
2788                 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2789                 pack.writePeercast(*sock);
2790         
2791                 streamPos = 0;
2792                 unsigned int syncPos=0;
2793                 while ((thread.active) && sock->active())
2794                 {
2795                         ch = chanMgr->findChannelByID(chanID);
2796                         if (ch)
2797                         {
2798
2799                                 ChanPacket rawPack;
2800                                 if (ch->rawData.findPacket(streamPos,rawPack))
2801                                 {
2802                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2803                                         {
2804                                                 sock->writeTag("SYNC");
2805                                                 sock->writeShort(4);
2806                                                 sock->writeShort(0);
2807                                                 sock->write(&syncPos,4);
2808                                                 syncPos++;
2809
2810                                                 rawPack.writePeercast(*sock);
2811                                         }
2812                                         streamPos = rawPack.pos + rawPack.len;
2813                                 }
2814                         }
2815                         sys->sleepIdle();
2816                 }
2817
2818         }catch(StreamException &e)
2819         {
2820                 LOG_ERROR("Stream channel: %s",e.msg);
2821         }
2822 }
2823
2824 //WLock canStreamLock;
2825
2826 // -----------------------------------
2827 void Servent::sendPCPChannel()
2828 {
2829         bool skipCheck = false;
2830         unsigned int ptime = 0;
2831         int npacket = 0, upsize = 0;
2832
2833         Channel *ch = chanMgr->findChannelByID(chanID);
2834         if (!ch)
2835                 throw StreamException("Channel not found");
2836
2837         AtomStream atom(*sock);
2838
2839         pcpStream = new PCPStream(remoteID);
2840         int error=0;
2841
2842         try
2843         {
2844
2845                 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2846
2847
2848 //              setStatus(S_CONNECTED);
2849
2850                 //canStreamLock.on();
2851                 //thread.active = canStream(ch);
2852                 //setStatus(S_CONNECTED);
2853                 //canStreamLock.off();
2854
2855                 lastSkipTime = 0;
2856                 lastSkipCount = 0;
2857                 waitPort = 0;
2858
2859                 if (thread.active){
2860                         atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2861                                 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2862                                 ch->info.writeInfoAtoms(atom);
2863                                 ch->info.writeTrackAtoms(atom);
2864                                 if (sendHeader)
2865                                 {
2866                                         atom.writeParent(PCP_CHAN_PKT,3);
2867                                         atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2868                                         atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2869                                         atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2870
2871                                         if (streamPos == 0)
2872                                                 streamPos = ch->headPack.pos+ch->headPack.len;
2873                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2874                                 }
2875                 }
2876
2877                 unsigned int streamIndex = ch->streamIndex;
2878
2879                 ChanPacket rawPack;
2880                 char pbuf[ChanPacket::MAX_DATALEN*3];
2881                 MemoryStream mems(pbuf,sizeof(pbuf));
2882                 AtomStream atom2(mems);
2883
2884                 while (thread.active)
2885                 {
2886
2887                         Channel *ch = chanMgr->findChannelByID(chanID);
2888
2889                         if (ch)
2890                         {
2891
2892                                 if (streamIndex != ch->streamIndex)
2893                                 {
2894                                         streamIndex = ch->streamIndex;
2895                                         streamPos = ch->headPack.pos;
2896                                         LOG_DEBUG("sendPCPStream got new stream index");                                                
2897                                 }
2898
2899                                 mems.rewind();
2900
2901                                 if (ch->rawData.findPacket(streamPos,rawPack))
2902                                 {
2903                                         if ((streamPos < rawPack.pos) && !rawPack.skip){
2904                                                 if (skipCheck){
2905                                                         char tmp[32];
2906                                                         getHost().IPtoStr(tmp);
2907                                                         LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2908
2909                                                         if (sys->getTime() == lastSkipTime) {
2910                                                                 LOG_DEBUG("##### skip all buffer");
2911                                                                 streamPos = ch->rawData.getLatestPos();
2912                                                                 continue;
2913                                                         }
2914
2915                                                         lastSkipTime = sys->getTime();
2916                                                         lastSkipCount++;
2917                                                 } else {
2918                                                         skipCheck = true;
2919                                                 }
2920                                         }
2921
2922                                         if (rawPack.type == ChanPacket::T_HEAD)
2923                                         {
2924                                                 atom2.writeParent(PCP_CHAN,2);
2925                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2926                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2927                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2928                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2929                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2930
2931                                                 sock->write(pbuf, mems.getPosition());
2932                                         }else if (rawPack.type == ChanPacket::T_DATA)
2933                                         {
2934                                                 atom2.writeParent(PCP_CHAN,2);
2935                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2936                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2937                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2938                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2939                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2940
2941 #ifdef WIN32
2942                                                 sock->bufferingWrite(pbuf, mems.getPosition());
2943                                                 lastSkipTime = sock->bufList.lastSkipTime;
2944                                                 lastSkipCount = sock->bufList.skipCount;
2945 #else
2946                                                 sock->write(pbuf, mems.getPosition());
2947 #endif
2948                                         }
2949
2950                                         if (rawPack.pos < streamPos)
2951                                                 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2952
2953                                         //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2954
2955                                         streamPos = rawPack.pos+rawPack.len;
2956                                 }
2957                         } else {
2958                                 throw StreamException("Channel not found");
2959                         }
2960
2961 #ifdef WIN32
2962                         sock->bufferingWrite(NULL, 0);
2963                         lastSkipTime = sock->bufList.lastSkipTime;
2964                         lastSkipCount = sock->bufList.skipCount;
2965 #endif
2966                         BroadcastState bcs;
2967                         bcs.servent_id = servent_id;
2968 //                      error = pcpStream->readPacket(*sock,bcs);
2969
2970                         unsigned int t = sys->getTime();
2971                         if (t != ptime) {
2972                                 ptime = t;
2973                                 npacket = MAX_PROC_PACKETS;
2974                                 upsize = MAX_OUTWARD_SIZE;
2975                         }
2976
2977                         int len = pcpStream->flushUb(*sock, upsize);
2978                         upsize -= len;
2979
2980                         while (npacket > 0 && sock->readReady()) {
2981                                 npacket--;
2982                                 error = pcpStream->readPacket(*sock,bcs);
2983                                 if (error)
2984                                         throw StreamException("PCP exception");
2985                         }
2986
2987                         sys->sleepIdle();
2988
2989                 }
2990
2991                 LOG_DEBUG("PCP channel stream closed normally.");
2992
2993         }catch(StreamException &e)
2994         {
2995                 LOG_ERROR("Stream channel: %s",e.msg);
2996         }
2997
2998         try
2999         {
3000                 pcpStream->flush(*sock);
3001                 atom.writeInt(PCP_QUIT,error);
3002         }catch(StreamException &) {}
3003
3004 }
3005
3006 // -----------------------------------
3007 int Servent::serverProcMain(ThreadInfo *thread)
3008 {
3009 //      thread->lock();
3010
3011
3012         Servent *sv = (Servent*)thread->data;
3013
3014         try 
3015         {
3016                 if (!sv->sock)
3017                         throw StreamException("Server has no socket");
3018
3019                 sv->setStatus(S_LISTENING);
3020
3021
3022                 char servIP[64];
3023                 sv->sock->host.toStr(servIP);
3024
3025                 if (servMgr->isRoot)
3026                         LOG_DEBUG("Root Server started: %s",servIP);
3027                 else
3028                         LOG_DEBUG("Server started: %s",servIP);
3029                 
3030
3031                 while ((thread->active) && (sv->sock->active()))
3032                 {
3033                         if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3034                         {
3035                                 ClientSocket *cs = sv->sock->accept();
3036
3037                                 // \95s\90³\82È\83\\81[\83X\83A\83h\83\8c\83X(IPv4\83}\83\8b\83`\83L\83\83\83X\83g)\82ð\8f\9c\8aO
3038                                 if (cs && (((cs->host.ip >> 24) & 0xF0) == 0xE0))
3039                                 {
3040                                         char ip[64];
3041                                         cs->host.toStr(ip);
3042                                         cs->close();
3043                                         LOG_ERROR("reject incoming multicast address: %s", ip);
3044                                         peercastApp->notifyMessage(ServMgr::NT_PEERCAST, "reject multicast address");
3045                                 } else
3046                                 if (cs)
3047                                 {       
3048                                         LOG_DEBUG("accepted incoming");
3049                                         Servent *ns = servMgr->allocServent();
3050                                         if (ns)
3051                                         {
3052                                                 servMgr->lastIncoming = sys->getTime();
3053                                                 ns->servPort = sv->sock->host.port;
3054                                                 ns->networkID = servMgr->networkID;
3055                                                 ns->initIncoming(cs,sv->allow);
3056                                         }else
3057                                                 LOG_ERROR("Out of servents");
3058                                 }
3059                         }
3060                         sys->sleep(100);
3061                 }
3062         }catch(StreamException &e)
3063         {
3064                 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3065         }
3066
3067         
3068         LOG_DEBUG("Server stopped");
3069
3070         sv->kill();
3071         sys->endThread(thread);
3072         return 0;
3073 }
3074
3075 // -----------------------------------
3076 int Servent::serverProc(ThreadInfo *thread)
3077 {
3078         SEH_THREAD(serverProcMain, Servent::serverProc);
3079 }
3080  
3081 // -----------------------------------
3082 bool    Servent::writeVariable(Stream &s, const String &var)
3083 {
3084         char buf[1024];
3085
3086         if (var == "type")
3087                 strcpy(buf,getTypeStr());
3088         else if (var == "status")
3089                 strcpy(buf,getStatusStr());
3090         else if (var == "address")
3091         {
3092                 if (servMgr->enableGetName) //JP-EX s
3093                 {
3094                         getHost().toStr(buf);
3095                         char h_ip[64];
3096                         Host h = getHost();
3097                         h.toStr(h_ip);
3098
3099 /*                      ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3100                         int numHits=0;
3101                         for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3102                         {
3103                                 ChanHitList *chl = &chanMgr->hitlists[i];
3104                                 if (chl->isUsed())
3105                                         hits[numHits++] = chl;
3106                         }
3107                         bool ishit,isfw;
3108                         ishit = isfw = false;
3109                         int numRelay = 0;
3110                         if (numHits) 
3111                         {
3112                                 for(int k=0; k<numHits; k++)
3113                                 {
3114                                         ChanHitList *chl = hits[k];
3115                                         if (chl->isUsed())
3116                                         {
3117                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3118                                                 {
3119                                                         ChanHit *hit = &chl->hits[j];
3120                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
3121                                                         {
3122                                                                 ishit = true;
3123                                                                 if (hit->firewalled)
3124                                                                         isfw = true;
3125                                                                 numRelay += hit->numRelays;
3126                                                         }
3127                                                 }
3128                                         }
3129                                 }
3130                         }
3131                         strcpy(buf,"");
3132                         if (ishit == true)
3133                         {
3134                                 if (isfw == true)
3135                                 {
3136                                         if (numRelay== 0)
3137                                                 strcat(buf,"<font color=red>");
3138                                         else 
3139                                                 strcat(buf,"<font color=orange>");
3140                                 }
3141                                 else
3142                                         strcat(buf,"<font color=green>");
3143                         }
3144                         strcat(buf,h_ip);
3145                         char h_name[128];
3146                         if (ClientSocket::getHostname(h_name,h.ip))
3147                         {
3148                                 strcat(buf,"[");
3149                                 strcat(buf,h_name);
3150                                 strcat(buf,"]");
3151                         }
3152                         if (ishit == true) 
3153                         {
3154                                 strcat(buf,"</font>");
3155                         }
3156                 } //JP-EX e*/
3157
3158
3159                         bool isfw = false;
3160                         bool isRelay = true;
3161                         int numRelay = 0;
3162                         ChanHitList *chl = chanMgr->findHitListByID(chanID);
3163                         if (chl){
3164                                 ChanHit *hit = chl->hit;
3165                                 while(hit){
3166                                         if (hit->host.isValid() && (h.ip == hit->host.ip)){
3167                                                 isfw = hit->firewalled;
3168                                                 isRelay = hit->relay;
3169                                                 numRelay = hit->numRelays;
3170                                                 break;
3171                                         }
3172                                         hit = hit->next;
3173                                 }
3174                         }
3175                         strcpy(buf, "");
3176                         if (isfw){
3177                                 if (numRelay == 0){
3178                                         strcat(buf,"<font color=red>");
3179                                 } else {
3180                                         strcat(buf,"<font color=orange>");
3181                                 }
3182                         } else {
3183                                 if (!isRelay){
3184                                         if (numRelay==0){
3185                                                 strcpy(buf,"<font color=purple>");
3186                                         } else {
3187                                                 strcpy(buf,"<font color=blue>");
3188                                         }
3189                                 } else {
3190                                         strcpy(buf,"<font color=green>");
3191                                 }
3192                         }
3193                         strcat(buf,h_ip);
3194                         char h_name[128];
3195                         if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF\91Î\8dô)
3196                         {
3197                                 strcat(buf,"[");
3198                                 strcat(buf,h_name);
3199                                 strcat(buf,"]");
3200                         }
3201                         strcat(buf,"</font>");
3202                 }
3203                 else 
3204                         getHost().toStr(buf);
3205         }
3206         else if (var == "agent")
3207                 strcpy(buf,agent.cstr());
3208         else if (var == "bitrate")
3209         {
3210                 if (sock)
3211                 {
3212                         unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3213                         sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3214                 }else
3215                         strcpy(buf,"0");
3216         }else if (var == "uptime")
3217         {
3218                 String uptime;
3219                 if (lastConnect)
3220                         uptime.setFromStopwatch(sys->getTime()-lastConnect);
3221                 else
3222                         uptime.set("-");
3223                 strcpy(buf,uptime.cstr());
3224         }else if (var.startsWith("gnet."))
3225         {
3226
3227                 float ctime = (float)(sys->getTime()-lastConnect);
3228                 if (var == "gnet.packetsIn")
3229                         sprintf(buf,"%d",gnuStream.packetsIn);
3230                 else if (var == "gnet.packetsInPerSec")
3231                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3232                 else if (var == "gnet.packetsOut")
3233                         sprintf(buf,"%d",gnuStream.packetsOut);
3234                 else if (var == "gnet.packetsOutPerSec")
3235                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3236                 else if (var == "gnet.normQueue")
3237                         sprintf(buf,"%d",outPacketsNorm.numPending());
3238                 else if (var == "gnet.priQueue")
3239                         sprintf(buf,"%d",outPacketsPri.numPending());
3240                 else if (var == "gnet.flowControl")
3241                         sprintf(buf,"%d",flowControl?1:0);
3242                 else if (var == "gnet.routeTime")
3243                 {
3244                         int nr = seenIDs.numUsed();
3245                         unsigned int tim = sys->getTime()-seenIDs.getOldest();
3246                 
3247                         String tstr;
3248                         tstr.setFromStopwatch(tim);
3249
3250                         if (nr)
3251                                 strcpy(buf,tstr.cstr());
3252                         else
3253                                 strcpy(buf,"-");
3254                 }
3255                 else
3256                         return false;
3257
3258         }else
3259                 return false;
3260
3261         s.writeString(buf);
3262
3263         return true;
3264 }