Added OSYNC_TEST_EXPORT to export symbols in test mode - required for windows port
[opensync.git] / opensync / engine / opensync_obj_engine.c
blob0ba4876c1c8918b94989e910eec0bd960df0f10d
1 /*
2 * libosengine - A synchronization engine for the opensync framework
3 * Copyright (C) 2004-2005 Armin Bauer <armin.bauer@opensync.org>
4 *
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
9 *
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
21 #include "opensync.h"
22 #include "opensync_internals.h"
24 #include "opensync-archive.h"
25 #include "opensync-group.h"
26 #include "opensync-engine.h"
27 #include "opensync-client.h"
28 #include "opensync-data.h"
29 #include "opensync-mapping.h"
30 #include "opensync-format.h"
31 #include "opensync-merger.h"
32 #include "opensync-plugin.h"
33 #include "opensync-xmlformat.h"
35 #include "opensync_engine_internals.h"
36 #include "opensync_sink_engine_internals.h"
37 #include "opensync_mapping_entry_engine_internals.h"
38 #include "opensync_status_internals.h"
40 #include "opensync_mapping_engine.h"
41 #include "opensync_mapping_engine_internals.h"
43 #include "opensync_obj_engine.h"
44 #include "opensync_obj_engine_internals.h"
46 #include "archive/opensync_archive_internals.h"
47 #include "data/opensync_change_internals.h"
48 #include "client/opensync_client_proxy_internals.h"
50 OSyncMappingEngine *_osync_obj_engine_create_mapping_engine(OSyncObjEngine *engine, OSyncError **error)
52 /* If there is none, create one */
53 OSyncMapping *mapping = osync_mapping_new(error);
54 GList *s = NULL;
55 OSyncMappingEngine *mapping_engine = NULL;
56 if (!mapping)
57 goto error;
59 osync_mapping_set_id(mapping, osync_mapping_table_get_next_id(engine->mapping_table));
60 osync_mapping_table_add_mapping(engine->mapping_table, mapping);
62 for (s = engine->sink_engines; s; s = s->next) {
63 OSyncSinkEngine *sink_engine = s->data;
65 OSyncMember *member = osync_client_proxy_get_member(sink_engine->proxy);
67 OSyncMappingEntry *mapping_entry = osync_mapping_entry_new(error);
68 osync_mapping_entry_set_member_id(mapping_entry, osync_member_get_id(member));
69 osync_mapping_add_entry(mapping, mapping_entry);
70 osync_mapping_entry_unref(mapping_entry);
73 mapping_engine = osync_mapping_engine_new(engine, mapping, error);
74 if (!mapping_engine)
75 goto error_free_mapping;
76 osync_mapping_unref(mapping);
78 return mapping_engine;
80 error_free_mapping:
81 osync_mapping_unref(mapping);
82 error:
83 return NULL;
86 static void _osync_obj_engine_connect_callback(OSyncClientProxy *proxy, void *userdata, osync_bool slowsync, OSyncError *error)
88 OSyncSinkEngine *sinkengine = userdata;
89 OSyncObjEngine *engine = sinkengine->engine;
90 OSyncError *locerror = NULL;
92 osync_trace(TRACE_ENTRY, "%s(%p, %p, %i, %p)", __func__, proxy, userdata, slowsync, error);
94 if (error) {
95 osync_trace(TRACE_INTERNAL, "Obj Engine received connect error: %s", osync_error_print(&error));
96 osync_obj_engine_set_error(engine, error);
97 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
98 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
99 } else {
100 engine->sink_connects = engine->sink_connects | (0x1 << sinkengine->position);
101 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_CONNECTED, engine->objtype, NULL);
104 if (slowsync) {
105 osync_obj_engine_set_slowsync(engine, TRUE);
106 osync_trace(TRACE_INTERNAL, "SlowSync requested during connect.");
109 if (osync_bitcount(engine->sink_errors | engine->sink_connects) == g_list_length(engine->sink_engines)) {
110 if (osync_bitcount(engine->sink_errors)) {
111 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "At least one sink_engine failed while connecting");
112 osync_obj_engine_set_error(engine, locerror);
115 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_CONNECTED, locerror ? locerror : error);
116 } else
117 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_connects));
119 osync_trace(TRACE_EXIT, "%s", __func__);
122 static void _osync_obj_engine_generate_event_disconnected(OSyncObjEngine *engine, OSyncError *error)
124 OSyncError *locerror = NULL;
125 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
127 if (osync_bitcount(engine->sink_errors | engine->sink_disconnects) == g_list_length(engine->sink_engines)) {
128 if (osync_bitcount(engine->sink_disconnects) < osync_bitcount(engine->sink_connects)) {
129 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines disconnected than connected");
130 osync_obj_engine_set_error(engine, locerror);
131 osync_error_unref(&locerror);
134 /* Since disconnect errors don't affect the data integrity keep the sync successful, even on
135 a disconnect error. So we have to avoid OSyncEngine->error got set,
136 just keep this ObjEngine disconnect errors at this engine. */
137 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_DISCONNECTED, NULL);
138 } else
139 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_disconnects));
141 osync_trace(TRACE_EXIT, "%s", __func__);
144 static void _osync_obj_engine_disconnect_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
146 OSyncSinkEngine *sinkengine = userdata;
147 OSyncObjEngine *engine = sinkengine->engine;
149 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
151 if (error) {
152 osync_obj_engine_set_error(engine, error);
153 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
154 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
155 } else {
156 engine->sink_disconnects = engine->sink_disconnects | (0x1 << sinkengine->position);
157 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_DISCONNECTED, engine->objtype, NULL);
160 _osync_obj_engine_generate_event_disconnected(engine, error);
162 osync_trace(TRACE_EXIT, "%s", __func__);
165 /* Finds the mapping to which the entry should belong. The
166 * return value is MISMATCH if no mapping could be found,
167 * SIMILAR if a mapping has been found but its not completely the same
168 * SAME if a mapping has been found and is the same */
169 static OSyncConvCmpResult _osync_obj_engine_mapping_find(OSyncObjEngine *engine, OSyncChange *change, OSyncSinkEngine *sinkengine, OSyncMappingEngine **mapping_engine)
171 GList *m = NULL;
172 GList *e = NULL;
173 OSyncConvCmpResult result = OSYNC_CONV_DATA_MISMATCH;
174 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, engine, change, sinkengine, mapping_engine);
176 for (m = engine->mapping_engines; m; m = m->next) {
177 *mapping_engine = m->data;
179 /* Go through the already existing mapping entries. We only consider mappings
180 * which dont have a entry on our side and where the data comparsion does not
181 * return MISMATCH */
182 for (e = (*mapping_engine)->entries; e; e = e->next) {
183 OSyncMappingEntryEngine *entry_engine = e->data;
184 OSyncChange *mapping_change = NULL;
185 /* if the mapping already has a entry on our side, its not worth looking */
186 if (entry_engine->sink_engine == sinkengine) {
187 *mapping_engine = NULL;
188 break;
191 mapping_change = osync_entry_engine_get_change(entry_engine);
192 if (!mapping_change)
193 continue;
195 result = osync_change_compare(mapping_change, change);
196 if (result == OSYNC_CONV_DATA_MISMATCH)
197 *mapping_engine = NULL;
199 break;
202 if (*mapping_engine) {
203 osync_trace(TRACE_EXIT, "%s: Found %p", __func__, *mapping_engine);
204 return result;
208 osync_trace(TRACE_EXIT, "%s: Mismatch", __func__);
209 return OSYNC_CONV_DATA_MISMATCH;
212 osync_bool osync_obj_engine_map_changes(OSyncObjEngine *engine, OSyncError **error)
214 OSyncMappingEngine *mapping_engine = NULL;
215 GList *new_mappings = NULL, *v = NULL;
217 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
218 //osync_trace_disable();
220 /* Go through all sink engines that are available */
221 for (v = engine->sink_engines; v; v = v->next) {
222 OSyncSinkEngine *sinkengine = v->data;
224 /* We use a temp list to speed things up. We dont have to compare with newly created mappings for
225 * the current sinkengine, since there will be only one entry (for the current sinkengine) so there
226 * is no need to compare */
227 new_mappings = NULL;
229 /* For each sinkengine, go through all unmapped changes */
230 while (sinkengine->unmapped) {
231 OSyncChange *change = sinkengine->unmapped->data;
232 OSyncConvCmpResult result = 0;
233 OSyncMappingEntryEngine *entry_engine = NULL;
235 osync_trace(TRACE_INTERNAL, "Looking for mapping for change %s, changetype %i from member %lli", osync_change_get_uid(change), osync_change_get_changetype(change), osync_member_get_id(osync_client_proxy_get_member(sinkengine->proxy)));
237 /* See if there is an exisiting mapping, which fits the unmapped change */
238 result = _osync_obj_engine_mapping_find(engine, change, sinkengine, &mapping_engine);
239 if (result == OSYNC_CONV_DATA_MISMATCH) {
240 /* If there is none, create one */
241 mapping_engine = _osync_obj_engine_create_mapping_engine(engine, error);
242 if (!mapping_engine)
243 goto error;
245 osync_trace(TRACE_INTERNAL, "Unable to find mapping. Creating new mapping with id %lli", osync_mapping_get_id(mapping_engine->mapping));
247 new_mappings = g_list_append(new_mappings, mapping_engine);
248 } else if (result == OSYNC_CONV_DATA_SIMILAR) {
249 mapping_engine->conflict = TRUE;
251 /* Update the entry which belongs to our sinkengine with the the change */
252 entry_engine = osync_mapping_engine_get_entry(mapping_engine, sinkengine);
253 osync_assert(entry_engine);
255 osync_entry_engine_update(entry_engine, change);
256 sinkengine->unmapped = g_list_remove(sinkengine->unmapped, sinkengine->unmapped->data);
257 osync_change_unref(change);
260 engine->mapping_engines = g_list_concat(engine->mapping_engines, new_mappings);
263 //osync_trace_enable();
264 osync_trace(TRACE_EXIT, "%s", __func__);
265 return TRUE;
267 error:
268 osync_trace_enable();
269 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
270 return FALSE;
273 static void _osync_obj_engine_read_ignored_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
275 /* TODO: Share _generate_read_event fucntion with _osync_obj_engine_read_callback?
276 To report errors .. and handle _timeout problems of _read_ignored call.
281 static void _osync_obj_engine_read_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
283 OSyncSinkEngine *sinkengine = userdata;
284 OSyncObjEngine *engine = sinkengine->engine;
285 OSyncError *locerror = NULL;
287 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
289 if (error) {
290 osync_obj_engine_set_error(engine, error);
291 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
292 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
293 } else {
294 engine->sink_get_changes = engine->sink_get_changes | (0x1 << sinkengine->position);
295 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_READ, engine->objtype, NULL);
298 if (osync_bitcount(engine->sink_errors | engine->sink_get_changes) == g_list_length(engine->sink_engines)) {
300 if (osync_bitcount(engine->sink_get_changes) < osync_bitcount(engine->sink_connects)) {
301 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines reported get_changes than connected");
302 osync_obj_engine_set_error(engine, locerror);
303 } else {
304 /* We are now done reading the changes. so we can now start to create the mappings, conflicts etc */
305 if (!osync_obj_engine_map_changes(engine, &locerror)) {
306 osync_obj_engine_set_error(engine, locerror);
307 } else {
308 GList *m;
309 for (m = engine->mapping_engines; m; m = m->next) {
310 OSyncMappingEngine *mapping_engine = m->data;
311 if (!mapping_engine->synced)
312 osync_mapping_engine_check_conflict(mapping_engine);
318 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_READ, locerror ? locerror : error);
319 } else
320 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_get_changes));
322 osync_trace(TRACE_EXIT, "%s", __func__);
325 osync_bool osync_obj_engine_receive_change(OSyncObjEngine *objengine, OSyncClientProxy *proxy, OSyncChange *change, OSyncError **error)
327 OSyncSinkEngine *sinkengine = NULL;
328 GList *s = NULL, *e = NULL;
330 osync_assert(objengine);
332 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p, %p)", __func__, objengine, proxy, change, error);
334 /* Find the sinkengine for the proxy */
335 for (s = objengine->sink_engines; s; s = s->next) {
336 sinkengine = s->data;
337 if (sinkengine->proxy == proxy)
338 break;
339 sinkengine = NULL;
342 if (!sinkengine) {
343 osync_error_set(error, OSYNC_ERROR_GENERIC, "Unable to find sinkengine");
344 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
345 return FALSE;
348 /* We now have to see if the change matches one of the already existing mappings */
349 for (e = sinkengine->entries; e; e = e->next) {
350 OSyncMappingEntryEngine *mapping_engine = e->data;
352 if (osync_entry_engine_matches(mapping_engine, change)) {
353 osync_entry_engine_update(mapping_engine, change);
355 osync_status_update_change(sinkengine->engine->parent, change, osync_client_proxy_get_member(proxy), mapping_engine->mapping_engine->mapping, OSYNC_CHANGE_EVENT_READ, NULL);
357 osync_trace(TRACE_EXIT, "%s: Updated", __func__);
358 return TRUE;
362 osync_status_update_change(sinkengine->engine->parent, change, osync_client_proxy_get_member(proxy), NULL, OSYNC_CHANGE_EVENT_READ, NULL);
364 /* If we couldnt find a match entry, we will append it the unmapped changes
365 * and take care of it later */
366 sinkengine->unmapped = g_list_append(sinkengine->unmapped, change);
367 osync_change_ref(change);
369 osync_trace(TRACE_EXIT, "%s: Unmapped", __func__);
370 return TRUE;
373 /* Note: This function got shared between _osync_obj_engine_commit_change_callback() and
374 osync_obj_engine_written_callback(). Those function call _osync_obj_engine_generate_written_event()
375 with the most recent error and pass it to this function as last argument "error". If no error
376 appears this function got called with NULL as error.
378 It's quite important that this function only get called with the most recent error. This function
379 MUST NOT get called with (obj)engine->error.
381 If this functions doesn't get called with the most recent commit/committed_all error OSyncEngine
382 will get stuck. (testcases: dual_commit_error, dual_commit_timeout, *_commit_*, *_committed_all_*)
384 static void _osync_obj_engine_generate_written_event(OSyncObjEngine *engine, OSyncError *error)
386 osync_bool dirty = FALSE;
387 GList *p = NULL;
388 GList *e = NULL;
389 OSyncSinkEngine *sinkengine = NULL;
390 OSyncError *locerror = NULL;
392 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
393 /* We need to make sure that all entries are written ... */
395 for (p = engine->sink_engines; p; p = p->next) {
396 OSyncMember *member = NULL;
397 OSyncObjTypeSink *objtype_sink = NULL;
399 sinkengine = p->data;
400 member = osync_client_proxy_get_member(sinkengine->proxy);
401 objtype_sink = osync_member_find_objtype_sink(member, engine->objtype);
403 /* If the sink engine isn't able/allowed to write we don't care if everything got written ("how dirty is it!?") */
404 if (!objtype_sink || !osync_objtype_sink_get_write(objtype_sink))
405 break;
407 for (e = sinkengine->entries; e; e = e->next) {
408 OSyncMappingEntryEngine *entry_engine = e->data;
409 if (osync_entry_engine_is_dirty(entry_engine) == TRUE) {
410 dirty = TRUE;
411 break;
414 if (dirty) {
415 osync_trace(TRACE_EXIT, "%s: Still dirty", __func__);
416 return;
419 osync_trace(TRACE_INTERNAL, "%s: Not dirty anymore", __func__);
421 /* And that we received the written replies from all sinks */
422 if (osync_bitcount(engine->sink_errors | engine->sink_written) == g_list_length(engine->sink_engines)) {
423 if (osync_bitcount(engine->sink_written) < osync_bitcount(engine->sink_connects)) {
424 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines reported committed all than connected");
425 osync_obj_engine_set_error(engine, locerror);
426 } else if (osync_bitcount(engine->sink_errors)) {
427 /* Emit engine-wide error if one of the sinks got an error (tests: single_commit_error, ...) */
428 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "At least one Sink Engine failed while committing");
429 osync_obj_engine_set_error(engine, locerror);
432 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_WRITTEN, locerror ? locerror : error);
434 } else
435 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_written));
437 osync_trace(TRACE_EXIT, "%s", __func__);
440 static void _osync_obj_engine_commit_change_callback(OSyncClientProxy *proxy, void *userdata, const char *uid, OSyncError *error)
442 OSyncMappingEntryEngine *entry_engine = userdata;
443 OSyncObjEngine *engine = entry_engine->objengine;
444 OSyncSinkEngine *sinkengine = entry_engine->sink_engine;
445 OSyncError *locerror = NULL;
446 OSyncMapping *mapping = NULL;
447 OSyncMember *member = NULL;
448 OSyncMappingEntry *entry = NULL;
449 const char *objtype = NULL;
450 long long int id = 0;
453 osync_trace(TRACE_ENTRY, "%s(%p, %p, %s, %p)", __func__, proxy, userdata, uid, error);
455 osync_entry_engine_set_dirty(entry_engine, FALSE);
457 mapping = entry_engine->mapping_engine->mapping;
458 member = osync_client_proxy_get_member(proxy);
459 entry = entry_engine->entry;
460 objtype = osync_change_get_objtype(entry_engine->change);
461 id = osync_mapping_entry_get_id(entry);
463 if (error) {
464 /* Error handling (tests: single_commit_error, ...) */
466 /* TODO: Review differences between Mapping and Change status events - Are both really needed?! */
467 osync_status_update_change(engine->parent, entry_engine->change, osync_client_proxy_get_member(proxy), entry_engine->mapping_engine->mapping, OSYNC_CHANGE_EVENT_ERROR, error);
468 osync_status_update_mapping(engine->parent, entry_engine->mapping_engine, OSYNC_MAPPING_EVENT_ERROR, error);
470 osync_obj_engine_set_error(engine, error);
471 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
472 goto error;
475 if (uid)
476 osync_change_set_uid(entry_engine->change, uid);
478 if (engine->archive) {
479 if (osync_change_get_changetype(entry_engine->change) == OSYNC_CHANGE_TYPE_DELETED) {
480 /* TODO error handling */
481 osync_archive_delete_change(engine->archive, id, objtype, &locerror);
482 } else {
484 /* TODO error handling */
485 osync_archive_save_change(engine->archive, id, osync_change_get_uid(entry_engine->change), objtype, osync_mapping_get_id(mapping), osync_member_get_id(member), &locerror);
489 osync_assert(entry_engine->mapping_engine);
490 osync_status_update_change(engine->parent, entry_engine->change, osync_client_proxy_get_member(proxy), entry_engine->mapping_engine->mapping, OSYNC_CHANGE_EVENT_WRITTEN, NULL);
491 osync_entry_engine_update(entry_engine, NULL);
493 osync_trace(TRACE_EXIT, "%s", __func__);
494 return;
496 error:
497 _osync_obj_engine_generate_written_event(engine, error);
498 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(&error));
501 static void _osync_obj_engine_written_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
503 OSyncSinkEngine *sinkengine = userdata;
504 OSyncObjEngine *engine = sinkengine->engine;
506 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
508 if (error) {
509 osync_obj_engine_set_error(engine, error);
510 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
511 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
512 } else {
513 engine->sink_written = engine->sink_written | (0x1 << sinkengine->position);
514 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_WRITTEN, engine->objtype, NULL);
517 _osync_obj_engine_generate_written_event(engine, error);
519 osync_trace(TRACE_EXIT, "%s", __func__);
522 static void _osync_obj_engine_sync_done_callback(OSyncClientProxy *proxy, void *userdata, OSyncError *error)
524 OSyncSinkEngine *sinkengine = userdata;
525 OSyncObjEngine *engine = sinkengine->engine;
526 OSyncError *locerror = NULL;
528 osync_trace(TRACE_ENTRY, "%s(%p, %p, %p)", __func__, proxy, userdata, error);
530 if (error) {
531 osync_obj_engine_set_error(engine, error);
532 engine->sink_errors = engine->sink_errors | (0x1 << sinkengine->position);
533 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_ERROR, engine->objtype, error);
534 } else {
535 engine->sink_sync_done = engine->sink_sync_done | (0x1 << sinkengine->position);
536 osync_status_update_member(engine->parent, osync_client_proxy_get_member(proxy), OSYNC_CLIENT_EVENT_SYNC_DONE, engine->objtype, NULL);
539 if (osync_bitcount(engine->sink_errors | engine->sink_sync_done) == g_list_length(engine->sink_engines)) {
540 if (osync_bitcount(engine->sink_sync_done) < osync_bitcount(engine->sink_connects)) {
541 osync_error_set(&locerror, OSYNC_ERROR_GENERIC, "Fewer sink_engines reported sync_done than connected");
542 osync_obj_engine_set_error(engine, locerror);
545 osync_obj_engine_event(engine, OSYNC_ENGINE_EVENT_SYNC_DONE, locerror ? locerror : error);
546 } else
547 osync_trace(TRACE_INTERNAL, "Not yet: %i", osync_bitcount(engine->sink_errors | engine->sink_sync_done));
549 osync_trace(TRACE_EXIT, "%s", __func__);
552 static osync_bool _create_mapping_engines(OSyncObjEngine *engine, OSyncError **error)
554 int i = 0;
555 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
557 for (i = 0; i < osync_mapping_table_num_mappings(engine->mapping_table); i++) {
558 OSyncMapping *mapping = osync_mapping_table_nth_mapping(engine->mapping_table, i);
560 OSyncMappingEngine *mapping_engine = osync_mapping_engine_new(engine, mapping, error);
561 if (!mapping_engine)
562 goto error;
564 engine->mapping_engines = g_list_append(engine->mapping_engines, mapping_engine);
567 osync_trace(TRACE_EXIT, "%s", __func__);
568 return TRUE;
570 error:
571 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
572 return FALSE;
575 static osync_bool _inject_changelog_entries(OSyncObjEngine *engine, OSyncError **error) {
576 OSyncList *ids = NULL;
577 OSyncList *changetypes = NULL;
578 OSyncList *j = NULL, *t = NULL;
580 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
582 osync_assert(engine);
583 osync_assert(engine->archive);
584 osync_assert(engine->objtype);
586 if (!osync_archive_load_ignored_conflicts(engine->archive, engine->objtype, &ids, &changetypes, error)) {
587 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
588 return FALSE;
591 t = changetypes;
592 for (j = ids; j; j = j->next) {
593 long long int id = (long long int)GPOINTER_TO_INT(j->data);
595 OSyncMapping *ignored_mapping = osync_mapping_table_find_mapping(engine->mapping_table, id);
597 GList *e;
598 for (e = engine->mapping_engines; e; e = e->next) {
599 OSyncMappingEngine *mapping_engine = e->data;
601 if (mapping_engine->mapping == ignored_mapping) {
602 GList *m;
603 for (m = mapping_engine->entries; m; m = m->next) {
604 OSyncMappingEntryEngine *entry = m->data;
605 OSyncChangeType changetype = (OSyncChangeType) t->data;
606 OSyncChange *ignored_change = osync_change_new(error);
607 OSyncObjFormat *dummyformat = NULL;
608 OSyncData *data = NULL;
610 osync_change_set_changetype(ignored_change, changetype);
611 osync_entry_engine_update(entry, ignored_change);
613 dummyformat = osync_objformat_new("plain", engine->objtype, NULL);
614 data = osync_data_new(NULL, 0, dummyformat, NULL);
615 osync_change_set_data(ignored_change, data);
616 osync_objformat_unref(dummyformat);
618 osync_change_set_uid(ignored_change, osync_mapping_entry_get_uid(entry->entry));
620 osync_trace(TRACE_INTERNAL, "CHANGE: %p", entry->change);
622 break;
626 t = t->next;
629 osync_list_free(ids);
630 osync_list_free(changetypes);
632 osync_trace(TRACE_EXIT, "%s", __func__);
633 return TRUE;
637 OSyncObjEngine *osync_obj_engine_new(OSyncEngine *parent, const char *objtype, OSyncFormatEnv *formatenv, OSyncError **error)
639 OSyncObjEngine *engine = NULL;
640 osync_assert(parent);
641 osync_assert(objtype);
643 osync_trace(TRACE_ENTRY, "%s(%p, %s, %p, %p)", __func__, parent, objtype, formatenv, error);
645 engine = osync_try_malloc0(sizeof(OSyncObjEngine), error);
646 if (!engine)
647 goto error;
648 engine->ref_count = 1;
649 engine->slowsync = FALSE;
650 engine->written = FALSE;
652 /* we dont reference the parent to avoid circular dependencies. This object is completely
653 * dependent on the engine anyways */
654 engine->parent = parent;
656 engine->objtype = g_strdup(objtype);
657 engine->formatenv = formatenv;
659 engine->mapping_table = osync_mapping_table_new(error);
660 if (!engine->mapping_table)
661 goto error_free_engine;
663 engine->archive = osync_engine_get_archive(parent);
665 osync_trace(TRACE_EXIT, "%s: %p", __func__, engine);
666 return engine;
668 error_free_engine:
669 osync_obj_engine_unref(engine);
670 error:
671 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
672 return NULL;
675 OSyncObjEngine *osync_obj_engine_ref(OSyncObjEngine *engine)
677 osync_assert(engine);
679 g_atomic_int_inc(&(engine->ref_count));
681 return engine;
684 void osync_obj_engine_unref(OSyncObjEngine *engine)
686 osync_assert(engine);
688 if (g_atomic_int_dec_and_test(&(engine->ref_count))) {
689 while (engine->sink_engines) {
690 OSyncSinkEngine *sinkengine = engine->sink_engines->data;
691 osync_sink_engine_unref(sinkengine);
693 engine->sink_engines = g_list_remove(engine->sink_engines, sinkengine);
696 while (engine->mapping_engines) {
697 OSyncMappingEngine *mapping_engine = engine->mapping_engines->data;
698 osync_mapping_engine_unref(mapping_engine);
700 engine->mapping_engines = g_list_remove(engine->mapping_engines, mapping_engine);
703 if (engine->error)
704 osync_error_unref(&engine->error);
706 if (engine->objtype)
707 g_free(engine->objtype);
709 if (engine->mapping_table)
710 osync_mapping_table_unref(engine->mapping_table);
712 g_free(engine);
716 static int _osync_obj_engine_num_write_sinks(OSyncObjEngine *objengine) {
717 int num = 0;
718 GList *p = NULL;
719 OSyncSinkEngine *sink;
721 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, objengine);
723 for (p = objengine->sink_engines; p; p = p->next) {
724 sink = p->data;
729 osync_trace(TRACE_EXIT, "%s: %i", __func__, num);
730 return num;
733 osync_bool osync_obj_engine_initialize(OSyncObjEngine *engine, OSyncError **error)
735 const char *objtype = NULL;
736 int num = 0;
737 int i = 0;
739 osync_trace(TRACE_ENTRY, "%s(%p, %p)", __func__, engine, error);
741 osync_trace(TRACE_INTERNAL, "Loaded %i mappings", osync_mapping_table_num_mappings(engine->mapping_table));
743 objtype = osync_obj_engine_get_objtype(engine);
745 num = osync_engine_num_proxies(engine->parent);
746 for (i = 0; i < num; i++) {
747 OSyncClientProxy *proxy = osync_engine_nth_proxy(engine->parent, i);
748 OSyncObjTypeSink *sink = osync_client_proxy_find_objtype_sink(proxy, objtype);
749 OSyncSinkEngine *sinkengine = NULL;
750 if (!sink) {
751 /* "data" sink engine counts also as valid. */
752 sink = osync_client_proxy_find_objtype_sink(proxy, "data");
753 if (!sink)
754 continue;
757 sinkengine = osync_sink_engine_new(i, proxy, engine, error);
758 if (!sinkengine)
759 goto error;
761 engine->sink_engines = g_list_append(engine->sink_engines, sinkengine);
764 if (engine->archive && engine->slowsync) {
765 if (!osync_mapping_table_flush(engine->mapping_table, engine->archive, engine->objtype, error))
766 goto error;
769 if (engine->archive) {
770 if (!osync_mapping_table_load(engine->mapping_table, engine->archive, engine->objtype, error))
771 goto error;
774 if (!_create_mapping_engines(engine, error))
775 goto error;
777 osync_trace(TRACE_INTERNAL, "Created %i mapping engine", g_list_length(engine->mapping_engines));
779 if (engine->archive) {
780 /* inject ignored conflicts from previous syncs */
781 if (!_inject_changelog_entries(engine, error))
782 goto error;
785 osync_trace(TRACE_EXIT, "%s", __func__);
786 return TRUE;
787 error:
789 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
790 return FALSE;
793 void osync_obj_engine_finalize(OSyncObjEngine *engine)
795 OSyncMappingEngine *mapping_engine;
796 osync_trace(TRACE_ENTRY, "%s(%p)", __func__, engine);
798 engine->slowsync = FALSE;
799 engine->written = FALSE;
801 engine->sink_errors = 0;
802 engine->sink_connects = 0;
803 engine->sink_disconnects = 0;
804 engine->sink_get_changes = 0;
805 engine->sink_sync_done = 0;
806 engine->sink_written = 0;
808 while (engine->sink_engines) {
809 OSyncSinkEngine *sinkengine = engine->sink_engines->data;
810 osync_sink_engine_unref(sinkengine);
812 engine->sink_engines = g_list_remove(engine->sink_engines, sinkengine);
815 while (engine->conflicts) {
816 mapping_engine = engine->conflicts->data;
817 /* No need to unref the mapping engine. They get unref while emptying
818 the mapping_engines list. See next loop. */
819 engine->conflicts = g_list_remove(engine->conflicts, mapping_engine);
822 while (engine->mapping_engines) {
823 mapping_engine = engine->mapping_engines->data;
824 osync_mapping_engine_unref(mapping_engine);
826 engine->mapping_engines = g_list_remove(engine->mapping_engines, mapping_engine);
829 if (engine->mapping_table)
830 osync_mapping_table_close(engine->mapping_table);
832 osync_trace(TRACE_EXIT, "%s", __func__);
835 const char *osync_obj_engine_get_objtype(OSyncObjEngine *engine)
837 osync_assert(engine);
838 return engine->objtype;
841 void osync_obj_engine_set_slowsync(OSyncObjEngine *engine, osync_bool slowsync)
843 osync_assert(engine);
844 engine->slowsync = slowsync;
847 osync_bool osync_obj_engine_get_slowsync(OSyncObjEngine *engine)
849 osync_assert(engine);
850 return engine->slowsync;
853 osync_bool osync_obj_engine_command(OSyncObjEngine *engine, OSyncEngineCmd cmd, OSyncError **error)
855 GList *p = NULL;
856 GList *m = NULL;
857 GList *e = NULL;
858 OSyncSinkEngine *sinkengine = NULL;
861 osync_trace(TRACE_ENTRY, "%s(%p, %i, %p)", __func__, engine, cmd, error);
862 osync_assert(engine);
864 switch (cmd) {
865 int write_sinks = 0;
866 osync_bool proxy_disconnect = FALSE;
867 case OSYNC_ENGINE_COMMAND_CONNECT:
868 for (p = engine->sink_engines; p; p = p->next) {
869 sinkengine = p->data;
871 if (!osync_client_proxy_connect(sinkengine->proxy, _osync_obj_engine_connect_callback, sinkengine, engine->objtype, engine->slowsync, error))
872 goto error;
874 break;
875 case OSYNC_ENGINE_COMMAND_READ:
876 for (p = engine->sink_engines; p; p = p->next) {
877 sinkengine = p->data;
878 for (m = sinkengine->entries; m; m = m->next) {
879 OSyncMappingEntryEngine *entry = m->data;
880 OSyncChange *change = entry->change;
882 if (!change)
883 continue;
885 if (!osync_client_proxy_read(sinkengine->proxy, _osync_obj_engine_read_ignored_callback, sinkengine, change, error))
886 goto error;
890 if (engine->archive) {
891 /* Flush the changelog - to avoid double entries of ignored entries */
892 if (!osync_archive_flush_ignored_conflict(engine->archive, engine->objtype, error))
893 goto error;
896 write_sinks = _osync_obj_engine_num_write_sinks(engine);
898 /* Get change entries since last sync. (get_changes) */
899 for (p = engine->sink_engines; p; p = p->next) {
900 OSyncMember *member = NULL;
901 OSyncObjTypeSink *objtype_sink = NULL;
903 sinkengine = p->data;
905 member = osync_client_proxy_get_member(sinkengine->proxy);
906 objtype_sink = osync_member_find_objtype_sink(member, engine->objtype);
908 /* Is there at least one other writeable sink? */
909 if (objtype_sink && osync_objtype_sink_get_write(objtype_sink) && write_sinks) {
910 _osync_obj_engine_read_callback(sinkengine->proxy, sinkengine, *error);
911 osync_trace(TRACE_INTERNAL, "no other writable sinks .... SKIP");
912 continue;
915 if (!osync_client_proxy_get_changes(sinkengine->proxy, _osync_obj_engine_read_callback, sinkengine, engine->objtype, engine->slowsync, error))
916 goto error;
919 break;
920 case OSYNC_ENGINE_COMMAND_WRITE:
921 if (engine->conflicts) {
922 osync_trace(TRACE_INTERNAL, "We still have conflict. Delaying write");
923 break;
926 if (engine->written) {
927 osync_trace(TRACE_INTERNAL, "Already written");
928 break;
931 engine->written = TRUE;
933 /* Write the changes. First, we can multiply the winner in the mapping */
934 osync_trace(TRACE_INTERNAL, "Preparing write. multiplying %i mappings", g_list_length(engine->mapping_engines));
935 for (m = engine->mapping_engines; m; m = m->next) {
936 OSyncMappingEngine *mapping_engine = m->data;
937 if (!osync_mapping_engine_multiply(mapping_engine, error))
938 goto error;
941 osync_trace(TRACE_INTERNAL, "Starting to write");
942 for (p = engine->sink_engines; p; p = p->next) {
943 OSyncMember *member = NULL;
944 long long int memberid = 0;
945 OSyncObjTypeSink *objtype_sink = NULL;
946 OSyncFormatConverterPath *path = NULL;
948 sinkengine = p->data;
949 member = osync_client_proxy_get_member(sinkengine->proxy);
950 memberid = osync_member_get_id(member);
951 objtype_sink = osync_member_find_objtype_sink(member, engine->objtype);
953 /* If sink could not be found use "data" sink if available */
954 if (!objtype_sink)
955 objtype_sink = osync_member_find_objtype_sink(member, "data");
956 /* TODO: Review if objtype_sink = NULL is valid at all. */
958 for (e = sinkengine->entries; e; e = e->next) {
959 OSyncMappingEntryEngine *entry_engine = e->data;
960 osync_assert(entry_engine);
962 /* Merger - Save the entire xml and demerge */
963 /* TODO: is here the right place to save the xml???? */
964 if (osync_group_get_merger_enabled(osync_engine_get_group(engine->parent)) &&
965 osync_group_get_converter_enabled(osync_engine_get_group(engine->parent)) &&
966 entry_engine->change &&
967 (osync_change_get_changetype(entry_engine->change) != OSYNC_CHANGE_TYPE_DELETED) &&
968 !strncmp(osync_objformat_get_name(osync_change_get_objformat(entry_engine->change)), "xmlformat-", 10) )
970 char *buffer = NULL;
971 unsigned int xmlformat_size = 0, size = 0;
972 OSyncXMLFormat *xmlformat = NULL;
973 const char *objtype = NULL;
974 OSyncMapping *mapping = NULL;
975 OSyncMerger *merger = NULL;
977 osync_trace(TRACE_INTERNAL, "Entry %s for member %lli: Dirty: %i", osync_change_get_uid(entry_engine->change), memberid, osync_entry_engine_is_dirty(entry_engine));
979 osync_trace(TRACE_INTERNAL, "Save the entire XMLFormat and demerge.");
980 objtype = osync_change_get_objtype(entry_engine->change);
981 mapping = entry_engine->mapping_engine->mapping;
983 osync_data_get_data(osync_change_get_data(entry_engine->change), (char **) &xmlformat, &xmlformat_size);
984 osync_assert(xmlformat_size == osync_xmlformat_size());
986 if(!osync_xmlformat_assemble(xmlformat, &buffer, &size)) {
987 osync_error_set(error, OSYNC_ERROR_GENERIC, "Could not assamble the xmlformat");
988 goto error;
991 if(!osync_archive_save_data(engine->archive, osync_mapping_get_id(mapping), objtype, buffer, size, error)) {
992 g_free(buffer);
993 goto error;
995 g_free(buffer);
997 merger = osync_member_get_merger(osync_client_proxy_get_member(sinkengine->proxy));
998 if(merger)
999 osync_merger_demerge(merger, xmlformat);
1003 /* Only commit change if the objtype sink is able/allowed to write. */
1004 if (objtype_sink && osync_objtype_sink_get_write(objtype_sink) && osync_entry_engine_is_dirty(entry_engine)) {
1005 OSyncChange *change = entry_engine->change;
1006 osync_assert(entry_engine->change);
1008 /* Convert to requested target format if the changetype is not DELETED */
1009 if (osync_group_get_converter_enabled(osync_engine_get_group(engine->parent)) && (osync_change_get_changetype(change) != OSYNC_CHANGE_TYPE_DELETED)) {
1011 char *objtype = NULL;
1012 OSyncList *format_sinks = NULL;
1013 unsigned int length = 0;
1014 OSyncFormatConverter *converter = NULL;
1016 osync_trace(TRACE_INTERNAL, "Starting to convert from objtype %s and format %s", osync_change_get_objtype(entry_engine->change), osync_objformat_get_name(osync_change_get_objformat(entry_engine->change)));
1017 /* We have to save the objtype of the change so that it does not get
1018 * overwritten by the conversion */
1019 objtype = g_strdup(osync_change_get_objtype(change));
1021 /* Now we have to convert to one of the formats
1022 * that the client can understand */
1023 format_sinks = osync_objtype_sink_get_objformat_sinks(objtype_sink);
1024 if (!format_sinks) {
1025 osync_error_set(error, OSYNC_ERROR_GENERIC, "There are no available format sinks.");
1026 goto error;
1029 /* We cache the converter path for each sink/member couple */
1030 if (!path) {
1031 path = osync_format_env_find_path_formats_with_detectors(engine->formatenv, osync_change_get_data(entry_engine->change), format_sinks, osync_objtype_sink_get_preferred_format(objtype_sink), error);
1033 if (!path)
1034 goto error;
1036 length = osync_converter_path_num_edges(path);
1037 converter = osync_converter_path_nth_edge(path, length - 1);
1038 if (converter) {
1039 OSyncObjFormat *format = osync_converter_get_targetformat(converter);
1040 OSyncObjFormatSink *formatsink = osync_objtype_sink_find_objformat_sink(objtype_sink, format);
1041 osync_converter_path_set_config(path, osync_objformat_sink_get_config(formatsink));
1044 if (!osync_format_env_convert(engine->formatenv, path, osync_change_get_data(entry_engine->change), error)) {
1045 osync_converter_path_unref(path);
1046 goto error;
1048 osync_trace(TRACE_INTERNAL, "converted to format %s", osync_objformat_get_name(osync_change_get_objformat(entry_engine->change)));
1051 osync_change_set_objtype(change, objtype);
1052 g_free(objtype);
1055 osync_trace(TRACE_INTERNAL, "Writing change %s, changetype %i, format %s , objtype %s from member %lli",
1056 osync_change_get_uid(change),
1057 osync_change_get_changetype(change),
1058 osync_objformat_get_name(osync_change_get_objformat(change)),
1059 osync_change_get_objtype(change),
1060 osync_member_get_id(osync_client_proxy_get_member(sinkengine->proxy)));
1062 if (!osync_client_proxy_commit_change(sinkengine->proxy, _osync_obj_engine_commit_change_callback, entry_engine, osync_entry_engine_get_change(entry_engine), error))
1063 goto error;
1064 } else if (entry_engine->change) {
1065 OSyncMapping *mapping = entry_engine->mapping_engine->mapping;
1066 OSyncMember *member = osync_client_proxy_get_member(sinkengine->proxy);
1067 OSyncMappingEntry *entry = entry_engine->entry;
1068 const char *objtype = osync_change_get_objtype(entry_engine->change);
1070 if (engine->archive) {
1071 if (osync_change_get_changetype(entry_engine->change) == OSYNC_CHANGE_TYPE_DELETED) {
1072 if (!osync_archive_delete_change(engine->archive, osync_mapping_entry_get_id(entry), objtype, error))
1073 goto error;
1074 } else {
1075 if (!osync_archive_save_change(engine->archive, osync_mapping_entry_get_id(entry), osync_change_get_uid(entry_engine->change), objtype, osync_mapping_get_id(mapping), osync_member_get_id(member), error))
1076 goto error;
1082 if (path)
1083 osync_converter_path_unref(path);
1085 if (!osync_client_proxy_committed_all(sinkengine->proxy, _osync_obj_engine_written_callback, sinkengine, engine->objtype, error))
1086 goto error;
1088 break;
1089 case OSYNC_ENGINE_COMMAND_SYNC_DONE:
1090 for (p = engine->sink_engines; p; p = p->next) {
1091 sinkengine = p->data;
1092 if (!osync_client_proxy_sync_done(sinkengine->proxy, _osync_obj_engine_sync_done_callback, sinkengine, engine->objtype, error))
1093 goto error;
1095 break;
1096 case OSYNC_ENGINE_COMMAND_DISCONNECT:;
1097 for (p = engine->sink_engines; p; p = p->next) {
1098 sinkengine = p->data;
1100 /* Don't call client disconnect functions if the sink is already disconnected.
1101 This avoids unintended disconnect calls of clients/plugins which might not prepared
1102 for a disconnect call when their never got connected. (testcases: *_connect_error, *_connect_timeout ..) */
1103 if (!osync_sink_engine_is_connected(sinkengine))
1104 continue;
1106 proxy_disconnect = TRUE;
1108 if (!osync_client_proxy_disconnect(sinkengine->proxy, _osync_obj_engine_disconnect_callback, sinkengine, engine->objtype, error))
1109 goto error;
1112 /* If no client needs to be disconnected, we MUST NOT expected any
1113 disconnected_callback which generates an OSYNC_ENGINE_EVENT_DISCONNECTED event.
1114 So we directly generate such event on our own. (testcases: double_connect_*, triple_connnect_*) */
1115 if (!proxy_disconnect)
1116 _osync_obj_engine_generate_event_disconnected(engine, NULL);
1118 break;
1119 case OSYNC_ENGINE_COMMAND_SOLVE:
1120 case OSYNC_ENGINE_COMMAND_DISCOVER:
1121 case OSYNC_ENGINE_COMMAND_ABORT:
1122 break;
1125 osync_trace(TRACE_EXIT, "%s", __func__);
1126 return TRUE;
1128 error:
1129 osync_trace(TRACE_EXIT_ERROR, "%s: %s", __func__, osync_error_print(error));
1130 return FALSE;
1134 void osync_obj_engine_event(OSyncObjEngine *engine, OSyncEngineEvent event, OSyncError *error)
1136 osync_trace(TRACE_ENTRY, "%s(%p, %i, %p)", __func__, engine, event, error);
1137 osync_assert(engine);
1139 /* TODO: Create own enum OSyncObjEngine for objengine events. */
1140 osync_assert_msg(event != OSYNC_ENGINE_EVENT_ERROR, "OSyncObjEngine isn't supposed to emit OSYNC_ENGINE_EVENT_ERROR events!");
1142 /* engine event callback gets called with most recent OSyncError or NULL.
1143 Don't use engine->error for the engine event callback. Previous appears errors
1144 in this objengine would get passed to this engine, which will be interpeted by the
1145 engine as error for the current event.
1147 Example:
1149 EVENT_CONNECTED (obj)engine->error: NULL
1150 EVENT_READ **ERROR** (obj)engine->error: 0x....
1151 # OSyncEngine aborts sync and emit disconnect event, we reply with:
1152 EVENT_DISCONNECTED (obj)engine->error: 0x.....
1154 If we would pass in this case enigne->error instead of the most recent
1155 OSyncError, OSyncEngien would interpret this as the disconnect failed as well.
1156 So we just pass the most recent OSyncError pointer, which could be NULL -> no error.
1159 engine->callback(engine, event, error, engine->callback_userdata);
1161 osync_trace(TRACE_EXIT, "%s", __func__);
1162 return;
1165 void osync_obj_engine_set_callback(OSyncObjEngine *engine, OSyncObjEngineEventCallback callback, void *userdata)
1167 osync_assert(engine);
1168 engine->callback = callback;
1169 engine->callback_userdata = userdata;
1172 void osync_obj_engine_set_error(OSyncObjEngine *engine, OSyncError *error)
1174 osync_assert(engine);
1175 if (engine->error) {
1176 osync_error_stack(&error, &engine->error);
1177 osync_error_unref(&engine->error);
1179 engine->error = error;
1180 osync_error_ref(&error);