2 * Copyright (c) 2011, Google Inc.
5 #include "environment.h"
7 #include "repository.h"
8 #include "object-store.h"
9 #include "replace-object.h"
13 typedef int (*open_istream_fn
)(struct git_istream
*,
15 const struct object_id
*,
17 typedef int (*close_istream_fn
)(struct git_istream
*);
18 typedef ssize_t (*read_istream_fn
)(struct git_istream
*, char *, size_t);
20 #define FILTER_BUFFER (1024*16)
22 struct filtered_istream
{
23 struct git_istream
*upstream
;
24 struct stream_filter
*filter
;
25 char ibuf
[FILTER_BUFFER
];
26 char obuf
[FILTER_BUFFER
];
34 close_istream_fn close
;
37 unsigned long size
; /* inflated size of full object */
39 enum { z_unused
, z_used
, z_done
, z_error
} z_state
;
43 char *buf
; /* from oid_object_info_extended() */
44 unsigned long read_ptr
;
49 unsigned long mapsize
;
56 struct packed_git
*pack
;
60 struct filtered_istream filtered
;
64 /*****************************************************************
68 *****************************************************************/
70 static void close_deflated_stream(struct git_istream
*st
)
72 if (st
->z_state
== z_used
)
73 git_inflate_end(&st
->z
);
77 /*****************************************************************
81 *****************************************************************/
83 static int close_istream_filtered(struct git_istream
*st
)
85 free_stream_filter(st
->u
.filtered
.filter
);
86 return close_istream(st
->u
.filtered
.upstream
);
89 static ssize_t
read_istream_filtered(struct git_istream
*st
, char *buf
,
92 struct filtered_istream
*fs
= &(st
->u
.filtered
);
96 /* do we already have filtered output? */
97 if (fs
->o_ptr
< fs
->o_end
) {
98 size_t to_move
= fs
->o_end
- fs
->o_ptr
;
101 memcpy(buf
+ filled
, fs
->obuf
+ fs
->o_ptr
, to_move
);
102 fs
->o_ptr
+= to_move
;
107 fs
->o_end
= fs
->o_ptr
= 0;
109 /* do we have anything to feed the filter with? */
110 if (fs
->i_ptr
< fs
->i_end
) {
111 size_t to_feed
= fs
->i_end
- fs
->i_ptr
;
112 size_t to_receive
= FILTER_BUFFER
;
113 if (stream_filter(fs
->filter
,
114 fs
->ibuf
+ fs
->i_ptr
, &to_feed
,
115 fs
->obuf
, &to_receive
))
117 fs
->i_ptr
= fs
->i_end
- to_feed
;
118 fs
->o_end
= FILTER_BUFFER
- to_receive
;
122 /* tell the filter to drain upon no more input */
123 if (fs
->input_finished
) {
124 size_t to_receive
= FILTER_BUFFER
;
125 if (stream_filter(fs
->filter
,
127 fs
->obuf
, &to_receive
))
129 fs
->o_end
= FILTER_BUFFER
- to_receive
;
134 fs
->i_end
= fs
->i_ptr
= 0;
136 /* refill the input from the upstream */
137 if (!fs
->input_finished
) {
138 fs
->i_end
= read_istream(fs
->upstream
, fs
->ibuf
, FILTER_BUFFER
);
144 fs
->input_finished
= 1;
149 static struct git_istream
*attach_stream_filter(struct git_istream
*st
,
150 struct stream_filter
*filter
)
152 struct git_istream
*ifs
= xmalloc(sizeof(*ifs
));
153 struct filtered_istream
*fs
= &(ifs
->u
.filtered
);
155 ifs
->close
= close_istream_filtered
;
156 ifs
->read
= read_istream_filtered
;
159 fs
->i_end
= fs
->i_ptr
= 0;
160 fs
->o_end
= fs
->o_ptr
= 0;
161 fs
->input_finished
= 0;
162 ifs
->size
= -1; /* unknown */
166 /*****************************************************************
168 * Loose object stream
170 *****************************************************************/
172 static ssize_t
read_istream_loose(struct git_istream
*st
, char *buf
, size_t sz
)
174 size_t total_read
= 0;
176 switch (st
->z_state
) {
185 if (st
->u
.loose
.hdr_used
< st
->u
.loose
.hdr_avail
) {
186 size_t to_copy
= st
->u
.loose
.hdr_avail
- st
->u
.loose
.hdr_used
;
189 memcpy(buf
, st
->u
.loose
.hdr
+ st
->u
.loose
.hdr_used
, to_copy
);
190 st
->u
.loose
.hdr_used
+= to_copy
;
191 total_read
+= to_copy
;
194 while (total_read
< sz
) {
197 st
->z
.next_out
= (unsigned char *)buf
+ total_read
;
198 st
->z
.avail_out
= sz
- total_read
;
199 status
= git_inflate(&st
->z
, Z_FINISH
);
201 total_read
= st
->z
.next_out
- (unsigned char *)buf
;
203 if (status
== Z_STREAM_END
) {
204 git_inflate_end(&st
->z
);
205 st
->z_state
= z_done
;
208 if (status
!= Z_OK
&& (status
!= Z_BUF_ERROR
|| total_read
< sz
)) {
209 git_inflate_end(&st
->z
);
210 st
->z_state
= z_error
;
217 static int close_istream_loose(struct git_istream
*st
)
219 close_deflated_stream(st
);
220 munmap(st
->u
.loose
.mapped
, st
->u
.loose
.mapsize
);
224 static int open_istream_loose(struct git_istream
*st
, struct repository
*r
,
225 const struct object_id
*oid
,
226 enum object_type
*type
)
228 struct object_info oi
= OBJECT_INFO_INIT
;
229 oi
.sizep
= &st
->size
;
232 st
->u
.loose
.mapped
= map_loose_object(r
, oid
, &st
->u
.loose
.mapsize
);
233 if (!st
->u
.loose
.mapped
)
235 switch (unpack_loose_header(&st
->z
, st
->u
.loose
.mapped
,
236 st
->u
.loose
.mapsize
, st
->u
.loose
.hdr
,
237 sizeof(st
->u
.loose
.hdr
), NULL
)) {
244 if (parse_loose_header(st
->u
.loose
.hdr
, &oi
) < 0 || *type
< 0)
247 st
->u
.loose
.hdr_used
= strlen(st
->u
.loose
.hdr
) + 1;
248 st
->u
.loose
.hdr_avail
= st
->z
.total_out
;
249 st
->z_state
= z_used
;
250 st
->close
= close_istream_loose
;
251 st
->read
= read_istream_loose
;
255 git_inflate_end(&st
->z
);
256 munmap(st
->u
.loose
.mapped
, st
->u
.loose
.mapsize
);
261 /*****************************************************************
263 * Non-delta packed object stream
265 *****************************************************************/
267 static ssize_t
read_istream_pack_non_delta(struct git_istream
*st
, char *buf
,
270 size_t total_read
= 0;
272 switch (st
->z_state
) {
274 memset(&st
->z
, 0, sizeof(st
->z
));
275 git_inflate_init(&st
->z
);
276 st
->z_state
= z_used
;
286 while (total_read
< sz
) {
288 struct pack_window
*window
= NULL
;
289 unsigned char *mapped
;
291 mapped
= use_pack(st
->u
.in_pack
.pack
, &window
,
292 st
->u
.in_pack
.pos
, &st
->z
.avail_in
);
294 st
->z
.next_out
= (unsigned char *)buf
+ total_read
;
295 st
->z
.avail_out
= sz
- total_read
;
296 st
->z
.next_in
= mapped
;
297 status
= git_inflate(&st
->z
, Z_FINISH
);
299 st
->u
.in_pack
.pos
+= st
->z
.next_in
- mapped
;
300 total_read
= st
->z
.next_out
- (unsigned char *)buf
;
303 if (status
== Z_STREAM_END
) {
304 git_inflate_end(&st
->z
);
305 st
->z_state
= z_done
;
310 * Unlike the loose object case, we do not have to worry here
311 * about running out of input bytes and spinning infinitely. If
312 * we get Z_BUF_ERROR due to too few input bytes, then we'll
313 * replenish them in the next use_pack() call when we loop. If
314 * we truly hit the end of the pack (i.e., because it's corrupt
315 * or truncated), then use_pack() catches that and will die().
317 if (status
!= Z_OK
&& status
!= Z_BUF_ERROR
) {
318 git_inflate_end(&st
->z
);
319 st
->z_state
= z_error
;
326 static int close_istream_pack_non_delta(struct git_istream
*st
)
328 close_deflated_stream(st
);
332 static int open_istream_pack_non_delta(struct git_istream
*st
,
333 struct repository
*r UNUSED
,
334 const struct object_id
*oid UNUSED
,
335 enum object_type
*type UNUSED
)
337 struct pack_window
*window
;
338 enum object_type in_pack_type
;
342 in_pack_type
= unpack_object_header(st
->u
.in_pack
.pack
,
347 switch (in_pack_type
) {
349 return -1; /* we do not do deltas for now */
356 st
->z_state
= z_unused
;
357 st
->close
= close_istream_pack_non_delta
;
358 st
->read
= read_istream_pack_non_delta
;
364 /*****************************************************************
368 *****************************************************************/
370 static int close_istream_incore(struct git_istream
*st
)
372 free(st
->u
.incore
.buf
);
376 static ssize_t
read_istream_incore(struct git_istream
*st
, char *buf
, size_t sz
)
378 size_t read_size
= sz
;
379 size_t remainder
= st
->size
- st
->u
.incore
.read_ptr
;
381 if (remainder
<= read_size
)
382 read_size
= remainder
;
384 memcpy(buf
, st
->u
.incore
.buf
+ st
->u
.incore
.read_ptr
, read_size
);
385 st
->u
.incore
.read_ptr
+= read_size
;
390 static int open_istream_incore(struct git_istream
*st
, struct repository
*r
,
391 const struct object_id
*oid
, enum object_type
*type
)
393 struct object_info oi
= OBJECT_INFO_INIT
;
395 st
->u
.incore
.read_ptr
= 0;
396 st
->close
= close_istream_incore
;
397 st
->read
= read_istream_incore
;
400 oi
.sizep
= &st
->size
;
401 oi
.contentp
= (void **)&st
->u
.incore
.buf
;
402 return oid_object_info_extended(r
, oid
, &oi
,
403 OBJECT_INFO_DIE_IF_CORRUPT
);
406 /*****************************************************************************
407 * static helpers variables and functions for users of streaming interface
408 *****************************************************************************/
410 static int istream_source(struct git_istream
*st
,
411 struct repository
*r
,
412 const struct object_id
*oid
,
413 enum object_type
*type
)
417 struct object_info oi
= OBJECT_INFO_INIT
;
421 status
= oid_object_info_extended(r
, oid
, &oi
, 0);
427 st
->open
= open_istream_loose
;
430 if (!oi
.u
.packed
.is_delta
&& big_file_threshold
< size
) {
431 st
->u
.in_pack
.pack
= oi
.u
.packed
.pack
;
432 st
->u
.in_pack
.pos
= oi
.u
.packed
.offset
;
433 st
->open
= open_istream_pack_non_delta
;
438 st
->open
= open_istream_incore
;
443 /****************************************************************
444 * Users of streaming interface
445 ****************************************************************/
447 int close_istream(struct git_istream
*st
)
449 int r
= st
->close(st
);
454 ssize_t
read_istream(struct git_istream
*st
, void *buf
, size_t sz
)
456 return st
->read(st
, buf
, sz
);
459 struct git_istream
*open_istream(struct repository
*r
,
460 const struct object_id
*oid
,
461 enum object_type
*type
,
463 struct stream_filter
*filter
)
465 struct git_istream
*st
= xmalloc(sizeof(*st
));
466 const struct object_id
*real
= lookup_replace_object(r
, oid
);
467 int ret
= istream_source(st
, r
, real
, type
);
474 if (st
->open(st
, r
, real
, type
)) {
475 if (open_istream_incore(st
, r
, real
, type
)) {
481 /* Add "&& !is_null_stream_filter(filter)" for performance */
482 struct git_istream
*nst
= attach_stream_filter(st
, filter
);
494 int stream_blob_to_fd(int fd
, const struct object_id
*oid
, struct stream_filter
*filter
,
497 struct git_istream
*st
;
498 enum object_type type
;
503 st
= open_istream(the_repository
, oid
, &type
, &sz
, filter
);
506 free_stream_filter(filter
);
509 if (type
!= OBJ_BLOB
)
513 ssize_t wrote
, holeto
;
514 ssize_t readlen
= read_istream(st
, buf
, sizeof(buf
));
520 if (can_seek
&& sizeof(buf
) == readlen
) {
521 for (holeto
= 0; holeto
< readlen
; holeto
++)
524 if (readlen
== holeto
) {
530 if (kept
&& lseek(fd
, kept
, SEEK_CUR
) == (off_t
) -1)
534 wrote
= write_in_full(fd
, buf
, readlen
);
539 if (kept
&& (lseek(fd
, kept
- 1, SEEK_CUR
) == (off_t
) -1 ||
540 xwrite(fd
, "", 1) != 1))