Implement zero-copy reads (read_buf) for FUSE 2.9 and above

FUSE 2.9 added support for zero copy reads/writes through two new
callbacks, read_buf and  write_buf. We now imeplement read_buf,
which iterates over the bands using the generic code, but instead
of opening and reading the content of the band into memory, we just
open the file and pass a structure back to FUSE with the file
descriptor, size, and offset, and FUSE will then use this on the
kernel side if possible (using splice on GNU/Linux).

For padding with zeroes we open /dev/zero and treat it as any
other file.

Since FUSE does not provide a callback when it's done with the
buffers there's no way for us to know when to close the bands
we opened. To work around this we keep the files open until
either the main image (dmg) file has been closed, or we run
out of file descriptors, at which point we gc the open files
and continue.
This commit is contained in:
Tor Arne Vestbø
2012-10-14 21:31:46 +02:00
parent 77945eadc7
commit 4abe68dccd

View File

@@ -6,6 +6,7 @@
#include <stdlib.h>
#include <syslog.h>
#include <string.h>
#include <sys/resource.h>
#include <algorithm>
#include <fstream>
@@ -19,6 +20,8 @@
#include <fuse.h>
#define FUSE_SUPPORTS_ZERO_COPY FUSE_VERSION >= 29
using namespace std;
static const char image_path[] = "/sparsebundle.dmg";
@@ -28,6 +31,9 @@ struct sparsebundle_data {
off_t band_size;
off_t size;
off_t times_opened;
#if FUSE_SUPPORTS_ZERO_COPY
map<string, int> open_files;
#endif
};
#define SB_DATA_CAST(ptr) ((struct sparsebundle_data *) ptr)
@@ -194,6 +200,118 @@ static int sparsebundle_read(const char *path, char *buffer, size_t length, off_
return sparsebundle_iterate_bands(path, length, offset, &read_ops);
}
#if FUSE_SUPPORTS_ZERO_COPY
int sparsebundle_read_buf_prepare_file(const char *path)
{
int fd = -1;
map<string, int>::const_iterator iter = SB_DATA->open_files.find(path);
if (iter != SB_DATA->open_files.end()) {
fd = iter->second;
} else {
syslog(LOG_DEBUG, "file %s not opened yet, opening", path);
fd = open(path, O_RDONLY);
SB_DATA->open_files[path] = fd;
}
return fd;
}
static int sparsebundle_read_buf_process_band(const char *band_path, size_t length, off_t offset, void *read_data)
{
ssize_t read = 0;
vector<fuse_buf> *buffers = static_cast<vector<fuse_buf>*>(read_data);
syslog(LOG_DEBUG, "preparing %zu bytes at offset %llu", length, offset);
int band_file_fd = sparsebundle_read_buf_prepare_file(band_path);
if (band_file_fd != -1) {
struct stat band_stat;
stat(band_path, &band_stat);
read += max(off_t(0), min(static_cast<off_t>(length), band_stat.st_size - offset));
} else if (errno != ENOENT) {
syslog(LOG_ERR, "failed to open band %s: %s", band_path, strerror(errno));
return -errno;
}
if (read > 0) {
fuse_buf buffer = { read, fuse_buf_flags(FUSE_BUF_IS_FD | FUSE_BUF_FD_SEEK), 0, band_file_fd, offset };
buffers->push_back(buffer);
}
return read;
}
static const char zero_device[] = "/dev/zero";
static int sparsebundle_read_buf_pad_with_zeroes(size_t length, void *read_data)
{
vector<fuse_buf> *buffers = static_cast<vector<fuse_buf>*>(read_data);
int zero_device_fd = sparsebundle_read_buf_prepare_file(zero_device);
fuse_buf buffer = { length, fuse_buf_flags(FUSE_BUF_IS_FD), 0, zero_device_fd, 0 };
buffers->push_back(buffer);
return length;
}
static void sparsebundle_read_buf_close_files()
{
syslog(LOG_DEBUG, "closing %u open file descriptor(s)", SB_DATA->open_files.size());
map<string, int>::iterator iter;
for(iter = SB_DATA->open_files.begin(); iter != SB_DATA->open_files.end(); ++iter)
close(iter->second);
SB_DATA->open_files.clear();
}
static int sparsebundle_read_buf(const char *path, struct fuse_bufvec **bufp,
size_t length, off_t offset, struct fuse_file_info *fi)
{
int ret = 0;
vector<fuse_buf> buffers;
sparsebundle_read_operations read_ops = {
&sparsebundle_read_buf_process_band,
sparsebundle_read_buf_pad_with_zeroes,
&buffers
};
syslog(LOG_DEBUG, "asked to read %zu bytes at offset %llu using zero-copy read",
length, offset);
static struct rlimit fd_limit = { -1, -1 };
if (fd_limit.rlim_cur < 0)
getrlimit(RLIMIT_NOFILE, &fd_limit);
if (SB_DATA->open_files.size() + 1 >= fd_limit.rlim_cur) {
syslog(LOG_DEBUG, "hit max number of file descriptors");
sparsebundle_read_buf_close_files();
}
ret = sparsebundle_iterate_bands(path, length, offset, &read_ops);
if (ret < 0)
return ret;
size_t bufvec_size = sizeof(struct fuse_bufvec) + (sizeof(struct fuse_buf) * (buffers.size() - 1));
struct fuse_bufvec *buffer_vector = static_cast<fuse_bufvec*>(malloc(bufvec_size));
if (buffer_vector == 0)
return -ENOMEM;
buffer_vector->count = buffers.size();
buffer_vector->idx = 0;
buffer_vector->off = 0;
copy(buffers.begin(), buffers.end(), buffer_vector->buf);
syslog(LOG_DEBUG, "returning %d buffers to fuse", buffer_vector->count);
*bufp = buffer_vector;
return ret;
}
#endif
static int sparsebundle_release(const char *path, struct fuse_file_info *fi)
{
SB_DATA->times_opened--;
@@ -201,7 +319,12 @@ static int sparsebundle_release(const char *path, struct fuse_file_info *fi)
SB_DATA->path, SB_DATA->times_opened);
if (SB_DATA->times_opened == 0) {
syslog(LOG_DEBUG, "no more references to sparsebundle, cleaning up");
syslog(LOG_DEBUG, "no more references, cleaning up");
#if FUSE_SUPPORTS_ZERO_COPY
if (!SB_DATA->open_files.empty())
sparsebundle_read_buf_close_files();
#endif
}
return 0;
@@ -310,6 +433,9 @@ int main(int argc, char **argv)
sparsebundle_filesystem_operations.read = sparsebundle_read;
sparsebundle_filesystem_operations.readdir = sparsebundle_readdir;
sparsebundle_filesystem_operations.release = sparsebundle_release;
#if FUSE_SUPPORTS_ZERO_COPY
sparsebundle_filesystem_operations.read_buf = sparsebundle_read_buf;
#endif
return fuse_main(args.argc, args.argv, &sparsebundle_filesystem_operations, &data);
}