sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
sock->writeLineF("%s %d",PCX_HS_PCP,1);
+ sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port);
sock->writeLine("");
// -----------------------------------
bool Channel::checkBump()
{
+ unsigned int maxIdleTime = 30;
+ if (isIndexTxt(this)) maxIdleTime = 60;
+
if (!isBroadcasting() && (!sourceHost.tracker))
- if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30))
+ if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime))
{
LOG_ERROR("Channel Auto bumped");
bump = true;
unsigned int receiveStartTime = 0;
+ unsigned int ptime = 0;
+ unsigned int upsize = 0;
+
try
{
while (thread.active && !peercastInst->isQuitting)
}
}
- source->flush(in);
+ unsigned int t = sys->getTime();
+ if (t != ptime) {
+ ptime = t;
+ upsize = Servent::MAX_OUTWARD_SIZE;
+ }
+
+ unsigned int len = source->flushUb(in, upsize);
+ upsize -= len;
sys->sleepIdle();
}
{
type = p.type;
len = p.len;
+ if (len > MAX_DATALEN)
+ throw StreamException("Packet data too large");
pos = p.pos;
sync = p.sync;
skip = p.skip;
+ priority = p.priority;
memcpy(data, p.data, len);
}
// -----------------------------------
memcpy(data,p,len);
pos = _pos;
skip = false;
+ priority = 0;
}
// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
pack.init(packets[readPos%MAX_PACKETS]);
readPos++;
lock.off();
+}
- sys->sleepIdle();
+// -----------------------------------
+void ChanPacketBuffer::readPacketPri(ChanPacket &pack)
+{
+ unsigned int tim = sys->getTime();
+
+ if (readPos < firstPos)
+ throw StreamException("Read too far behind");
+
+ while (readPos >= writePos)
+ {
+ sys->sleepIdle();
+ if ((sys->getTime() - tim) > 30)
+ throw TimeoutException();
+ }
+ lock.on();
+ ChanPacketv *best = &packets[readPos % MAX_PACKETS];
+ for (unsigned int i = readPos + 1; i < writePos; i++) {
+ if (packets[i % MAX_PACKETS].priority > best->priority)
+ best = &packets[i % MAX_PACKETS];
+ }
+ pack.init(*best);
+ best->init(packets[readPos % MAX_PACKETS]);
+ readPos++;
+ lock.off();
+ }
-}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
}
// -----------------------------------
+bool ChanMgr::findParentHit(ChanHit &p)
+{
+ ChanHitList *hl=NULL;
+
+ chanMgr->hitlistlock.on();
+
+ hl = findHitListByID(p.chanID);
+
+ if (hl)
+ {
+ ChanHit *ch = hl->hit;
+ while (ch)
+ {
+ if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip)
+ && (ch->rhost[0].port == p.uphost.port))
+ {
+ chanMgr->hitlistlock.off();
+ return 1;
+ }
+ ch = ch->next;
+ }
+ }
+
+ chanMgr->hitlistlock.off();
+
+ return 0;
+}
+
+// -----------------------------------
class ChanFindInfo : public ThreadInfo
{
public:
version_ex_number = 0;
status = 0;
+ servent_id = 0;
sessionID.clear();
chanID.clear();
riSequence &= 0xffffff;
seqLock.off();
+ Servent *s = servMgr->servents;
+ while (s) {
+ if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
+ && s->chanID.isSame(chl->info.id)) {
+ int i = index % MAX_RESULTS;
+ if (index < MAX_RESULTS
+ || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
+ s->serventHit.lastSendSeq = seq;
+ tmpHit[i] = s->serventHit;
+ tmpHit[i].host = s->serventHit.rhost[0];
+ index++;
+ }
+ }
+ s = s->next;
+ }
+
ChanHit *hit = chl->hit;
while(hit){
//rnd = (float)rand() / (float)RAND_MAX;
rnd = rand() % base;
if (hit->numHops == 1){
+#if 0
if (tmpHit[index % MAX_RESULTS].numHops == 1){
if (rnd < prob){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
}
+#endif
} else {
- if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
+ if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
char version_ex_prefix[2];
unsigned int version_ex_number;
+
+ unsigned int lastSendSeq;
};
// ----------------------------------
class ChanHitList
int pickHits(ChanHitSearch &);
-
+ bool findParentHit(ChanHit &p);
Channel *channel;
ChanHitList *hitlist;
pos = 0;
sync = 0;
skip = false;
+ priority = 0;
}
void init(ChanPacketv &p);
void init(TYPE t, const void *, unsigned int , unsigned int );
char data[MAX_DATALEN];
bool skip;
+ int priority;
};
// ----------------------------------
class ChanPacketv
skip = false;
data = NULL;
datasize = 0;
+ priority = 0;
}
void init(ChanPacket &p)
{
pos = p.pos;
sync = p.sync;
skip = p.skip;
+ priority = p.priority;
if (!data) {
datasize = (len & ~(BSIZE - 1)) + BSIZE;
data = new char[datasize];
unsigned int datasize;
bool skip;
+ int priority;
};
// ----------------------------------
class ChanPacketBuffer
bool writePacket(ChanPacket &,bool = false);
void readPacket(ChanPacket &);
+ void readPacketPri(ChanPacket &);
bool willSkip();
virtual void kill() {}
virtual bool sendPacket(ChanPacket &,GnuID &) {return false;}
virtual void flush(Stream &) {}
+ virtual unsigned int flushUb(Stream &, unsigned int) { return 0; }
virtual void readHeader(Stream &,Channel *)=0;
virtual int readPacket(Stream &,Channel *)=0;
virtual void readEnd(Stream &,Channel *)=0;
pack.writeRaw(in);
}
}
+
+// ------------------------------------------
+unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
+{
+ ChanPacket pack;
+ unsigned int len = 0, skip = 0;
+
+ while (outData.numPending())
+ {
+ outData.readPacketPri(pack);
+
+ if (size >= len + pack.len) {
+ len += pack.len;
+ pack.writeRaw(in);
+ } else {
+ skip++;
+ }
+ }
+ if (skip > 0)
+ LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
+
+ return len;
+}
+
// ------------------------------------------
int PCPStream::readPacket(Stream &in,Channel *)
{
ipNum = 1;
}
else if (id == PCP_HOST_NUML)
+ {
hit.numListeners = atom.readInt();
+ if (hit.numListeners > 10)
+ hit.numListeners = 10;
+ }
else if (id == PCP_HOST_NUMR)
+ {
hit.numRelays = atom.readInt();
+ if (hit.numRelays > 100)
+ hit.numRelays = 100;
+ }
else if (id == PCP_HOST_UPTIME)
hit.upTime = atom.readInt();
else if (id == PCP_HOST_OLDPOS)
if (hit.numHops == 1){
Servent *sv = servMgr->findServentByServentID(hit.servent_id);
- if (sv){
+ if (sv && sv->getHost().ip == hit.host.ip){
// LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
sv->waitPort = hit.host.port;
+ hit.lastSendSeq = sv->serventHit.lastSendSeq;
+ sv->serventHit = hit;
}
}
}
{
ChanHit hit;
readHostAtoms(atom,c,bcs,hit,false);
+ Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
if (hit.uphost.ip == 0){
// LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
if (bcs.numHops == 1){
hit.uphost.port = servMgr->serverHost.port;
hit.uphostHops = 1;
} else {
- Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
+ //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
if (sv){
hit.uphost.ip = sv->getHost().ip;
hit.uphost.port = sv->waitPort;
}
}
}
- int oldPos = pmem.pos;
- hit.writeAtoms(patom, hit.chanID);
- pmem.pos = oldPos;
- r = readAtom(patom,bcs);
+ if (sv &&
+ ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
+ && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
+ || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
+ || chanMgr->findParentHit(hit)))
+ {
+ int oldPos = pmem.pos;
+ hit.writeAtoms(patom, hit.chanID);
+ pmem.pos = oldPos;
+ r = readAtom(patom,bcs);
+ } else {
+ LOG_DEBUG("### Invalid bcst: hops=%d, ver=%d(VP%04d), ttl=%d",
+ bcs.numHops,ver,ver_vp,ttl);
+ ttl = 0;
+ }
} else {
// copy and process atoms
int oldPos = pmem.pos;
if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
{
+ pack.priority = 11 - bcs.numHops;
chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
}
virtual bool sendPacket(ChanPacket &,GnuID &);
virtual void flush(Stream &);
+ virtual unsigned int flushUb(Stream &, unsigned int);
virtual void readHeader(Stream &,Channel *);
virtual int readPacket(Stream &,Channel *);
virtual void readEnd(Stream &,Channel *);
type = T_NONE;
channel_id = 0;
+
+ serventHit.init();
}
// -----------------------------------
bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
bool gotPCP=false;
unsigned int reqPos=0;
+ unsigned short listenPort = 0;
nsSwitchNum=0;
gotPCP = atoi(arg)!=0;
else if (http.isHeader(PCX_HS_POS))
reqPos = atoi(arg);
+ else if (http.isHeader(PCX_HS_PORT))
+ listenPort = (unsigned short)atoi(arg);
else if (http.isHeader("icy-metadata"))
addMetadata = atoi(arg) > 0;
else if (http.isHeader(HTTP_HS_AGENT))
}
chanID = chanInfo.id;
+ serventHit.rhost[0].ip = getHost().ip;
+ serventHit.rhost[0].port = listenPort;
+ serventHit.host = serventHit.rhost[0];
+ serventHit.chanID = chanID;
+
canStreamLock.on();
chanReady = canStream(ch);
- if (0 && !chanReady)
+ if (/*0 && */!chanReady)
{
if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
{
sourceHit = &ch->sourceHost; // send source host info
- if (ch->info.getUptime() > 60) // if stable
+ if (listenPort && ch->info.getUptime() > 60) // if stable
{
// connect "this" host later
- ChanHit nh;
- nh.init();
- nh.chanID = chanID;
- nh.rhost[0] = getHost();
- chanMgr->addHit(nh);
+ chanMgr->addHit(serventHit);
}
char tmp[50];
LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
ch->bump = true;
}
- else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+ else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
{
chanReady = canStream(ch);
if (!chanReady)
- LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+ LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
}
}
if (!chanReady) type = T_INCOMING;
if (sourceHit) {
char tmp[50];
sourceHit->writeAtoms(atom2, chanInfo.id);
- chs.best[i].host.toStr(tmp);
- LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+ sourceHit->host.toStr(tmp);
+ LOG_DEBUG("relay info(sourceHit): %s", tmp);
best.host.ip = sourceHit->host.ip;
}
void Servent::sendPCPChannel()
{
bool skipCheck = false;
+ unsigned int ptime = 0;
+ int npacket = 0, upsize = 0;
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
BroadcastState bcs;
bcs.servent_id = servent_id;
// error = pcpStream->readPacket(*sock,bcs);
- do {
+
+ unsigned int t = sys->getTime();
+ if (t != ptime) {
+ ptime = t;
+ npacket = MAX_PROC_PACKETS;
+ upsize = MAX_OUTWARD_SIZE;
+ }
+
+ int len = pcpStream->flushUb(*sock, upsize);
+ upsize -= len;
+
+ while (npacket > 0 && sock->readReady()) {
+ npacket--;
error = pcpStream->readPacket(*sock,bcs);
if (error)
throw StreamException("PCP exception");
- } while (sock->readReady() || pcpStream->outData.numPending());
+ }
sys->sleepIdle();
MAX_OUTPACKETS = 32 // max. output packets per queue (normal/priority)
};
+ enum
+ {
+ MAX_PROC_PACKETS = 300,
+ MAX_OUTWARD_SIZE = 1024 * 10
+ };
+
enum TYPE
{
T_NONE, // Not allocated
unsigned int lastSkipCount;
unsigned int waitPort;
+ ChanHit serventHit;
+
int channel_id;
};
}
// --------------------------------------------------
+#if 0
static ChanHit *findServentHit(Servent *s)
{
ChanHitList *chl = chanMgr->findHitListByID(s->chanID);
}
return NULL;
}
+#endif
// --------------------------------------------------
-int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns)
+int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit)
{
Servent *ks = NULL;
Servent *s = servMgr->servents;
{
Host h = s->getHost();
- chanMgr->hitlistlock.on();
- ChanHit *hit = findServentHit(s);
- if (hit && !hit->relay && hit->numRelays == 0)
+ ChanHit hit = s->serventHit;
+ if (!hit.relay && hit.numRelays == 0)
{
char hostName[256];
h.toStr(hostName);
if (!ks || s->lastConnect < ks->lastConnect) // elder servent
ks = s;
}
- chanMgr->hitlistlock.off();
}
s = s->next;
}
if (ks)
{
- if (ns)
+ if (sendhit.rhost[0].port)
{
- Host h = ns->getHost();
- ChanHit nh;
- nh.init();
- nh.chanID = chid;
- nh.rhost[0] = h;
-
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
- nh.writeAtoms(atom, chid);
+ sendhit.writeAtoms(atom, chid);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
unsigned int kickPushTime;
bool isCheckPushStream(); //JP-EX
void banFirewalledHost(); //JP-EX
- int kickUnrelayableHost(GnuID &, Servent * = NULL);
+ int kickUnrelayableHost(GnuID &, ChanHit &);
bool getModulePath; //JP-EX
bool clearPLS; //JP-EX
#endif
// ------------------------------------------------
static const int PCP_CLIENT_VERSION = 1218;
-static const int PCP_CLIENT_VERSION_VP = 25;
+static const int PCP_CLIENT_VERSION_VP = 26;
static const int PCP_ROOT_VERSION = 1218;
static const int PCP_CLIENT_MINVERSION = 1200;
static const char *PCX_AGENT = "PeerCast/0.1218";
static const char *PCX_AGENTJP = "PeerCast/0.1218-J";
-static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0025-1)";
-static const char *PCX_VERSTRING = "v0.1218(VP0025-1)";
+static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0026)";
+static const char *PCX_VERSTRING = "v0.1218(VP0026)";
#if 1 /* for VP extend version */
#define VERSION_EX 1
static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
-static const int PCP_CLIENT_VERSION_EX_NUMBER = 7650;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)";
+static const int PCP_CLIENT_VERSION_EX_NUMBER = 26;
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0026-patch071223)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM0026)";
#endif
// ------------------------------------------------
sock->writeLineF("GET /channel/%s HTTP/1.0",idStr);
sock->writeLineF("%s %d",PCX_HS_POS,streamPos);
sock->writeLineF("%s %d",PCX_HS_PCP,1);
+ sock->writeLineF("%s %d",PCX_HS_PORT,servMgr->serverHost.port);
sock->writeLine("");
// -----------------------------------
bool Channel::checkBump()
{
+ unsigned int maxIdleTime = 30;
+ if (isIndexTxt(this)) maxIdleTime = 60;
+
if (!isBroadcasting() && (!sourceHost.tracker))
- if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > 30))
+ if (rawData.lastWriteTime && ((sys->getTime() - rawData.lastWriteTime) > maxIdleTime))
{
LOG_ERROR("Channel Auto bumped");
bump = true;
unsigned int receiveStartTime = 0;
+ unsigned int ptime = 0;
+ unsigned int upsize = 0;
+
try
{
while (thread.active && !peercastInst->isQuitting)
}
}
- source->flush(in);
+ unsigned int t = sys->getTime();
+ if (t != ptime) {
+ ptime = t;
+ upsize = Servent::MAX_OUTWARD_SIZE;
+ }
+
+ unsigned int len = source->flushUb(in, upsize);
+ upsize -= len;
sys->sleepIdle();
}
{
type = p.type;
len = p.len;
+ if (len > MAX_DATALEN)
+ throw StreamException("Packet data too large");
pos = p.pos;
sync = p.sync;
skip = p.skip;
+ priority = p.priority;
memcpy(data, p.data, len);
}
// -----------------------------------
memcpy(data,p,len);
pos = _pos;
skip = false;
+ priority = 0;
}
// -----------------------------------
void ChanPacket::writeRaw(Stream &out)
pack.init(packets[readPos%MAX_PACKETS]);
readPos++;
lock.off();
+}
- sys->sleepIdle();
+// -----------------------------------
+void ChanPacketBuffer::readPacketPri(ChanPacket &pack)
+{
+ unsigned int tim = sys->getTime();
+
+ if (readPos < firstPos)
+ throw StreamException("Read too far behind");
+
+ while (readPos >= writePos)
+ {
+ sys->sleepIdle();
+ if ((sys->getTime() - tim) > 30)
+ throw TimeoutException();
+ }
+ lock.on();
+ ChanPacketv *best = &packets[readPos % MAX_PACKETS];
+ for (unsigned int i = readPos + 1; i < writePos; i++) {
+ if (packets[i % MAX_PACKETS].priority > best->priority)
+ best = &packets[i % MAX_PACKETS];
+ }
+ pack.init(*best);
+ best->init(packets[readPos % MAX_PACKETS]);
+ readPos++;
+ lock.off();
+ }
-}
// -----------------------------------
bool ChanPacketBuffer::willSkip()
{
}
// -----------------------------------
+bool ChanMgr::findParentHit(ChanHit &p)
+{
+ ChanHitList *hl=NULL;
+
+ chanMgr->hitlistlock.on();
+
+ hl = findHitListByID(p.chanID);
+
+ if (hl)
+ {
+ ChanHit *ch = hl->hit;
+ while (ch)
+ {
+ if (!ch->dead && (ch->rhost[0].ip == p.uphost.ip)
+ && (ch->rhost[0].port == p.uphost.port))
+ {
+ chanMgr->hitlistlock.off();
+ return 1;
+ }
+ ch = ch->next;
+ }
+ }
+
+ chanMgr->hitlistlock.off();
+
+ return 0;
+}
+
+// -----------------------------------
class ChanFindInfo : public ThreadInfo
{
public:
version_ex_number = 0;
status = 0;
+ servent_id = 0;
sessionID.clear();
chanID.clear();
riSequence &= 0xffffff;
seqLock.off();
+ Servent *s = servMgr->servents;
+ while (s) {
+ if (s->serventHit.rhost[0].port && s->type == Servent::T_RELAY
+ && s->chanID.isSame(chl->info.id)) {
+ int i = index % MAX_RESULTS;
+ if (index < MAX_RESULTS
+ || tmpHit[i].lastSendSeq > s->serventHit.lastSendSeq) {
+ s->serventHit.lastSendSeq = seq;
+ tmpHit[i] = s->serventHit;
+ tmpHit[i].host = s->serventHit.rhost[0];
+ index++;
+ }
+ }
+ s = s->next;
+ }
+
ChanHit *hit = chl->hit;
while(hit){
//rnd = (float)rand() / (float)RAND_MAX;
rnd = rand() % base;
if (hit->numHops == 1){
+#if 0
if (tmpHit[index % MAX_RESULTS].numHops == 1){
if (rnd < prob){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
}
+#endif
} else {
- if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob)){
+ if ((tmpHit[index % MAX_RESULTS].numHops != 1) && (rnd < prob) || rnd == 0){
tmpHit[index % MAX_RESULTS] = *hit;
tmpHit[index % MAX_RESULTS].host = hit->rhost[0];
index++;
char version_ex_prefix[2];
unsigned int version_ex_number;
+
+ unsigned int lastSendSeq;
};
// ----------------------------------
class ChanHitList
int pickHits(ChanHitSearch &);
-
+ bool findParentHit(ChanHit &p);
Channel *channel;
ChanHitList *hitlist;
pos = 0;
sync = 0;
skip = false;
+ priority = 0;
}
void init(ChanPacketv &p);
void init(TYPE t, const void *, unsigned int , unsigned int );
char data[MAX_DATALEN];
bool skip;
+ int priority;
};
// ----------------------------------
class ChanPacketv
skip = false;
data = NULL;
datasize = 0;
+ priority = 0;
}
void init(ChanPacket &p)
{
pos = p.pos;
sync = p.sync;
skip = p.skip;
+ priority = p.priority;
if (!data) {
datasize = (len & ~(BSIZE - 1)) + BSIZE;
data = new char[datasize];
unsigned int datasize;
bool skip;
+ int priority;
};
// ----------------------------------
class ChanPacketBuffer
bool writePacket(ChanPacket &,bool = false);
void readPacket(ChanPacket &);
+ void readPacketPri(ChanPacket &);
bool willSkip();
virtual void kill() {}
virtual bool sendPacket(ChanPacket &,GnuID &) {return false;}
virtual void flush(Stream &) {}
+ virtual unsigned int flushUb(Stream &, unsigned int) { return 0; }
virtual void readHeader(Stream &,Channel *)=0;
virtual int readPacket(Stream &,Channel *)=0;
virtual void readEnd(Stream &,Channel *)=0;
pack.writeRaw(in);
}
}
+
+// ------------------------------------------
+unsigned int PCPStream::flushUb(Stream &in, unsigned int size)
+{
+ ChanPacket pack;
+ unsigned int len = 0, skip = 0;
+
+ while (outData.numPending())
+ {
+ outData.readPacketPri(pack);
+
+ if (size >= len + pack.len) {
+ len += pack.len;
+ pack.writeRaw(in);
+ } else {
+ skip++;
+ }
+ }
+ if (skip > 0)
+ LOG_DEBUG("PCPStream::flushUb: skip %d packets", skip);
+
+ return len;
+}
+
// ------------------------------------------
int PCPStream::readPacket(Stream &in,Channel *)
{
ipNum = 1;
}
else if (id == PCP_HOST_NUML)
+ {
hit.numListeners = atom.readInt();
+ if (hit.numListeners > 10)
+ hit.numListeners = 10;
+ }
else if (id == PCP_HOST_NUMR)
+ {
hit.numRelays = atom.readInt();
+ if (hit.numRelays > 100)
+ hit.numRelays = 100;
+ }
else if (id == PCP_HOST_UPTIME)
hit.upTime = atom.readInt();
else if (id == PCP_HOST_OLDPOS)
if (hit.numHops == 1){
Servent *sv = servMgr->findServentByServentID(hit.servent_id);
- if (sv){
+ if (sv && sv->getHost().ip == hit.host.ip){
// LOG_DEBUG("set servent's waitPort = %d", hit.host.port);
sv->waitPort = hit.host.port;
+ hit.lastSendSeq = sv->serventHit.lastSendSeq;
+ sv->serventHit = hit;
}
}
}
{
ChanHit hit;
readHostAtoms(atom,c,bcs,hit,false);
+ Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
if (hit.uphost.ip == 0){
// LOG_DEBUG("bcs servent_id = %d", bcs.servent_id);
if (bcs.numHops == 1){
hit.uphost.port = servMgr->serverHost.port;
hit.uphostHops = 1;
} else {
- Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
+ //Servent *sv = servMgr->findServentByServentID(bcs.servent_id);
if (sv){
hit.uphost.ip = sv->getHost().ip;
hit.uphost.port = sv->waitPort;
}
}
}
- int oldPos = pmem.pos;
- hit.writeAtoms(patom, hit.chanID);
- pmem.pos = oldPos;
- r = readAtom(patom,bcs);
+ if (sv &&
+ ((hit.numHops == 1 && (hit.rhost[0].ip == sv->getHost().ip
+ && hit.uphost.ip == servMgr->serverHost.ip && hit.uphost.port == servMgr->serverHost.port)
+ || (hit.rhost[1].localIP() && hit.rhost[1].ip == sv->getHost().ip))
+ || chanMgr->findParentHit(hit)))
+ {
+ int oldPos = pmem.pos;
+ hit.writeAtoms(patom, hit.chanID);
+ pmem.pos = oldPos;
+ r = readAtom(patom,bcs);
+ } else {
+ LOG_DEBUG("### Invalid bcst: hops=%d, ver=%d(VP%04d), ttl=%d",
+ bcs.numHops,ver,ver_vp,ttl);
+ ttl = 0;
+ }
} else {
// copy and process atoms
int oldPos = pmem.pos;
if (bcs.group & (/*PCP_BCST_GROUP_ROOT|*/PCP_BCST_GROUP_TRACKERS|PCP_BCST_GROUP_RELAYS))
{
+ pack.priority = 11 - bcs.numHops;
chanMgr->broadcastPacketUp(pack,bcs.chanID,remoteID,destID);
}
virtual bool sendPacket(ChanPacket &,GnuID &);
virtual void flush(Stream &);
+ virtual unsigned int flushUb(Stream &, unsigned int);
virtual void readHeader(Stream &,Channel *);
virtual int readPacket(Stream &,Channel *);
virtual void readEnd(Stream &,Channel *);
type = T_NONE;
channel_id = 0;
+
+ serventHit.init();
}
// -----------------------------------
bool Servent::sendPacket(ChanPacket &pack,GnuID &cid,GnuID &sid,GnuID &did,Servent::TYPE t)
bool gotPCP=false;
unsigned int reqPos=0;
+ unsigned short listenPort = 0;
nsSwitchNum=0;
gotPCP = atoi(arg)!=0;
else if (http.isHeader(PCX_HS_POS))
reqPos = atoi(arg);
+ else if (http.isHeader(PCX_HS_PORT))
+ listenPort = (unsigned short)atoi(arg);
else if (http.isHeader("icy-metadata"))
addMetadata = atoi(arg) > 0;
else if (http.isHeader(HTTP_HS_AGENT))
}
chanID = chanInfo.id;
+ serventHit.rhost[0].ip = getHost().ip;
+ serventHit.rhost[0].port = listenPort;
+ serventHit.host = serventHit.rhost[0];
+ serventHit.chanID = chanID;
+
canStreamLock.on();
chanReady = canStream(ch);
- if (0 && !chanReady)
+ if (/*0 && */!chanReady)
{
if (servMgr->numStreams(chanID, Servent::T_RELAY, false) == 0)
{
sourceHit = &ch->sourceHost; // send source host info
- if (ch->info.getUptime() > 60) // if stable
+ if (listenPort && ch->info.getUptime() > 60) // if stable
{
// connect "this" host later
- ChanHit nh;
- nh.init();
- nh.chanID = chanID;
- nh.rhost[0] = getHost();
- chanMgr->addHit(nh);
+ chanMgr->addHit(serventHit);
}
char tmp[50];
LOG_DEBUG("Bump channel, hand over sourceHost to %s", tmp);
ch->bump = true;
}
- else if (servMgr->kickUnrelayableHost(chanID, this) != 0)
+ else if (servMgr->kickUnrelayableHost(chanID, serventHit) != 0)
{
chanReady = canStream(ch);
if (!chanReady)
- LOG_DEBUG("Kicked unrelayable host, but still cannot stream");
+ LOG_DEBUG("*** Kicked unrelayable host, but still cannot stream");
}
}
if (!chanReady) type = T_INCOMING;
if (sourceHit) {
char tmp[50];
sourceHit->writeAtoms(atom2, chanInfo.id);
- chs.best[i].host.toStr(tmp);
- LOG_DEBUG("relay info: %s hops = %d", tmp, chs.best[i].numHops);
+ sourceHit->host.toStr(tmp);
+ LOG_DEBUG("relay info(sourceHit): %s", tmp);
best.host.ip = sourceHit->host.ip;
}
void Servent::sendPCPChannel()
{
bool skipCheck = false;
+ unsigned int ptime = 0;
+ int npacket = 0, upsize = 0;
Channel *ch = chanMgr->findChannelByID(chanID);
if (!ch)
BroadcastState bcs;
bcs.servent_id = servent_id;
// error = pcpStream->readPacket(*sock,bcs);
- do {
+
+ unsigned int t = sys->getTime();
+ if (t != ptime) {
+ ptime = t;
+ npacket = MAX_PROC_PACKETS;
+ upsize = MAX_OUTWARD_SIZE;
+ }
+
+ int len = pcpStream->flushUb(*sock, upsize);
+ upsize -= len;
+
+ while (npacket > 0 && sock->readReady()) {
+ npacket--;
error = pcpStream->readPacket(*sock,bcs);
if (error)
throw StreamException("PCP exception");
- } while (sock->readReady() || pcpStream->outData.numPending());
+ }
sys->sleepIdle();
MAX_OUTPACKETS = 32 // max. output packets per queue (normal/priority)
};
+ enum
+ {
+ MAX_PROC_PACKETS = 300,
+ MAX_OUTWARD_SIZE = 1024 * 10
+ };
+
enum TYPE
{
T_NONE, // Not allocated
unsigned int lastSkipCount;
unsigned int waitPort;
+ ChanHit serventHit;
+
int channel_id;
};
}
// --------------------------------------------------
+#if 0
static ChanHit *findServentHit(Servent *s)
{
ChanHitList *chl = chanMgr->findHitListByID(s->chanID);
}
return NULL;
}
+#endif
// --------------------------------------------------
-int ServMgr::kickUnrelayableHost(GnuID &chid, Servent *ns)
+int ServMgr::kickUnrelayableHost(GnuID &chid, ChanHit &sendhit)
{
Servent *ks = NULL;
Servent *s = servMgr->servents;
{
Host h = s->getHost();
- chanMgr->hitlistlock.on();
- ChanHit *hit = findServentHit(s);
- if (hit && !hit->relay && hit->numRelays == 0)
+ ChanHit hit = s->serventHit;
+ if (!hit.relay && hit.numRelays == 0)
{
char hostName[256];
h.toStr(hostName);
if (!ks || s->lastConnect < ks->lastConnect) // elder servent
ks = s;
}
- chanMgr->hitlistlock.off();
}
s = s->next;
}
if (ks)
{
- if (ns)
+ if (sendhit.rhost[0].port)
{
- Host h = ns->getHost();
- ChanHit nh;
- nh.init();
- nh.chanID = chid;
- nh.rhost[0] = h;
-
ChanPacket pack;
MemoryStream mem(pack.data,sizeof(pack.data));
AtomStream atom(mem);
- nh.writeAtoms(atom, chid);
+ sendhit.writeAtoms(atom, chid);
pack.len = mem.pos;
pack.type = ChanPacket::T_PCP;
GnuID noID;
unsigned int kickPushTime;
bool isCheckPushStream(); //JP-EX
void banFirewalledHost(); //JP-EX
- int kickUnrelayableHost(GnuID &, Servent * = NULL);
+ int kickUnrelayableHost(GnuID &, ChanHit &);
bool getModulePath; //JP-EX
bool clearPLS; //JP-EX
#endif
// ------------------------------------------------
static const int PCP_CLIENT_VERSION = 1218;
-static const int PCP_CLIENT_VERSION_VP = 25;
+static const int PCP_CLIENT_VERSION_VP = 26;
static const int PCP_ROOT_VERSION = 1218;
static const int PCP_CLIENT_MINVERSION = 1200;
static const char *PCX_AGENT = "PeerCast/0.1218";
static const char *PCX_AGENTJP = "PeerCast/0.1218-J";
-static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0025-1)";
-static const char *PCX_VERSTRING = "v0.1218(VP0025-1)";
+static const char *PCX_AGENTVP = "PeerCast/0.1218(VP0026)";
+static const char *PCX_VERSTRING = "v0.1218(VP0026)";
#if 1 /* for VP extend version */
#define VERSION_EX 1
static const char *PCP_CLIENT_VERSION_EX_PREFIX = "IM"; // 2bytes only
-static const int PCP_CLIENT_VERSION_EX_NUMBER = 7650;
-static const char *PCX_AGENTEX = "PeerCast/0.1218(IM-VP25-1)";
-static const char *PCX_VERSTRING_EX = "v0.1218(IM765-VP25-1)";
+static const int PCP_CLIENT_VERSION_EX_NUMBER = 26;
+static const char *PCX_AGENTEX = "PeerCast/0.1218(IM0026-patch071223)";
+static const char *PCX_VERSTRING_EX = "v0.1218(IM0026)";
#endif
// ------------------------------------------------