OSDN Git Service

VP25-3マージ
authoreru <eru01@users.sourceforge.jp>
Wed, 19 Dec 2007 17:06:06 +0000 (17:06 +0000)
committereru <eru01@users.sourceforge.jp>
Wed, 19 Dec 2007 17:06:06 +0000 (17:06 +0000)
16 files changed:
PeerCast.root/PeerCast/core/common/channel.cpp
PeerCast.root/PeerCast/core/common/servent.cpp
PeerCast.root/PeerCast/core/common/servhs.cpp
PeerCast.root/PeerCast/core/common/servmgr.cpp
PeerCast.root/PeerCast/core/common/servmgr.h
PeerCast.root/PeerCast/core/common/sys.h
PeerCast.root/PeerCast/core/common/version2.h
PeerCast.root/PeerCast/core/win32/wsocket.cpp
c:/Git/PeerCast.root/PeerCast/core/common/channel.cpp
c:/Git/PeerCast.root/PeerCast/core/common/servent.cpp
c:/Git/PeerCast.root/PeerCast/core/common/servhs.cpp
c:/Git/PeerCast.root/PeerCast/core/common/servmgr.cpp
c:/Git/PeerCast.root/PeerCast/core/common/servmgr.h
c:/Git/PeerCast.root/PeerCast/core/common/sys.h
c:/Git/PeerCast.root/PeerCast/core/common/version2.h
c:/Git/PeerCast.root/PeerCast/core/win32/wsocket.cpp

index f80f0da..4ca13be 100644 (file)
@@ -442,6 +442,8 @@ THREAD_PROC Channel::stream(ThreadInfo *thread)
                        LOG_DEBUG("Channel sleeping for %d seconds",diff);
                        for(unsigned int i=0; i<diff; i++)
                        {
+                               if (ch->info.lastPlayEnd == 0) // reconnected
+                                       break;
                                if (!thread->active || peercastInst->isQuitting){
                                        thread->active = false;
                                        break;
@@ -675,16 +677,6 @@ void PeercastSource::stream(Channel *ch)
                                        }
                                }
 
-                               // find tracker
-                               unsigned int ctime = sys->getTime();
-                               if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
-                                       if (ch->trackerHit.lastContact + 30 < ctime){
-                                               ch->sourceHost = ch->trackerHit;
-                                               ch->trackerHit.lastContact = ctime;
-                                               LOG_DEBUG("use saved tracker");
-                                       }
-                               }
-
                                // else find global tracker
                                if (!ch->sourceHost.host.ip)
                                {
@@ -699,6 +691,16 @@ void PeercastSource::stream(Channel *ch)
                                                LOG_DEBUG("use global tracker");
                                        }
                                }
+
+                               // find tracker
+                               unsigned int ctime = sys->getTime();
+                               if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
+                                       if (ch->trackerHit.lastContact + 30 < ctime){
+                                               ch->sourceHost = ch->trackerHit;
+                                               ch->trackerHit.lastContact = ctime;
+                                               LOG_DEBUG("use saved tracker");
+                                       }
+                               }
                        }
 
                        chanMgr->hitlistlock.off();
@@ -1539,7 +1541,7 @@ int Channel::readStream(Stream &in,ChannelStream *source)
                                        break;
 
                                //if (rawData.writePos > 0)
-                               if (rawData.lastWriteTime > 0)
+                               if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
                                {
                                        if (isBroadcasting())
                                        {                                       
@@ -1830,8 +1832,10 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
 
        lock.on();
 
+       unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
        unsigned int fpos = getStreamPos(firstPos);
-       if (spos < fpos)
+       unsigned int lpos = getStreamPos(lastPos);
+       if (spos < fpos && (fpos < lpos || spos > lpos + bound))
                spos = fpos;
 
 
@@ -1839,7 +1843,7 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
        {
                //ChanPacket &p = packets[i%MAX_PACKETS];
                ChanPacketv &p = packets[i%MAX_PACKETS];
-               if (p.pos >= spos)
+               if (p.pos >= spos && p.pos - spos <= bound)
                {
                        pack.init(p);
                        lock.off();
@@ -1904,6 +1908,7 @@ bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos)
                        if (packets[lastPos%MAX_PACKETS].type == ChanPacket::T_HEAD) lpos = 0;
                        if (lpos && (diff == 0 || diff > 0xfff00000)) {
                                LOG_DEBUG("*   latest pos=%d, pack pos=%d", getLatestPos(), pack.pos);
+                               lastSkipTime = sys->getTime();
                                return false;
                        }
                }
@@ -2217,6 +2222,8 @@ Channel *ChanMgr::findAndRelay(ChanInfo &info)
        } else if (!(c->thread.active)){
                c->thread.active = true;
                c->thread.finish = false;
+               c->info.lastPlayStart = 0; // reconnect 
+               c->info.lastPlayEnd = 0;
                if (c->finthread){
                        c->finthread->finish = true;
                        c->finthread = NULL;
@@ -3101,7 +3108,8 @@ void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,bool isFul
                }
                c = c->next;
        }
-       int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+       unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+       int diff = servMgr->maxRelays - numRelay;
        if (ch->localRelays()){
                if (noRelay > diff){
                        noRelay = diff;
@@ -3111,19 +3119,13 @@ void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,bool isFul
                needRate = 0;
        }
 
-       // for PCRaw (relay) start.
-       bool force_off = false;
-
-       if(isIndexTxt(ch))
-               force_off = true;
-       // for PCRaw (relay) end.
-
 //     ratefull = servMgr->bitrateFull(needRate+bitrate);
        ratefull = (servMgr->maxBitrateOut < allRate + needRate + ch->info.bitrate);
 
-       //relay =       (!relayfull) && (!chfull) && (!ratefull) && ((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays);
-       relay = (!relayfull || force_off) && (!chfull) && (!ratefull)
-               && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays) || force_off);       // for PCRaw (relay) (force_off)
+       if (!isIndexTxt(ch))
+               relay = (!relayfull) && (!chfull) && (!ratefull) && (numRelay + noRelay < servMgr->maxRelays);
+       else
+               relay = (!chfull) && (!ratefull); // for PCRaw (relay)
 
 /*     if (relayfull){
                LOG_DEBUG("Reject by relay full");
index bb12277..ad18d0c 100644 (file)
@@ -154,12 +154,6 @@ void       Servent::kill()
                                        else
                                                chl->hit = next;
 
-       char ip0str[64],ip1str[64];
-       chh->rhost[0].toStr(ip0str);
-       chh->rhost[1].toStr(ip1str);
-       LOG_DEBUG("Delete hit (servent_id=%d): F%dT%dR%d %s/%s",
-               chh->servent_id,chh->firewalled,chh->tracker,chh->relay,ip0str,ip1str);
-
                                        delete chh;
                                        chh = next;
                                }
@@ -374,6 +368,7 @@ void Servent::initIncoming(ClientSocket *s, unsigned int a)
                allow = a;
                thread.data = this;
                thread.func = incomingProc;
+               thread.finish = false;
 
                setStatus(S_PROTOCOL);
 
@@ -862,21 +857,52 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
        bool chanFound=false;
        bool chanReady=false;
 
+       ChanHit *sourceHit = NULL;
+
        Channel *ch = chanMgr->findChannelByID(chanInfo.id);
        if (ch)
        {
                sendHeader = true;
-//             if (reqPos)
-//             {
-//                     streamPos = ch->rawData.findOldestPos(reqPos);
-//             }else
-//             {
-                       streamPos = ch->rawData.getLatestPos();
-//             }
+               if (reqPos || !isIndexTxt(&chanInfo))
+               {
+                       streamPos = ch->rawData.findOldestPos(reqPos);
+                       //streamPos = ch->rawData.getLatestPos();
+               }else
+               {
+                       streamPos = ch->rawData.getLatestPos();
+               }
 
                chanID = chanInfo.id;
                canStreamLock.on();
                chanReady = canStream(ch);
+               if (0 && !chanReady)
+               {
+                       if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+                       {
+                               sourceHit = &ch->sourceHost;  // send source host info
+
+                               if (ch->info.getUptime() > 60)  // if stable
+                               {
+                                       // connect "this" host later
+                                       ChanHit nh;
+                                       nh.init();
+                                       nh.chanID = chanID;
+                                       nh.rhost[0] = getHost();
+                                       chanMgr->addHit(nh);
+                               }
+
+                               char tmp[50];
+                               getHost().toStr(tmp);
+                               LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
+                               ch->bump = true;
+                       }
+                       else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+                       {
+                               chanReady = canStream(ch);
+                               if (!chanReady)
+                                       LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+                       }
+               }
                if (!chanReady) type = T_INCOMING;
                thread.active = chanReady;
                setStatus(S_CONNECTED);
@@ -1032,7 +1058,8 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                
                                // search for up to 8 other hits
                                int cnt=0;
-                               for(int i=0; i<8; i++)
+                               int i;
+                               for(i=0; i<8; i++)
                                {
                                        best.init();
 
@@ -1080,10 +1107,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                        if (!best.host.ip)
                                                break;
 
-                                       best.writeAtoms(atom2,chanInfo.id);                             
+                                       best.writeAtoms(atom2,chanInfo.id);                        
                                        cnt++;
                                }
 
+                               if (sourceHit) {
+                                       char tmp[50];
+                                       sourceHit->writeAtoms(atom2, chanInfo.id);
+                                       chs.best[i].host.toStr(tmp);
+                                       LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+                                       best.host.ip = sourceHit->host.ip;
+                               }
+
                                if (!best.host.ip){
                                        char tmp[50];
 //                                     chanMgr->hitlistlock.on();
@@ -2792,11 +2827,12 @@ void Servent::sendPCPChannel()
                                if (sendHeader)
                                {
                                        atom.writeParent(PCP_CHAN_PKT,3);
-                                               atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
-                                               atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
-                                               atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
+                                       atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
+                                       atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
+                                       atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
 
-                                       streamPos = ch->headPack.pos+ch->headPack.len;
+                                       if (streamPos == 0)
+                                               streamPos = ch->headPack.pos+ch->headPack.len;
                                        LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
                                }
                }
@@ -2882,7 +2918,6 @@ void Servent::sendPCPChannel()
                                        streamPos = rawPack.pos+rawPack.len;
                                }
                        } else {
-                               pcpStream->flush(*sock);
                                throw StreamException("Channel not found");
                        }
 
@@ -2913,6 +2948,7 @@ void Servent::sendPCPChannel()
 
        try
        {
+               pcpStream->flush(*sock);
                atom.writeInt(PCP_QUIT,error);
        }catch(StreamException &) {}
 
index aa7cc55..be97958 100644 (file)
@@ -414,14 +414,21 @@ bool Servent::canStream(Channel *ch)
 
        if (!isPrivate())
        {
+               if  (!ch->isPlaying() || ch->isFull() || ((type == T_DIRECT) && servMgr->directFull()))
+                       return false;
+
+               if (!isIndexTxt(ch) && (type == T_RELAY) && (servMgr->relaysFull()))
+                       return false;
+
                Channel *c = chanMgr->channel;
                int noRelay = 0;
                unsigned int needRate = 0;
                unsigned int allRate = 0;
                while(c){
                        if (c->isPlaying()){
-                               allRate += c->info.bitrate * c->localRelays();
-                               if ((c != ch) && (c->localRelays() == 0)){
+                               int nlr = c->localRelays();
+                               allRate += c->info.bitrate * nlr;
+                               if ((c != ch) && (nlr == 0)){
                                        if(!isIndexTxt(c))      // for PCRaw (relay)
                                                noRelay++;
                                        needRate+=c->info.bitrate;
@@ -429,7 +436,8 @@ bool Servent::canStream(Channel *ch)
                        }
                        c = c->next;
                }
-               int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+               unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+               int diff = servMgr->maxRelays - numRelay;
                if (ch->localRelays()){
                        if (noRelay > diff){
                                noRelay = diff;
@@ -439,23 +447,24 @@ bool Servent::canStream(Channel *ch)
                        needRate = 0;
                }
 
-               // for PCRaw (relay) start.
-               bool force_off = true;
-
-               if(isIndexTxt(ch))
-                       force_off = false;
-               // for PCRaw (relay) end.
-
                LOG_DEBUG("Relay check: Max=%d Now=%d Need=%d ch=%d",
                        servMgr->maxBitrateOut, allRate, needRate, ch->info.bitrate);
-               if  (   !ch->isPlaying()
-                               || ch->isFull()
-//                             || servMgr->bitrateFull(needRate+ch->getBitrate())
-                               || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
-                               || ((type == T_RELAY) && servMgr->relaysFull() && force_off)    // for PCRaw (relay) (force_off)
-                               || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off)        // for PCRaw (relay) (force_off)
-                               || ((type == T_DIRECT) && servMgr->directFull())
-               ){
+               //              if  (   !ch->isPlaying()
+               //                              || ch->isFull()
+               //                              || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+               //                              || ((type == T_RELAY) && servMgr->relaysFull() && force_off)    // for PCRaw (relay) (force_off)
+               //                              || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off)        // for PCRaw (relay) (force_off)
+               //                              || ((type == T_DIRECT) && servMgr->directFull())
+               //              ){
+
+               if (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+               {
+                       LOG_DEBUG("Relay check: NG");
+                       return false;
+               }
+
+               if (!isIndexTxt(ch) && (type == T_RELAY) && (numRelay + noRelay >= servMgr->maxRelays))
+               {
                        LOG_DEBUG("Relay check: NG");
                        return false;
                }
@@ -1784,7 +1793,7 @@ void Servent::handshakeICY(Channel::SRC_TYPE type, bool isHTTP)
 
                
        // check password before anything else, if needed
-       if (servMgr->password != loginPassword)
+       if (!loginPassword.isSame(servMgr->password))
        {
                if (!sock->host.isLocalhost() || !loginPassword.isEmpty())
                        throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
index 05ca133..d7bbbe7 100644 (file)
@@ -2608,3 +2608,84 @@ void ServMgr::banFirewalledHost()
        }
 }
 
+// --------------------------------------------------
+static ChanHit *findServentHit(Servent *s)
+{
+       ChanHitList *chl = chanMgr->findHitListByID(s->chanID);
+       Host h = s->getHost();
+
+       if (chl)
+       {
+               ChanHit *hit = chl->hit;
+               while (hit)
+               {
+                       if ((hit->numHops == 1) && hit->host.isValid() && (h.ip == hit->host.ip))
+                               return hit;
+                       hit = hit->next;
+               }
+       }
+       return NULL;
+}
+// --------------------------------------------------
+int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns)
+{
+       Servent *ks = NULL;
+       Servent *s = servMgr->servents;
+
+       while (s)
+       {
+               if (s->type == Servent::T_RELAY && s->chanID.isSame(chid) && !s->isPrivate())
+               {
+                       Host h = s->getHost();
+
+                       chanMgr->hitlistlock.on();
+                       ChanHit *hit = findServentHit(s);
+                       if (hit && !hit->relay && hit->numRelays == 0)
+                       {
+                               char hostName[256];
+                               h.toStr(hostName);
+                               //s->thread.active = false;
+                               LOG_DEBUG("unrelayable Servent : %s",hostName);
+                               if (!ks || s->lastConnect < ks->lastConnect) // elder servent
+                                       ks = s;
+                       }
+                       chanMgr->hitlistlock.off();
+               }
+               s = s->next;
+       }
+
+       if (ks)
+       {
+               if (ns)
+               {
+                       Host h = ns->getHost();
+                       ChanHit nh;
+                       nh.init();
+                       nh.chanID = chid;
+                       nh.rhost[0] = h;
+
+                       ChanPacket pack;
+                       MemoryStream mem(pack.data,sizeof(pack.data));
+                       AtomStream atom(mem);
+                       nh.writeAtoms(atom, chid);
+                       pack.len = mem.pos;
+                       pack.type = ChanPacket::T_PCP;
+                       GnuID noID;
+                       noID.clear();
+
+                       ks->sendPacket(pack, chid, noID, noID, Servent::T_RELAY);
+               }
+
+               ks->setStatus(Servent::S_CLOSING);
+               ks->thread.active = false;
+
+               char hostName[256];
+               ks->getHost().toStr(hostName);
+
+               LOG_DEBUG("Stop unrelayable Servent : %s",hostName);
+
+               return 1;
+       }
+
+       return 0;
+}
index 492520e..fa8e721 100644 (file)
@@ -402,6 +402,7 @@ public:
        unsigned int kickPushTime;
        bool    isCheckPushStream(); //JP-EX
        void    banFirewalledHost(); //JP-EX
+       int kickUnrelayableHost(GnuID &, Servent * = NULL);
 
        bool    getModulePath; //JP-EX
        bool    clearPLS; //JP-EX
index 3cb5696..0819ee1 100644 (file)
@@ -157,8 +157,8 @@ public:
                *this = tmp;
        }
 
-       bool operator == (const char *s) const {return isSame(s);}
-       bool operator != (const char *s) const {return !isSame(s);}
+       bool operator == (const char *s) const {return isSame(s);}
+       bool operator != (const char *s) const {return !isSame(s);}
 
        operator const char *() const {return data;}
 
index 95c5150..fc9e7ec 100644 (file)
@@ -29,22 +29,22 @@ static bool PCP_FORCE_YP                            = false;
 #endif
 // ------------------------------------------------
 static const int PCP_CLIENT_VERSION            = 1218;
-static const int PCP_CLIENT_VERSION_VP = 24;
+static const int PCP_CLIENT_VERSION_VP = 25;
 static const int PCP_ROOT_VERSION              = 1218;
 
 static const int PCP_CLIENT_MINVERSION = 1200;
 
 static const char *PCX_AGENT           = "PeerCast/0.1218";    
 static const char *PCX_AGENTJP                 = "PeerCast/0.1218-J";
-static const char *PCX_AGENTVP         = "PeerCast/0.1218(VP0024)";
-static const char *PCX_VERSTRING       = "v0.1218(VP0024)";
+static const char *PCX_AGENTVP         = "PeerCast/0.1218(VP0025-1)";
+static const char *PCX_VERSTRING       = "v0.1218(VP0025-1)";
 
 #if 1 /* for VP extend version */
 #define VERSION_EX 1
 static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
-static const int  PCP_CLIENT_VERSION_EX_NUMBER = 7651;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM7651)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM7651)";
+static const int  PCP_CLIENT_VERSION_EX_NUMBER = 7650;
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)";
 #endif
 
 // ------------------------------------------------
index 372f1c5..3df0d3f 100644 (file)
@@ -642,9 +642,9 @@ void WSAClientSocket::close()
                try
                {
                        char c[1024];
-                       while (readUpto(&c,1024) > 0)
-                               if (sys->getTime() - stime > 5) break;
-                       //readUpto(&c,1);
+                       while (read(&c, sizeof(c)) > 0)
+                               if (sys->getTime() - stime > 5)
+                                       break;
                }catch(StreamException &) {}
 
                if (closesocket (sockNum))
index f80f0da..4ca13be 100644 (file)
@@ -442,6 +442,8 @@ THREAD_PROC Channel::stream(ThreadInfo *thread)
                        LOG_DEBUG("Channel sleeping for %d seconds",diff);
                        for(unsigned int i=0; i<diff; i++)
                        {
+                               if (ch->info.lastPlayEnd == 0) // reconnected
+                                       break;
                                if (!thread->active || peercastInst->isQuitting){
                                        thread->active = false;
                                        break;
@@ -675,16 +677,6 @@ void PeercastSource::stream(Channel *ch)
                                        }
                                }
 
-                               // find tracker
-                               unsigned int ctime = sys->getTime();
-                               if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
-                                       if (ch->trackerHit.lastContact + 30 < ctime){
-                                               ch->sourceHost = ch->trackerHit;
-                                               ch->trackerHit.lastContact = ctime;
-                                               LOG_DEBUG("use saved tracker");
-                                       }
-                               }
-
                                // else find global tracker
                                if (!ch->sourceHost.host.ip)
                                {
@@ -699,6 +691,16 @@ void PeercastSource::stream(Channel *ch)
                                                LOG_DEBUG("use global tracker");
                                        }
                                }
+
+                               // find tracker
+                               unsigned int ctime = sys->getTime();
+                               if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
+                                       if (ch->trackerHit.lastContact + 30 < ctime){
+                                               ch->sourceHost = ch->trackerHit;
+                                               ch->trackerHit.lastContact = ctime;
+                                               LOG_DEBUG("use saved tracker");
+                                       }
+                               }
                        }
 
                        chanMgr->hitlistlock.off();
@@ -1539,7 +1541,7 @@ int Channel::readStream(Stream &in,ChannelStream *source)
                                        break;
 
                                //if (rawData.writePos > 0)
-                               if (rawData.lastWriteTime > 0)
+                               if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
                                {
                                        if (isBroadcasting())
                                        {                                       
@@ -1830,8 +1832,10 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
 
        lock.on();
 
+       unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
        unsigned int fpos = getStreamPos(firstPos);
-       if (spos < fpos)
+       unsigned int lpos = getStreamPos(lastPos);
+       if (spos < fpos && (fpos < lpos || spos > lpos + bound))
                spos = fpos;
 
 
@@ -1839,7 +1843,7 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
        {
                //ChanPacket &p = packets[i%MAX_PACKETS];
                ChanPacketv &p = packets[i%MAX_PACKETS];
-               if (p.pos >= spos)
+               if (p.pos >= spos && p.pos - spos <= bound)
                {
                        pack.init(p);
                        lock.off();
@@ -1904,6 +1908,7 @@ bool ChanPacketBuffer::writePacket(ChanPacket &pack, bool updateReadPos)
                        if (packets[lastPos%MAX_PACKETS].type == ChanPacket::T_HEAD) lpos = 0;
                        if (lpos && (diff == 0 || diff > 0xfff00000)) {
                                LOG_DEBUG("*   latest pos=%d, pack pos=%d", getLatestPos(), pack.pos);
+                               lastSkipTime = sys->getTime();
                                return false;
                        }
                }
@@ -2217,6 +2222,8 @@ Channel *ChanMgr::findAndRelay(ChanInfo &info)
        } else if (!(c->thread.active)){
                c->thread.active = true;
                c->thread.finish = false;
+               c->info.lastPlayStart = 0; // reconnect 
+               c->info.lastPlayEnd = 0;
                if (c->finthread){
                        c->finthread->finish = true;
                        c->finthread = NULL;
@@ -3101,7 +3108,8 @@ void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,bool isFul
                }
                c = c->next;
        }
-       int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+       unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+       int diff = servMgr->maxRelays - numRelay;
        if (ch->localRelays()){
                if (noRelay > diff){
                        noRelay = diff;
@@ -3111,19 +3119,13 @@ void ChanHit::initLocal(int numl,int numr,int,int uptm,bool connected,bool isFul
                needRate = 0;
        }
 
-       // for PCRaw (relay) start.
-       bool force_off = false;
-
-       if(isIndexTxt(ch))
-               force_off = true;
-       // for PCRaw (relay) end.
-
 //     ratefull = servMgr->bitrateFull(needRate+bitrate);
        ratefull = (servMgr->maxBitrateOut < allRate + needRate + ch->info.bitrate);
 
-       //relay =       (!relayfull) && (!chfull) && (!ratefull) && ((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays);
-       relay = (!relayfull || force_off) && (!chfull) && (!ratefull)
-               && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays) || force_off);       // for PCRaw (relay) (force_off)
+       if (!isIndexTxt(ch))
+               relay = (!relayfull) && (!chfull) && (!ratefull) && (numRelay + noRelay < servMgr->maxRelays);
+       else
+               relay = (!chfull) && (!ratefull); // for PCRaw (relay)
 
 /*     if (relayfull){
                LOG_DEBUG("Reject by relay full");
index bb12277..ad18d0c 100644 (file)
@@ -154,12 +154,6 @@ void       Servent::kill()
                                        else
                                                chl->hit = next;
 
-       char ip0str[64],ip1str[64];
-       chh->rhost[0].toStr(ip0str);
-       chh->rhost[1].toStr(ip1str);
-       LOG_DEBUG("Delete hit (servent_id=%d): F%dT%dR%d %s/%s",
-               chh->servent_id,chh->firewalled,chh->tracker,chh->relay,ip0str,ip1str);
-
                                        delete chh;
                                        chh = next;
                                }
@@ -374,6 +368,7 @@ void Servent::initIncoming(ClientSocket *s, unsigned int a)
                allow = a;
                thread.data = this;
                thread.func = incomingProc;
+               thread.finish = false;
 
                setStatus(S_PROTOCOL);
 
@@ -862,21 +857,52 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
        bool chanFound=false;
        bool chanReady=false;
 
+       ChanHit *sourceHit = NULL;
+
        Channel *ch = chanMgr->findChannelByID(chanInfo.id);
        if (ch)
        {
                sendHeader = true;
-//             if (reqPos)
-//             {
-//                     streamPos = ch->rawData.findOldestPos(reqPos);
-//             }else
-//             {
-                       streamPos = ch->rawData.getLatestPos();
-//             }
+               if (reqPos || !isIndexTxt(&chanInfo))
+               {
+                       streamPos = ch->rawData.findOldestPos(reqPos);
+                       //streamPos = ch->rawData.getLatestPos();
+               }else
+               {
+                       streamPos = ch->rawData.getLatestPos();
+               }
 
                chanID = chanInfo.id;
                canStreamLock.on();
                chanReady = canStream(ch);
+               if (0 && !chanReady)
+               {
+                       if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+                       {
+                               sourceHit = &ch->sourceHost;  // send source host info
+
+                               if (ch->info.getUptime() > 60)  // if stable
+                               {
+                                       // connect "this" host later
+                                       ChanHit nh;
+                                       nh.init();
+                                       nh.chanID = chanID;
+                                       nh.rhost[0] = getHost();
+                                       chanMgr->addHit(nh);
+                               }
+
+                               char tmp[50];
+                               getHost().toStr(tmp);
+                               LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
+                               ch->bump = true;
+                       }
+                       else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+                       {
+                               chanReady = canStream(ch);
+                               if (!chanReady)
+                                       LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+                       }
+               }
                if (!chanReady) type = T_INCOMING;
                thread.active = chanReady;
                setStatus(S_CONNECTED);
@@ -1032,7 +1058,8 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                
                                // search for up to 8 other hits
                                int cnt=0;
-                               for(int i=0; i<8; i++)
+                               int i;
+                               for(i=0; i<8; i++)
                                {
                                        best.init();
 
@@ -1080,10 +1107,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                        if (!best.host.ip)
                                                break;
 
-                                       best.writeAtoms(atom2,chanInfo.id);                             
+                                       best.writeAtoms(atom2,chanInfo.id);                        
                                        cnt++;
                                }
 
+                               if (sourceHit) {
+                                       char tmp[50];
+                                       sourceHit->writeAtoms(atom2, chanInfo.id);
+                                       chs.best[i].host.toStr(tmp);
+                                       LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+                                       best.host.ip = sourceHit->host.ip;
+                               }
+
                                if (!best.host.ip){
                                        char tmp[50];
 //                                     chanMgr->hitlistlock.on();
@@ -2792,11 +2827,12 @@ void Servent::sendPCPChannel()
                                if (sendHeader)
                                {
                                        atom.writeParent(PCP_CHAN_PKT,3);
-                                               atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
-                                               atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
-                                               atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
+                                       atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
+                                       atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
+                                       atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
 
-                                       streamPos = ch->headPack.pos+ch->headPack.len;
+                                       if (streamPos == 0)
+                                               streamPos = ch->headPack.pos+ch->headPack.len;
                                        LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
                                }
                }
@@ -2882,7 +2918,6 @@ void Servent::sendPCPChannel()
                                        streamPos = rawPack.pos+rawPack.len;
                                }
                        } else {
-                               pcpStream->flush(*sock);
                                throw StreamException("Channel not found");
                        }
 
@@ -2913,6 +2948,7 @@ void Servent::sendPCPChannel()
 
        try
        {
+               pcpStream->flush(*sock);
                atom.writeInt(PCP_QUIT,error);
        }catch(StreamException &) {}
 
index aa7cc55..be97958 100644 (file)
@@ -414,14 +414,21 @@ bool Servent::canStream(Channel *ch)
 
        if (!isPrivate())
        {
+               if  (!ch->isPlaying() || ch->isFull() || ((type == T_DIRECT) && servMgr->directFull()))
+                       return false;
+
+               if (!isIndexTxt(ch) && (type == T_RELAY) && (servMgr->relaysFull()))
+                       return false;
+
                Channel *c = chanMgr->channel;
                int noRelay = 0;
                unsigned int needRate = 0;
                unsigned int allRate = 0;
                while(c){
                        if (c->isPlaying()){
-                               allRate += c->info.bitrate * c->localRelays();
-                               if ((c != ch) && (c->localRelays() == 0)){
+                               int nlr = c->localRelays();
+                               allRate += c->info.bitrate * nlr;
+                               if ((c != ch) && (nlr == 0)){
                                        if(!isIndexTxt(c))      // for PCRaw (relay)
                                                noRelay++;
                                        needRate+=c->info.bitrate;
@@ -429,7 +436,8 @@ bool Servent::canStream(Channel *ch)
                        }
                        c = c->next;
                }
-               int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+               unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+               int diff = servMgr->maxRelays - numRelay;
                if (ch->localRelays()){
                        if (noRelay > diff){
                                noRelay = diff;
@@ -439,23 +447,24 @@ bool Servent::canStream(Channel *ch)
                        needRate = 0;
                }
 
-               // for PCRaw (relay) start.
-               bool force_off = true;
-
-               if(isIndexTxt(ch))
-                       force_off = false;
-               // for PCRaw (relay) end.
-
                LOG_DEBUG("Relay check: Max=%d Now=%d Need=%d ch=%d",
                        servMgr->maxBitrateOut, allRate, needRate, ch->info.bitrate);
-               if  (   !ch->isPlaying()
-                               || ch->isFull()
-//                             || servMgr->bitrateFull(needRate+ch->getBitrate())
-                               || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
-                               || ((type == T_RELAY) && servMgr->relaysFull() && force_off)    // for PCRaw (relay) (force_off)
-                               || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off)        // for PCRaw (relay) (force_off)
-                               || ((type == T_DIRECT) && servMgr->directFull())
-               ){
+               //              if  (   !ch->isPlaying()
+               //                              || ch->isFull()
+               //                              || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+               //                              || ((type == T_RELAY) && servMgr->relaysFull() && force_off)    // for PCRaw (relay) (force_off)
+               //                              || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off)        // for PCRaw (relay) (force_off)
+               //                              || ((type == T_DIRECT) && servMgr->directFull())
+               //              ){
+
+               if (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+               {
+                       LOG_DEBUG("Relay check: NG");
+                       return false;
+               }
+
+               if (!isIndexTxt(ch) && (type == T_RELAY) && (numRelay + noRelay >= servMgr->maxRelays))
+               {
                        LOG_DEBUG("Relay check: NG");
                        return false;
                }
@@ -1784,7 +1793,7 @@ void Servent::handshakeICY(Channel::SRC_TYPE type, bool isHTTP)
 
                
        // check password before anything else, if needed
-       if (servMgr->password != loginPassword)
+       if (!loginPassword.isSame(servMgr->password))
        {
                if (!sock->host.isLocalhost() || !loginPassword.isEmpty())
                        throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
index 05ca133..d7bbbe7 100644 (file)
@@ -2608,3 +2608,84 @@ void ServMgr::banFirewalledHost()
        }
 }
 
+// --------------------------------------------------
+static ChanHit *findServentHit(Servent *s)
+{
+       ChanHitList *chl = chanMgr->findHitListByID(s->chanID);
+       Host h = s->getHost();
+
+       if (chl)
+       {
+               ChanHit *hit = chl->hit;
+               while (hit)
+               {
+                       if ((hit->numHops == 1) && hit->host.isValid() && (h.ip == hit->host.ip))
+                               return hit;
+                       hit = hit->next;
+               }
+       }
+       return NULL;
+}
+// --------------------------------------------------
+int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns)
+{
+       Servent *ks = NULL;
+       Servent *s = servMgr->servents;
+
+       while (s)
+       {
+               if (s->type == Servent::T_RELAY && s->chanID.isSame(chid) && !s->isPrivate())
+               {
+                       Host h = s->getHost();
+
+                       chanMgr->hitlistlock.on();
+                       ChanHit *hit = findServentHit(s);
+                       if (hit && !hit->relay && hit->numRelays == 0)
+                       {
+                               char hostName[256];
+                               h.toStr(hostName);
+                               //s->thread.active = false;
+                               LOG_DEBUG("unrelayable Servent : %s",hostName);
+                               if (!ks || s->lastConnect < ks->lastConnect) // elder servent
+                                       ks = s;
+                       }
+                       chanMgr->hitlistlock.off();
+               }
+               s = s->next;
+       }
+
+       if (ks)
+       {
+               if (ns)
+               {
+                       Host h = ns->getHost();
+                       ChanHit nh;
+                       nh.init();
+                       nh.chanID = chid;
+                       nh.rhost[0] = h;
+
+                       ChanPacket pack;
+                       MemoryStream mem(pack.data,sizeof(pack.data));
+                       AtomStream atom(mem);
+                       nh.writeAtoms(atom, chid);
+                       pack.len = mem.pos;
+                       pack.type = ChanPacket::T_PCP;
+                       GnuID noID;
+                       noID.clear();
+
+                       ks->sendPacket(pack, chid, noID, noID, Servent::T_RELAY);
+               }
+
+               ks->setStatus(Servent::S_CLOSING);
+               ks->thread.active = false;
+
+               char hostName[256];
+               ks->getHost().toStr(hostName);
+
+               LOG_DEBUG("Stop unrelayable Servent : %s",hostName);
+
+               return 1;
+       }
+
+       return 0;
+}
index 492520e..fa8e721 100644 (file)
@@ -402,6 +402,7 @@ public:
        unsigned int kickPushTime;
        bool    isCheckPushStream(); //JP-EX
        void    banFirewalledHost(); //JP-EX
+       int kickUnrelayableHost(GnuID &, Servent * = NULL);
 
        bool    getModulePath; //JP-EX
        bool    clearPLS; //JP-EX
index 3cb5696..0819ee1 100644 (file)
@@ -157,8 +157,8 @@ public:
                *this = tmp;
        }
 
-       bool operator == (const char *s) const {return isSame(s);}
-       bool operator != (const char *s) const {return !isSame(s);}
+       bool operator == (const char *s) const {return isSame(s);}
+       bool operator != (const char *s) const {return !isSame(s);}
 
        operator const char *() const {return data;}
 
index 95c5150..fc9e7ec 100644 (file)
@@ -29,22 +29,22 @@ static bool PCP_FORCE_YP                            = false;
 #endif
 // ------------------------------------------------
 static const int PCP_CLIENT_VERSION            = 1218;
-static const int PCP_CLIENT_VERSION_VP = 24;
+static const int PCP_CLIENT_VERSION_VP = 25;
 static const int PCP_ROOT_VERSION              = 1218;
 
 static const int PCP_CLIENT_MINVERSION = 1200;
 
 static const char *PCX_AGENT           = "PeerCast/0.1218";    
 static const char *PCX_AGENTJP                 = "PeerCast/0.1218-J";
-static const char *PCX_AGENTVP         = "PeerCast/0.1218(VP0024)";
-static const char *PCX_VERSTRING       = "v0.1218(VP0024)";
+static const char *PCX_AGENTVP         = "PeerCast/0.1218(VP0025-1)";
+static const char *PCX_VERSTRING       = "v0.1218(VP0025-1)";
 
 #if 1 /* for VP extend version */
 #define VERSION_EX 1
 static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
-static const int  PCP_CLIENT_VERSION_EX_NUMBER = 7651;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM7651)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM7651)";
+static const int  PCP_CLIENT_VERSION_EX_NUMBER = 7650;
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)";
 #endif
 
 // ------------------------------------------------
index 372f1c5..3df0d3f 100644 (file)
@@ -642,9 +642,9 @@ void WSAClientSocket::close()
                try
                {
                        char c[1024];
-                       while (readUpto(&c,1024) > 0)
-                               if (sys->getTime() - stime > 5) break;
-                       //readUpto(&c,1);
+                       while (read(&c, sizeof(c)) > 0)
+                               if (sys->getTime() - stime > 5)
+                                       break;
                }catch(StreamException &) {}
 
                if (closesocket (sockNum))