RecordFileReader::RecordFileReader(const std::string& filename, FILE* fp)
: filename_(filename), record_fp_(fp), event_id_pos_in_sample_records_(0),
- event_id_reverse_pos_in_non_sample_records_(0) {
+ event_id_reverse_pos_in_non_sample_records_(0), read_record_size_(0) {
}
RecordFileReader::~RecordFileReader() {
}
event_ids_for_file_attrs_.push_back(ids);
for (auto id : ids) {
- event_id_to_attr_map_[id] = &file_attrs_[i].attr;
+ event_id_to_attr_map_[id] = i;
}
}
return true;
bool RecordFileReader::ReadDataSection(
const std::function<bool(std::unique_ptr<Record>)>& callback, bool sorted) {
- if (fseek(record_fp_, header_.data.offset, SEEK_SET) != 0) {
- PLOG(ERROR) << "fseek() failed";
- return false;
+ std::unique_ptr<Record> record;
+ while (ReadRecord(record, sorted)) {
+ if (record == nullptr) {
+ return true;
+ }
+ if (!callback(std::move(record))) {
+ return false;
+ }
}
- bool has_timestamp = true;
- for (const auto& attr : file_attrs_) {
- if (!IsTimestampSupported(attr.attr)) {
- has_timestamp = false;
- break;
+ return false;
+}
+
+bool RecordFileReader::ReadRecord(std::unique_ptr<Record>& record,
+ bool sorted) {
+ if (read_record_size_ == 0) {
+ if (fseek(record_fp_, header_.data.offset, SEEK_SET) != 0) {
+ PLOG(ERROR) << "fseek() failed";
+ return false;
}
+ bool has_timestamp = true;
+ for (const auto& attr : file_attrs_) {
+ if (!IsTimestampSupported(attr.attr)) {
+ has_timestamp = false;
+ break;
+ }
+ }
+ record_cache_.reset(new RecordCache(has_timestamp));
}
- RecordCache cache(has_timestamp);
- for (size_t nbytes_read = 0; nbytes_read < header_.data.size;) {
- std::unique_ptr<Record> record = ReadRecord(&nbytes_read);
+ record = nullptr;
+ while (read_record_size_ < header_.data.size && record == nullptr) {
+ record = ReadRecord(&read_record_size_);
if (record == nullptr) {
return false;
}
ProcessEventIdRecord(*static_cast<EventIdRecord*>(record.get()));
}
if (sorted) {
- cache.Push(std::move(record));
- record = cache.Pop();
- if (record != nullptr) {
- if (!callback(std::move(record))) {
- return false;
- }
- }
- } else {
- if (!callback(std::move(record))) {
- return false;
- }
+ record_cache_->Push(std::move(record));
+ record = record_cache_->Pop();
}
}
- std::vector<std::unique_ptr<Record>> records = cache.PopAll();
- for (auto& record : records) {
- if (!callback(std::move(record))) {
- return false;
- }
+ if (record == nullptr) {
+ record = record_cache_->ForcedPop();
}
return true;
}
-std::unique_ptr<Record> RecordFileReader::ReadRecord(size_t* nbytes_read) {
+std::unique_ptr<Record> RecordFileReader::ReadRecord(uint64_t* nbytes_read) {
char header_buf[Record::header_size()];
if (!Read(header_buf, Record::header_size())) {
return nullptr;
if (has_event_id) {
auto it = event_id_to_attr_map_.find(event_id);
if (it != event_id_to_attr_map_.end()) {
- attr = it->second;
+ attr = &file_attrs_[it->second].attr;
}
}
}
void RecordFileReader::ProcessEventIdRecord(const EventIdRecord& r) {
for (size_t i = 0; i < r.count; ++i) {
event_ids_for_file_attrs_[r.data[i].attr_id].push_back(r.data[i].event_id);
- event_id_to_attr_map_[r.data[i].event_id] =
- &file_attrs_[r.data[i].attr_id].attr;
+ event_id_to_attr_map_[r.data[i].event_id] = r.data[i].attr_id;
+ }
+}
+
+size_t RecordFileReader::GetAttrIndexOfRecord(const SampleRecord& record) {
+ auto it = event_id_to_attr_map_.find(record.id_data.id);
+ if (it != event_id_to_attr_map_.end()) {
+ return it->second;
}
+ return 0;
}
bool RecordFileReader::ReadFeatureSection(int feature, std::vector<char>* data) {