else
chl->hit = next;
- char ip0str[64],ip1str[64];
- chh->rhost[0].toStr(ip0str);
- chh->rhost[1].toStr(ip1str);
- LOG_DEBUG("Delete hit (servent_id=%d): F%dT%dR%d %s/%s",
- chh->servent_id,chh->firewalled,chh->tracker,chh->relay,ip0str,ip1str);
-
delete chh;
chh = next;
}
allow = a;
thread.data = this;
thread.func = incomingProc;
+ thread.finish = false;
setStatus(S_PROTOCOL);
bool chanFound=false;
bool chanReady=false;
+ ChanHit *sourceHit = NULL;
+
Channel *ch = chanMgr->findChannelByID(chanInfo.id);
if (ch)
{
sendHeader = true;
-// if (reqPos)
-// {
-// streamPos = ch->rawData.findOldestPos(reqPos);
-// }else
-// {
- streamPos = ch->rawData.getLatestPos();
-// }
+ if (reqPos || !isIndexTxt(&chanInfo))
+ {
+ streamPos = ch->rawData.findOldestPos(reqPos);
+ //streamPos = ch->rawData.getLatestPos();
+ }else
+ {
+ streamPos = ch->rawData.getLatestPos();
+ }
chanID = chanInfo.id;
canStreamLock.on();
chanReady = canStream(ch);
+ if (0 && !chanReady)
+ {
+ if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+ {
+ sourceHit = &ch->sourceHost; // send source host info
+
+ if (ch->info.getUptime() > 60) // if stable
+ {
+ // connect "this" host later
+ ChanHit nh;
+ nh.init();
+ nh.chanID = chanID;
+ nh.rhost[0] = getHost();
+ chanMgr->addHit(nh);
+ }
+
+ char tmp[50];
+ getHost().toStr(tmp);
+ LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
+ ch->bump = true;
+ }
+ else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+ {
+ chanReady = canStream(ch);
+ if (!chanReady)
+ LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+ }
+ }
if (!chanReady) type = T_INCOMING;
thread.active = chanReady;
setStatus(S_CONNECTED);
// search for up to 8 other hits
int cnt=0;
- for(int i=0; i<8; i++)
+ int i;
+ for(i=0; i<8; i++)
{
best.init();
if (!best.host.ip)
break;
- best.writeAtoms(atom2,chanInfo.id);
+ best.writeAtoms(atom2,chanInfo.id);
cnt++;
}
+ if (sourceHit) {
+ char tmp[50];
+ sourceHit->writeAtoms(atom2, chanInfo.id);
+ chs.best[i].host.toStr(tmp);
+ LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+ best.host.ip = sourceHit->host.ip;
+ }
+
if (!best.host.ip){
char tmp[50];
// chanMgr->hitlistlock.on();
if (sendHeader)
{
atom.writeParent(PCP_CHAN_PKT,3);
- atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
- atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
- atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
+ atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
+ atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
+ atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
- streamPos = ch->headPack.pos+ch->headPack.len;
+ if (streamPos == 0)
+ streamPos = ch->headPack.pos+ch->headPack.len;
LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
}
}
streamPos = rawPack.pos+rawPack.len;
}
} else {
- pcpStream->flush(*sock);
throw StreamException("Channel not found");
}
try
{
+ pcpStream->flush(*sock);
atom.writeInt(PCP_QUIT,error);
}catch(StreamException &) {}