OSDN Git Service

バグ修正コード(VPdiff_27_20080120)マージ IM0027-3
authoreru <eru01@users.sourceforge.jp>
Tue, 22 Jan 2008 11:18:29 +0000 (11:18 +0000)
committereru <eru01@users.sourceforge.jp>
Tue, 22 Jan 2008 11:18:29 +0000 (11:18 +0000)
バグ修正コード(VPdiff_27_20080120)をマージ

14 files changed:
PeerCast.root/PeerCast/core/common/channel.cpp
PeerCast.root/PeerCast/core/common/channel.h
PeerCast.root/PeerCast/core/common/cstream.h
PeerCast.root/PeerCast/core/common/pcp.cpp
PeerCast.root/PeerCast/core/common/servent.cpp
PeerCast.root/PeerCast/core/common/servmgr.cpp
PeerCast.root/PeerCast/core/common/version2.h
c:/Git/PeerCast.root/PeerCast/core/common/channel.cpp
c:/Git/PeerCast.root/PeerCast/core/common/channel.h
c:/Git/PeerCast.root/PeerCast/core/common/cstream.h
c:/Git/PeerCast.root/PeerCast/core/common/pcp.cpp
c:/Git/PeerCast.root/PeerCast/core/common/servent.cpp
c:/Git/PeerCast.root/PeerCast/core/common/servmgr.cpp
c:/Git/PeerCast.root/PeerCast/core/common/version2.h

index 9c3740e..c1b3e2b 100644 (file)
@@ -608,6 +608,10 @@ void PeercastSource::stream(Channel *ch)
        bool next_yp = false;
        bool tracker_check = (ch->trackerHit.host.ip != 0);
        int connFailCnt = 0;
+       int keepDownstreamTime = 7;
+
+       if (isIndexTxt(&ch->info))
+               keepDownstreamTime = 30;
 
        ch->lastStopTime = 0;
        ch->bumped = false;
@@ -720,7 +724,9 @@ void PeercastSource::stream(Channel *ch)
 
                        chanMgr->hitlistlock.off();
 
-                       if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) {
+                       if (servMgr->keepDownstreams && ch->lastStopTime
+                               && ch->lastStopTime < sys->getTime() - keepDownstreamTime)
+                       {
                                ch->lastStopTime = 0;
                                LOG_DEBUG("------------ disconnect all downstreams");
                                ChanPacket pack;
@@ -902,7 +908,7 @@ yp0:
                                ch->trackerHit.lastContact = ctime - 30 + (rand() % 30);
 
                        // broadcast source host
-                       if (!error && ch->sourceHost.host.ip) { // if closed normally
+                       if (!got503 && !error && ch->sourceHost.host.ip) { // if closed normally
                                ChanPacket pack;
                                MemoryStream mem(pack.data,sizeof(pack.data));
                                AtomStream atom(mem);
@@ -916,7 +922,7 @@ yp0:
                        }
 
                        // broadcast quit to any connected downstream servents
-                       if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) {
+                       if (!servMgr->keepDownstreams || !got503 && (ch->sourceHost.tracker || !error)) {
                                ChanPacket pack;
                                MemoryStream mem(pack.data,sizeof(pack.data));
                                AtomStream atom(mem);
@@ -1385,6 +1391,17 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
 {
        unsigned int ctime = sys->getTime();
 
+       if ((ch->isPlaying() == isPlaying)){
+               if ((ctime-lastUpdate) < 10){
+                       return false;
+               }
+
+               if ((ctime-lastCheckTime) < 5){
+                       return false;
+               }
+               lastCheckTime = ctime;
+       }
+
        ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
 
        if (!chl)
@@ -1419,17 +1436,6 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
        int newLocalListeners = ch->localListeners();
        int newLocalRelays = ch->localRelays();
 
-       if ((ch->isPlaying() == isPlaying)){
-               if ((ctime-lastUpdate) < 10){
-                       return false;
-               }
-
-               if ((ctime-lastCheckTime) < 10){
-                       return false;
-               }
-               lastCheckTime = ctime;
-       }
-
        unsigned int oldp = ch->rawData.getOldestPos();
        unsigned int newp = ch->rawData.getLatestPos();
 
@@ -1611,9 +1617,11 @@ int Channel::readStream(Stream &in,ChannelStream *source)
                                                setStatus(Channel::S_RECEIVING);
                                                bumped = false;
                                        }
-                                       source->updateStatus(this);
+                                       //source->updateStatus(this);
                                }
                        }
+                       if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
+                               source->updateStatus(this);
 
                        unsigned int t = sys->getTime();
                        if (t != ptime) {
@@ -1886,9 +1894,11 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
        lock.on();
 
        unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
-       unsigned int fpos = getStreamPos(firstPos);
-       unsigned int lpos = getStreamPos(lastPos);
-       if (spos < fpos && (fpos < lpos || spos > lpos + bound))
+       unsigned int fpos = getFirstDataPos();
+       unsigned int lpos = getLatestPos();
+       if ((spos < fpos && fpos <= lpos && spos != getStreamPosEnd(lastPos)) // --s-----f---l--
+               || (spos < fpos && lpos < fpos && spos > lpos + bound)            // -l-------s--f--
+               || (spos > lpos && lpos >= fpos && spos - lpos > bound))          // --f---l------s-
                spos = fpos;
 
 
@@ -1917,6 +1927,18 @@ unsigned int     ChanPacketBuffer::getLatestPos()
                
 }
 // -----------------------------------
+unsigned int   ChanPacketBuffer::getFirstDataPos()
+{
+       if (!writePos)
+               return 0;
+       for(unsigned int i=firstPos; i<=lastPos; i++)
+       {
+               if (packets[i%MAX_PACKETS].type == ChanPacket::T_DATA)
+                       return packets[i%MAX_PACKETS].pos;
+       }
+       return 0;
+}
+// -----------------------------------
 unsigned int   ChanPacketBuffer::getOldestPos()
 {
        if (!writePos)
@@ -3897,6 +3919,15 @@ int ChanHitList::pickSourceHits(ChanHitSearch &chs)
        return 0;
 }
 
+// -----------------------------------
+unsigned int ChanHitList::getSeq()
+{
+       unsigned int seq;
+       seqLock.on();
+       seq = riSequence = (riSequence + 1) & 0xffffff;
+       seqLock.off();
+       return seq;
+}
 
 // -----------------------------------
 const char *ChanInfo::getTypeStr(TYPE t)
@@ -4703,40 +4734,18 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
        int index = 0;
        int prob;
        int rnd;
-       static int base = 0x400;
+       int base = 0x400;
        ChanHit tmpHit[MAX_RESULTS];
-       static WLock seqLock;
-       static unsigned int riSequence = 0;
 
        //srand(seed);
        //seed += 11;
 
-       unsigned int seq;
-       seqLock.on();
-       seq = riSequence++;
-       riSequence &= 0xffffff;
-       seqLock.off();
-
-       Servent *s = servMgr->servents;
-       while (s) {
-               if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
-                       && s->chanID.isSame(chl->info.id)) {
-                               int i = index % MAX_RESULTS;
-                               if (index < MAX_RESULTS
-                                       || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
-                                               s->serventHit.lastSendSeq = seq;
-                                               tmpHit[i] = s->serventHit;
-                                               tmpHit[i].host = s->serventHit.rhost[0];
-                                               index++;
-                               }
-               }
-               s = s->next;
-       }
+       unsigned int seq = chl->getSeq();
 
        ChanHit *hit = chl->hit;
 
        while(hit){
-               if (hit->host.ip && !hit->dead){
+               if (hit->rhost[0].ip && !hit->dead) {
                        if (
                                (!exID.isSame(hit->sessionID))
 //                     &&      (hit->relay)
@@ -4764,7 +4773,6 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                                //rnd = (float)rand() / (float)RAND_MAX;
                                rnd = rand() % base;
                                if (hit->numHops == 1){
-#if 0
                                        if (tmpHit[index % MAX_RESULTS].numHops == 1){
                                                if (rnd < prob){
                                                        tmpHit[index % MAX_RESULTS] = *hit;
@@ -4776,9 +4784,8 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                                                tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
                                                index++;
                                        }
-#endif
                                } else {
-                                       if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
+                                       if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
                                                tmpHit[index % MAX_RESULTS] = *hit;
                                                tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
                                                index++;
@@ -4822,15 +4829,9 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                best[use[i]] = tmpHit[i];
        }*/
 
-       int use[MAX_RESULTS];
-       int i;
-       for (i = 0; i < cnt; i++) {
-               use[i] = (i + seq) % cnt;
-       }
-
-       for (i = 0; i < cnt; i++){
+       for (int i = 0; i < cnt; i++){
 //             LOG_DEBUG("%d", use[i]);
-               best[use[i]] = tmpHit[i];
+               best[(i + seq) % cnt] = tmpHit[i];
        }
 //     for (i = 0; i < cnt; i++){
 //             char tmp[50];
index d6f5563..5a7795f 100644 (file)
@@ -249,13 +249,16 @@ public:
        int             getTotalRelays();
        int             getTotalFirewalled();
 
+       unsigned int getSeq();
+
        bool    used;
        ChanInfo        info;
        ChanHit *hit;
        unsigned int lastHitTime;
        ChanHitList *next;
 
-
+       WLock seqLock;
+       unsigned int riSequence;
 };
 // ----------------------------------
 class ChanHitSearch
index d59ad1d..33a74d1 100644 (file)
@@ -187,6 +187,7 @@ public:
 
        int             numPending() {return writePos-readPos;}
 
+       unsigned int getFirstDataPos();
        unsigned int    getLatestPos();
        unsigned int    getOldestPos();
        unsigned int    findOldestPos(unsigned int);
index 11289f6..48117ba 100644 (file)
@@ -547,7 +547,7 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C
                if (sv && sv->getHost().ip == hit.host.ip){
 //                     LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
                        sv->waitPort = hit.host.port;
-                       hit.lastSendSeq = sv->serventHit.lastSendSeq;
+                       //hit.lastSendSeq = sv->serventHit.lastSendSeq;
                        sv->serventHit = hit;
                }
        }
@@ -761,7 +761,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
                                ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
                                && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
                                || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
-                               || chanMgr->findParentHit(hit)))
+                               || (hit.numHops != 1 && chanMgr->findParentHit(hit))))
                        {
                                int oldPos = pmem.pos;
                                hit.writeAtoms(patom, hit.chanID);
index d8cdddd..573b8be 100644 (file)
@@ -878,20 +878,24 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                }
 
                chanID = chanInfo.id;
-               serventHit.rhost[0].ip = getHost().ip;
-               serventHit.rhost[0].port = listenPort;
-               serventHit.host = serventHit.rhost[0];
+               serventHit.host.ip = getHost().ip;
+               serventHit.host.port = listenPort;
+               if (serventHit.host.globalIP())
+                       serventHit.rhost[0] = serventHit.host;
+               else
+                       serventHit.rhost[1] = serventHit.host;
                serventHit.chanID = chanID;
 
                canStreamLock.on();
                chanReady = canStream(ch);
-               if (/*0 && */!chanReady)
+               if (0 && !chanReady && ch->isPlaying())
                {
-                       if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+                       if (ch->info.getUptime() > 60
+                               && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
                        {
                                sourceHit = &ch->sourceHost;  // send source host info
 
-                               if (listenPort && ch->info.getUptime() > 60)  // if stable
+                               if (listenPort)
                                {
                                        // connect "this" host later
                                        chanMgr->addHit(serventHit);
@@ -1054,11 +1058,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
 
                        int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
 
+                       if (sourceHit) {
+                               sourceHit->writeAtoms(atom2,chanInfo.id);       
+                               char tmp[50];
+                               sourceHit->host.toStr(tmp);
+                               LOG_DEBUG("relay info(sourceHit): %s", tmp);
+                       }
+
                        chanMgr->hitlistlock.on();
 
                        chl = chanMgr->findHitList(chanInfo);
 
-                       if (chl)
+                       if (chl && !sourceHit)
                        {
                                ChanHit best;
                                
@@ -1117,25 +1128,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                        cnt++;
                                }
 
-                               if (sourceHit) {
-                                       char tmp[50];
-                                       sourceHit->writeAtoms(atom2, chanInfo.id);
-                                       sourceHit->host.toStr(tmp);
-                                       LOG_DEBUG("relay info(sourceHit): %s", tmp);
-                                       best.host.ip = sourceHit->host.ip;
-                               }
-
                                if (!best.host.ip){
                                        char tmp[50];
 //                                     chanMgr->hitlistlock.on();
-                                       int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
+                                       int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
 //                                     chanMgr->hitlistlock.off();
-                                       for (int i = 0; i < cnt; i++){
+                                       for (int i = 0; i < rhcnt; i++){
                                                chs.best[i].writeAtoms(atom2, chanInfo.id);
                                                chs.best[i].host.toStr(tmp);
                                                LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
                                                best.host.ip = chs.best[i].host.ip;
                                        }
+                                       cnt += rhcnt;
                                }
 
                                if (cnt)
@@ -1352,6 +1356,13 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                {
                        handshakeIncomingPCP(atom,rhost,remoteID,agent);
                        atom.writeInt(PCP_OK,0);
+                       if (rhost.globalIP())
+                               serventHit.rhost[0] = rhost;
+                       else
+                               serventHit.rhost[1] = rhost;
+                       serventHit.sessionID = remoteID;
+                       serventHit.numHops = 1;
+                       chanMgr->addHit(serventHit);
                }
 
        }
index 9470076..f9dc302 100644 (file)
@@ -2762,7 +2762,7 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit)
                        Host h = s->getHost();
 
                        ChanHit hit = s->serventHit;
-                       if (!hit.relay && hit.numRelays == 0)
+                       if (!hit.relay && hit.numRelays == 0 || hit.firewalled)
                        {
                                char hostName[256];
                                h.toStr(hostName);
index da49db2..2ae3f0f 100644 (file)
@@ -45,8 +45,8 @@ extern int version_ex; // PP
 //#define VERSION_EX 1
 static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
 static const int  PCP_CLIENT_VERSION_EX_NUMBER = 27;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-2)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-2)";
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-3)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-3)";
 #endif
 
 // ------------------------------------------------
index 9c3740e..c1b3e2b 100644 (file)
@@ -608,6 +608,10 @@ void PeercastSource::stream(Channel *ch)
        bool next_yp = false;
        bool tracker_check = (ch->trackerHit.host.ip != 0);
        int connFailCnt = 0;
+       int keepDownstreamTime = 7;
+
+       if (isIndexTxt(&ch->info))
+               keepDownstreamTime = 30;
 
        ch->lastStopTime = 0;
        ch->bumped = false;
@@ -720,7 +724,9 @@ void PeercastSource::stream(Channel *ch)
 
                        chanMgr->hitlistlock.off();
 
-                       if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) {
+                       if (servMgr->keepDownstreams && ch->lastStopTime
+                               && ch->lastStopTime < sys->getTime() - keepDownstreamTime)
+                       {
                                ch->lastStopTime = 0;
                                LOG_DEBUG("------------ disconnect all downstreams");
                                ChanPacket pack;
@@ -902,7 +908,7 @@ yp0:
                                ch->trackerHit.lastContact = ctime - 30 + (rand() % 30);
 
                        // broadcast source host
-                       if (!error && ch->sourceHost.host.ip) { // if closed normally
+                       if (!got503 && !error && ch->sourceHost.host.ip) { // if closed normally
                                ChanPacket pack;
                                MemoryStream mem(pack.data,sizeof(pack.data));
                                AtomStream atom(mem);
@@ -916,7 +922,7 @@ yp0:
                        }
 
                        // broadcast quit to any connected downstream servents
-                       if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) {
+                       if (!servMgr->keepDownstreams || !got503 && (ch->sourceHost.tracker || !error)) {
                                ChanPacket pack;
                                MemoryStream mem(pack.data,sizeof(pack.data));
                                AtomStream atom(mem);
@@ -1385,6 +1391,17 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
 {
        unsigned int ctime = sys->getTime();
 
+       if ((ch->isPlaying() == isPlaying)){
+               if ((ctime-lastUpdate) < 10){
+                       return false;
+               }
+
+               if ((ctime-lastCheckTime) < 5){
+                       return false;
+               }
+               lastCheckTime = ctime;
+       }
+
        ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
 
        if (!chl)
@@ -1419,17 +1436,6 @@ bool ChannelStream::getStatus(Channel *ch,ChanPacket &pack)
        int newLocalListeners = ch->localListeners();
        int newLocalRelays = ch->localRelays();
 
-       if ((ch->isPlaying() == isPlaying)){
-               if ((ctime-lastUpdate) < 10){
-                       return false;
-               }
-
-               if ((ctime-lastCheckTime) < 10){
-                       return false;
-               }
-               lastCheckTime = ctime;
-       }
-
        unsigned int oldp = ch->rawData.getOldestPos();
        unsigned int newp = ch->rawData.getLatestPos();
 
@@ -1611,9 +1617,11 @@ int Channel::readStream(Stream &in,ChannelStream *source)
                                                setStatus(Channel::S_RECEIVING);
                                                bumped = false;
                                        }
-                                       source->updateStatus(this);
+                                       //source->updateStatus(this);
                                }
                        }
+                       if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
+                               source->updateStatus(this);
 
                        unsigned int t = sys->getTime();
                        if (t != ptime) {
@@ -1886,9 +1894,11 @@ bool ChanPacketBuffer::findPacket(unsigned int spos, ChanPacket &pack)
        lock.on();
 
        unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
-       unsigned int fpos = getStreamPos(firstPos);
-       unsigned int lpos = getStreamPos(lastPos);
-       if (spos < fpos && (fpos < lpos || spos > lpos + bound))
+       unsigned int fpos = getFirstDataPos();
+       unsigned int lpos = getLatestPos();
+       if ((spos < fpos && fpos <= lpos && spos != getStreamPosEnd(lastPos)) // --s-----f---l--
+               || (spos < fpos && lpos < fpos && spos > lpos + bound)            // -l-------s--f--
+               || (spos > lpos && lpos >= fpos && spos - lpos > bound))          // --f---l------s-
                spos = fpos;
 
 
@@ -1917,6 +1927,18 @@ unsigned int     ChanPacketBuffer::getLatestPos()
                
 }
 // -----------------------------------
+unsigned int   ChanPacketBuffer::getFirstDataPos()
+{
+       if (!writePos)
+               return 0;
+       for(unsigned int i=firstPos; i<=lastPos; i++)
+       {
+               if (packets[i%MAX_PACKETS].type == ChanPacket::T_DATA)
+                       return packets[i%MAX_PACKETS].pos;
+       }
+       return 0;
+}
+// -----------------------------------
 unsigned int   ChanPacketBuffer::getOldestPos()
 {
        if (!writePos)
@@ -3897,6 +3919,15 @@ int ChanHitList::pickSourceHits(ChanHitSearch &chs)
        return 0;
 }
 
+// -----------------------------------
+unsigned int ChanHitList::getSeq()
+{
+       unsigned int seq;
+       seqLock.on();
+       seq = riSequence = (riSequence + 1) & 0xffffff;
+       seqLock.off();
+       return seq;
+}
 
 // -----------------------------------
 const char *ChanInfo::getTypeStr(TYPE t)
@@ -4703,40 +4734,18 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
        int index = 0;
        int prob;
        int rnd;
-       static int base = 0x400;
+       int base = 0x400;
        ChanHit tmpHit[MAX_RESULTS];
-       static WLock seqLock;
-       static unsigned int riSequence = 0;
 
        //srand(seed);
        //seed += 11;
 
-       unsigned int seq;
-       seqLock.on();
-       seq = riSequence++;
-       riSequence &= 0xffffff;
-       seqLock.off();
-
-       Servent *s = servMgr->servents;
-       while (s) {
-               if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
-                       && s->chanID.isSame(chl->info.id)) {
-                               int i = index % MAX_RESULTS;
-                               if (index < MAX_RESULTS
-                                       || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
-                                               s->serventHit.lastSendSeq = seq;
-                                               tmpHit[i] = s->serventHit;
-                                               tmpHit[i].host = s->serventHit.rhost[0];
-                                               index++;
-                               }
-               }
-               s = s->next;
-       }
+       unsigned int seq = chl->getSeq();
 
        ChanHit *hit = chl->hit;
 
        while(hit){
-               if (hit->host.ip && !hit->dead){
+               if (hit->rhost[0].ip && !hit->dead) {
                        if (
                                (!exID.isSame(hit->sessionID))
 //                     &&      (hit->relay)
@@ -4764,7 +4773,6 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                                //rnd = (float)rand() / (float)RAND_MAX;
                                rnd = rand() % base;
                                if (hit->numHops == 1){
-#if 0
                                        if (tmpHit[index % MAX_RESULTS].numHops == 1){
                                                if (rnd < prob){
                                                        tmpHit[index % MAX_RESULTS] = *hit;
@@ -4776,9 +4784,8 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                                                tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
                                                index++;
                                        }
-#endif
                                } else {
-                                       if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
+                                       if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
                                                tmpHit[index % MAX_RESULTS] = *hit;
                                                tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
                                                index++;
@@ -4822,15 +4829,9 @@ int ChanHitSearch::getRelayHost(Host host1, Host host2, GnuID exID, ChanHitList
                best[use[i]] = tmpHit[i];
        }*/
 
-       int use[MAX_RESULTS];
-       int i;
-       for (i = 0; i < cnt; i++) {
-               use[i] = (i + seq) % cnt;
-       }
-
-       for (i = 0; i < cnt; i++){
+       for (int i = 0; i < cnt; i++){
 //             LOG_DEBUG("%d", use[i]);
-               best[use[i]] = tmpHit[i];
+               best[(i + seq) % cnt] = tmpHit[i];
        }
 //     for (i = 0; i < cnt; i++){
 //             char tmp[50];
index d6f5563..5a7795f 100644 (file)
@@ -249,13 +249,16 @@ public:
        int             getTotalRelays();
        int             getTotalFirewalled();
 
+       unsigned int getSeq();
+
        bool    used;
        ChanInfo        info;
        ChanHit *hit;
        unsigned int lastHitTime;
        ChanHitList *next;
 
-
+       WLock seqLock;
+       unsigned int riSequence;
 };
 // ----------------------------------
 class ChanHitSearch
index d59ad1d..33a74d1 100644 (file)
@@ -187,6 +187,7 @@ public:
 
        int             numPending() {return writePos-readPos;}
 
+       unsigned int getFirstDataPos();
        unsigned int    getLatestPos();
        unsigned int    getOldestPos();
        unsigned int    findOldestPos(unsigned int);
index 11289f6..48117ba 100644 (file)
@@ -547,7 +547,7 @@ void PCPStream::readHostAtoms(AtomStream &atom, int numc, BroadcastState &bcs, C
                if (sv && sv->getHost().ip == hit.host.ip){
 //                     LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
                        sv->waitPort = hit.host.port;
-                       hit.lastSendSeq = sv->serventHit.lastSendSeq;
+                       //hit.lastSendSeq = sv->serventHit.lastSendSeq;
                        sv->serventHit = hit;
                }
        }
@@ -761,7 +761,7 @@ int PCPStream::readBroadcastAtoms(AtomStream &atom,int numc,BroadcastState &bcs)
                                ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
                                && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
                                || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
-                               || chanMgr->findParentHit(hit)))
+                               || (hit.numHops != 1 && chanMgr->findParentHit(hit))))
                        {
                                int oldPos = pmem.pos;
                                hit.writeAtoms(patom, hit.chanID);
index d8cdddd..573b8be 100644 (file)
@@ -878,20 +878,24 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                }
 
                chanID = chanInfo.id;
-               serventHit.rhost[0].ip = getHost().ip;
-               serventHit.rhost[0].port = listenPort;
-               serventHit.host = serventHit.rhost[0];
+               serventHit.host.ip = getHost().ip;
+               serventHit.host.port = listenPort;
+               if (serventHit.host.globalIP())
+                       serventHit.rhost[0] = serventHit.host;
+               else
+                       serventHit.rhost[1] = serventHit.host;
                serventHit.chanID = chanID;
 
                canStreamLock.on();
                chanReady = canStream(ch);
-               if (/*0 && */!chanReady)
+               if (0 && !chanReady && ch->isPlaying())
                {
-                       if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+                       if (ch->info.getUptime() > 60
+                               && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
                        {
                                sourceHit = &ch->sourceHost;  // send source host info
 
-                               if (listenPort && ch->info.getUptime() > 60)  // if stable
+                               if (listenPort)
                                {
                                        // connect "this" host later
                                        chanMgr->addHit(serventHit);
@@ -1054,11 +1058,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
 
                        int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
 
+                       if (sourceHit) {
+                               sourceHit->writeAtoms(atom2,chanInfo.id);       
+                               char tmp[50];
+                               sourceHit->host.toStr(tmp);
+                               LOG_DEBUG("relay info(sourceHit): %s", tmp);
+                       }
+
                        chanMgr->hitlistlock.on();
 
                        chl = chanMgr->findHitList(chanInfo);
 
-                       if (chl)
+                       if (chl && !sourceHit)
                        {
                                ChanHit best;
                                
@@ -1117,25 +1128,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                        cnt++;
                                }
 
-                               if (sourceHit) {
-                                       char tmp[50];
-                                       sourceHit->writeAtoms(atom2, chanInfo.id);
-                                       sourceHit->host.toStr(tmp);
-                                       LOG_DEBUG("relay info(sourceHit): %s", tmp);
-                                       best.host.ip = sourceHit->host.ip;
-                               }
-
                                if (!best.host.ip){
                                        char tmp[50];
 //                                     chanMgr->hitlistlock.on();
-                                       int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
+                                       int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
 //                                     chanMgr->hitlistlock.off();
-                                       for (int i = 0; i < cnt; i++){
+                                       for (int i = 0; i < rhcnt; i++){
                                                chs.best[i].writeAtoms(atom2, chanInfo.id);
                                                chs.best[i].host.toStr(tmp);
                                                LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
                                                best.host.ip = chs.best[i].host.ip;
                                        }
+                                       cnt += rhcnt;
                                }
 
                                if (cnt)
@@ -1352,6 +1356,13 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                {
                        handshakeIncomingPCP(atom,rhost,remoteID,agent);
                        atom.writeInt(PCP_OK,0);
+                       if (rhost.globalIP())
+                               serventHit.rhost[0] = rhost;
+                       else
+                               serventHit.rhost[1] = rhost;
+                       serventHit.sessionID = remoteID;
+                       serventHit.numHops = 1;
+                       chanMgr->addHit(serventHit);
                }
 
        }
index 9470076..f9dc302 100644 (file)
@@ -2762,7 +2762,7 @@ int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit)
                        Host h = s->getHost();
 
                        ChanHit hit = s->serventHit;
-                       if (!hit.relay && hit.numRelays == 0)
+                       if (!hit.relay && hit.numRelays == 0 || hit.firewalled)
                        {
                                char hostName[256];
                                h.toStr(hostName);
index da49db2..2ae3f0f 100644 (file)
@@ -45,8 +45,8 @@ extern int version_ex; // PP
 //#define VERSION_EX 1
 static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
 static const int  PCP_CLIENT_VERSION_EX_NUMBER = 27;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-2)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-2)";
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-3)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-3)";
 #endif
 
 // ------------------------------------------------