LOG_DEBUG("Channel sleeping for %d seconds",diff);
for(unsigned int i=0; i<diff; i++)
{
+ if (ch->info.lastPlayEnd == 0) // reconnected
+ break;
if (!thread->active || peercastInst->isQuitting){
thread->active = false;
break;
}
}
- // find tracker
- unsigned int ctime = sys->getTime();
- if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
- if (ch->trackerHit.lastContact + 30 < ctime){
- ch->sourceHost = ch->trackerHit;
- ch->trackerHit.lastContact = ctime;
- LOG_DEBUG("use saved tracker");
- }
- }
-
// else find global tracker
if (!ch->sourceHost.host.ip)
{
LOG_DEBUG("use global tracker");
}
}
+
+ // find tracker
+ unsigned int ctime = sys->getTime();
+ if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
+ if (ch->trackerHit.lastContact + 30 < ctime){
+ ch->sourceHost = ch->trackerHit;
+ ch->trackerHit.lastContact = ctime;
+ LOG_DEBUG("use saved tracker");
+ }
+ }
}
chanMgr->hitlistlock.off();
break;
//if (rawData.writePos > 0)
- if (rawData.lastWriteTime > 0)
+ if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
{
if (isBroadcasting())
{
lock.on();
+ unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
unsigned int fpos = getStreamPos(firstPos);
- if (spos < fpos)
+ unsigned int lpos = getStreamPos(lastPos);
+ if (spos < fpos && (fpos < lpos || spos > lpos + bound))
spos = fpos;
{
//ChanPacket &p = packets[i%MAX_PACKETS];
ChanPacketv &p = packets[i%MAX_PACKETS];
- if (p.pos >= spos)
+ if (p.pos >= spos && p.pos - spos <= bound)
{
pack.init(p);
lock.off();
if (packets[lastPos%MAX_PACKETS].type == ChanPacket::T_HEAD) lpos = 0;
if (lpos && (diff == 0 || diff > 0xfff00000)) {
LOG_DEBUG("* latest pos=%d, pack pos=%d", getLatestPos(), pack.pos);
+ lastSkipTime = sys->getTime();
return false;
}
}
} else if (!(c->thread.active)){
c->thread.active = true;
c->thread.finish = false;
+ c->info.lastPlayStart = 0; // reconnect
+ c->info.lastPlayEnd = 0;
if (c->finthread){
c->finthread->finish = true;
c->finthread = NULL;
}
c = c->next;
}
- int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+ unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+ int diff = servMgr->maxRelays - numRelay;
if (ch->localRelays()){
if (noRelay > diff){
noRelay = diff;
needRate = 0;
}
- // for PCRaw (relay) start.
- bool force_off = false;
-
- if(isIndexTxt(ch))
- force_off = true;
- // for PCRaw (relay) end.
-
// ratefull = servMgr->bitrateFull(needRate+bitrate);
ratefull = (servMgr->maxBitrateOut < allRate + needRate + ch->info.bitrate);
- //relay = (!relayfull) && (!chfull) && (!ratefull) && ((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays);
- relay = (!relayfull || force_off) && (!chfull) && (!ratefull)
- && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays) || force_off); // for PCRaw (relay) (force_off)
+ if (!isIndexTxt(ch))
+ relay = (!relayfull) && (!chfull) && (!ratefull) && (numRelay + noRelay < servMgr->maxRelays);
+ else
+ relay = (!chfull) && (!ratefull); // for PCRaw (relay)
/* if (relayfull){
LOG_DEBUG("Reject by relay full");
else
chl->hit = next;
- char ip0str[64],ip1str[64];
- chh->rhost[0].toStr(ip0str);
- chh->rhost[1].toStr(ip1str);
- LOG_DEBUG("Delete hit (servent_id=%d): F%dT%dR%d %s/%s",
- chh->servent_id,chh->firewalled,chh->tracker,chh->relay,ip0str,ip1str);
-
delete chh;
chh = next;
}
allow = a;
thread.data = this;
thread.func = incomingProc;
+ thread.finish = false;
setStatus(S_PROTOCOL);
bool chanFound=false;
bool chanReady=false;
+ ChanHit *sourceHit = NULL;
+
Channel *ch = chanMgr->findChannelByID(chanInfo.id);
if (ch)
{
sendHeader = true;
-// if (reqPos)
-// {
-// streamPos = ch->rawData.findOldestPos(reqPos);
-// }else
-// {
- streamPos = ch->rawData.getLatestPos();
-// }
+ if (reqPos || !isIndexTxt(&chanInfo))
+ {
+ streamPos = ch->rawData.findOldestPos(reqPos);
+ //streamPos = ch->rawData.getLatestPos();
+ }else
+ {
+ streamPos = ch->rawData.getLatestPos();
+ }
chanID = chanInfo.id;
canStreamLock.on();
chanReady = canStream(ch);
+ if (0 && !chanReady)
+ {
+ if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+ {
+ sourceHit = &ch->sourceHost; // send source host info
+
+ if (ch->info.getUptime() > 60) // if stable
+ {
+ // connect "this" host later
+ ChanHit nh;
+ nh.init();
+ nh.chanID = chanID;
+ nh.rhost[0] = getHost();
+ chanMgr->addHit(nh);
+ }
+
+ char tmp[50];
+ getHost().toStr(tmp);
+ LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
+ ch->bump = true;
+ }
+ else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+ {
+ chanReady = canStream(ch);
+ if (!chanReady)
+ LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+ }
+ }
if (!chanReady) type = T_INCOMING;
thread.active = chanReady;
setStatus(S_CONNECTED);
// search for up to 8 other hits
int cnt=0;
- for(int i=0; i<8; i++)
+ int i;
+ for(i=0; i<8; i++)
{
best.init();
if (!best.host.ip)
break;
- best.writeAtoms(atom2,chanInfo.id);
+ best.writeAtoms(atom2,chanInfo.id);
cnt++;
}
+ if (sourceHit) {
+ char tmp[50];
+ sourceHit->writeAtoms(atom2, chanInfo.id);
+ chs.best[i].host.toStr(tmp);
+ LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+ best.host.ip = sourceHit->host.ip;
+ }
+
if (!best.host.ip){
char tmp[50];
// chanMgr->hitlistlock.on();
if (sendHeader)
{
atom.writeParent(PCP_CHAN_PKT,3);
- atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
- atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
- atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
+ atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
+ atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
+ atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
- streamPos = ch->headPack.pos+ch->headPack.len;
+ if (streamPos == 0)
+ streamPos = ch->headPack.pos+ch->headPack.len;
LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
}
}
streamPos = rawPack.pos+rawPack.len;
}
} else {
- pcpStream->flush(*sock);
throw StreamException("Channel not found");
}
try
{
+ pcpStream->flush(*sock);
atom.writeInt(PCP_QUIT,error);
}catch(StreamException &) {}
if (!isPrivate())
{
+ if (!ch->isPlaying() || ch->isFull() || ((type == T_DIRECT) && servMgr->directFull()))
+ return false;
+
+ if (!isIndexTxt(ch) && (type == T_RELAY) && (servMgr->relaysFull()))
+ return false;
+
Channel *c = chanMgr->channel;
int noRelay = 0;
unsigned int needRate = 0;
unsigned int allRate = 0;
while(c){
if (c->isPlaying()){
- allRate += c->info.bitrate * c->localRelays();
- if ((c != ch) && (c->localRelays() == 0)){
+ int nlr = c->localRelays();
+ allRate += c->info.bitrate * nlr;
+ if ((c != ch) && (nlr == 0)){
if(!isIndexTxt(c)) // for PCRaw (relay)
noRelay++;
needRate+=c->info.bitrate;
}
c = c->next;
}
- int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+ unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+ int diff = servMgr->maxRelays - numRelay;
if (ch->localRelays()){
if (noRelay > diff){
noRelay = diff;
needRate = 0;
}
- // for PCRaw (relay) start.
- bool force_off = true;
-
- if(isIndexTxt(ch))
- force_off = false;
- // for PCRaw (relay) end.
-
LOG_DEBUG("Relay check: Max=%d Now=%d Need=%d ch=%d",
servMgr->maxBitrateOut, allRate, needRate, ch->info.bitrate);
- if ( !ch->isPlaying()
- || ch->isFull()
-// || servMgr->bitrateFull(needRate+ch->getBitrate())
- || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
- || ((type == T_RELAY) && servMgr->relaysFull() && force_off) // for PCRaw (relay) (force_off)
- || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off) // for PCRaw (relay) (force_off)
- || ((type == T_DIRECT) && servMgr->directFull())
- ){
+ // if ( !ch->isPlaying()
+ // || ch->isFull()
+ // || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+ // || ((type == T_RELAY) && servMgr->relaysFull() && force_off) // for PCRaw (relay) (force_off)
+ // || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off) // for PCRaw (relay) (force_off)
+ // || ((type == T_DIRECT) && servMgr->directFull())
+ // ){
+
+ if (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+ {
+ LOG_DEBUG("Relay check: NG");
+ return false;
+ }
+
+ if (!isIndexTxt(ch) && (type == T_RELAY) && (numRelay + noRelay >= servMgr->maxRelays))
+ {
LOG_DEBUG("Relay check: NG");
return false;
}
// check password before anything else, if needed
- if (servMgr->password != loginPassword)
+ if (!loginPassword.isSame(servMgr->password))
{
if (!sock->host.isLocalhost() || !loginPassword.isEmpty())
throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
}
}
+// --------------------------------------------------
+static ChanHit *findServentHit(Servent *s)
+{
+ ChanHitList *chl = chanMgr->findHitListByID(s->chanID);
+ Host h = s->getHost();
+
+ if (chl)
+ {
+ ChanHit *hit = chl->hit;
+ while (hit)
+ {
+ if ((hit->numHops == 1) && hit->host.isValid() && (h.ip == hit->host.ip))
+ return hit;
+ hit = hit->next;
+ }
+ }
+ return NULL;
+}
+// --------------------------------------------------
+int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns)
+{
+ Servent *ks = NULL;
+ Servent *s = servMgr->servents;
+
+ while (s)
+ {
+ if (s->type == Servent::T_RELAY && s->chanID.isSame(chid) && !s->isPrivate())
+ {
+ Host h = s->getHost();
+
+ chanMgr->hitlistlock.on();
+ ChanHit *hit = findServentHit(s);
+ if (hit && !hit->relay && hit->numRelays == 0)
+ {
+ char hostName[256];
+ h.toStr(hostName);
+ //s->thread.active = false;
+ LOG_DEBUG("unrelayable Servent : %s",hostName);
+ if (!ks || s->lastConnect < ks->lastConnect) // elder servent
+ ks = s;
+ }
+ chanMgr->hitlistlock.off();
+ }
+ s = s->next;
+ }
+
+ if (ks)
+ {
+ if (ns)
+ {
+ Host h = ns->getHost();
+ ChanHit nh;
+ nh.init();
+ nh.chanID = chid;
+ nh.rhost[0] = h;
+
+ ChanPacket pack;
+ MemoryStream mem(pack.data,sizeof(pack.data));
+ AtomStream atom(mem);
+ nh.writeAtoms(atom, chid);
+ pack.len = mem.pos;
+ pack.type = ChanPacket::T_PCP;
+ GnuID noID;
+ noID.clear();
+
+ ks->sendPacket(pack, chid, noID, noID, Servent::T_RELAY);
+ }
+
+ ks->setStatus(Servent::S_CLOSING);
+ ks->thread.active = false;
+
+ char hostName[256];
+ ks->getHost().toStr(hostName);
+
+ LOG_DEBUG("Stop unrelayable Servent : %s",hostName);
+
+ return 1;
+ }
+
+ return 0;
+}
unsigned int kickPushTime;
bool isCheckPushStream(); //JP-EX
void banFirewalledHost(); //JP-EX
+ int kickUnrelayableHost(GnuID &, Servent * = NULL);
bool getModulePath; //JP-EX
bool clearPLS; //JP-EX
*this = tmp;
}
- bool operator == (const char *s) const {return isSame(s);}
- bool operator != (const char *s) const {return !isSame(s);}
+ bool operator == (const char *s) const {return isSame(s);}
+ bool operator != (const char *s) const {return !isSame(s);}
operator const char *() const {return data;}
#endif
// ------------------------------------------------
static const int PCP_CLIENT_VERSION = 1218;
-static const int PCP_CLIENT_VERSION_VP = 24;
+static const int PCP_CLIENT_VERSION_VP = 25;
static const int PCP_ROOT_VERSION = 1218;
static const int PCP_CLIENT_MINVERSION = 1200;
static const char *PCX_AGENT = "PeerCast/0.1218";
static const char *PCX_AGENTJP = "PeerCast/0.1218-J";
-static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0024)";
-static const char *PCX_VERSTRING = "v0.1218(VP0024)";
+static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0025-1)";
+static const char *PCX_VERSTRING = "v0.1218(VP0025-1)";
#if 1 /* for VP extend version */
#define VERSION_EX 1
static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
-static const int PCP_CLIENT_VERSION_EX_NUMBER = 7651;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM7651)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM7651)";
+static const int PCP_CLIENT_VERSION_EX_NUMBER = 7650;
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)";
#endif
// ------------------------------------------------
try
{
char c[1024];
- while (readUpto(&c,1024) > 0)
- if (sys->getTime() - stime > 5) break;
- //readUpto(&c,1);
+ while (read(&c, sizeof(c)) > 0)
+ if (sys->getTime() - stime > 5)
+ break;
}catch(StreamException &) {}
if (closesocket (sockNum))
LOG_DEBUG("Channel sleeping for %d seconds",diff);
for(unsigned int i=0; i<diff; i++)
{
+ if (ch->info.lastPlayEnd == 0) // reconnected
+ break;
if (!thread->active || peercastInst->isQuitting){
thread->active = false;
break;
}
}
- // find tracker
- unsigned int ctime = sys->getTime();
- if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
- if (ch->trackerHit.lastContact + 30 < ctime){
- ch->sourceHost = ch->trackerHit;
- ch->trackerHit.lastContact = ctime;
- LOG_DEBUG("use saved tracker");
- }
- }
-
// else find global tracker
if (!ch->sourceHost.host.ip)
{
LOG_DEBUG("use global tracker");
}
}
+
+ // find tracker
+ unsigned int ctime = sys->getTime();
+ if (!ch->sourceHost.host.ip && tracker_check && ch->trackerHit.host.ip){
+ if (ch->trackerHit.lastContact + 30 < ctime){
+ ch->sourceHost = ch->trackerHit;
+ ch->trackerHit.lastContact = ctime;
+ LOG_DEBUG("use saved tracker");
+ }
+ }
}
chanMgr->hitlistlock.off();
break;
//if (rawData.writePos > 0)
- if (rawData.lastWriteTime > 0)
+ if (rawData.lastWriteTime > 0 || rawData.lastSkipTime > 0)
{
if (isBroadcasting())
{
lock.on();
+ unsigned int bound = packets[0].len * ChanPacketBuffer::MAX_PACKETS * 2; // max packets to wait
unsigned int fpos = getStreamPos(firstPos);
- if (spos < fpos)
+ unsigned int lpos = getStreamPos(lastPos);
+ if (spos < fpos && (fpos < lpos || spos > lpos + bound))
spos = fpos;
{
//ChanPacket &p = packets[i%MAX_PACKETS];
ChanPacketv &p = packets[i%MAX_PACKETS];
- if (p.pos >= spos)
+ if (p.pos >= spos && p.pos - spos <= bound)
{
pack.init(p);
lock.off();
if (packets[lastPos%MAX_PACKETS].type == ChanPacket::T_HEAD) lpos = 0;
if (lpos && (diff == 0 || diff > 0xfff00000)) {
LOG_DEBUG("* latest pos=%d, pack pos=%d", getLatestPos(), pack.pos);
+ lastSkipTime = sys->getTime();
return false;
}
}
} else if (!(c->thread.active)){
c->thread.active = true;
c->thread.finish = false;
+ c->info.lastPlayStart = 0; // reconnect
+ c->info.lastPlayEnd = 0;
if (c->finthread){
c->finthread->finish = true;
c->finthread = NULL;
}
c = c->next;
}
- int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+ unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+ int diff = servMgr->maxRelays - numRelay;
if (ch->localRelays()){
if (noRelay > diff){
noRelay = diff;
needRate = 0;
}
- // for PCRaw (relay) start.
- bool force_off = false;
-
- if(isIndexTxt(ch))
- force_off = true;
- // for PCRaw (relay) end.
-
// ratefull = servMgr->bitrateFull(needRate+bitrate);
ratefull = (servMgr->maxBitrateOut < allRate + needRate + ch->info.bitrate);
- //relay = (!relayfull) && (!chfull) && (!ratefull) && ((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays);
- relay = (!relayfull || force_off) && (!chfull) && (!ratefull)
- && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) < servMgr->maxRelays) || force_off); // for PCRaw (relay) (force_off)
+ if (!isIndexTxt(ch))
+ relay = (!relayfull) && (!chfull) && (!ratefull) && (numRelay + noRelay < servMgr->maxRelays);
+ else
+ relay = (!chfull) && (!ratefull); // for PCRaw (relay)
/* if (relayfull){
LOG_DEBUG("Reject by relay full");
else
chl->hit = next;
- char ip0str[64],ip1str[64];
- chh->rhost[0].toStr(ip0str);
- chh->rhost[1].toStr(ip1str);
- LOG_DEBUG("Delete hit (servent_id=%d): F%dT%dR%d %s/%s",
- chh->servent_id,chh->firewalled,chh->tracker,chh->relay,ip0str,ip1str);
-
delete chh;
chh = next;
}
allow = a;
thread.data = this;
thread.func = incomingProc;
+ thread.finish = false;
setStatus(S_PROTOCOL);
bool chanFound=false;
bool chanReady=false;
+ ChanHit *sourceHit = NULL;
+
Channel *ch = chanMgr->findChannelByID(chanInfo.id);
if (ch)
{
sendHeader = true;
-// if (reqPos)
-// {
-// streamPos = ch->rawData.findOldestPos(reqPos);
-// }else
-// {
- streamPos = ch->rawData.getLatestPos();
-// }
+ if (reqPos || !isIndexTxt(&chanInfo))
+ {
+ streamPos = ch->rawData.findOldestPos(reqPos);
+ //streamPos = ch->rawData.getLatestPos();
+ }else
+ {
+ streamPos = ch->rawData.getLatestPos();
+ }
chanID = chanInfo.id;
canStreamLock.on();
chanReady = canStream(ch);
+ if (0 && !chanReady)
+ {
+ if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
+ {
+ sourceHit = &ch->sourceHost; // send source host info
+
+ if (ch->info.getUptime() > 60) // if stable
+ {
+ // connect "this" host later
+ ChanHit nh;
+ nh.init();
+ nh.chanID = chanID;
+ nh.rhost[0] = getHost();
+ chanMgr->addHit(nh);
+ }
+
+ char tmp[50];
+ getHost().toStr(tmp);
+ LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
+ ch->bump = true;
+ }
+ else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+ {
+ chanReady = canStream(ch);
+ if (!chanReady)
+ LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+ }
+ }
if (!chanReady) type = T_INCOMING;
thread.active = chanReady;
setStatus(S_CONNECTED);
// search for up to 8 other hits
int cnt=0;
- for(int i=0; i<8; i++)
+ int i;
+ for(i=0; i<8; i++)
{
best.init();
if (!best.host.ip)
break;
- best.writeAtoms(atom2,chanInfo.id);
+ best.writeAtoms(atom2,chanInfo.id);
cnt++;
}
+ if (sourceHit) {
+ char tmp[50];
+ sourceHit->writeAtoms(atom2, chanInfo.id);
+ chs.best[i].host.toStr(tmp);
+ LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+ best.host.ip = sourceHit->host.ip;
+ }
+
if (!best.host.ip){
char tmp[50];
// chanMgr->hitlistlock.on();
if (sendHeader)
{
atom.writeParent(PCP_CHAN_PKT,3);
- atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
- atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
- atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
+ atom.writeID4(PCP_CHAN_PKT_TYPE,PCP_CHAN_PKT_HEAD);
+ atom.writeInt(PCP_CHAN_PKT_POS,ch->headPack.pos);
+ atom.writeBytes(PCP_CHAN_PKT_DATA,ch->headPack.data,ch->headPack.len);
- streamPos = ch->headPack.pos+ch->headPack.len;
+ if (streamPos == 0)
+ streamPos = ch->headPack.pos+ch->headPack.len;
LOG_DEBUG("Sent %d bytes header",ch->headPack.len);
}
}
streamPos = rawPack.pos+rawPack.len;
}
} else {
- pcpStream->flush(*sock);
throw StreamException("Channel not found");
}
try
{
+ pcpStream->flush(*sock);
atom.writeInt(PCP_QUIT,error);
}catch(StreamException &) {}
if (!isPrivate())
{
+ if (!ch->isPlaying() || ch->isFull() || ((type == T_DIRECT) && servMgr->directFull()))
+ return false;
+
+ if (!isIndexTxt(ch) && (type == T_RELAY) && (servMgr->relaysFull()))
+ return false;
+
Channel *c = chanMgr->channel;
int noRelay = 0;
unsigned int needRate = 0;
unsigned int allRate = 0;
while(c){
if (c->isPlaying()){
- allRate += c->info.bitrate * c->localRelays();
- if ((c != ch) && (c->localRelays() == 0)){
+ int nlr = c->localRelays();
+ allRate += c->info.bitrate * nlr;
+ if ((c != ch) && (nlr == 0)){
if(!isIndexTxt(c)) // for PCRaw (relay)
noRelay++;
needRate+=c->info.bitrate;
}
c = c->next;
}
- int diff = servMgr->maxRelays - servMgr->numStreams(Servent::T_RELAY,false);
+ unsigned int numRelay = servMgr->numStreams(Servent::T_RELAY,false);
+ int diff = servMgr->maxRelays - numRelay;
if (ch->localRelays()){
if (noRelay > diff){
noRelay = diff;
needRate = 0;
}
- // for PCRaw (relay) start.
- bool force_off = true;
-
- if(isIndexTxt(ch))
- force_off = false;
- // for PCRaw (relay) end.
-
LOG_DEBUG("Relay check: Max=%d Now=%d Need=%d ch=%d",
servMgr->maxBitrateOut, allRate, needRate, ch->info.bitrate);
- if ( !ch->isPlaying()
- || ch->isFull()
-// || servMgr->bitrateFull(needRate+ch->getBitrate())
- || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
- || ((type == T_RELAY) && servMgr->relaysFull() && force_off) // for PCRaw (relay) (force_off)
- || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off) // for PCRaw (relay) (force_off)
- || ((type == T_DIRECT) && servMgr->directFull())
- ){
+ // if ( !ch->isPlaying()
+ // || ch->isFull()
+ // || (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+ // || ((type == T_RELAY) && servMgr->relaysFull() && force_off) // for PCRaw (relay) (force_off)
+ // || ((type == T_RELAY) && (((servMgr->numStreams(Servent::T_RELAY,false) + noRelay) >= servMgr->maxRelays)) && force_off) // for PCRaw (relay) (force_off)
+ // || ((type == T_DIRECT) && servMgr->directFull())
+ // ){
+
+ if (allRate + needRate + ch->info.bitrate > servMgr->maxBitrateOut)
+ {
+ LOG_DEBUG("Relay check: NG");
+ return false;
+ }
+
+ if (!isIndexTxt(ch) && (type == T_RELAY) && (numRelay + noRelay >= servMgr->maxRelays))
+ {
LOG_DEBUG("Relay check: NG");
return false;
}
// check password before anything else, if needed
- if (servMgr->password != loginPassword)
+ if (!loginPassword.isSame(servMgr->password))
{
if (!sock->host.isLocalhost() || !loginPassword.isEmpty())
throw HTTPException(HTTP_SC_UNAUTHORIZED,401);
}
}
+// --------------------------------------------------
+static ChanHit *findServentHit(Servent *s)
+{
+ ChanHitList *chl = chanMgr->findHitListByID(s->chanID);
+ Host h = s->getHost();
+
+ if (chl)
+ {
+ ChanHit *hit = chl->hit;
+ while (hit)
+ {
+ if ((hit->numHops == 1) && hit->host.isValid() && (h.ip == hit->host.ip))
+ return hit;
+ hit = hit->next;
+ }
+ }
+ return NULL;
+}
+// --------------------------------------------------
+int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns)
+{
+ Servent *ks = NULL;
+ Servent *s = servMgr->servents;
+
+ while (s)
+ {
+ if (s->type == Servent::T_RELAY && s->chanID.isSame(chid) && !s->isPrivate())
+ {
+ Host h = s->getHost();
+
+ chanMgr->hitlistlock.on();
+ ChanHit *hit = findServentHit(s);
+ if (hit && !hit->relay && hit->numRelays == 0)
+ {
+ char hostName[256];
+ h.toStr(hostName);
+ //s->thread.active = false;
+ LOG_DEBUG("unrelayable Servent : %s",hostName);
+ if (!ks || s->lastConnect < ks->lastConnect) // elder servent
+ ks = s;
+ }
+ chanMgr->hitlistlock.off();
+ }
+ s = s->next;
+ }
+
+ if (ks)
+ {
+ if (ns)
+ {
+ Host h = ns->getHost();
+ ChanHit nh;
+ nh.init();
+ nh.chanID = chid;
+ nh.rhost[0] = h;
+
+ ChanPacket pack;
+ MemoryStream mem(pack.data,sizeof(pack.data));
+ AtomStream atom(mem);
+ nh.writeAtoms(atom, chid);
+ pack.len = mem.pos;
+ pack.type = ChanPacket::T_PCP;
+ GnuID noID;
+ noID.clear();
+
+ ks->sendPacket(pack, chid, noID, noID, Servent::T_RELAY);
+ }
+
+ ks->setStatus(Servent::S_CLOSING);
+ ks->thread.active = false;
+
+ char hostName[256];
+ ks->getHost().toStr(hostName);
+
+ LOG_DEBUG("Stop unrelayable Servent : %s",hostName);
+
+ return 1;
+ }
+
+ return 0;
+}
unsigned int kickPushTime;
bool isCheckPushStream(); //JP-EX
void banFirewalledHost(); //JP-EX
+ int kickUnrelayableHost(GnuID &, Servent * = NULL);
bool getModulePath; //JP-EX
bool clearPLS; //JP-EX
*this = tmp;
}
- bool operator == (const char *s) const {return isSame(s);}
- bool operator != (const char *s) const {return !isSame(s);}
+ bool operator == (const char *s) const {return isSame(s);}
+ bool operator != (const char *s) const {return !isSame(s);}
operator const char *() const {return data;}
#endif
// ------------------------------------------------
static const int PCP_CLIENT_VERSION = 1218;
-static const int PCP_CLIENT_VERSION_VP = 24;
+static const int PCP_CLIENT_VERSION_VP = 25;
static const int PCP_ROOT_VERSION = 1218;
static const int PCP_CLIENT_MINVERSION = 1200;
static const char *PCX_AGENT = "PeerCast/0.1218";
static const char *PCX_AGENTJP = "PeerCast/0.1218-J";
-static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0024)";
-static const char *PCX_VERSTRING = "v0.1218(VP0024)";
+static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0025-1)";
+static const char *PCX_VERSTRING = "v0.1218(VP0025-1)";
#if 1 /* for VP extend version */
#define VERSION_EX 1
static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
-static const int PCP_CLIENT_VERSION_EX_NUMBER = 7651;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM7651)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM7651)";
+static const int PCP_CLIENT_VERSION_EX_NUMBER = 7650;
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)";
#endif
// ------------------------------------------------
try
{
char c[1024];
- while (readUpto(&c,1024) > 0)
- if (sys->getTime() - stime > 5) break;
- //readUpto(&c,1);
+ while (read(&c, sizeof(c)) > 0)
+ if (sys->getTime() - stime > 5)
+ break;
}catch(StreamException &) {}
if (closesocket (sockNum))