Skip to content

Commit 9ebd4ef

Browse files
committed
abstract into a dmlc stream.
1 parent fa5d460 commit 9ebd4ef

File tree

4 files changed

+65
-18
lines changed

4 files changed

+65
-18
lines changed

doc/tutorials/external_memory.rst

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,18 @@ The feature is still experimental and not yet ready for production use. In this
88
we will introduce both methods. Please note that training on data from external memory is
99
not supported by ``exact`` tree method.
1010

11+
.. warning::
12+
13+
The implementation of external memory uses ``mmap`` and is not tested against errors
14+
like disconnected network devices. (`SIGBUS`)
15+
16+
.. note::
17+
18+
When externel memory is used, the CPU training performance is IO bounded. Meaning, the
19+
training speed almost exclusively determined by the disk IO speed. For GPU, please read
20+
on and see the gradient-based sampling with external memory. During benchmark, we used
21+
a NVME connected to a PCIE slot, the performance is "usable" with ``hist`` on CPU.
22+
1123
*************
1224
Data Iterator
1325
*************

rabit/include/rabit/internal/io.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ struct MemoryFixSizeBuffer : public SeekStream {
6060
return curr_ptr_ == buffer_size_;
6161
}
6262

63-
private:
63+
protected:
6464
/*! \brief in memory buffer */
6565
char *p_buffer_;
6666
/*! \brief current pointer */

src/common/io.h

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@
99
#define XGBOOST_COMMON_IO_H_
1010

1111
#include <dmlc/io.h>
12+
#include <fcntl.h> // for open, O_RDONLY
1213
#include <rabit/rabit.h>
13-
#include <string>
14+
#include <sys/mman.h> // for mmap, munmap
15+
#include <unistd.h> // for close
16+
#include <xgboost/string_view.h>
17+
1418
#include <cstring>
1519
#include <fstream>
20+
#include <string>
1621

1722
#include "common.h"
1823

@@ -127,6 +132,48 @@ inline std::string ReadAll(std::string const &path) {
127132
return content;
128133
}
129134

135+
/**
136+
* \brief Private mmap file, copy-on-write
137+
*/
138+
class PrivateMmapStream : public MemoryFixSizeBuffer {
139+
std::int32_t fd_;
140+
std::string path_;
141+
142+
void* Open(StringView path, bool read_only, std::size_t offset, std::size_t length) {
143+
fd_ = open(path.c_str(), O_RDONLY);
144+
CHECK_GE(fd_, 0) << "Failed to open:" << path << ". " << strerror(errno);
145+
146+
char* ptr{nullptr};
147+
int prot{PROT_READ};
148+
if (!read_only) {
149+
prot |= PROT_WRITE;
150+
}
151+
#if defined(__linux__)
152+
ptr = reinterpret_cast<char*>(mmap64(nullptr, length, prot, MAP_PRIVATE, fd_, offset));
153+
#elif defined(__APPLE__)
154+
CHECK_LE(offset, std::numeric_limits<off_t>::max())
155+
<< "File size has exceeded the limit on macos.";
156+
ptr = reinterpret_cast<char*>(mmap(nullptr, length, prot, MAP_PRIVATE, fd_, offset));
157+
#else
158+
// fixme: not yet implemented
159+
ptr = reinterpret_cast<char*>(mmap(nullptr, length, prot, MAP_PRIVATE, fd_, offset));
160+
#endif // defined(__linux__)
161+
CHECK_NE(ptr, MAP_FAILED) << "Failed to map: " << path << ". " << strerror(errno);
162+
return ptr;
163+
}
164+
165+
public:
166+
explicit PrivateMmapStream(std::string path, bool read_only, std::size_t offset,
167+
std::size_t length)
168+
: MemoryFixSizeBuffer{Open(StringView{path}, read_only, offset, length), length},
169+
path_{path} {}
170+
171+
~PrivateMmapStream() override {
172+
CHECK_NE(munmap(p_buffer_, buffer_size_), -1)
173+
<< "Faled to munmap." << path_ << ". " << strerror(errno);
174+
CHECK_NE(close(fd_), -1) << "Faled to close: " << path_ << ". " << strerror(errno);
175+
}
176+
};
130177
} // namespace common
131178
} // namespace xgboost
132179
#endif // XGBOOST_COMMON_IO_H_

src/data/sparse_page_source.h

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
123123

124124
size_t n_prefetch_batches = std::min(kPreFetch, n_batches_);
125125
CHECK_GT(n_prefetch_batches, 0) << "total batches:" << n_batches_;
126-
size_t fetch_it = count_;
126+
std::size_t fetch_it = count_;
127127

128-
for (size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
128+
for (std::size_t i = 0; i < n_prefetch_batches; ++i, ++fetch_it) {
129129
fetch_it %= n_batches_; // ring
130130
if (ring_->at(fetch_it).valid()) {
131131
continue;
@@ -143,21 +143,9 @@ class SparsePageSourceImpl : public BatchIteratorImpl<S> {
143143
std::uint64_t offset = self->cache_info_->offset.at(fetch_it);
144144
std::uint64_t length = self->cache_info_->bytes.at(fetch_it);
145145

146-
// mmap
147-
auto fd = open(n.c_str(), O_RDONLY);
148-
CHECK_GE(fd, 0) << "Failed to open:" << n << ". " << strerror(errno);
149-
auto ptr = mmap64(nullptr, length, PROT_READ, MAP_PRIVATE, fd, offset);
150-
CHECK_NE(ptr, MAP_FAILED) << "Failed to map: " << n << ". " << strerror(errno);
151-
152-
// read page
153-
auto fi = common::MemoryFixSizeBuffer(ptr, length);
154-
CHECK(fmt->Read(page.get(), &fi));
146+
auto fi = std::make_unique<common::PrivateMmapStream>(n, true, offset, length);
147+
CHECK(fmt->Read(page.get(), fi.get()));
155148
LOG(INFO) << "Read a page in " << timer.ElapsedSeconds() << " seconds.";
156-
157-
// cleanup
158-
CHECK_NE(munmap(ptr, length), -1) << "Faled to munmap: " << n << ". " << strerror(errno);
159-
CHECK_NE(close(fd), -1) << "Faled to close: " << n << ". " << strerror(errno);
160-
161149
return page;
162150
});
163151
}

0 commit comments

Comments
 (0)