bool next_yp = false;
bool tracker_check = (ch->trackerHit.host.ip != 0);
int connFailCnt = 0;
+ int keepDownstreamTime = 7;
+
+ if (isIndexTxt(&ch->info))
+ keepDownstreamTime = 30;
ch->lastStopTime = 0;
ch->bumped = false;
chanMgr->hitlistlock.off();
- if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) {
+ if (servMgr->keepDownstreams && ch->lastStopTime
+ && ch->lastStopTime < sys->getTime() - keepDownstreamTime)
+ {
ch->lastStopTime = 0;
LOG_DEBUG("------------ disconnect all downstreams");
ChanPacket pack;
ch->trackerHit.lastContact = ctime - 30 + (rand() % 30);
// broadcast source host
- if (!error && ch->sourceHost.host.ip) { // if closed normally
+ if (!got503 && !error && ch->sourceHost.host.ip) { // if closed normally
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
}
// broadcast quit to any connected downstream servents
- if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) {
+ if (!servMgr->keepDownstreams || !got503 && (ch->sourceHost.tracker || !error)) {
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
{
unsigned int ctime = sys->getTime();
+ if ((ch->isPlaying() == isPlaying)){
+ if ((ctime-lastUpdate) < 10){
+ return false;
+ }
+
+ if ((ctime-lastCheckTime) < 5){
+ return false;
+ }
+ lastCheckTime = ctime;
+ }
+
ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
if (!chl)
int newLocalListeners = ch->localListeners();
int newLocalRelays = ch->localRelays();
- if ((ch->isPlaying() == isPlaying)){
- if ((ctime-lastUpdate) < 10){
- return false;
- }
-
- if ((ctime-lastCheckTime) < 10){
- return false;
- }
- lastCheckTime = ctime;
- }
-
unsigned int oldp = ch->rawData.getOldestPos();
unsigned int newp = ch->rawData.getLatestPos();
setStatus(Channel::S_RECEIVING);
bumped = false;
}
- source->updateStatus(this);
+ //source->updateStatus(this);
}
}
+ if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
+ source->updateStatus(this);
unsigned int t = sys->getTime();
if (t != ptime) {
lock.on();
unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
- unsigned int fpos = getStreamPos(firstPos);
- unsigned int lpos = getStreamPos(lastPos);
- if (spos < fpos && (fpos < lpos || spos > lpos + bound))
+ unsigned int fpos = getFirstDataPos();
+ unsigned int lpos = getLatestPos();
+ if ((spos < fpos && fpos <= lpos && spos != getStreamPosEnd(lastPos)) // --s-----f---l--
+ || (spos < fpos && lpos < fpos && spos > lpos + bound) // -l-------s--f--
+ || (spos > lpos && lpos >= fpos && spos - lpos > bound)) // --f---l------s-
spos = fpos;
}
// -----------------------------------
+unsigned int ChanPacketBuffer::getFirstDataPos()
+{
+ if (!writePos)
+ return 0;
+ for(unsigned int i=firstPos; i<=lastPos; i++)
+ {
+ if (packets[i%MAX_PACKETS].type == ChanPacket::T_DATA)
+ return packets[i%MAX_PACKETS].pos;
+ }
+ return 0;
+}
+// -----------------------------------
unsigned int ChanPacketBuffer::getOldestPos()
{
if (!writePos)
return 0;
}
+// -----------------------------------
+unsigned int ChanHitList::getSeq()
+{
+ unsigned int seq;
+ seqLock.on();
+ seq = riSequence = (riSequence + 1) & 0xffffff;
+ seqLock.off();
+ return seq;
+}
// -----------------------------------
const char *ChanInfo::getTypeStr(TYPE t)
int index = 0;
int prob;
int rnd;
- static int base = 0x400;
+ int base = 0x400;
ChanHit tmpHit[MAX_RESULTS];
- static WLock seqLock;
- static unsigned int riSequence = 0;
//srand(seed);
//seed += 11;
- unsigned int seq;
- seqLock.on();
- seq = riSequence++;
- riSequence &= 0xffffff;
- seqLock.off();
-
- Servent *s = servMgr->servents;
- while (s) {
- if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
- && s->chanID.isSame(chl->info.id)) {
- int i = index % MAX_RESULTS;
- if (index < MAX_RESULTS
- || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
- s->serventHit.lastSendSeq = seq;
- tmpHit[i] = s->serventHit;
- tmpHit[i].host = s->serventHit.rhost[0];
- index++;
- }
- }
- s = s->next;
- }
+ unsigned int seq = chl->getSeq();
ChanHit *hit = chl->hit;
while(hit){
- if (hit->host.ip && !hit->dead){
+ if (hit->rhost[0].ip && !hit->dead) {
if (
(!exID.isSame(hit->sessionID))
// && (hit->relay)
//rnd = (float)rand() / (float)RAND_MAX;
rnd = rand() % base;
if (hit->numHops == 1){
-#if 0
if (tmpHit[index % MAX_RESULTS].numHops == 1){
if (rnd < prob){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
}
-#endif
} else {
- if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
+ if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
best[use[i]] = tmpHit[i];
}*/
- int use[MAX_RESULTS];
- int i;
- for (i = 0; i < cnt; i++) {
- use[i] = (i + seq) % cnt;
- }
-
- for (i = 0; i < cnt; i++){
+ for (int i = 0; i < cnt; i++){
// LOG_DEBUG("%d", use[i]);
- best[use[i]] = tmpHit[i];
+ best[(i + seq) % cnt] = tmpHit[i];
}
// for (i = 0; i < cnt; i++){
// char tmp[50];
int getTotalRelays();
int getTotalFirewalled();
+ unsigned int getSeq();
+
bool used;
ChanInfo info;
ChanHit *hit;
unsigned int lastHitTime;
ChanHitList *next;
-
+ WLock seqLock;
+ unsigned int riSequence;
};
// ----------------------------------
class ChanHitSearch
int numPending() {return writePos-readPos;}
+ unsigned int getFirstDataPos();
unsigned int getLatestPos();
unsigned int getOldestPos();
unsigned int findOldestPos(unsigned int);
if (sv && sv->getHost().ip == hit.host.ip){
// LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
sv->waitPort = hit.host.port;
- hit.lastSendSeq = sv->serventHit.lastSendSeq;
+ //hit.lastSendSeq = sv->serventHit.lastSendSeq;
sv->serventHit = hit;
}
}
((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
&& hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
|| (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
- || chanMgr->findParentHit(hit)))
+ || (hit.numHops != 1 && chanMgr->findParentHit(hit))))
{
int oldPos = pmem.pos;
hit.writeAtoms(patom, hit.chanID);
}
chanID = chanInfo.id;
- serventHit.rhost[0].ip = getHost().ip;
- serventHit.rhost[0].port = listenPort;
- serventHit.host = serventHit.rhost[0];
+ serventHit.host.ip = getHost().ip;
+ serventHit.host.port = listenPort;
+ if (serventHit.host.globalIP())
+ serventHit.rhost[0] = serventHit.host;
+ else
+ serventHit.rhost[1] = serventHit.host;
serventHit.chanID = chanID;
canStreamLock.on();
chanReady = canStream(ch);
- if (/*0 && */!chanReady)
+ if (0 && !chanReady && ch->isPlaying())
{
- if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+ if (ch->info.getUptime() > 60
+ && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
{
sourceHit = &ch->sourceHost; // send source host info
- if (listenPort && ch->info.getUptime() > 60) // if stable
+ if (listenPort)
{
// connect "this" host later
chanMgr->addHit(serventHit);
int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
+ if (sourceHit) {
+ sourceHit->writeAtoms(atom2,chanInfo.id);
+ char tmp[50];
+ sourceHit->host.toStr(tmp);
+ LOG_DEBUG("relay info(sourceHit): %s", tmp);
+ }
+
chanMgr->hitlistlock.on();
chl = chanMgr->findHitList(chanInfo);
- if (chl)
+ if (chl && !sourceHit)
{
ChanHit best;
cnt++;
}
- if (sourceHit) {
- char tmp[50];
- sourceHit->writeAtoms(atom2, chanInfo.id);
- sourceHit->host.toStr(tmp);
- LOG_DEBUG("relay info(sourceHit): %s", tmp);
- best.host.ip = sourceHit->host.ip;
- }
-
if (!best.host.ip){
char tmp[50];
// chanMgr->hitlistlock.on();
- int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
+ int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
// chanMgr->hitlistlock.off();
- for (int i = 0; i < cnt; i++){
+ for (int i = 0; i < rhcnt; i++){
chs.best[i].writeAtoms(atom2, chanInfo.id);
chs.best[i].host.toStr(tmp);
LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
best.host.ip = chs.best[i].host.ip;
}
+ cnt += rhcnt;
}
if (cnt)
{
handshakeIncomingPCP(atom,rhost,remoteID,agent);
atom.writeInt(PCP_OK,0);
+ if (rhost.globalIP())
+ serventHit.rhost[0] = rhost;
+ else
+ serventHit.rhost[1] = rhost;
+ serventHit.sessionID = remoteID;
+ serventHit.numHops = 1;
+ chanMgr->addHit(serventHit);
}
}
Host h = s->getHost();
ChanHit hit = s->serventHit;
- if (!hit.relay && hit.numRelays == 0)
+ if (!hit.relay && hit.numRelays == 0 || hit.firewalled)
{
char hostName[256];
h.toStr(hostName);
//#define VERSION_EX 1
static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
static const int PCP_CLIENT_VERSION_EX_NUMBER = 27;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-2)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-2)";
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-3)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-3)";
#endif
// ------------------------------------------------
bool next_yp = false;
bool tracker_check = (ch->trackerHit.host.ip != 0);
int connFailCnt = 0;
+ int keepDownstreamTime = 7;
+
+ if (isIndexTxt(&ch->info))
+ keepDownstreamTime = 30;
ch->lastStopTime = 0;
ch->bumped = false;
chanMgr->hitlistlock.off();
- if (servMgr->keepDownstreams && ch->lastStopTime && ch->lastStopTime < sys->getTime() - 7) {
+ if (servMgr->keepDownstreams && ch->lastStopTime
+ && ch->lastStopTime < sys->getTime() - keepDownstreamTime)
+ {
ch->lastStopTime = 0;
LOG_DEBUG("------------ disconnect all downstreams");
ChanPacket pack;
ch->trackerHit.lastContact = ctime - 30 + (rand() % 30);
// broadcast source host
- if (!error && ch->sourceHost.host.ip) { // if closed normally
+ if (!got503 && !error && ch->sourceHost.host.ip) { // if closed normally
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
}
// broadcast quit to any connected downstream servents
- if (!servMgr->keepDownstreams || (ch->sourceHost.tracker && !got503) || !error) {
+ if (!servMgr->keepDownstreams || !got503 && (ch->sourceHost.tracker || !error)) {
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
{
unsigned int ctime = sys->getTime();
+ if ((ch->isPlaying() == isPlaying)){
+ if ((ctime-lastUpdate) < 10){
+ return false;
+ }
+
+ if ((ctime-lastCheckTime) < 5){
+ return false;
+ }
+ lastCheckTime = ctime;
+ }
+
ChanHitList *chl = chanMgr->findHitListByID(ch->info.id);
if (!chl)
int newLocalListeners = ch->localListeners();
int newLocalRelays = ch->localRelays();
- if ((ch->isPlaying() == isPlaying)){
- if ((ctime-lastUpdate) < 10){
- return false;
- }
-
- if ((ctime-lastCheckTime) < 10){
- return false;
- }
- lastCheckTime = ctime;
- }
-
unsigned int oldp = ch->rawData.getOldestPos();
unsigned int newp = ch->rawData.getLatestPos();
setStatus(Channel::S_RECEIVING);
bumped = false;
}
- source->updateStatus(this);
+ //source->updateStatus(this);
}
}
+ if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
+ source->updateStatus(this);
unsigned int t = sys->getTime();
if (t != ptime) {
lock.on();
unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
- unsigned int fpos = getStreamPos(firstPos);
- unsigned int lpos = getStreamPos(lastPos);
- if (spos < fpos && (fpos < lpos || spos > lpos + bound))
+ unsigned int fpos = getFirstDataPos();
+ unsigned int lpos = getLatestPos();
+ if ((spos < fpos && fpos <= lpos && spos != getStreamPosEnd(lastPos)) // --s-----f---l--
+ || (spos < fpos && lpos < fpos && spos > lpos + bound) // -l-------s--f--
+ || (spos > lpos && lpos >= fpos && spos - lpos > bound)) // --f---l------s-
spos = fpos;
}
// -----------------------------------
+unsigned int ChanPacketBuffer::getFirstDataPos()
+{
+ if (!writePos)
+ return 0;
+ for(unsigned int i=firstPos; i<=lastPos; i++)
+ {
+ if (packets[i%MAX_PACKETS].type == ChanPacket::T_DATA)
+ return packets[i%MAX_PACKETS].pos;
+ }
+ return 0;
+}
+// -----------------------------------
unsigned int ChanPacketBuffer::getOldestPos()
{
if (!writePos)
return 0;
}
+// -----------------------------------
+unsigned int ChanHitList::getSeq()
+{
+ unsigned int seq;
+ seqLock.on();
+ seq = riSequence = (riSequence + 1) & 0xffffff;
+ seqLock.off();
+ return seq;
+}
// -----------------------------------
const char *ChanInfo::getTypeStr(TYPE t)
int index = 0;
int prob;
int rnd;
- static int base = 0x400;
+ int base = 0x400;
ChanHit tmpHit[MAX_RESULTS];
- static WLock seqLock;
- static unsigned int riSequence = 0;
//srand(seed);
//seed += 11;
- unsigned int seq;
- seqLock.on();
- seq = riSequence++;
- riSequence &= 0xffffff;
- seqLock.off();
-
- Servent *s = servMgr->servents;
- while (s) {
- if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
- && s->chanID.isSame(chl->info.id)) {
- int i = index % MAX_RESULTS;
- if (index < MAX_RESULTS
- || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
- s->serventHit.lastSendSeq = seq;
- tmpHit[i] = s->serventHit;
- tmpHit[i].host = s->serventHit.rhost[0];
- index++;
- }
- }
- s = s->next;
- }
+ unsigned int seq = chl->getSeq();
ChanHit *hit = chl->hit;
while(hit){
- if (hit->host.ip && !hit->dead){
+ if (hit->rhost[0].ip && !hit->dead) {
if (
(!exID.isSame(hit->sessionID))
// && (hit->relay)
//rnd = (float)rand() / (float)RAND_MAX;
rnd = rand() % base;
if (hit->numHops == 1){
-#if 0
if (tmpHit[index % MAX_RESULTS].numHops == 1){
if (rnd < prob){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
}
-#endif
} else {
- if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
+ if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
best[use[i]] = tmpHit[i];
}*/
- int use[MAX_RESULTS];
- int i;
- for (i = 0; i < cnt; i++) {
- use[i] = (i + seq) % cnt;
- }
-
- for (i = 0; i < cnt; i++){
+ for (int i = 0; i < cnt; i++){
// LOG_DEBUG("%d", use[i]);
- best[use[i]] = tmpHit[i];
+ best[(i + seq) % cnt] = tmpHit[i];
}
// for (i = 0; i < cnt; i++){
// char tmp[50];
int getTotalRelays();
int getTotalFirewalled();
+ unsigned int getSeq();
+
bool used;
ChanInfo info;
ChanHit *hit;
unsigned int lastHitTime;
ChanHitList *next;
-
+ WLock seqLock;
+ unsigned int riSequence;
};
// ----------------------------------
class ChanHitSearch
int numPending() {return writePos-readPos;}
+ unsigned int getFirstDataPos();
unsigned int getLatestPos();
unsigned int getOldestPos();
unsigned int findOldestPos(unsigned int);
if (sv && sv->getHost().ip == hit.host.ip){
// LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
sv->waitPort = hit.host.port;
- hit.lastSendSeq = sv->serventHit.lastSendSeq;
+ //hit.lastSendSeq = sv->serventHit.lastSendSeq;
sv->serventHit = hit;
}
}
((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
&& hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
|| (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
- || chanMgr->findParentHit(hit)))
+ || (hit.numHops != 1 && chanMgr->findParentHit(hit))))
{
int oldPos = pmem.pos;
hit.writeAtoms(patom, hit.chanID);
}
chanID = chanInfo.id;
- serventHit.rhost[0].ip = getHost().ip;
- serventHit.rhost[0].port = listenPort;
- serventHit.host = serventHit.rhost[0];
+ serventHit.host.ip = getHost().ip;
+ serventHit.host.port = listenPort;
+ if (serventHit.host.globalIP())
+ serventHit.rhost[0] = serventHit.host;
+ else
+ serventHit.rhost[1] = serventHit.host;
serventHit.chanID = chanID;
canStreamLock.on();
chanReady = canStream(ch);
- if (/*0 && */!chanReady)
+ if (0 && !chanReady && ch->isPlaying())
{
- if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+ if (ch->info.getUptime() > 60
+ && servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
{
sourceHit = &ch->sourceHost; // send source host info
- if (listenPort && ch->info.getUptime() > 60) // if stable
+ if (listenPort)
{
// connect "this" host later
chanMgr->addHit(serventHit);
int error = PCP_ERROR_QUIT+PCP_ERROR_UNAVAILABLE;
+ if (sourceHit) {
+ sourceHit->writeAtoms(atom2,chanInfo.id);
+ char tmp[50];
+ sourceHit->host.toStr(tmp);
+ LOG_DEBUG("relay info(sourceHit): %s", tmp);
+ }
+
chanMgr->hitlistlock.on();
chl = chanMgr->findHitList(chanInfo);
- if (chl)
+ if (chl && !sourceHit)
{
ChanHit best;
cnt++;
}
- if (sourceHit) {
- char tmp[50];
- sourceHit->writeAtoms(atom2, chanInfo.id);
- sourceHit->host.toStr(tmp);
- LOG_DEBUG("relay info(sourceHit): %s", tmp);
- best.host.ip = sourceHit->host.ip;
- }
-
if (!best.host.ip){
char tmp[50];
// chanMgr->hitlistlock.on();
- int cnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
+ int rhcnt = chs.getRelayHost(servMgr->serverHost, rhost, remoteID, chl);
// chanMgr->hitlistlock.off();
- for (int i = 0; i < cnt; i++){
+ for (int i = 0; i < rhcnt; i++){
chs.best[i].writeAtoms(atom2, chanInfo.id);
chs.best[i].host.toStr(tmp);
LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
best.host.ip = chs.best[i].host.ip;
}
+ cnt += rhcnt;
}
if (cnt)
{
handshakeIncomingPCP(atom,rhost,remoteID,agent);
atom.writeInt(PCP_OK,0);
+ if (rhost.globalIP())
+ serventHit.rhost[0] = rhost;
+ else
+ serventHit.rhost[1] = rhost;
+ serventHit.sessionID = remoteID;
+ serventHit.numHops = 1;
+ chanMgr->addHit(serventHit);
}
}
Host h = s->getHost();
ChanHit hit = s->serventHit;
- if (!hit.relay && hit.numRelays == 0)
+ if (!hit.relay && hit.numRelays == 0 || hit.firewalled)
{
char hostName[256];
h.toStr(hostName);
//#define VERSION_EX 1
static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
static const int PCP_CLIENT_VERSION_EX_NUMBER = 27;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-2)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-2)";
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0027-3)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM0027-3)";
#endif
// ------------------------------------------------