OSDN Git Service

VP25-3マージ
[peercast-im/PeerCastIM.git] / c: / Git / PeerCast.root / PeerCast / core / common / servent.cpp
index bb12277..ad18d0c 100644 (file)
@@ -154,12 +154,6 @@ void       Servent::kill()
                                        else
                                                chl->hit = next;
 
-       char ip0str[64],ip1str[64];
-       chh->rhost[0].toStr(ip0str);
-       chh->rhost[1].toStr(ip1str);
-       LOG_DEBUG("Delete hit (servent_id=%d): F%dT%dR%d %s/%s",
-               chh->servent_id,chh->firewalled,chh->tracker,chh->relay,ip0str,ip1str);
-
                                        delete chh;
                                        chh = next;
                                }
@@ -374,6 +368,7 @@ void Servent::initIncoming(ClientSocket *s, unsigned int a)
                allow = a;
                thread.data = this;
                thread.func = incomingProc;
+               thread.finish = false;
 
                setStatus(S_PROTOCOL);
 
@@ -862,21 +857,52 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
        bool chanFound=false;
        bool chanReady=false;
 
+       ChanHit *sourceHit = NULL;
+
        Channel *ch = chanMgr->findChannelByID(chanInfo.id);
        if (ch)
        {
                sendHeader = true;
-//             if (reqPos)
-//             {
-//                     streamPos = ch->rawData.findOldestPos(reqPos);
-//             }else
-//             {
-                       streamPos = ch->rawData.getLatestPos();
-//             }
+               if (reqPos || !isIndexTxt(&chanInfo))
+               {
+                       streamPos = ch->rawData.findOldestPos(reqPos);
+                       //streamPos = ch->rawData.getLatestPos();
+               }else
+               {
+                       streamPos = ch->rawData.getLatestPos();
+               }
 
                chanID = chanInfo.id;
                canStreamLock.on();
                chanReady = canStream(ch);
+               if (0 && !chanReady)
+               {
+                       if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+                       {
+                               sourceHit = &ch->sourceHost;  // send source host info
+
+                               if (ch->info.getUptime() > 60)  // if stable
+                               {
+                                       // connect "this" host later
+                                       ChanHit nh;
+                                       nh.init();
+                                       nh.chanID = chanID;
+                                       nh.rhost[0] = getHost();
+                                       chanMgr->addHit(nh);
+                               }
+
+                               char tmp[50];
+                               getHost().toStr(tmp);
+                               LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
+                               ch->bump = true;
+                       }
+                       else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+                       {
+                               chanReady = canStream(ch);
+                               if (!chanReady)
+                                       LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+                       }
+               }
                if (!chanReady) type = T_INCOMING;
                thread.active = chanReady;
                setStatus(S_CONNECTED);
@@ -1032,7 +1058,8 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                
                                // search for up to 8 other hits
                                int cnt=0;
-                               for(int i=0; i<8; i++)
+                               int i;
+                               for(i=0; i<8; i++)
                                {
                                        best.init();
 
@@ -1080,10 +1107,18 @@ bool Servent::handshakeStream(ChanInfo &chanInfo)
                                        if (!best.host.ip)
                                                break;
 
-                                       best.writeAtoms(atom2,chanInfo.id);                             
+                                       best.writeAtoms(atom2,chanInfo.id);                        
                                        cnt++;
                                }
 
+                               if (sourceHit) {
+                                       char tmp[50];
+                                       sourceHit->writeAtoms(atom2, chanInfo.id);
+                                       chs.best[i].host.toStr(tmp);
+                                       LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+                                       best.host.ip = sourceHit->host.ip;
+                               }
+
                                if (!best.host.ip){
                                        char tmp[50];
 //                                     chanMgr->hitlistlock.on();
@@ -2792,11 +2827,12 @@ void Servent::sendPCPChannel()
                                if (sendHeader)
                                {
                                        atom.writeParent(PCP_CHAN_PKT,3);
-                                               atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
-                                               atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
-                                               atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
+                                       atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
+                                       atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
+                                       atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
 
-                                       streamPos = ch->headPack.pos+ch->headPack.len;
+                                       if (streamPos == 0)
+                                               streamPos = ch->headPack.pos+ch->headPack.len;
                                        LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
                                }
                }
@@ -2882,7 +2918,6 @@ void Servent::sendPCPChannel()
                                        streamPos = rawPack.pos+rawPack.len;
                                }
                        } else {
-                               pcpStream->flush(*sock);
                                throw StreamException("Channel not found");
                        }
 
@@ -2913,6 +2948,7 @@ void Servent::sendPCPChannel()
 
        try
        {
+               pcpStream->flush(*sock);
                atom.writeInt(PCP_QUIT,error);
        }catch(StreamException &) {}