OSDN Git Service

Change a directory tree
[peercast-im/PeerCastIM.git] / core / common / servent.cpp
1 // ------------------------------------------------
2 // File : servent.cpp
3 // Date: 4-apr-2002
4 // Author: giles
5 // Desc: 
6 //              Servents are the actual connections between clients. They do the handshaking,
7 //              transfering of data and processing of GnuPackets. Each servent has one socket allocated
8 //              to it on connect, it uses this to transfer all of its data.
9 //
10 // (c) 2002 peercast.org
11 // ------------------------------------------------
12 // This program is free software; you can redistribute it and/or modify
13 // it under the terms of the GNU General Public License as published by
14 // the Free Software Foundation; either version 2 of the License, or
15 // (at your option) any later version.
16
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20 // GNU General Public License for more details.
21 // ------------------------------------------------
22 // todo: make lan->yp not check firewall
23
24 #include <stdlib.h>
25 #include "servent.h"
26 #include "sys.h"
27 #include "gnutella.h"
28 #include "xml.h"
29 #include "html.h"
30 #include "http.h"
31 #include "stats.h"
32 #include "servmgr.h"
33 #include "peercast.h"
34 #include "atom.h"
35 #include "pcp.h"
36 #include "version2.h"
37 #ifdef _DEBUG
38 #include "chkMemoryLeak.h"
39 #define DEBUG_NEW new(__FILE__, __LINE__)
40 #define new DEBUG_NEW
41 #endif
42
43 #include "win32/seh.h"
44
45
46 const int DIRECT_WRITE_TIMEOUT = 60;
47
48 // -----------------------------------
49 char *Servent::statusMsgs[]=
50 {
51         "NONE",
52                 "CONNECTING",
53         "PROTOCOL",
54         "HANDSHAKE",
55         "CONNECTED",
56         "CLOSING",
57                 "LISTENING",
58                 "TIMEOUT",
59                 "REFUSED",
60                 "VERIFIED",
61                 "ERROR",
62                 "WAIT",
63                 "FREE"
64 };
65
66 // -----------------------------------
67 char *Servent::typeMsgs[]=
68 {
69                 "NONE",
70         "INCOMING",
71         "SERVER",
72                 "RELAY",
73                 "DIRECT",
74                 "COUT",
75                 "CIN",
76                 "PGNU"
77 };
78 // -----------------------------------
79 bool    Servent::isPrivate() 
80 {
81         Host h = getHost();
82         return servMgr->isFiltered(ServFilter::F_PRIVATE,h) || h.isLocalhost();
83 }
84 // -----------------------------------
85 bool    Servent::isAllowed(int a) 
86 {
87         Host h = getHost();
88
89         if (servMgr->isFiltered(ServFilter::F_BAN,h))
90                 return false;
91
92         return (allow&a)!=0;
93 }
94
95 // -----------------------------------
96 bool    Servent::isFiltered(int f) 
97 {
98         Host h = getHost();
99         return servMgr->isFiltered(f,h);
100 }
101
102 int servent_count = 1;
103 // -----------------------------------
104 Servent::Servent(int index)
105 :outPacketsPri(MAX_OUTPACKETS)
106 ,outPacketsNorm(MAX_OUTPACKETS)
107 ,seenIDs(MAX_HASH)
108 ,serventIndex(index)
109 ,sock(NULL)
110 ,next(NULL)
111 {
112         reset();
113         servent_id = servent_count++;
114         lastSkipTime = 0;
115         lastSkipCount = 0;
116         waitPort = 0;
117 }
118
119 // -----------------------------------
120 Servent::~Servent()
121 {       
122         
123 }
124 // -----------------------------------
125 void    Servent::kill() 
126 {
127         thread.active = false;
128                 
129         setStatus(S_CLOSING);
130
131         if (pcpStream)
132         {
133                 PCPStream *pcp = pcpStream;
134                 pcpStream = NULL;
135                 pcp->kill();
136                 delete pcp;
137         }
138
139         chanMgr->hitlistlock.on();
140         ChanHitList *chl = chanMgr->findHitListByID(chanID);
141         if (chl) {
142                 ChanHit *chh = chl->hit;
143                 ChanHit *prev = NULL;
144                 while (chh) {
145                         if (chh->servent_id == this->servent_id){
146                                 if ((servMgr->kickKeepTime != 0) && (chh->firewalled == 1)){
147                                         chh->numHops = 0;
148                                         chh->numListeners = 0;
149                                         chh->numRelays = 0;
150                                         prev = chh;
151                                         chh = chh->next;
152                                 } else {
153                                         ChanHit *next = chh->next;
154                                         if (prev)
155                                                 prev->next = next;
156                                         else
157                                                 chl->hit = next;
158
159                                         delete chh;
160                                         chh = next;
161                                 }
162                         } else {
163                                 prev = chh;
164                                 chh = chh->next;
165                         }
166                 }
167         }
168         chanMgr->hitlistlock.off();
169
170         if (sock)
171         {
172                 sock->close();
173                 delete sock;
174                 sock = NULL;
175         }
176
177         if (pushSock)
178         {
179                 pushSock->close();
180                 delete pushSock;
181                 pushSock = NULL;
182         }
183
184 //      thread.unlock();
185
186         if (type != T_SERVER)
187         {
188                 reset();
189                 setStatus(S_FREE);
190         }
191
192 }
193 // -----------------------------------
194 void    Servent::abort() 
195 {
196         thread.active = false;
197         if (sock)
198         {
199                 sock->close();
200         }
201
202 }
203
204 // -----------------------------------
205 void Servent::reset()
206 {
207
208         remoteID.clear();
209
210         servPort = 0;
211
212         pcpStream = NULL;
213
214         flowControl = false;
215         networkID.clear();
216
217         chanID.clear();
218
219         outputProtocol = ChanInfo::SP_UNKNOWN;
220
221         agent.clear();
222         sock = NULL;
223         allow = ALLOW_ALL;
224         syncPos = 0;
225         addMetadata = false;
226         nsSwitchNum = 0;
227         pack.func = 255;
228         lastConnect = lastPing = lastPacket = 0;
229         loginPassword.clear();
230         loginMount.clear();
231         bytesPerSecond = 0;
232         priorityConnect = false;
233         pushSock = NULL;
234         sendHeader = true;
235
236         outPacketsNorm.reset();
237         outPacketsPri.reset();
238
239         seenIDs.clear();
240
241         status = S_NONE;
242         type = T_NONE;
243
244         channel_id = 0;
245
246         serventHit.init();
247 }
248 // -----------------------------------
249 bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
250 {
251
252         if  (      (type == t) 
253                         && (isConnected())
254                         && (!cid.isSet() || chanID.isSame(cid))
255                         && (!sid.isSet() || !sid.isSame(remoteID))
256                         && (pcpStream != NULL)
257                 )
258         {
259                 return pcpStream->sendPacket(pack,did);
260         }
261         return false;
262 }
263
264
265 // -----------------------------------
266 bool Servent::acceptGIV(ClientSocket *givSock)
267 {
268         if (!pushSock)
269         {
270                 pushSock = givSock;
271                 return true;
272         }else
273                 return false;
274 }
275
276 // -----------------------------------
277 Host Servent::getHost()
278 {
279         Host h(0,0);
280
281         if (sock)
282                 h = sock->host;
283
284         return h;
285 }
286
287 // -----------------------------------
288 bool Servent::outputPacket(GnuPacket &p, bool pri)
289 {
290         lock.on();
291
292         bool r=false;
293         if (pri)
294                 r = outPacketsPri.write(p);
295         else
296         {
297                 if (servMgr->useFlowControl)
298                 {
299                         int per = outPacketsNorm.percentFull();
300                         if (per > 50)
301                                 flowControl = true;
302                         else if (per < 25)
303                                 flowControl = false;
304                 }
305
306
307                 bool send=true;
308                 if (flowControl)
309                 {
310                         // if in flowcontrol, only allow packets with less of a hop count than already in queue
311                         if (p.hops >= outPacketsNorm.findMinHop())
312                                 send = false;
313                 }
314
315                 if (send)
316                         r = outPacketsNorm.write(p);
317         }
318
319         lock.off();
320         return r;
321 }
322
323 // -----------------------------------
324 bool Servent::initServer(Host &h)
325 {
326         try
327         {
328                 checkFree();
329
330                 status = S_WAIT;
331
332                 createSocket();
333
334                 sock->bind(h);
335
336                 thread.data = this;
337
338                 thread.func = serverProc;
339
340                 type = T_SERVER;
341
342                 if (!sys->startThread(&thread))
343                         throw StreamException("Can`t start thread");
344
345         }catch(StreamException &e)
346         {
347                 LOG_ERROR("Bad server: %s",e.msg);
348                 kill();
349                 return false;
350         }
351
352         return true;
353 }
354 // -----------------------------------
355 void Servent::checkFree()
356 {
357         if (sock)
358                 throw StreamException("Socket already set");
359         if (thread.active)
360                 throw StreamException("Thread already active");
361 }
362 // -----------------------------------
363 void Servent::initIncoming(ClientSocket *s, unsigned int a)
364 {
365
366         try{
367
368                 checkFree();
369
370                 type = T_INCOMING;
371                 sock = s;
372                 allow = a;
373                 thread.data = this;
374                 thread.func = incomingProc;
375                 thread.finish = false;
376
377                 setStatus(S_PROTOCOL);
378
379                 char ipStr[64];
380                 sock->host.toStr(ipStr);
381                 LOG_DEBUG("Incoming from %s",ipStr);
382
383
384                 if (!sys->startThread(&thread))
385                         throw StreamException("Can`t start thread");
386         }catch(StreamException &e)
387         {
388                 //LOG_ERROR("!!FATAL!! Incoming error: %s",e.msg);
389                 //servMgr->shutdownTimer = 1;   
390                 kill();
391
392                 LOG_ERROR("INCOMING FAILED: %s",e.msg);
393
394         }
395 }
396
397 // -----------------------------------
398 void Servent::initOutgoing(TYPE ty)
399 {
400         try 
401         {
402                 checkFree();
403
404
405                 type = ty;
406
407                 thread.data = this;
408                 thread.func = outgoingProc;
409
410                 if (!sys->startThread(&thread))
411                         throw StreamException("Can`t start thread");
412
413         }catch(StreamException &e)
414         {
415                 LOG_ERROR("Unable to start outgoing: %s",e.msg);
416                 kill();
417         }
418 }
419
420 // -----------------------------------
421 void Servent::initPCP(Host &rh)
422 {
423         char ipStr[64];
424         rh.toStr(ipStr);
425         try 
426         {
427                 checkFree();
428
429             createSocket();
430
431                 type = T_COUT;
432
433                 sock->open(rh);
434
435                 if (!isAllowed(ALLOW_NETWORK))
436                         throw StreamException("Servent not allowed");
437
438                 thread.data = this;
439                 thread.func = outgoingProc;
440
441                 LOG_DEBUG("Outgoing to %s",ipStr);
442
443                 if (!sys->startThread(&thread))
444                         throw StreamException("Can`t start thread");
445
446         }catch(StreamException &e)
447         {
448                 LOG_ERROR("Unable to open connection to %s - %s",ipStr,e.msg);
449                 kill();
450         }
451 }
452
453 #if 0
454 // -----------------------------------
455 void    Servent::initChannelFetch(Host &host)
456 {
457         type = T_STREAM;
458
459         char ipStr[64];
460         host.toStr(ipStr);
461
462         checkFree();
463          
464         createSocket();
465                 
466         sock->open(host);
467
468                 
469         if (!isAllowed(ALLOW_DATA))     
470                 throw StreamException("Servent not allowed");
471                 
472         sock->connect();
473 }
474 #endif
475
476 // -----------------------------------
477 void Servent::initGIV(Host &h, GnuID &id)
478 {
479         char ipStr[64];
480         h.toStr(ipStr);
481         try 
482         {
483                 checkFree();
484
485                 givID = id;
486
487             createSocket();
488
489                 sock->open(h);
490
491                 if (!isAllowed(ALLOW_NETWORK))
492                         throw StreamException("Servent not allowed");
493                 
494                 sock->connect();
495
496                 thread.data = this;
497                 thread.func = givProc;
498
499                 type = T_RELAY;
500
501                 if (!sys->startThread(&thread))
502                         throw StreamException("Can`t start thread");
503
504         }catch(StreamException &e)
505         {
506                 LOG_ERROR("GIV error to %s: %s",ipStr,e.msg);
507                 kill();
508         }
509 }
510 // -----------------------------------
511 void Servent::createSocket()
512 {
513         if (sock)
514                 LOG_ERROR("Servent::createSocket attempt made while active");
515
516         sock = sys->createSocket();
517 }
518 // -----------------------------------
519 void Servent::setStatus(STATUS s)
520 {
521         if (s != status)
522         {
523                 status = s;
524
525                 if ((s == S_HANDSHAKE) || (s == S_CONNECTED) || (s == S_LISTENING))
526                         lastConnect = sys->getTime();
527         }
528
529 }
530
531 // -----------------------------------
532 void Servent::handshakeOut()
533 {
534     sock->writeLine(GNU_PEERCONN);
535
536         char str[64];
537     
538         sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_AGENT);
539     sock->writeLineF("%s %d",PCX_HS_PCP,1);
540
541         if (priorityConnect)
542             sock->writeLineF("%s %d",PCX_HS_PRIORITY,1);
543         
544         if (networkID.isSet())
545         {
546                 networkID.toStr(str);
547                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,str);
548         }
549
550         servMgr->sessionID.toStr(str);
551         sock->writeLineF("%s %s",PCX_HS_ID,str);
552
553         
554     sock->writeLineF("%s %s",PCX_HS_OS,peercastApp->getClientTypeOS());
555         
556         sock->writeLine("");
557
558         HTTP http(*sock);
559
560         int r = http.readResponse();
561
562         if (r != 200)
563         {
564                 LOG_ERROR("Expected 200, got %d",r);
565                 throw StreamException("Unexpected HTTP response");
566         }
567
568
569         bool versionValid = false;
570
571         GnuID clientID;
572         clientID.clear();
573
574     while (http.nextHeader())
575     {
576                 LOG_DEBUG(http.cmdLine);
577
578                 char *arg = http.getArgStr();
579                 if (!arg)
580                         continue;
581
582                 if (http.isHeader(HTTP_HS_AGENT))
583                 {
584                         agent.set(arg);
585
586                         if (strnicmp(arg,"PeerCast/",9)==0)
587                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
588                 }else if (http.isHeader(PCX_HS_NETWORKID))
589                         clientID.fromStr(arg);
590     }
591
592         if (!clientID.isSame(networkID))
593                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
594
595         if (!versionValid)
596                 throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
597
598
599     sock->writeLine(GNU_OK);
600     sock->writeLine("");
601
602 }
603
604
605 // -----------------------------------
606 void Servent::processOutChannel()
607 {
608 }
609
610
611 // -----------------------------------
612 void Servent::handshakeIn()
613 {
614
615         int osType=0;
616
617         HTTP http(*sock);
618
619
620         bool versionValid = false;
621         bool diffRootVer = false;
622
623         GnuID clientID;
624         clientID.clear();
625
626     while (http.nextHeader())
627     {
628                 LOG_DEBUG("%s",http.cmdLine);
629
630                 char *arg = http.getArgStr();
631                 if (!arg)
632                         continue;
633
634                 if (http.isHeader(HTTP_HS_AGENT))
635                 {
636                         agent.set(arg);
637
638                         if (strnicmp(arg,"PeerCast/",9)==0)
639                         {
640                                 versionValid = (stricmp(arg+9,MIN_CONNECTVER)>=0);
641                                 diffRootVer = stricmp(arg+9,MIN_ROOTVER)<0;
642                         }
643                 }else if (http.isHeader(PCX_HS_NETWORKID))
644                 {
645                         clientID.fromStr(arg);
646
647                 }else if (http.isHeader(PCX_HS_PRIORITY))
648                 {
649                         priorityConnect = atoi(arg)!=0;
650
651                 }else if (http.isHeader(PCX_HS_ID))
652                 {
653                         GnuID id;
654                         id.fromStr(arg);
655                         if (id.isSame(servMgr->sessionID))
656                                 throw StreamException("Servent loopback");
657
658                 }else if (http.isHeader(PCX_HS_OS))
659                 {
660                         if (stricmp(arg,PCX_OS_LINUX)==0)
661                                 osType = 1;
662                         else if (stricmp(arg,PCX_OS_WIN32)==0)
663                                 osType = 2;
664                         else if (stricmp(arg,PCX_OS_MACOSX)==0)
665                                 osType = 3;
666                         else if (stricmp(arg,PCX_OS_WINAMP2)==0)
667                                 osType = 4;
668                 }
669
670     }
671
672         if (!clientID.isSame(networkID))
673                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
674
675         // if this is a priority connection and all incoming connections 
676         // are full then kill an old connection to make room. Otherwise reject connection.
677         //if (!priorityConnect)
678         {
679                 if (!isPrivate())
680                         if (servMgr->pubInFull())
681                                 throw HTTPException(HTTP_SC_UNAVAILABLE,503);
682         }
683
684         if (!versionValid)
685                 throw HTTPException(HTTP_SC_FORBIDDEN,403);
686
687     sock->writeLine(GNU_OK);
688
689     sock->writeLineF("%s %s",HTTP_HS_AGENT,PCX_OLDAGENT);
690
691         if (networkID.isSet())
692         {
693                 char idStr[64];
694                 networkID.toStr(idStr);
695                 sock->writeLineF("%s %s",PCX_HS_NETWORKID,idStr);
696         }
697
698         if (servMgr->isRoot)
699         {
700                 sock->writeLineF("%s %d",PCX_HS_FLOWCTL,servMgr->useFlowControl?1:0);
701                 sock->writeLineF("%s %d",PCX_HS_MINBCTTL,chanMgr->minBroadcastTTL);
702                 sock->writeLineF("%s %d",PCX_HS_MAXBCTTL,chanMgr->maxBroadcastTTL);
703                 sock->writeLineF("%s %d",PCX_HS_RELAYBC,servMgr->relayBroadcast);
704                 //sock->writeLine("%s %d",PCX_HS_FULLHIT,2);
705
706
707                 if (diffRootVer)
708                 {
709                         sock->writeString(PCX_HS_DL);
710                         sock->writeLine(PCX_DL_URL);
711                 }
712
713                 sock->writeLineF("%s %s",PCX_HS_MSG,servMgr->rootMsg.cstr());
714         }
715
716
717         char hostIP[64];
718         Host h = sock->host;
719         h.IPtoStr(hostIP);
720     sock->writeLineF("%s %s",PCX_HS_REMOTEIP,hostIP);
721
722
723     sock->writeLine("");
724
725
726         while (http.nextHeader());
727 }
728
729 // -----------------------------------
730 bool    Servent::pingHost(Host &rhost,GnuID &rsid)
731 {
732         char ipstr[64];
733         rhost.toStr(ipstr);
734         LOG_DEBUG("Ping host %s: trying..",ipstr);
735         ClientSocket *s=NULL;
736         bool hostOK=false;
737         try
738         {
739                 s = sys->createSocket();
740                 if (!s)
741                         return false;
742                 else
743                 {
744
745                         s->setReadTimeout(15000);
746                         s->setWriteTimeout(15000);
747                         s->open(rhost);
748                         s->connect();
749
750                         AtomStream atom(*s);
751
752                         atom.writeInt(PCP_CONNECT,1);
753                         atom.writeParent(PCP_HELO,1);
754                                 atom.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
755
756                         GnuID sid;
757                         sid.clear();
758
759                         int numc,numd;
760                         ID4 id = atom.read(numc,numd);
761                         if (id == PCP_OLEH)
762                         {
763                                 for(int i=0; i<numc; i++)
764                                 {
765                                         int c,d;
766                                         ID4 pid = atom.read(c,d);
767                                         if (pid == PCP_SESSIONID)
768                                                 atom.readBytes(sid.id,16,d);
769                                         else
770                                                 atom.skip(c,d);
771                                 }
772                         }else
773                         {
774                                 LOG_DEBUG("Ping response: %s",id.getString().str());
775                                 throw StreamException("Bad ping response");
776                         }
777
778                         if (!sid.isSame(rsid))
779                                 throw StreamException("SIDs don`t match");
780
781                         hostOK = true;
782                         LOG_DEBUG("Ping host %s: OK",ipstr);
783                         atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT);
784
785
786                 }
787         }catch(StreamException &e)
788         {
789                 LOG_DEBUG("Ping host %s: %s",ipstr,e.msg);
790         }
791         if (s)
792         {
793                 s->close();
794                 delete s;
795         }
796
797         if (!hostOK)
798                 rhost.port = 0;
799
800         return true;
801 }
802
803 WLock canStreamLock;
804
805 // -----------------------------------
806 bool Servent::handshakeStream(ChanInfo &chanInfo)
807 {
808
809         HTTP http(*sock);
810
811
812         bool gotPCP=false;
813         unsigned int reqPos=0;
814         unsigned short listenPort = 0;
815
816         nsSwitchNum=0;
817
818         while (http.nextHeader())
819         {
820                 char *arg = http.getArgStr();
821                 if (!arg)
822                         continue;
823
824                 if (http.isHeader(PCX_HS_PCP))
825                         gotPCP = atoi(arg)!=0;
826                 else if (http.isHeader(PCX_HS_POS))
827                         reqPos = atoi(arg);
828                 else if (http.isHeader(PCX_HS_PORT))
829                         listenPort = (unsigned short)atoi(arg);
830                 else if (http.isHeader("icy-metadata"))
831                         addMetadata = atoi(arg) > 0;
832                 else if (http.isHeader(HTTP_HS_AGENT))
833                         agent = arg;
834                 else if (http.isHeader("Pragma"))
835                 {
836                         char *ssc = stristr(arg,"stream-switch-count=");
837                         char *so = stristr(arg,"stream-offset");
838
839                         if (ssc || so)
840                         {
841                                 nsSwitchNum=1;
842                                 //nsSwitchNum = atoi(ssc+20);
843                         }
844                 }
845
846                 LOG_DEBUG("Stream: %s",http.cmdLine);
847         }
848
849
850         if ((!gotPCP) && (outputProtocol == ChanInfo::SP_PCP))
851                 outputProtocol = ChanInfo::SP_PEERCAST;
852
853         if (outputProtocol == ChanInfo::SP_HTTP)
854         {
855                 if  ( (chanInfo.srcProtocol == ChanInfo::SP_MMS)
856                           || (chanInfo.contentType == ChanInfo::T_WMA)
857                           || (chanInfo.contentType == ChanInfo::T_WMV)
858                           || (chanInfo.contentType == ChanInfo::T_ASX)
859                         )
860                 outputProtocol = ChanInfo::SP_MMS;
861         }
862
863
864         bool chanFound=false;
865         bool chanReady=false;
866
867         ChanHit *sourceHit = NULL;
868
869         Channel *ch = chanMgr->findChannelByID(chanInfo.id);
870         if (ch)
871         {
872                 sendHeader = true;
873                 if (reqPos || !isIndexTxt(&chanInfo))
874                 {
875                         streamPos = ch->rawData.findOldestPos(reqPos);
876                         //streamPos = ch->rawData.getLatestPos();
877                 }else
878                 {
879                         streamPos = ch->rawData.getLatestPos();
880                 }
881
882                 chanID = chanInfo.id;
883                 serventHit.host.ip = getHost().ip;
884                 serventHit.host.port = listenPort;
885                 if (serventHit.host.globalIP())
886                         serventHit.rhost[0] = serventHit.host;
887                 else
888                         serventHit.rhost[1] = serventHit.host;
889                 serventHit.chanID = chanID;
890
891                 canStreamLock.on();
892                 chanReady = canStream(ch);
893                 if (0 && !chanReady && ch->isPlaying())
894                 {
895                         if (ch->info.getUptime() > 60
896                                 && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
897                         {
898                                 sourceHit = &ch->sourceHost;  // send source host info
899
900                                 if (listenPort)
901                                 {
902                                         // connect "this" host later
903                                         chanMgr->addHit(serventHit);
904                                 }
905
906                                 char tmp[50];
907                                 getHost().toStr(tmp);
908                                 LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
909                                 ch->bump = true;
910                         }
911                         else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
912                         {
913                                 chanReady = canStream(ch);
914                                 if (!chanReady)
915                                         LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
916                         }
917                 }
918                 if (!chanReady) type = T_INCOMING;
919                 thread.active = chanReady;
920                 setStatus(S_CONNECTED);
921                 canStreamLock.off();
922                 channel_id = ch->channel_id;
923
924                 //JP-Patch add-s
925                 if (servMgr->isCheckPushStream())
926                 {
927                         if (chanReady == true)
928                         {
929                                 Host h = getHost();
930
931                                 if (!h.isLocalhost()) 
932                                 {
933                                         do 
934                                         {
935                                                 if (strstr(agent.cstr(),"PeerCast/0.119") != NULL) 
936                                                 {                                               
937                                                         char strip[256];
938                                                         h.toStr(strip);
939                                                         LOG_ERROR("Block v0.119 Servent : %s (%s)",strip,agent.cstr());
940                                                         chanReady = false;
941                                                         break;
942                                                 }
943
944 /*                                              ChanHitList *hits[ChanMgr::MAX_HITLISTS];
945                                                 int numHits=0;
946                                                 for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
947                                                 {
948                                                         ChanHitList *chl = &chanMgr->hitlists[i];
949                                                         if (chl->isUsed())
950                                                                 hits[numHits++] = chl;
951                                                 }
952                                                 bool isfw = false;
953                                                 int numRelay = 0;
954                                                 for(int i=0; i<numHits; i++)
955                                                 {
956                                                         ChanHitList *chl = hits[i];
957                                                         if (chl->isUsed())
958                                                         {
959                                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
960                                                                 {
961                                                                         ChanHit *hit = &chl->hits[j];
962                                                                         if (hit->host.isValid() && (h.ip == hit->host.ip)) 
963                                                                         {
964                                                                                 if (hit->firewalled)
965                                                                                         isfw = true;
966                                                                                 numRelay = hit->numRelays;
967                                                                         }
968                                                                 }
969                                                         }
970                                                 }
971                                                 if ((isfw == true) && (numRelay == 0))
972                                                 {
973                                                         char strip[256];
974                                                         h.toStr(strip);
975                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
976                                                         chanReady = false;
977                                                 }*/
978
979                                                 ChanHitList *chl = chanMgr->findHitList(chanInfo);
980                                                 ChanHit *hit = (chl ? chl->hit : NULL);
981                                                 while(hit){
982                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
983                                                         {
984                                                                 if ((hit->firewalled) && (hit->numRelays == 0)){
985                                                                         char strip[256];
986                                                                         h.toStr(strip);
987                                                                         LOG_ERROR("Block firewalled Servent : %s",strip);
988                                                                         chanReady = false;
989                                                                 }
990                                                         }
991                                                         hit = hit->next;
992                                                 }
993                                         }while (0);
994                                 }               
995                         }
996                 }
997                 //JP-Patch add-e
998         }
999
1000 //      LockBlock lockblock(chanMgr->hitlistlock);
1001
1002 //      lockblock.lockon();
1003         ChanHitList *chl = chanMgr->findHitList(chanInfo);
1004
1005         if (chl)
1006         {
1007                 chanFound = true;
1008         }
1009
1010
1011         bool result = false;
1012
1013         char idStr[64];
1014         chanInfo.id.toStr(idStr);
1015
1016         char sidStr[64];
1017         servMgr->sessionID.toStr(sidStr);
1018
1019         Host rhost = sock->host;
1020
1021
1022
1023
1024         AtomStream atom(*sock);
1025
1026
1027
1028         if (!chanFound)
1029         {
1030                 sock->writeLine(HTTP_SC_NOTFOUND);
1031             sock->writeLine("");
1032                 LOG_DEBUG("Sending channel not found");
1033                 return false;
1034         }
1035
1036
1037         if (!chanReady)
1038         {
1039                 if (outputProtocol == ChanInfo::SP_PCP)
1040                 {
1041
1042                         char tbuf[8196];
1043                         MemoryStream mem(tbuf, sizeof(tbuf));
1044                         mem.writeLine(HTTP_SC_UNAVAILABLE);
1045                         mem.writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1046                         mem.writeLine("");
1047                         sock->write(tbuf, mem.getPosition());
1048
1049                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1050
1051                         char ripStr[64];
1052                         rhost.toStr(ripStr);
1053
1054                         LOG_DEBUG("Sending channel unavailable");
1055
1056                         ChanHitSearch chs;
1057
1058                         mem.rewind();
1059                         AtomStream atom2(mem);
1060
1061                         int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
1062
1063                         if (sourceHit) {
1064                                 sourceHit->writeAtoms(atom2,chanInfo.id);       
1065                                 char tmp[50];
1066                                 sourceHit->host.toStr(tmp);
1067                                 LOG_DEBUG("relay info(sourceHit): %s", tmp);
1068                         }
1069
1070                         chanMgr->hitlistlock.on();
1071
1072                         chl = chanMgr->findHitList(chanInfo);
1073
1074                         if (chl && !sourceHit)
1075                         {
1076                                 ChanHit best;
1077                                 
1078                                 // search for up to 8 other hits
1079                                 int cnt=0;
1080                                 int i;
1081                                 for(i=0; i<8; i++)
1082                                 {
1083                                         best.init();
1084
1085
1086                                         // find best hit this network if local IP
1087                                         if (!rhost.globalIP())
1088                                         {
1089                                                 chs.init();
1090                                                 chs.matchHost = servMgr->serverHost;
1091                                                 chs.waitDelay = 2;
1092                                                 chs.excludeID = remoteID;
1093                                                 if (chl->pickHits(chs)){
1094                                                         best = chs.best[0];
1095                                                         LOG_DEBUG("find best hit this network if local IP");
1096                                                 }
1097                                         }
1098
1099                                         // find best hit on same network
1100                                         if (!best.host.ip)
1101                                         {
1102                                                 chs.init();
1103                                                 chs.matchHost = rhost;
1104                                                 chs.waitDelay = 2;
1105                                                 chs.excludeID = remoteID;
1106                                                 if (chl->pickHits(chs)){
1107                                                         best = chs.best[0];
1108                                                         LOG_DEBUG("find best hit on same network");
1109                                                 }
1110
1111                                         }
1112
1113                                         // find best hit on other networks
1114 /*                                      if (!best.host.ip)
1115                                         {
1116                                                 chs.init();
1117                                                 chs.waitDelay = 2;
1118                                                 chs.excludeID = remoteID;
1119                                                 if (chl->pickHits(chs)){
1120                                                         best = chs.best[0];
1121                                                         LOG_DEBUG("find best hit on other networks");
1122                                                 }
1123
1124                                         }*/
1125                                         
1126                                         if (!best.host.ip)
1127                                                 break;
1128
1129                                         best.writeAtoms(atom2,chanInfo.id);                        
1130                                         cnt++;
1131                                 }
1132
1133                                 if (!best.host.ip){
1134                                         char tmp[50];
1135 //                                      chanMgr->hitlistlock.on();
1136                                         int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
1137 //                                      chanMgr->hitlistlock.off();
1138                                         for (int i = 0; i < rhcnt; i++){
1139                                                 chs.best[i].writeAtoms(atom2, chanInfo.id);
1140                                                 chs.best[i].host.toStr(tmp);
1141                                                 LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
1142                                                 best.host.ip = chs.best[i].host.ip;
1143                                         }
1144                                         cnt += rhcnt;
1145                                 }
1146
1147                                 if (cnt)
1148                                 {
1149                                         LOG_DEBUG("Sent %d channel hit(s) to %s",cnt,ripStr);
1150
1151                                 }
1152                                 else if (rhost.port)
1153                                 {
1154                                         // find firewalled host
1155                                         chs.init();
1156                                         chs.waitDelay = 30;
1157                                         chs.useFirewalled = true;
1158                                         chs.excludeID = remoteID;
1159                                         if (chl->pickHits(chs))
1160                                         {
1161                                                 best = chs.best[0];
1162                                                 int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_RELAY);
1163                                                 LOG_DEBUG("Broadcasted channel push request to %d clients for %s",cnt,ripStr);
1164                                         }
1165                                 } 
1166
1167                                 // if all else fails, use tracker
1168                                 if (!best.host.ip)
1169                                 {
1170                                         // find best tracker on this network if local IP
1171                                         if (!rhost.globalIP())
1172                                         {
1173                                                 chs.init();
1174                                                 chs.matchHost = servMgr->serverHost;
1175                                                 chs.trackersOnly = true;
1176                                                 chs.excludeID = remoteID;
1177                                                 if (chl->pickHits(chs))
1178                                                         best = chs.best[0];
1179
1180                                         }
1181
1182                                         // find local tracker
1183                                         if (!best.host.ip)
1184                                         {
1185                                                 chs.init();
1186                                                 chs.matchHost = rhost;
1187                                                 chs.trackersOnly = true;
1188                                                 chs.excludeID = remoteID;
1189                                                 if (chl->pickHits(chs))
1190                                                         best = chs.best[0];
1191                                         }
1192
1193                                         // find global tracker
1194                                         if (!best.host.ip)
1195                                         {
1196                                                 chs.init();
1197                                                 chs.trackersOnly = true;
1198                                                 chs.excludeID = remoteID;
1199                                                 if (chl->pickHits(chs))
1200                                                         best = chs.best[0];
1201                                         }
1202
1203                                         if (best.host.ip)
1204                                         {
1205                                                 best.writeAtoms(atom2,chanInfo.id);                             
1206                                                 LOG_DEBUG("Sent 1 tracker hit to %s",ripStr);
1207                                         }else if (rhost.port)
1208                                         {
1209                                                 // find firewalled tracker
1210                                                 chs.init();
1211                                                 chs.useFirewalled = true;
1212                                                 chs.trackersOnly = true;
1213                                                 chs.excludeID = remoteID;
1214                                                 chs.waitDelay = 30;
1215                                                 if (chl->pickHits(chs))
1216                                                 {
1217                                                         best = chs.best[0];
1218                                                         int cnt = servMgr->broadcastPushRequest(best,rhost,chl->info.id,Servent::T_CIN);
1219                                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,ripStr);
1220                                                 }
1221                                         }
1222
1223                                 }
1224
1225
1226                         }
1227
1228                         chanMgr->hitlistlock.off();
1229
1230                         // return not available yet code
1231                         atom2.writeInt(PCP_QUIT,error);
1232                         sock->write(tbuf, mem.getPosition());
1233                         result = false;
1234
1235                         /*
1236                         char c[512];
1237                         // wait disconnect from other host
1238                         try{
1239                                 while(sock->read(c, sizeof(c))){
1240                                         sys->sleep(10);
1241                                 }
1242                         }catch(StreamException &e){
1243                                 LOG_DEBUG("RelayInfoWait: %s",e.msg);
1244                         }
1245                         */
1246                 }else
1247                 {
1248                         LOG_DEBUG("Sending channel unavailable");
1249                         sock->writeLine(HTTP_SC_UNAVAILABLE);
1250                         sock->writeLine("");
1251                         result = false;
1252                 }
1253
1254         } else {
1255
1256                 if (chanInfo.contentType != ChanInfo::T_MP3)
1257                         addMetadata=false;
1258
1259                 if (addMetadata && (outputProtocol == ChanInfo::SP_HTTP))               // winamp mp3 metadata check
1260                 {
1261
1262                         sock->writeLine(ICY_OK);
1263
1264                         sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1265                         sock->writeLineF("icy-name:%s",chanInfo.name.cstr());
1266                         sock->writeLineF("icy-br:%d",chanInfo.bitrate);
1267                         sock->writeLineF("icy-genre:%s",chanInfo.genre.cstr());
1268                         sock->writeLineF("icy-url:%s",chanInfo.url.cstr());
1269                         sock->writeLineF("icy-metaint:%d",chanMgr->icyMetaInterval);
1270                         sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1271
1272                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1273
1274                 }else
1275                 {
1276
1277                         sock->writeLine(HTTP_SC_OK);
1278
1279                         if ((chanInfo.contentType != ChanInfo::T_ASX) && (chanInfo.contentType != ChanInfo::T_WMV) && (chanInfo.contentType != ChanInfo::T_WMA))
1280                         {
1281                                 sock->writeLineF("%s %s",HTTP_HS_SERVER,PCX_AGENT);
1282
1283                                 sock->writeLine("Accept-Ranges: none");
1284
1285                                 sock->writeLineF("x-audiocast-name: %s",chanInfo.name.cstr());
1286                                 sock->writeLineF("x-audiocast-bitrate: %d",chanInfo.bitrate);
1287                                 sock->writeLineF("x-audiocast-genre: %s",chanInfo.genre.cstr());
1288                                 sock->writeLineF("x-audiocast-description: %s",chanInfo.desc.cstr());
1289                                 sock->writeLineF("x-audiocast-url: %s",chanInfo.url.cstr());
1290                                 sock->writeLineF("%s %s",PCX_HS_CHANNELID,idStr);
1291                         }
1292
1293
1294                         if (outputProtocol == ChanInfo::SP_HTTP)
1295                         {
1296                                 switch (chanInfo.contentType)
1297                                 {
1298                                         case ChanInfo::T_OGG:
1299                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XOGG);
1300                                                 break;
1301                                         case ChanInfo::T_MP3:
1302                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MP3);
1303                                                 break;
1304                                         case ChanInfo::T_MOV:
1305                                                 sock->writeLine("Connection: close");
1306                                                 sock->writeLine("Content-Length: 10000000");
1307                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MOV);
1308                                                 break;
1309                                         case ChanInfo::T_MPG:
1310                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MPG);
1311                                                 break;
1312                                         case ChanInfo::T_NSV:
1313                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_NSV);
1314                                                 break;
1315                                         case ChanInfo::T_ASX:
1316                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_ASX);
1317                                                 break;
1318                                         case ChanInfo::T_WMA:
1319                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMA);
1320                                                 break;
1321                                         case ChanInfo::T_WMV:
1322                                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_WMV);
1323                                                 break;
1324                                 }
1325                         } else if (outputProtocol == ChanInfo::SP_MMS)
1326                         {
1327                                 sock->writeLine("Server: Rex/9.0.0.2980");
1328                                 sock->writeLine("Cache-Control: no-cache");
1329                                 sock->writeLine("Pragma: no-cache");
1330                                 sock->writeLine("Pragma: client-id=3587303426");
1331                                 sock->writeLine("Pragma: features=\"broadcast,playlist\"");
1332
1333                                 if (nsSwitchNum)
1334                                 {
1335                                         sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_MMS);
1336                                 }else
1337                                 {
1338                                         if (agent.contains("Android"))
1339                                         {
1340                                                 LOG_DEBUG("INFO: Android client detected.");
1341                                                 sock->writeLineF("%s %s", HTTP_HS_CONTENT, MIME_WMV);
1342                                         } else
1343                                         {
1344                                                 sock->writeLine("Content-Type: application/vnd.ms.wms-hdr.asfv1");
1345                                                 if (ch)
1346                                                         sock->writeLineF("Content-Length: %d",ch->headPack.len);
1347                                                 sock->writeLine("Connection: Keep-Alive");
1348                                         }
1349                                 }
1350                         
1351                         } else if (outputProtocol == ChanInfo::SP_PCP)
1352                         {
1353                                 sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
1354                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPCP);
1355
1356                         }else if (outputProtocol == ChanInfo::SP_PEERCAST)
1357                         {
1358                                 sock->writeLineF("%s %s",HTTP_HS_CONTENT,MIME_XPEERCAST);
1359                         }
1360                 }
1361                 sock->writeLine("");
1362                 result = true;
1363
1364                 if (gotPCP)
1365                 {
1366                         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1367                         atom.writeInt(PCP_OK,0);
1368                         if (rhost.globalIP())
1369                                 serventHit.rhost[0] = rhost;
1370                         else
1371                                 serventHit.rhost[1] = rhost;
1372                         serventHit.sessionID = remoteID;
1373                         serventHit.numHops = 1;
1374                         chanMgr->addHit(serventHit);
1375                 }
1376
1377         }
1378
1379
1380
1381         return result;
1382 }
1383
1384 // -----------------------------------
1385 void Servent::handshakeGiv(GnuID &id)
1386 {
1387         if (id.isSet())
1388         {
1389                 char idstr[64];
1390                 id.toStr(idstr);
1391                 sock->writeLineF("GIV /%s",idstr);
1392         }else
1393                 sock->writeLine("GIV");
1394
1395         sock->writeLine("");
1396 }
1397
1398
1399 // -----------------------------------
1400 void Servent::processGnutella()
1401 {
1402         type = T_PGNU;
1403
1404         //if (servMgr->isRoot && !servMgr->needConnections())
1405         if (servMgr->isRoot)
1406         {
1407                 processRoot();
1408                 return;
1409         }
1410
1411
1412
1413         gnuStream.init(sock);
1414         setStatus(S_CONNECTED);
1415
1416         if (!servMgr->isRoot)
1417         {
1418                 chanMgr->broadcastRelays(this, 1, 1);
1419                 GnuPacket *p;
1420
1421                 if ((p=outPacketsNorm.curr()))  
1422                         gnuStream.sendPacket(*p);
1423                 return;
1424         }
1425
1426         gnuStream.ping(2);
1427
1428 //      if (type != T_LOOKUP)
1429 //              chanMgr->broadcastRelays(this,chanMgr->minBroadcastTTL,2);
1430
1431         lastPacket = lastPing = sys->getTime();
1432         bool doneBigPing=false;
1433
1434         const unsigned int      abortTimeoutSecs = 60;          // abort connection after 60 secs of no activitiy
1435         const unsigned int      packetTimeoutSecs = 30;         // ping connection after 30 secs of no activity
1436
1437         unsigned int currBytes=0;
1438         unsigned int lastWait=0;
1439
1440         unsigned int lastTotalIn=0,lastTotalOut=0;
1441
1442         while (thread.active && sock->active())
1443         {
1444
1445                 if (sock->readReady())
1446                 {
1447                         lastPacket = sys->getTime();
1448
1449                         if (gnuStream.readPacket(pack))
1450                         {
1451                                 char ipstr[64];
1452                                 sock->host.toStr(ipstr);
1453
1454                                 GnuID routeID;
1455                                 GnuStream::R_TYPE ret = GnuStream::R_PROCESS;
1456
1457                                 if (pack.func != GNU_FUNC_PONG)
1458                                         if (servMgr->seenPacket(pack))
1459                                                 ret = GnuStream::R_DUPLICATE;
1460
1461                                 seenIDs.add(pack.id);
1462
1463
1464                                 if (ret == GnuStream::R_PROCESS)
1465                                 {
1466                                         GnuID routeID;
1467                                         ret = gnuStream.processPacket(pack,this,routeID);
1468
1469                                         if (flowControl && (ret == GnuStream::R_BROADCAST))
1470                                                 ret = GnuStream::R_DROP;
1471
1472                                 }
1473
1474                                 switch(ret)
1475                                 {
1476                                         case GnuStream::R_BROADCAST:
1477                                                 if (servMgr->broadcast(pack,this))
1478                                                         stats.add(Stats::NUMBROADCASTED);
1479                                                 else
1480                                                         stats.add(Stats::NUMDROPPED);
1481                                                 break;
1482                                         case GnuStream::R_ROUTE:
1483                                                 if (servMgr->route(pack,routeID,NULL))
1484                                                         stats.add(Stats::NUMROUTED);
1485                                                 else
1486                                                         stats.add(Stats::NUMDROPPED);
1487                                                 break;
1488                                         case GnuStream::R_ACCEPTED:
1489                                                 stats.add(Stats::NUMACCEPTED);
1490                                                 break;
1491                                         case GnuStream::R_DUPLICATE:
1492                                                 stats.add(Stats::NUMDUP);
1493                                                 break;
1494                                         case GnuStream::R_DEAD:
1495                                                 stats.add(Stats::NUMDEAD);
1496                                                 break;
1497                                         case GnuStream::R_DISCARD:
1498                                                 stats.add(Stats::NUMDISCARDED);
1499                                                 break;
1500                                         case GnuStream::R_BADVERSION:
1501                                                 stats.add(Stats::NUMOLD);
1502                                                 break;
1503                                         case GnuStream::R_DROP:
1504                                                 stats.add(Stats::NUMDROPPED);
1505                                                 break;
1506                                 }
1507
1508
1509                                 LOG_NETWORK("packet in: %s-%s, %d bytes, %d hops, %d ttl, from %s",GNU_FUNC_STR(pack.func),GnuStream::getRouteStr(ret),pack.len,pack.hops,pack.ttl,ipstr);
1510
1511
1512
1513                         }else{
1514                                 LOG_ERROR("Bad packet");
1515                         }
1516                 }
1517
1518
1519                 GnuPacket *p;
1520
1521                 if ((p=outPacketsPri.curr()))                           // priority packet
1522                 {
1523                         gnuStream.sendPacket(*p);
1524                         seenIDs.add(p->id);
1525                         outPacketsPri.next();
1526                 } else if ((p=outPacketsNorm.curr()))           // or.. normal packet
1527                 {
1528                         gnuStream.sendPacket(*p);
1529                         seenIDs.add(p->id);
1530                         outPacketsNorm.next();
1531                 }
1532
1533                 int lpt =  sys->getTime()-lastPacket;
1534
1535                 if (!doneBigPing)
1536                 {
1537                         if ((sys->getTime()-lastPing) > 15)
1538                         {
1539                                 gnuStream.ping(7);
1540                                 lastPing = sys->getTime();
1541                                 doneBigPing = true;
1542                         }
1543                 }else{
1544                         if (lpt > packetTimeoutSecs)
1545                         {
1546                                 
1547                                 if ((sys->getTime()-lastPing) > packetTimeoutSecs)
1548                                 {
1549                                         gnuStream.ping(1);
1550                                         lastPing = sys->getTime();
1551                                 }
1552
1553                         }
1554                 }
1555                 if (lpt > abortTimeoutSecs)
1556                         throw TimeoutException();
1557
1558
1559                 unsigned int totIn = sock->totalBytesIn-lastTotalIn;
1560                 unsigned int totOut = sock->totalBytesOut-lastTotalOut;
1561
1562                 unsigned int bytes = totIn+totOut;
1563
1564                 lastTotalIn = sock->totalBytesIn;
1565                 lastTotalOut = sock->totalBytesOut;
1566
1567                 const int serventBandwidth = 1000;
1568
1569                 int delay = sys->idleSleepTime;
1570                 if ((bytes) && (serventBandwidth >= 8))
1571                         delay = (bytes*1000)/(serventBandwidth/8);      // set delay relative packetsize
1572
1573                 if (delay < (int)sys->idleSleepTime)
1574                         delay = sys->idleSleepTime;
1575                 //LOG("delay %d, in %d, out %d",delay,totIn,totOut);
1576
1577                 sys->sleep(delay);
1578         }
1579
1580 }
1581
1582
1583 // -----------------------------------
1584 void Servent::processRoot()
1585 {
1586         try 
1587         {
1588         
1589                 gnuStream.init(sock);
1590                 setStatus(S_CONNECTED);
1591
1592                 gnuStream.ping(2);
1593
1594                 unsigned int lastConnect = sys->getTime();
1595
1596                 while (thread.active && sock->active())
1597                 {
1598                         if (gnuStream.readPacket(pack))
1599                         {
1600                                 char ipstr[64];
1601                                 sock->host.toStr(ipstr);
1602                                 
1603                                 LOG_NETWORK("packet in: %d from %s",pack.func,ipstr);
1604
1605
1606                                 if (pack.func == GNU_FUNC_PING)         // if ping then pong back some hosts and close
1607                                 {
1608                                         
1609                                         Host hl[32];
1610                                         int cnt = servMgr->getNewestServents(hl,32,sock->host); 
1611                                         if (cnt)
1612                                         {
1613                                                 int start = sys->rnd() % cnt;
1614                                                 int max = cnt>8?8:cnt;
1615
1616                                                 for(int i=0; i<max; i++)
1617                                                 {
1618                                                         GnuPacket pong;
1619                                                         pack.hops = 1;
1620                                                         pong.initPong(hl[start],false,pack);
1621                                                         gnuStream.sendPacket(pong);
1622
1623                                                         char ipstr[64];
1624                                                         hl[start].toStr(ipstr);
1625
1626                                                         //LOG_NETWORK("Pong %d: %s",start+1,ipstr);
1627                                                         start = (start+1) % cnt;
1628                                                 }
1629                                                 char str[64];
1630                                                 sock->host.toStr(str);
1631                                                 LOG_NETWORK("Sent %d pong(s) to %s",max,str);
1632                                         }else
1633                                         {
1634                                                 LOG_NETWORK("No Pongs to send");
1635                                                 //return;
1636                                         }
1637                                 }else if (pack.func == GNU_FUNC_PONG)           // pong?
1638                                 {
1639                                         MemoryStream pong(pack.data,pack.len);
1640
1641                                         int ip,port;
1642                                         port = pong.readShort();
1643                                         ip = pong.readLong();
1644                                         ip = SWAP4(ip);
1645
1646
1647                                         Host h(ip,port);
1648                                         if ((ip) && (port) && (h.globalIP()))
1649                                         {
1650
1651                                                 LOG_NETWORK("added pong: %d.%d.%d.%d:%d",ip>>24&0xff,ip>>16&0xff,ip>>8&0xff,ip&0xff,port);
1652                                                 servMgr->addHost(h,ServHost::T_SERVENT,sys->getTime());
1653                                         }
1654                                         //return;
1655                                 } else if (pack.func == GNU_FUNC_HIT)
1656                                 {
1657                                         MemoryStream data(pack.data,pack.len);
1658                                         ChanHit hit;
1659                                         gnuStream.readHit(data,hit,pack.hops,pack.id);
1660                                 }
1661
1662                                 //if (gnuStream.packetsIn > 5)  // die if we get too many packets
1663                                 //      return;
1664                         }
1665
1666                         if((sys->getTime()-lastConnect > 60))
1667                                 break;
1668                 }
1669
1670
1671         }catch(StreamException &e)
1672         {
1673                 LOG_ERROR("Relay: %s",e.msg);
1674         }
1675
1676         
1677 }       
1678
1679 // -----------------------------------
1680 int Servent::givProcMain(ThreadInfo *thread)
1681 {
1682 //      thread->lock();
1683         Servent *sv = (Servent*)thread->data;
1684         try 
1685         {
1686                 sv->handshakeGiv(sv->givID);
1687                 sv->handshakeIncoming();
1688
1689         }catch(StreamException &e)
1690         {
1691                 LOG_ERROR("GIV: %s",e.msg);
1692         }
1693
1694         sv->kill();
1695         sys->endThread(thread);
1696         return 0;
1697 }
1698
1699 // -----------------------------------
1700 int Servent::givProc(ThreadInfo *thread)
1701 {
1702         SEH_THREAD(givProcMain, Servent::givProc);
1703 }
1704
1705 // -----------------------------------
1706 void Servent::handshakeOutgoingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent, bool isTrusted)
1707 {
1708
1709         bool nonFW = (servMgr->getFirewall() != ServMgr::FW_ON);
1710         bool testFW = (servMgr->getFirewall() == ServMgr::FW_UNKNOWN);
1711
1712         bool sendBCID = isTrusted && chanMgr->isBroadcasting();
1713
1714         char tbuf[1024];
1715         MemoryStream mem(tbuf, sizeof(tbuf));
1716         AtomStream atom2(mem);
1717         atom2.writeParent(PCP_HELO,3 + (testFW?1:0) + (nonFW?1:0) + (sendBCID?1:0));
1718                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1719                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1720                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1721                 if (nonFW)
1722                         atom2.writeShort(PCP_HELO_PORT,servMgr->serverHost.port);
1723                 if (testFW)
1724                         atom2.writeShort(PCP_HELO_PING,servMgr->serverHost.port);
1725                 if (sendBCID)
1726                         atom2.writeBytes(PCP_HELO_BCID,chanMgr->broadcastID.id,16);
1727         atom.io.write(tbuf, mem.getPosition());
1728
1729
1730         LOG_DEBUG("PCP outgoing waiting for OLEH..");
1731
1732         int numc,numd;
1733         ID4 id = atom.read(numc,numd);
1734         if (id != PCP_OLEH)
1735         {
1736                 LOG_DEBUG("PCP outgoing reply: %s",id.getString().str());
1737                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1738                 throw StreamException("Got unexpected PCP response");
1739         }
1740
1741
1742
1743         char arg[64];
1744
1745         GnuID clientID;
1746         clientID.clear();
1747         rid.clear();
1748         int version=0;
1749         int disable=0;
1750
1751         Host thisHost;
1752
1753         // read OLEH response
1754         for(int i=0; i<numc; i++)
1755         {
1756                 int c,dlen;
1757                 ID4 id = atom.read(c,dlen);
1758
1759                 if (id == PCP_HELO_AGENT)
1760                 {
1761                         atom.readString(arg,sizeof(arg),dlen);
1762                         agent.set(arg);
1763
1764                 }else if (id == PCP_HELO_REMOTEIP)
1765                 {
1766                         thisHost.ip = atom.readInt();
1767
1768                 }else if (id == PCP_HELO_PORT)
1769                 {
1770                         thisHost.port = atom.readShort();
1771
1772                 }else if (id == PCP_HELO_VERSION)
1773                 {
1774                         version = atom.readInt();
1775
1776                 }else if (id == PCP_HELO_DISABLE)
1777                 {
1778                         disable = atom.readInt();
1779
1780                 }else if (id == PCP_HELO_SESSIONID)
1781                 {
1782                         atom.readBytes(rid.id,16);
1783                         if (rid.isSame(servMgr->sessionID))
1784                                 throw StreamException("Servent loopback");
1785
1786                 }else
1787                 {
1788                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1789                         atom.skip(c,dlen);
1790                 }
1791
1792     }
1793
1794
1795         // update server ip/firewall status
1796         if (isTrusted)
1797         {
1798                 if (thisHost.isValid())
1799                 {
1800                         if ((servMgr->serverHost.ip != thisHost.ip) && (servMgr->forceIP.isEmpty()))
1801                         {
1802                                 char ipstr[64];
1803                                 thisHost.toStr(ipstr);
1804                                 LOG_DEBUG("Got new ip: %s",ipstr);
1805                                 servMgr->serverHost.ip = thisHost.ip;
1806                         }
1807
1808                         if (servMgr->getFirewall() == ServMgr::FW_UNKNOWN)
1809                         {
1810                                 if (thisHost.port && thisHost.globalIP())
1811                                         servMgr->setFirewall(ServMgr::FW_OFF);
1812                                 else
1813                                         servMgr->setFirewall(ServMgr::FW_ON);
1814                         }
1815                 }
1816
1817                 if (disable == 1)
1818                 {
1819                         LOG_ERROR("client disabled: %d",disable);
1820                         servMgr->isDisabled = true;             
1821                 }else
1822                 {
1823                         servMgr->isDisabled = false;            
1824                 }
1825         }
1826
1827
1828
1829         if (!rid.isSet())
1830         {
1831                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1832                 throw StreamException("Remote host not identified");
1833         }
1834
1835         LOG_DEBUG("PCP Outgoing handshake complete.");
1836
1837 }
1838
1839 // -----------------------------------
1840 void Servent::handshakeIncomingPCP(AtomStream &atom, Host &rhost, GnuID &rid, String &agent)
1841 {
1842         int numc,numd;
1843         ID4 id = atom.read(numc,numd);
1844
1845
1846         if (id != PCP_HELO)
1847         {
1848                 LOG_DEBUG("PCP incoming reply: %s",id.getString().str());
1849                 atom.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADRESPONSE);
1850                 throw StreamException("Got unexpected PCP response");
1851         }
1852
1853         char arg[64];
1854
1855         ID4 osType;
1856
1857         int version=0;
1858
1859         int pingPort=0;
1860
1861         GnuID bcID;
1862         GnuID clientID;
1863
1864         bcID.clear();
1865         clientID.clear();
1866
1867         rhost.port = 0;
1868
1869         for(int i=0; i<numc; i++)
1870         {
1871
1872                 int c,dlen;
1873                 ID4 id = atom.read(c,dlen);
1874
1875                 if (id == PCP_HELO_AGENT)
1876                 {
1877                         atom.readString(arg,sizeof(arg),dlen);
1878                         agent.set(arg);
1879
1880                 }else if (id == PCP_HELO_VERSION)
1881                 {
1882                         version = atom.readInt();
1883
1884                 }else if (id == PCP_HELO_SESSIONID)
1885                 {
1886                         atom.readBytes(rid.id,16);
1887                         if (rid.isSame(servMgr->sessionID))
1888                                 throw StreamException("Servent loopback");
1889
1890                 }else if (id == PCP_HELO_BCID)
1891                 {
1892                         atom.readBytes(bcID.id,16);
1893
1894                 }else if (id == PCP_HELO_OSTYPE)
1895                 {
1896                         osType = atom.readInt();
1897                 }else if (id == PCP_HELO_PORT)
1898                 {
1899                         rhost.port = atom.readShort();
1900                 }else if (id == PCP_HELO_PING)
1901                 {
1902                         pingPort = atom.readShort();
1903                 }else
1904                 {
1905                         LOG_DEBUG("PCP handshake skip: %s",id.getString().str());
1906                         atom.skip(c,dlen);
1907                 }
1908
1909     }
1910
1911         if (version)
1912                 LOG_DEBUG("Incoming PCP is %s : v%d", agent.cstr(),version);
1913
1914
1915         if (!rhost.globalIP() && servMgr->serverHost.globalIP())
1916                 rhost.ip = servMgr->serverHost.ip;
1917
1918         if (pingPort)
1919         {
1920                 char ripStr[64];
1921                 rhost.toStr(ripStr);
1922                 LOG_DEBUG("Incoming firewalled test request: %s ", ripStr);
1923                 rhost.port = pingPort;
1924                 if (!rhost.globalIP() || !pingHost(rhost,rid))
1925                         rhost.port = 0;
1926         }
1927
1928         if (servMgr->isRoot)
1929         {
1930                 if (bcID.isSet())
1931                 {
1932                         if (bcID.getFlags() & 1)        // private
1933                         {
1934                                 BCID *bcid = servMgr->findValidBCID(bcID);
1935                                 if (!bcid || (bcid && !bcid->valid))
1936                                 {
1937                                         atom.writeParent(PCP_OLEH,1);
1938                                         atom.writeInt(PCP_HELO_DISABLE,1);
1939                                         throw StreamException("Client is banned");
1940                                 }
1941                         }
1942                 }
1943         }
1944
1945
1946         char tbuf[1024];
1947         MemoryStream mem(tbuf, sizeof(tbuf));
1948         AtomStream atom2(mem);
1949         atom2.writeParent(PCP_OLEH,5);
1950                 atom2.writeString(PCP_HELO_AGENT,PCX_AGENT);
1951                 atom2.writeBytes(PCP_HELO_SESSIONID,servMgr->sessionID.id,16);
1952                 atom2.writeInt(PCP_HELO_VERSION,PCP_CLIENT_VERSION);
1953                 atom2.writeInt(PCP_HELO_REMOTEIP,rhost.ip);
1954                 atom2.writeShort(PCP_HELO_PORT,rhost.port);
1955
1956         if (version)
1957         {
1958                 if (version < PCP_CLIENT_MINVERSION)
1959                 {
1960                         atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_BADAGENT);
1961                         atom.io.write(tbuf, mem.getPosition());
1962                         throw StreamException("Agent is not valid");
1963                 }
1964         }
1965
1966         if (!rid.isSet())
1967         {
1968                 atom2.writeInt(PCP_QUIT,PCP_ERROR_QUIT+PCP_ERROR_NOTIDENTIFIED);
1969                 atom.io.write(tbuf, mem.getPosition());
1970                 throw StreamException("Remote host not identified");
1971         }
1972
1973
1974
1975         if (servMgr->isRoot)
1976         {
1977                 servMgr->writeRootAtoms(atom2,false);
1978         }
1979
1980         atom.io.write(tbuf, mem.getPosition());
1981
1982         LOG_DEBUG("PCP Incoming handshake complete.");
1983
1984 }
1985
1986 // -----------------------------------
1987 void Servent::processIncomingPCP(bool suggestOthers)
1988 {
1989         PCPStream::readVersion(*sock);
1990
1991
1992         AtomStream atom(*sock);
1993         Host rhost = sock->host;
1994
1995         handshakeIncomingPCP(atom,rhost,remoteID,agent);
1996
1997
1998         bool alreadyConnected = (servMgr->findConnection(Servent::T_COUT,remoteID)!=NULL)
1999                                                         || (servMgr->findConnection(Servent::T_CIN,remoteID)!=NULL);
2000         bool unavailable = servMgr->controlInFull();
2001         bool offair = !servMgr->isRoot && !chanMgr->isBroadcasting();
2002
2003         char rstr[64];
2004         rhost.toStr(rstr);
2005
2006         if (unavailable || alreadyConnected || offair)
2007         {
2008                 int error;
2009
2010                 if (alreadyConnected)
2011                         error = PCP_ERROR_QUIT+PCP_ERROR_ALREADYCONNECTED;
2012                 else if (unavailable)
2013                         error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
2014                 else if (offair)
2015                         error = PCP_ERROR_QUIT+PCP_ERROR_OFFAIR;
2016                 else 
2017                         error = PCP_ERROR_QUIT;
2018
2019
2020                 if (suggestOthers)
2021                 {
2022
2023                         ChanHit best;
2024                         ChanHitSearch chs;
2025
2026                         int cnt=0;
2027                         for(int i=0; i<8; i++)
2028                         {
2029                                 best.init();
2030
2031                                 // find best hit on this network                        
2032                                 if (!rhost.globalIP())
2033                                 {
2034                                         chs.init();
2035                                         chs.matchHost = servMgr->serverHost;
2036                                         chs.waitDelay = 2;
2037                                         chs.excludeID = remoteID;
2038                                         chs.trackersOnly = true;
2039                                         chs.useBusyControls = false;
2040                                         if (chanMgr->pickHits(chs))
2041                                                 best = chs.best[0];
2042                                 }
2043
2044                                 // find best hit on same network                        
2045                                 if (!best.host.ip)
2046                                 {
2047                                         chs.init();
2048                                         chs.matchHost = rhost;
2049                                         chs.waitDelay = 2;
2050                                         chs.excludeID = remoteID;
2051                                         chs.trackersOnly = true;
2052                                         chs.useBusyControls = false;
2053                                         if (chanMgr->pickHits(chs))
2054                                                 best = chs.best[0];
2055                                 }
2056
2057                                 // else find best hit on other networks
2058                                 if (!best.host.ip)
2059                                 {
2060                                         chs.init();
2061                                         chs.waitDelay = 2;
2062                                         chs.excludeID = remoteID;
2063                                         chs.trackersOnly = true;
2064                                         chs.useBusyControls = false;
2065                                         if (chanMgr->pickHits(chs))
2066                                                 best = chs.best[0];
2067                                 }
2068
2069                                 if (!best.host.ip)
2070                                         break;
2071                                 
2072                                 GnuID noID;
2073                                 noID.clear();
2074                                 best.writeAtoms(atom,noID);
2075                                 cnt++;
2076                         }
2077                         if (cnt)
2078                         {
2079                                 LOG_DEBUG("Sent %d tracker(s) to %s",cnt,rstr);
2080                         }
2081                         else if (rhost.port)
2082                         {
2083                                 // send push request to best firewalled tracker on other network
2084                                 chs.init();
2085                                 chs.waitDelay = 30;
2086                                 chs.excludeID = remoteID;
2087                                 chs.trackersOnly = true;
2088                                 chs.useFirewalled = true;
2089                                 chs.useBusyControls = false;
2090                                 if (chanMgr->pickHits(chs))
2091                                 {
2092                                         best = chs.best[0];
2093                                         GnuID noID;
2094                                         noID.clear();
2095                                         int cnt = servMgr->broadcastPushRequest(best,rhost,noID,Servent::T_CIN);
2096                                         LOG_DEBUG("Broadcasted tracker push request to %d clients for %s",cnt,rstr);
2097                                 }
2098                         }else
2099                         {
2100                                 LOG_DEBUG("No available trackers");
2101                         }
2102                 }
2103
2104
2105                 LOG_ERROR("Sending QUIT to incoming: %d",error);
2106
2107                 atom.writeInt(PCP_QUIT,error);
2108                 return;         
2109         }
2110         
2111
2112         type = T_CIN;
2113         setStatus(S_CONNECTED);
2114
2115         atom.writeInt(PCP_OK,0);
2116
2117         // ask for update
2118         atom.writeParent(PCP_ROOT,1);
2119                 atom.writeParent(PCP_ROOT_UPDATE,0);
2120
2121         pcpStream = new PCPStream(remoteID);
2122
2123         int error = 0;
2124         BroadcastState bcs;
2125         while (!error && thread.active && !sock->eof())
2126         {
2127                 error = pcpStream->readPacket(*sock,bcs);
2128                 sys->sleepIdle();
2129
2130                 if (!servMgr->isRoot && !chanMgr->isBroadcasting())
2131                         error = PCP_ERROR_OFFAIR;
2132                 if (peercastInst->isQuitting)
2133                         error = PCP_ERROR_SHUTDOWN;
2134         }
2135
2136         pcpStream->flush(*sock);
2137
2138         error += PCP_ERROR_QUIT;
2139         atom.writeInt(PCP_QUIT,error);
2140
2141         LOG_DEBUG("PCP Incoming to %s closed: %d",rstr,error);
2142
2143 }
2144
2145 // -----------------------------------
2146 int Servent::outgoingProcMain(ThreadInfo *thread)
2147 {
2148 //      thread->lock();
2149         LOG_DEBUG("COUT started");
2150
2151         Servent *sv = (Servent*)thread->data;
2152                 
2153         GnuID noID;
2154         noID.clear();
2155         sv->pcpStream = new PCPStream(noID);
2156
2157         while (sv->thread.active)
2158         {
2159                 sv->setStatus(S_WAIT);
2160
2161                 if (chanMgr->isBroadcasting() && servMgr->autoServe)
2162                 {
2163                         ChanHit bestHit;
2164                         ChanHitSearch chs;
2165                         char ipStr[64];
2166
2167                         do
2168                         {
2169                                 bestHit.init();
2170
2171                                 if (servMgr->rootHost.isEmpty())
2172                                         break;
2173
2174                                 if (sv->pushSock)
2175                                 {
2176                                         sv->sock = sv->pushSock;
2177                                         sv->pushSock = NULL;
2178                                         bestHit.host = sv->sock->host;
2179                                         break;
2180                                 }
2181
2182                                 GnuID noID;
2183                                 noID.clear();
2184                                 ChanHitList *chl = chanMgr->findHitListByID(noID);
2185                                 if (chl)
2186                                 {
2187                                         // find local tracker
2188                                         chs.init();
2189                                         chs.matchHost = servMgr->serverHost;
2190                                         chs.waitDelay = MIN_TRACKER_RETRY;
2191                                         chs.excludeID = servMgr->sessionID;
2192                                         chs.trackersOnly = true;
2193                                         if (!chl->pickHits(chs))
2194                                         {
2195                                                 // else find global tracker
2196                                                 chs.init();
2197                                                 chs.waitDelay = MIN_TRACKER_RETRY;
2198                                                 chs.excludeID = servMgr->sessionID;
2199                                                 chs.trackersOnly = true;
2200                                                 chl->pickHits(chs);
2201                                         }
2202
2203                                         if (chs.numResults)
2204                                         {
2205                                                 bestHit = chs.best[0];
2206                                         }
2207                                 }
2208
2209
2210                                 unsigned int ctime = sys->getTime();
2211
2212                                 if ((!bestHit.host.ip) && ((ctime-chanMgr->lastYPConnect) > MIN_YP_RETRY))
2213                                 {
2214                                         bestHit.host.fromStrName(servMgr->rootHost.cstr(),DEFAULT_PORT);
2215                                         bestHit.yp = true;
2216                                         chanMgr->lastYPConnect = ctime;
2217                                 }
2218                                 sys->sleepIdle();
2219
2220                         }while (!bestHit.host.ip && (sv->thread.active));
2221
2222
2223                         if (!bestHit.host.ip)           // give up
2224                         {
2225                                 LOG_ERROR("COUT giving up");
2226                                 break;
2227                         }
2228
2229
2230                         bestHit.host.toStr(ipStr);
2231
2232                         int error=0;
2233                         try 
2234                         {
2235
2236                                 LOG_DEBUG("COUT to %s: Connecting..",ipStr);
2237
2238                                 if (!sv->sock)
2239                                 {
2240                                         sv->setStatus(S_CONNECTING);
2241                                         sv->sock = sys->createSocket();
2242                                         if (!sv->sock)
2243                                                 throw StreamException("Unable to create socket");
2244                                         sv->sock->open(bestHit.host);
2245                                         sv->sock->connect();
2246
2247                                 }
2248
2249                                 sv->sock->setReadTimeout(30000);
2250                                 AtomStream atom(*sv->sock);
2251
2252                                 sv->setStatus(S_HANDSHAKE);
2253
2254                                 Host rhost = sv->sock->host;
2255                                 atom.writeInt(PCP_CONNECT,1);
2256                                 handshakeOutgoingPCP(atom,rhost,sv->remoteID,sv->agent,bestHit.yp);
2257
2258                                 sv->setStatus(S_CONNECTED);
2259
2260                                 LOG_DEBUG("COUT to %s: OK",ipStr);
2261
2262                                 sv->pcpStream->init(sv->remoteID);
2263
2264                                 BroadcastState bcs;
2265                                 bcs.servent_id = sv->servent_id;
2266                                 error = 0;
2267                                 while (!error && sv->thread.active && !sv->sock->eof() && servMgr->autoServe)
2268                                 {
2269                                         error = sv->pcpStream->readPacket(*sv->sock,bcs);
2270
2271                                         sys->sleepIdle();
2272
2273                                         if (!chanMgr->isBroadcasting())
2274                                                 error = PCP_ERROR_OFFAIR;
2275                                         if (peercastInst->isQuitting)
2276                                                 error = PCP_ERROR_SHUTDOWN;
2277
2278                                         if (sv->pcpStream->nextRootPacket)
2279                                                 if (sys->getTime() > (sv->pcpStream->nextRootPacket+30))
2280                                                         error = PCP_ERROR_NOROOT;
2281                                 }
2282                                 sv->setStatus(S_CLOSING);
2283
2284                                 sv->pcpStream->flush(*sv->sock);
2285
2286                                 error += PCP_ERROR_QUIT;
2287                                 atom.writeInt(PCP_QUIT,error);
2288
2289                                 LOG_ERROR("COUT to %s closed: %d",ipStr,error);
2290
2291                         }catch(TimeoutException &e)
2292                         {
2293                                 LOG_ERROR("COUT to %s: timeout (%s)",ipStr,e.msg);
2294                                 sv->setStatus(S_TIMEOUT);
2295                         }catch(StreamException &e)
2296                         {
2297                                 LOG_ERROR("COUT to %s: %s",ipStr,e.msg);
2298                                 sv->setStatus(S_ERROR);
2299                         }
2300
2301                         try
2302                         {
2303                                 if (sv->sock)
2304                                 {
2305                                         sv->sock->close();
2306                                         delete sv->sock;
2307                                         sv->sock = NULL;
2308                                 }
2309
2310                         }catch(StreamException &) {}
2311
2312                         // don`t discard this hit if we caused the disconnect (stopped broadcasting)
2313                         if (error != (PCP_ERROR_QUIT+PCP_ERROR_OFFAIR))
2314                                 chanMgr->deadHit(bestHit);
2315
2316                 }
2317
2318                 sys->sleepIdle();
2319         }
2320
2321         sv->kill();
2322         sys->endThread(thread);
2323         LOG_DEBUG("COUT ended");
2324         return 0;
2325 }
2326 // -----------------------------------
2327 int Servent::outgoingProc(ThreadInfo *thread)
2328 {
2329         SEH_THREAD(outgoingProcMain, Servent::outgoingProc);
2330 }
2331 // -----------------------------------
2332 int Servent::incomingProcMain(ThreadInfo *thread)
2333 {
2334 //      thread->lock();
2335
2336         Servent *sv = (Servent*)thread->data;
2337         
2338         char ipStr[64];
2339         sv->sock->host.toStr(ipStr);
2340
2341         try 
2342         {
2343                 sv->handshakeIncoming();
2344         }catch(HTTPException &e)
2345         {
2346                 try
2347                 {
2348                         sv->sock->writeLine(e.msg);
2349                         if (e.code == 401)
2350                                 sv->sock->writeLine("WWW-Authenticate: Basic realm=\"PeerCast\"");
2351                         sv->sock->writeLine("");
2352                 }catch(StreamException &){}
2353                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2354         }catch(StreamException &e)
2355         {
2356                 LOG_ERROR("Incoming from %s: %s",ipStr,e.msg);
2357         }
2358
2359
2360         sv->kill();
2361         sys->endThread(thread);
2362         return 0;
2363 }
2364 // -----------------------------------
2365 int Servent::incomingProc(ThreadInfo *thread)
2366 {
2367         SEH_THREAD(incomingProcMain, Servent::incomingProc);
2368 }
2369 // -----------------------------------
2370 void Servent::processServent()
2371 {
2372         setStatus(S_HANDSHAKE);
2373
2374         handshakeIn();
2375
2376         if (!sock)
2377                 throw StreamException("Servent has no socket");
2378
2379         processGnutella();
2380 }
2381
2382 // -----------------------------------
2383 void Servent::processStream(bool doneHandshake,ChanInfo &chanInfo)
2384 {       
2385         if (!doneHandshake)
2386         {
2387                 setStatus(S_HANDSHAKE);
2388
2389                 if (!handshakeStream(chanInfo))
2390                         return;
2391         }
2392
2393         if (chanInfo.id.isSet())
2394         {
2395
2396                 chanID = chanInfo.id;
2397
2398                 LOG_CHANNEL("Sending channel: %s ",ChanInfo::getProtocolStr(outputProtocol));
2399
2400                 if (!waitForChannelHeader(chanInfo))
2401                         throw StreamException("Channel not ready");
2402
2403                 servMgr->totalStreams++;
2404
2405                 Host host = sock->host;
2406                 host.port = 0;  // force to 0 so we ignore the incoming port
2407
2408                 Channel *ch = chanMgr->findChannelByID(chanID);
2409                 if (!ch)
2410                         throw StreamException("Channel not found");
2411
2412                 if (outputProtocol == ChanInfo::SP_HTTP)
2413                 {
2414                         if ((addMetadata) && (chanMgr->icyMetaInterval))
2415                                 sendRawMetaChannel(chanMgr->icyMetaInterval);
2416                         else 
2417                                 sendRawChannel(true,true);
2418
2419                 }else if (outputProtocol == ChanInfo::SP_MMS)
2420                 {
2421                         if (nsSwitchNum)
2422                         {
2423                                 sendRawChannel(true,true);
2424                         }else
2425                         {
2426                                 sendRawChannel(true,false);
2427                         }
2428
2429                 }else if (outputProtocol  == ChanInfo::SP_PCP)
2430                 {
2431                         sendPCPChannel();
2432
2433                 } else if (outputProtocol  == ChanInfo::SP_PEERCAST)
2434                 {
2435                         sendPeercastChannel();
2436                 }
2437         }
2438
2439         setStatus(S_CLOSING);
2440 }
2441
2442 // -----------------------------------------
2443 #if 0
2444 // debug
2445                 FileStream file;
2446                 file.openReadOnly("c://test.mp3");
2447
2448                 LOG_DEBUG("raw file read");
2449                 char buf[4000];
2450                 int cnt=0;
2451                 while (!file.eof())
2452                 {
2453                         LOG_DEBUG("send %d",cnt++);
2454                         file.read(buf,sizeof(buf));
2455                         sock->write(buf,sizeof(buf));
2456
2457                 }
2458                 file.close();
2459                 LOG_DEBUG("raw file sent");
2460
2461         return;
2462 // debug
2463 #endif
2464 // -----------------------------------
2465 bool Servent::waitForChannelHeader(ChanInfo &info)
2466 {
2467         for(int i=0; i<30*10; i++)
2468         {
2469                 Channel *ch = chanMgr->findChannelByID(info.id);
2470                 if (!ch)
2471                         return false;
2472
2473                 if (ch->isPlaying() && (ch->rawData.writePos>0))
2474                         return true;
2475
2476                 if (!thread.active || !sock->active())
2477                         break;
2478                 sys->sleep(100);
2479         }
2480         return false;
2481 }
2482 // -----------------------------------
2483 void Servent::sendRawChannel(bool sendHead, bool sendData)
2484 {
2485         try
2486         {
2487
2488                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2489
2490                 Channel *ch = chanMgr->findChannelByID(chanID);
2491                 if (!ch)
2492                         throw StreamException("Channel not found");
2493
2494                 setStatus(S_CONNECTED);
2495
2496                 LOG_DEBUG("Starting Raw stream of %s at %d",ch->info.name.cstr(),streamPos);
2497
2498                 if (sendHead)
2499                 {
2500                         ch->headPack.writeRaw(*sock);
2501                         streamPos = ch->headPack.pos + ch->headPack.len;
2502                         LOG_DEBUG("Sent %d bytes header ",ch->headPack.len);
2503                 }
2504
2505                 if (sendData)
2506                 {
2507
2508                         unsigned int streamIndex = ch->streamIndex;
2509                         unsigned int connectTime = sys->getTime();
2510                         unsigned int lastWriteTime = connectTime;
2511
2512                         while ((thread.active) && sock->active())
2513                         {
2514                                 ch = chanMgr->findChannelByID(chanID);
2515
2516                                 if (ch)
2517                                 {
2518
2519                                         if (streamIndex != ch->streamIndex)
2520                                         {
2521                                                 streamIndex = ch->streamIndex;
2522                                                 streamPos = ch->headPack.pos;
2523                                                 LOG_DEBUG("sendRaw got new stream index");
2524                                         }
2525
2526                                         ChanPacket rawPack;
2527                                         if (ch->rawData.findPacket(streamPos,rawPack))
2528                                         {
2529                                                 if (syncPos != rawPack.sync)
2530                                                         LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2531                                                 syncPos = rawPack.sync+1;
2532
2533                                                 if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2534                                                 {
2535                                                         rawPack.writeRaw(*sock);
2536                                                         lastWriteTime = sys->getTime();
2537                                                 }
2538
2539                                                 if (rawPack.pos < streamPos)
2540                                                         LOG_DEBUG("raw: skip back %d",rawPack.pos-streamPos);
2541                                                 streamPos = rawPack.pos+rawPack.len;
2542                                         } else if (sock->readReady()) {
2543                                                 char c;
2544                                                 int error = sock->readUpto(&c, 1);
2545                                                 if (error == 0) sock->close();
2546                                         }
2547                                 }
2548
2549                                 if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2550                                         throw TimeoutException();
2551
2552                                 sys->sleepIdle();
2553                         }
2554                 }
2555         }catch(StreamException &e)
2556         {
2557                 LOG_ERROR("Stream channel: %s",e.msg);
2558         }
2559 }
2560
2561 #if 0
2562 // -----------------------------------
2563 void Servent::sendRawMultiChannel(bool sendHead, bool sendData)
2564 {
2565         try
2566         {
2567                 unsigned int chanStreamIndex[ChanMgr::MAX_CHANNELS];
2568                 unsigned int chanStreamPos[ChanMgr::MAX_CHANNELS];
2569                 GnuID chanIDs[ChanMgr::MAX_CHANNELS];
2570                 int numChanIDs=0;
2571                 for(int i=0; i<ChanMgr::MAX_CHANNELS; i++)
2572                 {
2573                         Channel *ch = &chanMgr->channels[i];
2574                         if (ch->isPlaying())
2575                                 chanIDs[numChanIDs++]=ch->info.id;
2576                 }
2577
2578
2579
2580                 setStatus(S_CONNECTED);
2581
2582
2583                 if (sendHead)
2584                 {
2585                         for(int i=0; i<numChanIDs; i++)
2586                         {
2587                                 Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2588                                 if (ch)
2589                                 {
2590                                         LOG_DEBUG("Starting RawMulti stream: %s",ch->info.name.cstr());
2591                                         ch->headPack.writeRaw(*sock);
2592                                         chanStreamPos[i] = ch->headPack.pos + ch->headPack.len;
2593                                         chanStreamIndex[i] = ch->streamIndex;
2594                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2595
2596                                 }
2597                         }
2598                 }
2599
2600                 if (sendData)
2601                 {
2602
2603                         unsigned int connectTime=sys->getTime();
2604
2605                         while ((thread.active) && sock->active())
2606                         {
2607
2608                                 for(int i=1; i<numChanIDs; i++)
2609                                 {
2610                                         Channel *ch = chanMgr->findChannelByID(chanIDs[i]);
2611                                         if (ch)
2612                                         {
2613                                                 if (chanStreamIndex[i] != ch->streamIndex)
2614                                                 {
2615                                                         chanStreamIndex[i] = ch->streamIndex;
2616                                                         chanStreamPos[i] = ch->headPack.pos;
2617                                                         LOG_DEBUG("sendRawMulti got new stream index for chan %d",i);
2618                                                 }
2619
2620                                                 ChanPacket rawPack;
2621                                                 if (ch->rawData.findPacket(chanStreamPos[i],rawPack))
2622                                                 {
2623                                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2624                                                                 rawPack.writeRaw(*sock);
2625
2626
2627                                                         if (rawPack.pos < chanStreamPos[i])
2628                                                                 LOG_DEBUG("raw: skip back %d",rawPack.pos-chanStreamPos[i]);
2629                                                         chanStreamPos[i] = rawPack.pos+rawPack.len;
2630
2631
2632                                                         //LOG("raw at %d: %d %d",streamPos,ch->rawData.getStreamPos(ch->rawData.firstPos),ch->rawData.getStreamPos(ch->rawData.lastPos));
2633                                                 }                                               
2634                                         }
2635                                         break;
2636                                 }
2637                                 
2638
2639                                 sys->sleepIdle();
2640                         }
2641                 }
2642         }catch(StreamException &e)
2643         {
2644                 LOG_ERROR("Stream channel: %s",e.msg);
2645         }
2646 }
2647 #endif
2648
2649 // -----------------------------------
2650 void Servent::sendRawMetaChannel(int interval)
2651 {
2652
2653         try
2654         {
2655                 Channel *ch = chanMgr->findChannelByID(chanID);
2656                 if (!ch)
2657                         throw StreamException("Channel not found");
2658
2659                 sock->setWriteTimeout(DIRECT_WRITE_TIMEOUT*1000);
2660
2661                 setStatus(S_CONNECTED);
2662
2663                 LOG_DEBUG("Starting Raw Meta stream of %s (metaint: %d) at %d",ch->info.name.cstr(),interval,streamPos);
2664
2665
2666                 String lastTitle,lastURL;
2667
2668                 int             lastMsgTime=sys->getTime();
2669                 bool    showMsg=true;
2670
2671                 char buf[16384];
2672                 int bufPos=0;
2673
2674                 if ((interval > sizeof(buf)) || (interval < 1))
2675                         throw StreamException("Bad ICY Meta Interval value");
2676
2677                 unsigned int connectTime = sys->getTime();
2678                 unsigned int lastWriteTime = connectTime;
2679
2680                 streamPos = 0;          // raw meta channel has no header (its MP3)
2681
2682                 while ((thread.active) && sock->active())
2683                 {
2684                         ch = chanMgr->findChannelByID(chanID);
2685
2686                         if (ch)
2687                         {
2688
2689                                 ChanPacket rawPack;
2690                                 if (ch->rawData.findPacket(streamPos,rawPack))
2691                                 {
2692
2693                                         if (syncPos != rawPack.sync)
2694                                                 LOG_ERROR("Send skip: %d",rawPack.sync-syncPos);
2695                                         syncPos = rawPack.sync+1;
2696
2697                                         MemoryStream mem(rawPack.data,rawPack.len);
2698
2699                                         if (rawPack.type == ChanPacket::T_DATA)
2700                                         {
2701
2702                                                 int len = rawPack.len;
2703                                                 char *p = rawPack.data;
2704                                                 while (len)
2705                                                 {
2706                                                         int rl = len;
2707                                                         if ((bufPos+rl) > interval)
2708                                                                 rl = interval-bufPos;
2709                                                         memcpy(&buf[bufPos],p,rl);
2710                                                         bufPos+=rl;
2711                                                         p+=rl;
2712                                                         len-=rl;
2713
2714                                                         if (bufPos >= interval)
2715                                                         {
2716                                                                 bufPos = 0;     
2717                                                                 sock->write(buf,interval);
2718                                                                 lastWriteTime = sys->getTime();
2719
2720                                                                 if (chanMgr->broadcastMsgInterval)
2721                                                                         if ((sys->getTime()-lastMsgTime) >= chanMgr->broadcastMsgInterval)
2722                                                                         {
2723                                                                                 showMsg ^= true;
2724                                                                                 lastMsgTime = sys->getTime();
2725                                                                         }
2726
2727                                                                 String *metaTitle = &ch->info.track.title;
2728                                                                 if (!ch->info.comment.isEmpty() && (showMsg))
2729                                                                         metaTitle = &ch->info.comment;
2730
2731
2732                                                                 if (!metaTitle->isSame(lastTitle) || !ch->info.url.isSame(lastURL))
2733                                                                 {
2734
2735                                                                         char tmp[1024];
2736                                                                         String title,url;
2737
2738                                                                         title = *metaTitle;
2739                                                                         url = ch->info.url;
2740
2741                                                                         title.convertTo(String::T_META);
2742                                                                         url.convertTo(String::T_META);
2743
2744                                                                         sprintf(tmp,"StreamTitle='%s';StreamUrl='%s';\0",title.cstr(),url.cstr());
2745                                                                         int len = ((strlen(tmp) + 15+1) / 16);
2746                                                                         sock->writeChar(len);
2747                                                                         sock->write(tmp,len*16);
2748
2749                                                                         lastTitle = *metaTitle;
2750                                                                         lastURL = ch->info.url;
2751
2752                                                                         LOG_DEBUG("StreamTitle: %s, StreamURL: %s",lastTitle.cstr(),lastURL.cstr());
2753
2754                                                                 }else
2755                                                                 {
2756                                                                         sock->writeChar(0);                                     
2757                                                                 }
2758
2759                                                         }
2760                                                 }
2761                                         }
2762                                         streamPos = rawPack.pos + rawPack.len;
2763                                 }
2764                         }
2765                         if ((sys->getTime()-lastWriteTime) > DIRECT_WRITE_TIMEOUT)
2766                                 throw TimeoutException();
2767
2768                         sys->sleepIdle();
2769
2770                 }
2771         }catch(StreamException &e)
2772         {
2773                 LOG_ERROR("Stream channel: %s",e.msg);
2774         }
2775 }
2776 // -----------------------------------
2777 void Servent::sendPeercastChannel()
2778 {
2779         try
2780         {
2781                 setStatus(S_CONNECTED);
2782
2783                 Channel *ch = chanMgr->findChannelByID(chanID);
2784                 if (!ch)
2785                         throw StreamException("Channel not found");
2786
2787                 LOG_DEBUG("Starting PeerCast stream: %s",ch->info.name.cstr());
2788
2789                 sock->writeTag("PCST");
2790
2791                 ChanPacket pack;
2792
2793                 ch->headPack.writePeercast(*sock);
2794
2795                 pack.init(ChanPacket::T_META,ch->insertMeta.data,ch->insertMeta.len,ch->streamPos);
2796                 pack.writePeercast(*sock);
2797         
2798                 streamPos = 0;
2799                 unsigned int syncPos=0;
2800                 while ((thread.active) && sock->active())
2801                 {
2802                         ch = chanMgr->findChannelByID(chanID);
2803                         if (ch)
2804                         {
2805
2806                                 ChanPacket rawPack;
2807                                 if (ch->rawData.findPacket(streamPos,rawPack))
2808                                 {
2809                                         if ((rawPack.type == ChanPacket::T_DATA) || (rawPack.type == ChanPacket::T_HEAD))
2810                                         {
2811                                                 sock->writeTag("SYNC");
2812                                                 sock->writeShort(4);
2813                                                 sock->writeShort(0);
2814                                                 sock->write(&syncPos,4);
2815                                                 syncPos++;
2816
2817                                                 rawPack.writePeercast(*sock);
2818                                         }
2819                                         streamPos = rawPack.pos + rawPack.len;
2820                                 }
2821                         }
2822                         sys->sleepIdle();
2823                 }
2824
2825         }catch(StreamException &e)
2826         {
2827                 LOG_ERROR("Stream channel: %s",e.msg);
2828         }
2829 }
2830
2831 //WLock canStreamLock;
2832
2833 // -----------------------------------
2834 void Servent::sendPCPChannel()
2835 {
2836         bool skipCheck = false;
2837         unsigned int ptime = 0;
2838         int npacket = 0, upsize = 0;
2839
2840         Channel *ch = chanMgr->findChannelByID(chanID);
2841         if (!ch)
2842                 throw StreamException("Channel not found");
2843
2844         AtomStream atom(*sock);
2845
2846         pcpStream = new PCPStream(remoteID);
2847         int error=0;
2848
2849         try
2850         {
2851
2852                 LOG_DEBUG("Starting PCP stream of channel at %d",streamPos);
2853
2854
2855 //              setStatus(S_CONNECTED);
2856
2857                 //canStreamLock.on();
2858                 //thread.active = canStream(ch);
2859                 //setStatus(S_CONNECTED);
2860                 //canStreamLock.off();
2861
2862                 lastSkipTime = 0;
2863                 lastSkipCount = 0;
2864                 waitPort = 0;
2865
2866                 if (thread.active){
2867                         atom.writeParent(PCP_CHAN,3 + ((sendHeader)?1:0));
2868                                 atom.writeBytes(PCP_CHAN_ID,chanID.id,16);
2869                                 ch->info.writeInfoAtoms(atom);
2870                                 ch->info.writeTrackAtoms(atom);
2871                                 if (sendHeader)
2872                                 {
2873                                         atom.writeParent(PCP_CHAN_PKT,3);
2874                                         atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2875                                         atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
2876                                         atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
2877
2878                                         if (streamPos == 0)
2879                                                 streamPos = ch->headPack.pos+ch->headPack.len;
2880                                         LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
2881                                 }
2882                 }
2883
2884                 unsigned int streamIndex = ch->streamIndex;
2885
2886                 ChanPacket rawPack;
2887                 char pbuf[ChanPacket::MAX_DATALEN*3];
2888                 MemoryStream mems(pbuf,sizeof(pbuf));
2889                 AtomStream atom2(mems);
2890
2891                 while (thread.active)
2892                 {
2893
2894                         Channel *ch = chanMgr->findChannelByID(chanID);
2895
2896                         if (ch)
2897                         {
2898
2899                                 if (streamIndex != ch->streamIndex)
2900                                 {
2901                                         streamIndex = ch->streamIndex;
2902                                         streamPos = ch->headPack.pos;
2903                                         LOG_DEBUG("sendPCPStream got new stream index");                                                
2904                                 }
2905
2906                                 mems.rewind();
2907
2908                                 if (ch->rawData.findPacket(streamPos,rawPack))
2909                                 {
2910                                         if ((streamPos < rawPack.pos) && !rawPack.skip){
2911                                                 if (skipCheck){
2912                                                         char tmp[32];
2913                                                         getHost().IPtoStr(tmp);
2914                                                         LOG_NETWORK("##### send skipping ##### %d (%d, %d) -> %s", (rawPack.pos - streamPos), streamPos, rawPack.pos, tmp);
2915
2916                                                         if (sys->getTime() == lastSkipTime) {
2917                                                                 LOG_DEBUG("##### skip all buffer");
2918                                                                 streamPos = ch->rawData.getLatestPos();
2919                                                                 continue;
2920                                                         }
2921
2922                                                         lastSkipTime = sys->getTime();
2923                                                         lastSkipCount++;
2924                                                 } else {
2925                                                         skipCheck = true;
2926                                                 }
2927                                         }
2928
2929                                         if (rawPack.type == ChanPacket::T_HEAD)
2930                                         {
2931                                                 atom2.writeParent(PCP_CHAN,2);
2932                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2933                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2934                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
2935                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2936                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2937
2938                                                 sock->write(pbuf, mems.getPosition());
2939                                         }else if (rawPack.type == ChanPacket::T_DATA)
2940                                         {
2941                                                 atom2.writeParent(PCP_CHAN,2);
2942                                                         atom2.writeBytes(PCP_CHAN_ID,chanID.id,16);
2943                                                         atom2.writeParent(PCP_CHAN_PKT,3);
2944                                                                 atom2.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_DATA);
2945                                                                 atom2.writeInt(PCP_CHAN_PKT_POS,rawPack.pos);
2946                                                                 atom2.writeBytes(PCP_CHAN_PKT_DATA,rawPack.data,rawPack.len);
2947
2948 #ifdef WIN32
2949                                                 sock->bufferingWrite(pbuf, mems.getPosition());
2950                                                 lastSkipTime = sock->bufList.lastSkipTime;
2951                                                 lastSkipCount = sock->bufList.skipCount;
2952 #else
2953                                                 sock->write(pbuf, mems.getPosition());
2954 #endif
2955                                         }
2956
2957                                         if (rawPack.pos < streamPos)
2958                                                 LOG_DEBUG("pcp: skip back %d",rawPack.pos-streamPos);
2959
2960                                         //LOG_DEBUG("Sending %d-%d (%d,%d,%d)",rawPack.pos,rawPack.pos+rawPack.len,ch->streamPos,ch->rawData.getLatestPos(),ch->rawData.getOldestPos());
2961
2962                                         streamPos = rawPack.pos+rawPack.len;
2963                                 }
2964                         } else {
2965                                 throw StreamException("Channel not found");
2966                         }
2967
2968 #ifdef WIN32
2969                         sock->bufferingWrite(NULL, 0);
2970                         lastSkipTime = sock->bufList.lastSkipTime;
2971                         lastSkipCount = sock->bufList.skipCount;
2972 #endif
2973                         BroadcastState bcs;
2974                         bcs.servent_id = servent_id;
2975 //                      error = pcpStream->readPacket(*sock,bcs);
2976
2977                         unsigned int t = sys->getTime();
2978                         if (t != ptime) {
2979                                 ptime = t;
2980                                 npacket = MAX_PROC_PACKETS;
2981                                 upsize = MAX_OUTWARD_SIZE;
2982                         }
2983
2984                         int len = pcpStream->flushUb(*sock, upsize);
2985                         upsize -= len;
2986
2987                         while (npacket > 0 && sock->readReady()) {
2988                                 npacket--;
2989                                 error = pcpStream->readPacket(*sock,bcs);
2990                                 if (error)
2991                                         throw StreamException("PCP exception");
2992                         }
2993
2994                         sys->sleepIdle();
2995
2996                 }
2997
2998                 LOG_DEBUG("PCP channel stream closed normally.");
2999
3000         }catch(StreamException &e)
3001         {
3002                 LOG_ERROR("Stream channel: %s",e.msg);
3003         }
3004
3005         try
3006         {
3007                 pcpStream->flush(*sock);
3008                 atom.writeInt(PCP_QUIT,error);
3009         }catch(StreamException &) {}
3010
3011 }
3012
3013 // -----------------------------------
3014 int Servent::serverProcMain(ThreadInfo *thread)
3015 {
3016 //      thread->lock();
3017
3018
3019         Servent *sv = (Servent*)thread->data;
3020
3021         try 
3022         {
3023                 if (!sv->sock)
3024                         throw StreamException("Server has no socket");
3025
3026                 sv->setStatus(S_LISTENING);
3027
3028
3029                 char servIP[64];
3030                 sv->sock->host.toStr(servIP);
3031
3032                 if (servMgr->isRoot)
3033                         LOG_DEBUG("Root Server started: %s",servIP);
3034                 else
3035                         LOG_DEBUG("Server started: %s",servIP);
3036                 
3037
3038                 while ((thread->active) && (sv->sock->active()))
3039                 {
3040                         if (servMgr->numActiveOnPort(sv->sock->host.port) < servMgr->maxServIn)
3041                         {
3042                                 ClientSocket *cs = sv->sock->accept();
3043
3044                                 // \95s\90³\82È\83\\81[\83X\83A\83h\83\8c\83X(IPv4\83}\83\8b\83`\83L\83\83\83X\83g)\82ð\8f\9c\8aO
3045                                 if (cs && (((cs->host.ip >> 24) & 0xF0) == 0xE0))
3046                                 {
3047                                         char ip[64];
3048                                         cs->host.toStr(ip);
3049                                         cs->close();
3050                                         LOG_ERROR("reject incoming multicast address: %s", ip);
3051                                         peercastApp->notifyMessage(ServMgr::NT_PEERCAST, "reject multicast address");
3052                                 } else
3053                                 if (cs)
3054                                 {       
3055                                         LOG_DEBUG("accepted incoming");
3056                                         Servent *ns = servMgr->allocServent();
3057                                         if (ns)
3058                                         {
3059                                                 servMgr->lastIncoming = sys->getTime();
3060                                                 ns->servPort = sv->sock->host.port;
3061                                                 ns->networkID = servMgr->networkID;
3062                                                 ns->initIncoming(cs,sv->allow);
3063                                         }else
3064                                                 LOG_ERROR("Out of servents");
3065                                 }
3066                         }
3067                         sys->sleep(100);
3068                 }
3069         }catch(StreamException &e)
3070         {
3071                 LOG_ERROR("Server Error: %s:%d",e.msg,e.err);
3072         }
3073
3074         
3075         LOG_DEBUG("Server stopped");
3076
3077         sv->kill();
3078         sys->endThread(thread);
3079         return 0;
3080 }
3081
3082 // -----------------------------------
3083 int Servent::serverProc(ThreadInfo *thread)
3084 {
3085         SEH_THREAD(serverProcMain, Servent::serverProc);
3086 }
3087  
3088 // -----------------------------------
3089 bool    Servent::writeVariable(Stream &s, const String &var)
3090 {
3091         char buf[1024];
3092
3093         if (var == "type")
3094                 strcpy(buf,getTypeStr());
3095         else if (var == "status")
3096                 strcpy(buf,getStatusStr());
3097         else if (var == "address")
3098         {
3099                 if (servMgr->enableGetName) //JP-EX s
3100                 {
3101                         getHost().toStr(buf);
3102                         char h_ip[64];
3103                         Host h = getHost();
3104                         h.toStr(h_ip);
3105
3106 /*                      ChanHitList *hits[ChanMgr::MAX_HITLISTS];
3107                         int numHits=0;
3108                         for(int i=0; i<ChanMgr::MAX_HITLISTS; i++)
3109                         {
3110                                 ChanHitList *chl = &chanMgr->hitlists[i];
3111                                 if (chl->isUsed())
3112                                         hits[numHits++] = chl;
3113                         }
3114                         bool ishit,isfw;
3115                         ishit = isfw = false;
3116                         int numRelay = 0;
3117                         if (numHits) 
3118                         {
3119                                 for(int k=0; k<numHits; k++)
3120                                 {
3121                                         ChanHitList *chl = hits[k];
3122                                         if (chl->isUsed())
3123                                         {
3124                                                 for (int j=0; j<ChanHitList::MAX_HITS; j++)
3125                                                 {
3126                                                         ChanHit *hit = &chl->hits[j];
3127                                                         if (hit->host.isValid() && (h.ip == hit->host.ip))
3128                                                         {
3129                                                                 ishit = true;
3130                                                                 if (hit->firewalled)
3131                                                                         isfw = true;
3132                                                                 numRelay += hit->numRelays;
3133                                                         }
3134                                                 }
3135                                         }
3136                                 }
3137                         }
3138                         strcpy(buf,"");
3139                         if (ishit == true)
3140                         {
3141                                 if (isfw == true)
3142                                 {
3143                                         if (numRelay== 0)
3144                                                 strcat(buf,"<font color=red>");
3145                                         else 
3146                                                 strcat(buf,"<font color=orange>");
3147                                 }
3148                                 else
3149                                         strcat(buf,"<font color=green>");
3150                         }
3151                         strcat(buf,h_ip);
3152                         char h_name[128];
3153                         if (ClientSocket::getHostname(h_name,h.ip))
3154                         {
3155                                 strcat(buf,"[");
3156                                 strcat(buf,h_name);
3157                                 strcat(buf,"]");
3158                         }
3159                         if (ishit == true) 
3160                         {
3161                                 strcat(buf,"</font>");
3162                         }
3163                 } //JP-EX e*/
3164
3165
3166                         bool isfw = false;
3167                         bool isRelay = true;
3168                         int numRelay = 0;
3169                         ChanHitList *chl = chanMgr->findHitListByID(chanID);
3170                         if (chl){
3171                                 ChanHit *hit = chl->hit;
3172                                 while(hit){
3173                                         if (hit->host.isValid() && (h.ip == hit->host.ip)){
3174                                                 isfw = hit->firewalled;
3175                                                 isRelay = hit->relay;
3176                                                 numRelay = hit->numRelays;
3177                                                 break;
3178                                         }
3179                                         hit = hit->next;
3180                                 }
3181                         }
3182                         strcpy(buf, "");
3183                         if (isfw){
3184                                 if (numRelay == 0){
3185                                         strcat(buf,"<font color=red>");
3186                                 } else {
3187                                         strcat(buf,"<font color=orange>");
3188                                 }
3189                         } else {
3190                                 if (!isRelay){
3191                                         if (numRelay==0){
3192                                                 strcpy(buf,"<font color=purple>");
3193                                         } else {
3194                                                 strcpy(buf,"<font color=blue>");
3195                                         }
3196                                 } else {
3197                                         strcpy(buf,"<font color=green>");
3198                                 }
3199                         }
3200                         strcat(buf,h_ip);
3201                         char h_name[128];
3202                         if (ClientSocket::getHostname(h_name,sizeof(h_name),h.ip)) //JP-MOD(BOF\91Î\8dô)
3203                         {
3204                                 strcat(buf,"[");
3205                                 strcat(buf,h_name);
3206                                 strcat(buf,"]");
3207                         }
3208                         strcat(buf,"</font>");
3209                 }
3210                 else 
3211                         getHost().toStr(buf);
3212         }
3213         else if (var == "agent")
3214                 strcpy(buf,agent.cstr());
3215         else if (var == "bitrate")
3216         {
3217                 if (sock)
3218                 {
3219                         unsigned int tot = sock->bytesInPerSec+sock->bytesOutPerSec;
3220                         sprintf(buf,"%.1f",BYTES_TO_KBPS(tot));
3221                 }else
3222                         strcpy(buf,"0");
3223         }else if (var == "uptime")
3224         {
3225                 String uptime;
3226                 if (lastConnect)
3227                         uptime.setFromStopwatch(sys->getTime()-lastConnect);
3228                 else
3229                         uptime.set("-");
3230                 strcpy(buf,uptime.cstr());
3231         }else if (var.startsWith("gnet."))
3232         {
3233
3234                 float ctime = (float)(sys->getTime()-lastConnect);
3235                 if (var == "gnet.packetsIn")
3236                         sprintf(buf,"%d",gnuStream.packetsIn);
3237                 else if (var == "gnet.packetsInPerSec")
3238                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsIn)/ctime:0);
3239                 else if (var == "gnet.packetsOut")
3240                         sprintf(buf,"%d",gnuStream.packetsOut);
3241                 else if (var == "gnet.packetsOutPerSec")
3242                         sprintf(buf,"%.1f",ctime>0?((float)gnuStream.packetsOut)/ctime:0);
3243                 else if (var == "gnet.normQueue")
3244                         sprintf(buf,"%d",outPacketsNorm.numPending());
3245                 else if (var == "gnet.priQueue")
3246                         sprintf(buf,"%d",outPacketsPri.numPending());
3247                 else if (var == "gnet.flowControl")
3248                         sprintf(buf,"%d",flowControl?1:0);
3249                 else if (var == "gnet.routeTime")
3250                 {
3251                         int nr = seenIDs.numUsed();
3252                         unsigned int tim = sys->getTime()-seenIDs.getOldest();
3253                 
3254                         String tstr;
3255                         tstr.setFromStopwatch(tim);
3256
3257                         if (nr)
3258                                 strcpy(buf,tstr.cstr());
3259                         else
3260                                 strcpy(buf,"-");
3261                 }
3262                 else
3263                         return false;
3264
3265         }else
3266                 return false;
3267
3268         s.writeString(buf);
3269
3270         return true;
3271 }