Added 'automatic' test/check tool
[eblob.git] / example / merge.cpp
blob89bcd2d47ef96f1d18b1f75bcafdb9887df34b5f
1 #include <stdlib.h>
2 #include <string.h>
4 #include <fstream>
5 #include <iostream>
6 #include <string>
7 #include <stdexcept>
8 #include <vector>
10 #include <boost/shared_ptr.hpp>
12 #include <eblob/blob.h>
13 #include "common.hpp"
15 static void copy_data(std::ifstream &src, std::ofstream &dst, size_t size)
17 size_t sz = 1024 * 1024;
19 char *buf = new char[sz];
21 try {
22 while (size != 0) {
23 size_t tmp = sz;
24 if (tmp > size)
25 tmp = size;
27 src.read(buf, tmp);
28 dst.write(buf, tmp);
30 size -= tmp;
32 } catch (...) {
33 delete [] buf;
34 throw;
37 delete [] buf;
40 static void em_usage(char *p)
42 std::cerr << "Usage: " << p << "<options>" << std::endl <<
43 " This utility will defragment and merge (multiple) blobs into larger one\n"
44 " -i path - input blob path (can be specified multiple times)\n"
45 " -o path - output blob path\n"
46 " -p - print all copied IDs\n"
47 " -h - this help\n"
48 "" << std::endl;
49 exit(-1);
52 struct em_blob {
53 int completed;
54 std::ifstream index, data;
55 std::string path_;
57 em_blob(const char *path) : completed(0), path_(path) {
58 try {
59 data.open(path, std::ios_base::in | std::ios_base::binary);
60 std::string index_path(path);
61 index_path += ".index";
62 index.open(index_path.c_str(), std::ios_base::in | std::ios_base::binary);
63 } catch (...) {
64 data.close();
65 index.close();
67 throw;
71 em_blob(const struct em_blob &e) {
72 em_blob(e.path_.c_str());
75 ~em_blob() {
76 data.close();
77 index.close();
81 typedef boost::shared_ptr<em_blob> em_blob_ptr;
83 struct em_ctl {
84 struct eblob_disk_control dc;
85 em_blob_ptr blob;
87 em_ctl(em_blob_ptr b) : blob(b) {
88 memset(&dc, 0, sizeof(struct eblob_disk_control));
92 struct em_compare {
93 bool operator () (const em_ctl &s1, const em_ctl &s2) const {
94 return memcmp(s1.dc.key.id, s2.dc.key.id, EBLOB_ID_SIZE);
98 int main(int argc, char *argv[])
100 int ch;
101 int total_input = 0;
102 int print_all = 0;
103 struct eblob_disk_control ddc;
104 long long total = 0, removed = 0, written = 0, broken = 0;
105 long long position = 0;
107 std::vector<em_blob_ptr> blobs;
108 std::string output;
110 while ((ch = getopt(argc, argv, "i:o:ph")) != -1) {
111 switch (ch) {
112 case 'i':
113 try {
114 em_blob_ptr b(new em_blob(optarg));
116 blobs.push_back(b);
117 total_input++;
118 } catch (const std::exception &e) {
119 std::cerr << "could not open data or index file for blob " << optarg << ": " << e.what() << std::endl;
121 break;
122 case 'o':
123 output.assign(optarg);
124 break;
125 case 'p':
126 print_all = 1;
127 break;
128 case 'h':
129 default:
130 em_usage(argv[0]);
131 /* not reached */
135 if (!blobs.size() || !output.size()) {
136 std::cerr << "You must specify input and output parameters" << std::endl;
137 em_usage(argv[0]);
140 try {
141 std::string data_path = output;
142 std::string index_path = output + ".index";
144 std::ofstream index_out(index_path.c_str(), std::ios_base::out | std::ios_base::binary | std::ios::trunc);
145 std::ofstream data_out(data_path.c_str(), std::ios_base::out | std::ios_base::binary | std::ios::trunc);
147 while (blobs.size() != 0) {
148 std::vector<struct em_ctl> ctl;
150 for (std::vector<em_blob_ptr>::iterator b = blobs.begin(); b < blobs.end(); ++b) {
151 struct em_ctl c(*b);
153 struct em_blob *blob = b->get();
155 if (blob->completed)
156 continue;
158 do {
159 blob->index.read((char *)&c.dc, sizeof(struct eblob_disk_control));
161 if (blob->index.gcount() != sizeof(struct eblob_disk_control)) {
162 blob->completed = 1;
164 std::cout << "Completed input stream " << blob->path_ <<
165 ": total: " << total_input <<
166 ", rest: " << blobs.size() << std::endl;
167 break;
170 } while (c.dc.disk_size == 0);
172 if (blob->completed)
173 continue;
175 blob->index.seekg(-sizeof(struct eblob_disk_control), std::ios_base::cur);
177 eblob_convert_disk_control(&c.dc);
178 ctl.push_back(c);
181 if (!ctl.size()) {
182 std::cout << "Completed all blobs" << std::endl;
183 break;
187 std::sort(ctl.begin(), ctl.end(), em_compare());
189 total++;
191 struct em_ctl c = ctl[0];
192 c.blob->index.seekg(sizeof(struct eblob_disk_control), std::ios_base::cur);
194 if (print_all) {
195 std::cout << c.blob->path_ << ": " << eblob_dump_control(&c.dc, position, 1, 0) << std::endl;
198 if (c.dc.flags & BLOB_DISK_CTL_REMOVE) {
199 removed++;
200 continue;
203 c.blob->data.seekg(c.dc.position, std::ios::beg);
204 c.blob->data.read((char *)&ddc, sizeof(struct eblob_disk_control));
205 eblob_convert_disk_control(&ddc);
207 if (c.blob->data.gcount() != sizeof(struct eblob_disk_control))
208 throw std::runtime_error("Data read failed");
210 if (ddc.flags & BLOB_DISK_CTL_REMOVE) {
211 removed++;
212 continue;
215 size_t size = ddc.disk_size;
217 ddc.position = position;
219 if (print_all) {
220 std::cout << "out: " << eblob_dump_control(&ddc, position, 1, 0) << std::endl;
223 if (size > sizeof(struct eblob_disk_control)) {
224 eblob_convert_disk_control(&ddc);
226 data_out.write((char *)&ddc, sizeof(struct eblob_disk_control));
227 copy_data(c.blob->data, data_out, size - sizeof(struct eblob_disk_control));
229 index_out.write((char *)&ddc, sizeof(struct eblob_disk_control));
231 position += size;
232 written++;
233 } else {
234 broken++;
237 } catch (const std::exception &e) {
238 std::cerr << e.what() << std::endl;
241 std::cout << "Total records: " << total << std::endl;
242 std::cout << "Written records: " << written << std::endl;
243 std::cout << "Removed records: " << removed << std::endl;
244 std::cout << "Broken records: " << broken << std::endl;
246 return 0;