Strictly speaking, WvPam is not a stream. So move wvpam.cc to utils.
[wvapps.git] / funfs / cachefsrcache.cc
blobe63c65784fb173a5f46b0755e8ffdd65654fcf29
1 #include "cachefsasync.h"
3 #include <strutils.h>
4 #include <fileutils.h>
6 #include <fcntl.h>
7 #include <sys/mman.h>
8 #include <errno.h>
9 #include <unistd.h>
12 #define UPTODATE (1 << 0)
13 #define DIRTY (1 << 1)
14 #define WAITING (1 << 2)
15 #define ERROR (1 << 3)
17 #define READAHEAD_BLOCKS 2
19 #define NUMBLOCKSRUP(x) ((x >> FUSE_BLOCK_SHIFT) + !!(x & ~FUSE_BLOCK_MASK))
22 RNode::RNode(ReadCache *_rcache, int _fd, WvStringParm _path, size_t _fsize)
23 : fd(_fd), fsize(_fsize), path(_path), rcache(_rcache), waiters(10)
25 m_data = m_header = NULL;
27 do_map();
31 RNode::~RNode()
33 do_unmap();
36 void RNode::do_map()
38 WvString datapath("%s/%s", rcache->cfg["Data Root"].get(), path);
39 WvString metapath("%s/%s.read", rcache->cfg["Meta Root"].get(), path);
41 mkdirp(getdirname(datapath));
42 mkdirp(getdirname(metapath));
44 int fd = ::open(datapath.cstr(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR);
46 if (fd == -1)
48 fprintf(stderr, "Error opening cache file '%s' for write: %s\n",
49 datapath.cstr(), strerror(errno));
50 return;
53 if (ftruncate(fd, fsize))
55 ::close(fd);
56 fprintf(stderr, "Error truncating file '%s': %s\n",
57 datapath.cstr(), strerror(errno));
58 return;
61 m_data = (char *)mmap(0, fsize,
62 PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
64 if (m_data == MAP_FAILED)
66 fprintf(stderr, "Error mmaping metadata file '%s': %s\n",
67 datapath.cstr(), strerror(errno));
68 m_data = NULL;
69 return;
72 ::close(fd);
74 int mfd = ::open(metapath.cstr(), O_RDWR|O_CREAT, S_IRUSR|S_IWUSR);
76 if (mfd == -1)
78 fprintf(stderr, "Error opening cache file '%s' for write: %s\n",
79 metapath.cstr(), strerror(errno));
80 return;
83 size_t headersize = NUMBLOCKSRUP(fsize);
85 if (ftruncate(mfd, headersize))
87 ::close(mfd);
88 fprintf(stderr, "Error truncating file '%s': %s\n",
89 metapath.cstr(), strerror(errno));
90 return;
93 m_header = (char *)mmap(0, headersize,
94 PROT_WRITE | PROT_READ, MAP_SHARED, mfd, 0);
96 if (m_header == MAP_FAILED)
98 fprintf(stderr, "Error mmaping metadata file '%s': %s\n",
99 metapath.cstr(), strerror(errno));
100 m_header = NULL;
101 return;
104 ::close(mfd);
108 void RNode::invalidate()
110 if (m_header)
111 memset(m_header, 0, NUMBLOCKSRUP(fsize));
115 void RNode::do_unmap()
117 if (m_data)
119 munmap(m_data, fsize);
120 m_data = NULL;
123 if (m_header)
125 munmap(m_header, NUMBLOCKSRUP(fsize));
126 m_header = NULL;
131 void RNode::maybe_schedule_readblock(CacheFS *cachefs, size_t index)
133 assert(m_data);
135 // Don't try to read past the end of the file
136 // NOTE: index is zero indexed
137 if (index >= NUMBLOCKSRUP(fsize))
138 return;
140 if (m_header[index] & UPTODATE || m_header[index] & ERROR
141 || m_header[index] & WAITING)
142 return;
144 m_header[index] |= WAITING;
146 new AsyncReadblock(cachefs, fd, m_data + index * FUSE_BLOCK_SIZE,
147 index);
151 void RNode::update_page_status(size_t index, int status)
153 if (status == FUSE_BLOCK_SIZE ||
154 (status > 0 && index == fsize >> FUSE_BLOCK_SHIFT))
155 m_header[index] |= UPTODATE;
156 else if (status < 0)
157 m_header[index] |= ERROR;
159 assert(status || "status = 0 in update_page_status");
161 while (WaiterHelper *w = waiters[index])
163 w->waiter->alarm(0);
164 waiters.remove(w);
169 int RNode::wait_on_block(size_t index, WvStream *waiter)
171 if (index >= NUMBLOCKSRUP(fsize))
172 assert(! "guard excess blocks in wait_on_block");
174 if (m_header[index] & UPTODATE)
175 return 0;
177 if (m_header[index] & ERROR)
178 return -1;
180 assert(m_header[index] & WAITING);
182 waiters.add(new WaiterHelper(index, waiter), true);
184 do {
185 waiter->continue_select(-1);
186 } while (!waiter->alarm_was_ticking);
188 assert(m_header[index] & (UPTODATE | ERROR));
190 return m_header[index] & ERROR ? -1 : 0;
194 int RNode::do_read(char *buf, size_t size, size_t offset)
196 if (offset + size > fsize)
197 size = fsize - offset;
199 memcpy(buf, m_data + offset, size);
200 return size;
204 ReadCache::ReadCache(UniConf _cfg)
205 : ICache(_cfg), fdhash(100), diskcache(WvString("%s/diskcache.db",
206 cfg["DB Location"].get()), cfg["Cache Size"].getint())
208 diskcache.set_remove_callback(DiskCacheCallback
209 (this, &ReadCache::diskcache_callback));
213 void ReadCache::diskcache_callback(WvStringParm path)
215 wvcon->print("Purging cache for %s\n", path);
216 WvString datapath("%s/%s", cfg["Data Root"].get(), path);
217 WvString metapath("%s/%s.read", cfg["Meta Root"].get(), path);
219 unlink(datapath.cstr());
220 unlink(metapath.cstr());
224 void ReadCache::rcache_do_open(WvStringParm path, int fd)
226 size_t size = 0;
227 INode *inode = maybe_get_inode(path, 0);
228 if (inode)
229 size = inode->get_size();
230 else
232 fprintf(stderr, "Error getting stats in "
233 "ReadCache::do_open on file '%s'\n", path.cstr());
234 return;
237 diskcache.add(path, size);
238 fdhash.add(new RNode(this, fd, path, size), true);
242 void ReadCache::rcache_do_invalidate(int fd)
244 RNode *r = fdhash[fd];
245 if (!r)
247 fprintf(stderr, "Error invalidating read cache fd = %d!\n", fd);
248 return;
251 r->invalidate();
255 void ReadCache::rcache_do_invalidate(WvStringParm path)
258 RNode *r = fdhash[fd];
259 if (!r)
261 fprintf(stderr, "Error invalidating read cache fd = %d!\n", fd);
262 return;
265 r->invalidate();
270 void ReadCache::rcache_do_close(int fd)
272 fdhash.remove(fdhash[fd]);
276 int ReadCache::rcache_do_read(WvStream *waiter, int fd, char *buf, size_t size,
277 size_t offset)
279 RNode *r = fdhash[fd];
280 assert(r);
282 if (!r->isok())
283 return 0;
285 size_t rblocks = NUMBLOCKSRUP(size);
286 size_t startblock = offset >> FUSE_BLOCK_SHIFT;
288 for (size_t index = startblock; index < startblock + rblocks +
289 READAHEAD_BLOCKS; index++)
291 // FIXME: ugly
292 r->maybe_schedule_readblock((CacheFS *)this, index);
295 for (size_t index = startblock; index < startblock + rblocks; index++)
297 if (r->wait_on_block(index, waiter))
298 return 0;
301 return r->do_read(buf, size, offset);
305 void ReadCache::update_page_status(int fd, size_t index, int status)
307 RNode *r = fdhash[fd];
308 assert(r);
310 r->update_page_status(index, status);