1 #include "cachefsasync.h"
12 #define UPTODATE (1 << 0)
13 #define DIRTY (1 << 1)
14 #define WAITING (1 << 2)
15 #define ERROR (1 << 3)
17 #define READAHEAD_BLOCKS 2
19 #define NUMBLOCKSRUP(x) ((x >> FUSE_BLOCK_SHIFT) + !!(x & ~FUSE_BLOCK_MASK))
22 RNode::RNode(ReadCache
*_rcache
, int _fd
, WvStringParm _path
, size_t _fsize
)
23 : fd(_fd
), fsize(_fsize
), path(_path
), rcache(_rcache
), waiters(10)
25 m_data
= m_header
= NULL
;
38 WvString
datapath("%s/%s", rcache
->cfg
["Data Root"].get(), path
);
39 WvString
metapath("%s/%s.read", rcache
->cfg
["Meta Root"].get(), path
);
41 mkdirp(getdirname(datapath
));
42 mkdirp(getdirname(metapath
));
44 int fd
= ::open(datapath
.cstr(), O_RDWR
|O_CREAT
, S_IRUSR
|S_IWUSR
);
48 fprintf(stderr
, "Error opening cache file '%s' for write: %s\n",
49 datapath
.cstr(), strerror(errno
));
53 if (ftruncate(fd
, fsize
))
56 fprintf(stderr
, "Error truncating file '%s': %s\n",
57 datapath
.cstr(), strerror(errno
));
61 m_data
= (char *)mmap(0, fsize
,
62 PROT_WRITE
| PROT_READ
, MAP_SHARED
, fd
, 0);
64 if (m_data
== MAP_FAILED
)
66 fprintf(stderr
, "Error mmaping metadata file '%s': %s\n",
67 datapath
.cstr(), strerror(errno
));
74 int mfd
= ::open(metapath
.cstr(), O_RDWR
|O_CREAT
, S_IRUSR
|S_IWUSR
);
78 fprintf(stderr
, "Error opening cache file '%s' for write: %s\n",
79 metapath
.cstr(), strerror(errno
));
83 size_t headersize
= NUMBLOCKSRUP(fsize
);
85 if (ftruncate(mfd
, headersize
))
88 fprintf(stderr
, "Error truncating file '%s': %s\n",
89 metapath
.cstr(), strerror(errno
));
93 m_header
= (char *)mmap(0, headersize
,
94 PROT_WRITE
| PROT_READ
, MAP_SHARED
, mfd
, 0);
96 if (m_header
== MAP_FAILED
)
98 fprintf(stderr
, "Error mmaping metadata file '%s': %s\n",
99 metapath
.cstr(), strerror(errno
));
108 void RNode::invalidate()
111 memset(m_header
, 0, NUMBLOCKSRUP(fsize
));
115 void RNode::do_unmap()
119 munmap(m_data
, fsize
);
125 munmap(m_header
, NUMBLOCKSRUP(fsize
));
131 void RNode::maybe_schedule_readblock(CacheFS
*cachefs
, size_t index
)
135 // Don't try to read past the end of the file
136 // NOTE: index is zero indexed
137 if (index
>= NUMBLOCKSRUP(fsize
))
140 if (m_header
[index
] & UPTODATE
|| m_header
[index
] & ERROR
141 || m_header
[index
] & WAITING
)
144 m_header
[index
] |= WAITING
;
146 new AsyncReadblock(cachefs
, fd
, m_data
+ index
* FUSE_BLOCK_SIZE
,
151 void RNode::update_page_status(size_t index
, int status
)
153 if (status
== FUSE_BLOCK_SIZE
||
154 (status
> 0 && index
== fsize
>> FUSE_BLOCK_SHIFT
))
155 m_header
[index
] |= UPTODATE
;
157 m_header
[index
] |= ERROR
;
159 assert(status
|| "status = 0 in update_page_status");
161 while (WaiterHelper
*w
= waiters
[index
])
169 int RNode::wait_on_block(size_t index
, WvStream
*waiter
)
171 if (index
>= NUMBLOCKSRUP(fsize
))
172 assert(! "guard excess blocks in wait_on_block");
174 if (m_header
[index
] & UPTODATE
)
177 if (m_header
[index
] & ERROR
)
180 assert(m_header
[index
] & WAITING
);
182 waiters
.add(new WaiterHelper(index
, waiter
), true);
185 waiter
->continue_select(-1);
186 } while (!waiter
->alarm_was_ticking
);
188 assert(m_header
[index
] & (UPTODATE
| ERROR
));
190 return m_header
[index
] & ERROR
? -1 : 0;
194 int RNode::do_read(char *buf
, size_t size
, size_t offset
)
196 if (offset
+ size
> fsize
)
197 size
= fsize
- offset
;
199 memcpy(buf
, m_data
+ offset
, size
);
204 ReadCache::ReadCache(UniConf _cfg
)
205 : ICache(_cfg
), fdhash(100), diskcache(WvString("%s/diskcache.db",
206 cfg
["DB Location"].get()), cfg
["Cache Size"].getint())
208 diskcache
.set_remove_callback(DiskCacheCallback
209 (this, &ReadCache::diskcache_callback
));
213 void ReadCache::diskcache_callback(WvStringParm path
)
215 wvcon
->print("Purging cache for %s\n", path
);
216 WvString
datapath("%s/%s", cfg
["Data Root"].get(), path
);
217 WvString
metapath("%s/%s.read", cfg
["Meta Root"].get(), path
);
219 unlink(datapath
.cstr());
220 unlink(metapath
.cstr());
224 void ReadCache::rcache_do_open(WvStringParm path
, int fd
)
227 INode
*inode
= maybe_get_inode(path
, 0);
229 size
= inode
->get_size();
232 fprintf(stderr
, "Error getting stats in "
233 "ReadCache::do_open on file '%s'\n", path
.cstr());
237 diskcache
.add(path
, size
);
238 fdhash
.add(new RNode(this, fd
, path
, size
), true);
242 void ReadCache::rcache_do_invalidate(int fd
)
244 RNode
*r
= fdhash
[fd
];
247 fprintf(stderr
, "Error invalidating read cache fd = %d!\n", fd
);
255 void ReadCache::rcache_do_invalidate(WvStringParm path
)
258 RNode *r = fdhash[fd];
261 fprintf(stderr, "Error invalidating read cache fd = %d!\n", fd);
270 void ReadCache::rcache_do_close(int fd
)
272 fdhash
.remove(fdhash
[fd
]);
276 int ReadCache::rcache_do_read(WvStream
*waiter
, int fd
, char *buf
, size_t size
,
279 RNode
*r
= fdhash
[fd
];
285 size_t rblocks
= NUMBLOCKSRUP(size
);
286 size_t startblock
= offset
>> FUSE_BLOCK_SHIFT
;
288 for (size_t index
= startblock
; index
< startblock
+ rblocks
+
289 READAHEAD_BLOCKS
; index
++)
292 r
->maybe_schedule_readblock((CacheFS
*)this, index
);
295 for (size_t index
= startblock
; index
< startblock
+ rblocks
; index
++)
297 if (r
->wait_on_block(index
, waiter
))
301 return r
->do_read(buf
, size
, offset
);
305 void ReadCache::update_page_status(int fd
, size_t index
, int status
)
307 RNode
*r
= fdhash
[fd
];
310 r
->update_page_status(index
, status
);