treewide: replace cache.h with more direct headers, where possible
[git.git] / bulk-checkin.c
blobd64cd5c52d01b1ccbd72a062c0dea963a8a3dfd2
1 /*
2 * Copyright (c) 2011, Google Inc.
3 */
4 #include "git-compat-util.h"
5 #include "alloc.h"
6 #include "bulk-checkin.h"
7 #include "hex.h"
8 #include "lockfile.h"
9 #include "repository.h"
10 #include "csum-file.h"
11 #include "pack.h"
12 #include "strbuf.h"
13 #include "string-list.h"
14 #include "tmp-objdir.h"
15 #include "packfile.h"
16 #include "object-store.h"
18 static int odb_transaction_nesting;
20 static struct tmp_objdir *bulk_fsync_objdir;
22 static struct bulk_checkin_packfile {
23 char *pack_tmp_name;
24 struct hashfile *f;
25 off_t offset;
26 struct pack_idx_option pack_idx_opts;
28 struct pack_idx_entry **written;
29 uint32_t alloc_written;
30 uint32_t nr_written;
31 } bulk_checkin_packfile;
33 static void finish_tmp_packfile(struct strbuf *basename,
34 const char *pack_tmp_name,
35 struct pack_idx_entry **written_list,
36 uint32_t nr_written,
37 struct pack_idx_option *pack_idx_opts,
38 unsigned char hash[])
40 char *idx_tmp_name = NULL;
42 stage_tmp_packfiles(basename, pack_tmp_name, written_list, nr_written,
43 NULL, pack_idx_opts, hash, &idx_tmp_name);
44 rename_tmp_packfile_idx(basename, &idx_tmp_name);
46 free(idx_tmp_name);
49 static void flush_bulk_checkin_packfile(struct bulk_checkin_packfile *state)
51 unsigned char hash[GIT_MAX_RAWSZ];
52 struct strbuf packname = STRBUF_INIT;
53 int i;
55 if (!state->f)
56 return;
58 if (state->nr_written == 0) {
59 close(state->f->fd);
60 unlink(state->pack_tmp_name);
61 goto clear_exit;
62 } else if (state->nr_written == 1) {
63 finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK,
64 CSUM_HASH_IN_STREAM | CSUM_FSYNC | CSUM_CLOSE);
65 } else {
66 int fd = finalize_hashfile(state->f, hash, FSYNC_COMPONENT_PACK, 0);
67 fixup_pack_header_footer(fd, hash, state->pack_tmp_name,
68 state->nr_written, hash,
69 state->offset);
70 close(fd);
73 strbuf_addf(&packname, "%s/pack/pack-%s.", get_object_directory(),
74 hash_to_hex(hash));
75 finish_tmp_packfile(&packname, state->pack_tmp_name,
76 state->written, state->nr_written,
77 &state->pack_idx_opts, hash);
78 for (i = 0; i < state->nr_written; i++)
79 free(state->written[i]);
81 clear_exit:
82 free(state->written);
83 memset(state, 0, sizeof(*state));
85 strbuf_release(&packname);
86 /* Make objects we just wrote available to ourselves */
87 reprepare_packed_git(the_repository);
91 * Cleanup after batch-mode fsync_object_files.
93 static void flush_batch_fsync(void)
95 struct strbuf temp_path = STRBUF_INIT;
96 struct tempfile *temp;
98 if (!bulk_fsync_objdir)
99 return;
102 * Issue a full hardware flush against a temporary file to ensure
103 * that all objects are durable before any renames occur. The code in
104 * fsync_loose_object_bulk_checkin has already issued a writeout
105 * request, but it has not flushed any writeback cache in the storage
106 * hardware or any filesystem logs. This fsync call acts as a barrier
107 * to ensure that the data in each new object file is durable before
108 * the final name is visible.
110 strbuf_addf(&temp_path, "%s/bulk_fsync_XXXXXX", get_object_directory());
111 temp = xmks_tempfile(temp_path.buf);
112 fsync_or_die(get_tempfile_fd(temp), get_tempfile_path(temp));
113 delete_tempfile(&temp);
114 strbuf_release(&temp_path);
117 * Make the object files visible in the primary ODB after their data is
118 * fully durable.
120 tmp_objdir_migrate(bulk_fsync_objdir);
121 bulk_fsync_objdir = NULL;
124 static int already_written(struct bulk_checkin_packfile *state, struct object_id *oid)
126 int i;
128 /* The object may already exist in the repository */
129 if (has_object_file(oid))
130 return 1;
132 /* Might want to keep the list sorted */
133 for (i = 0; i < state->nr_written; i++)
134 if (oideq(&state->written[i]->oid, oid))
135 return 1;
137 /* This is a new object we need to keep */
138 return 0;
142 * Read the contents from fd for size bytes, streaming it to the
143 * packfile in state while updating the hash in ctx. Signal a failure
144 * by returning a negative value when the resulting pack would exceed
145 * the pack size limit and this is not the first object in the pack,
146 * so that the caller can discard what we wrote from the current pack
147 * by truncating it and opening a new one. The caller will then call
148 * us again after rewinding the input fd.
150 * The already_hashed_to pointer is kept untouched by the caller to
151 * make sure we do not hash the same byte when we are called
152 * again. This way, the caller does not have to checkpoint its hash
153 * status before calling us just in case we ask it to call us again
154 * with a new pack.
156 static int stream_to_pack(struct bulk_checkin_packfile *state,
157 git_hash_ctx *ctx, off_t *already_hashed_to,
158 int fd, size_t size, enum object_type type,
159 const char *path, unsigned flags)
161 git_zstream s;
162 unsigned char ibuf[16384];
163 unsigned char obuf[16384];
164 unsigned hdrlen;
165 int status = Z_OK;
166 int write_object = (flags & HASH_WRITE_OBJECT);
167 off_t offset = 0;
169 git_deflate_init(&s, pack_compression_level);
171 hdrlen = encode_in_pack_object_header(obuf, sizeof(obuf), type, size);
172 s.next_out = obuf + hdrlen;
173 s.avail_out = sizeof(obuf) - hdrlen;
175 while (status != Z_STREAM_END) {
176 if (size && !s.avail_in) {
177 ssize_t rsize = size < sizeof(ibuf) ? size : sizeof(ibuf);
178 ssize_t read_result = read_in_full(fd, ibuf, rsize);
179 if (read_result < 0)
180 die_errno("failed to read from '%s'", path);
181 if (read_result != rsize)
182 die("failed to read %d bytes from '%s'",
183 (int)rsize, path);
184 offset += rsize;
185 if (*already_hashed_to < offset) {
186 size_t hsize = offset - *already_hashed_to;
187 if (rsize < hsize)
188 hsize = rsize;
189 if (hsize)
190 the_hash_algo->update_fn(ctx, ibuf, hsize);
191 *already_hashed_to = offset;
193 s.next_in = ibuf;
194 s.avail_in = rsize;
195 size -= rsize;
198 status = git_deflate(&s, size ? 0 : Z_FINISH);
200 if (!s.avail_out || status == Z_STREAM_END) {
201 if (write_object) {
202 size_t written = s.next_out - obuf;
204 /* would we bust the size limit? */
205 if (state->nr_written &&
206 pack_size_limit_cfg &&
207 pack_size_limit_cfg < state->offset + written) {
208 git_deflate_abort(&s);
209 return -1;
212 hashwrite(state->f, obuf, written);
213 state->offset += written;
215 s.next_out = obuf;
216 s.avail_out = sizeof(obuf);
219 switch (status) {
220 case Z_OK:
221 case Z_BUF_ERROR:
222 case Z_STREAM_END:
223 continue;
224 default:
225 die("unexpected deflate failure: %d", status);
228 git_deflate_end(&s);
229 return 0;
232 /* Lazily create backing packfile for the state */
233 static void prepare_to_stream(struct bulk_checkin_packfile *state,
234 unsigned flags)
236 if (!(flags & HASH_WRITE_OBJECT) || state->f)
237 return;
239 state->f = create_tmp_packfile(&state->pack_tmp_name);
240 reset_pack_idx_option(&state->pack_idx_opts);
242 /* Pretend we are going to write only one object */
243 state->offset = write_pack_header(state->f, 1);
244 if (!state->offset)
245 die_errno("unable to write pack header");
248 static int deflate_to_pack(struct bulk_checkin_packfile *state,
249 struct object_id *result_oid,
250 int fd, size_t size,
251 enum object_type type, const char *path,
252 unsigned flags)
254 off_t seekback, already_hashed_to;
255 git_hash_ctx ctx;
256 unsigned char obuf[16384];
257 unsigned header_len;
258 struct hashfile_checkpoint checkpoint = {0};
259 struct pack_idx_entry *idx = NULL;
261 seekback = lseek(fd, 0, SEEK_CUR);
262 if (seekback == (off_t) -1)
263 return error("cannot find the current offset");
265 header_len = format_object_header((char *)obuf, sizeof(obuf),
266 type, size);
267 the_hash_algo->init_fn(&ctx);
268 the_hash_algo->update_fn(&ctx, obuf, header_len);
270 /* Note: idx is non-NULL when we are writing */
271 if ((flags & HASH_WRITE_OBJECT) != 0)
272 CALLOC_ARRAY(idx, 1);
274 already_hashed_to = 0;
276 while (1) {
277 prepare_to_stream(state, flags);
278 if (idx) {
279 hashfile_checkpoint(state->f, &checkpoint);
280 idx->offset = state->offset;
281 crc32_begin(state->f);
283 if (!stream_to_pack(state, &ctx, &already_hashed_to,
284 fd, size, type, path, flags))
285 break;
287 * Writing this object to the current pack will make
288 * it too big; we need to truncate it, start a new
289 * pack, and write into it.
291 if (!idx)
292 BUG("should not happen");
293 hashfile_truncate(state->f, &checkpoint);
294 state->offset = checkpoint.offset;
295 flush_bulk_checkin_packfile(state);
296 if (lseek(fd, seekback, SEEK_SET) == (off_t) -1)
297 return error("cannot seek back");
299 the_hash_algo->final_oid_fn(result_oid, &ctx);
300 if (!idx)
301 return 0;
303 idx->crc32 = crc32_end(state->f);
304 if (already_written(state, result_oid)) {
305 hashfile_truncate(state->f, &checkpoint);
306 state->offset = checkpoint.offset;
307 free(idx);
308 } else {
309 oidcpy(&idx->oid, result_oid);
310 ALLOC_GROW(state->written,
311 state->nr_written + 1,
312 state->alloc_written);
313 state->written[state->nr_written++] = idx;
315 return 0;
318 void prepare_loose_object_bulk_checkin(void)
321 * We lazily create the temporary object directory
322 * the first time an object might be added, since
323 * callers may not know whether any objects will be
324 * added at the time they call begin_odb_transaction.
326 if (!odb_transaction_nesting || bulk_fsync_objdir)
327 return;
329 bulk_fsync_objdir = tmp_objdir_create("bulk-fsync");
330 if (bulk_fsync_objdir)
331 tmp_objdir_replace_primary_odb(bulk_fsync_objdir, 0);
334 void fsync_loose_object_bulk_checkin(int fd, const char *filename)
337 * If we have an active ODB transaction, we issue a call that
338 * cleans the filesystem page cache but avoids a hardware flush
339 * command. Later on we will issue a single hardware flush
340 * before renaming the objects to their final names as part of
341 * flush_batch_fsync.
343 if (!bulk_fsync_objdir ||
344 git_fsync(fd, FSYNC_WRITEOUT_ONLY) < 0) {
345 if (errno == ENOSYS)
346 warning(_("core.fsyncMethod = batch is unsupported on this platform"));
347 fsync_or_die(fd, filename);
351 int index_bulk_checkin(struct object_id *oid,
352 int fd, size_t size, enum object_type type,
353 const char *path, unsigned flags)
355 int status = deflate_to_pack(&bulk_checkin_packfile, oid, fd, size, type,
356 path, flags);
357 if (!odb_transaction_nesting)
358 flush_bulk_checkin_packfile(&bulk_checkin_packfile);
359 return status;
362 void begin_odb_transaction(void)
364 odb_transaction_nesting += 1;
367 void flush_odb_transaction(void)
369 flush_batch_fsync();
370 flush_bulk_checkin_packfile(&bulk_checkin_packfile);
373 void end_odb_transaction(void)
375 odb_transaction_nesting -= 1;
376 if (odb_transaction_nesting < 0)
377 BUG("Unbalanced ODB transaction nesting");
379 if (odb_transaction_nesting)
380 return;
382 flush_odb_transaction();