2016-04-27 Hristian Kirtchev <kirtchev@adacore.com>
[official-gcc.git] / liboffloadmic / runtime / offload_engine.cpp
blob4a88546877de3191fe921a28e5d9b6a577ae4218
1 /*
2 Copyright (c) 2014-2015 Intel Corporation. All Rights Reserved.
4 Redistribution and use in source and binary forms, with or without
5 modification, are permitted provided that the following conditions
6 are met:
8 * Redistributions of source code must retain the above copyright
9 notice, this list of conditions and the following disclaimer.
10 * Redistributions in binary form must reproduce the above copyright
11 notice, this list of conditions and the following disclaimer in the
12 documentation and/or other materials provided with the distribution.
13 * Neither the name of Intel Corporation nor the names of its
14 contributors may be used to endorse or promote products derived
15 from this software without specific prior written permission.
17 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20 A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21 HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22 SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23 LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24 DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25 THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31 #include "offload_engine.h"
32 #include <signal.h>
33 #include <errno.h>
35 #include <algorithm>
36 #include <vector>
38 #include "offload_host.h"
39 #include "offload_table.h"
40 #include "offload_iterator.h"
42 // Static members of Stream class must be described somewhere.
43 // This members describe the list of all streams defined in programm
44 // via call to _Offload_stream_create.
45 uint64_t Stream::m_streams_count = 0;
46 StreamMap Stream::all_streams;
47 mutex_t Stream::m_stream_lock;
49 const char* Engine::m_func_names[Engine::c_funcs_total] =
51 "server_compute",
52 #ifdef MYO_SUPPORT
53 "server_myoinit",
54 "server_myofini",
55 #endif // MYO_SUPPORT
56 "server_init",
57 "server_var_table_size",
58 "server_var_table_copy",
59 "server_set_stream_affinity"
62 // Symbolic representation of system signals. Fix for CQ233593
63 const char* Engine::c_signal_names[Engine::c_signal_max] =
65 "Unknown SIGNAL",
66 "SIGHUP", /* 1, Hangup (POSIX). */
67 "SIGINT", /* 2, Interrupt (ANSI). */
68 "SIGQUIT", /* 3, Quit (POSIX). */
69 "SIGILL", /* 4, Illegal instruction (ANSI). */
70 "SIGTRAP", /* 5, Trace trap (POSIX). */
71 "SIGABRT", /* 6, Abort (ANSI). */
72 "SIGBUS", /* 7, BUS error (4.2 BSD). */
73 "SIGFPE", /* 8, Floating-point exception (ANSI). */
74 "SIGKILL", /* 9, Kill, unblockable (POSIX). */
75 "SIGUSR1", /* 10, User-defined signal 1 (POSIX). */
76 "SIGSEGV", /* 11, Segmentation violation (ANSI). */
77 "SIGUSR2", /* 12, User-defined signal 2 (POSIX). */
78 "SIGPIPE", /* 13, Broken pipe (POSIX). */
79 "SIGALRM", /* 14, Alarm clock (POSIX). */
80 "SIGTERM", /* 15, Termination (ANSI). */
81 "SIGSTKFLT", /* 16, Stack fault. */
82 "SIGCHLD", /* 17, Child status has changed (POSIX). */
83 "SIGCONT", /* 18, Continue (POSIX). */
84 "SIGSTOP", /* 19, Stop, unblockable (POSIX). */
85 "SIGTSTP", /* 20, Keyboard stop (POSIX). */
86 "SIGTTIN", /* 21, Background read from tty (POSIX). */
87 "SIGTTOU", /* 22, Background write to tty (POSIX). */
88 "SIGURG", /* 23, Urgent condition on socket (4.2 BSD). */
89 "SIGXCPU", /* 24, CPU limit exceeded (4.2 BSD). */
90 "SIGXFSZ", /* 25, File size limit exceeded (4.2 BSD). */
91 "SIGVTALRM", /* 26, Virtual alarm clock (4.2 BSD). */
92 "SIGPROF", /* 27, Profiling alarm clock (4.2 BSD). */
93 "SIGWINCH", /* 28, Window size change (4.3 BSD, Sun). */
94 "SIGIO", /* 29, I/O now possible (4.2 BSD). */
95 "SIGPWR", /* 30, Power failure restart (System V). */
96 "SIGSYS" /* 31, Bad system call. */
99 void Engine::init(void)
101 if (!m_ready) {
102 mutex_locker_t locker(m_lock);
104 if (!m_ready) {
105 // start process if not done yet
106 if (m_process == 0) {
107 init_process();
110 // load penging images
111 load_libraries();
113 // and (re)build pointer table
114 init_ptr_data();
116 // it is ready now
117 m_ready = true;
122 void Engine::init_process(void)
124 COIENGINE engine;
125 COIRESULT res;
126 const char **environ;
127 char buf[4096]; // For exe path name
129 // create environment for the target process
130 environ = (const char**) mic_env_vars.create_environ_for_card(m_index);
131 if (environ != 0) {
132 for (const char **p = environ; *p != 0; p++) {
133 OFFLOAD_DEBUG_TRACE(3, "Env Var for card %d: %s\n", m_index, *p);
137 // Create execution context in the specified device
138 OFFLOAD_DEBUG_TRACE(2, "Getting device %d (engine %d) handle\n", m_index,
139 m_physical_index);
140 res = COI::EngineGetHandle(COI_ISA_MIC, m_physical_index, &engine);
141 check_result(res, c_get_engine_handle, m_index, res);
143 // Get engine info on threads and cores.
144 // The values of core number and thread number will be used later at stream
145 // creation by call to _Offload_stream_create(device,number_of_cpus).
147 COI_ENGINE_INFO engine_info;
149 res = COI::EngineGetInfo(engine, sizeof(COI_ENGINE_INFO), &engine_info);
150 check_result(res, c_get_engine_info, m_index, res);
152 // m_cpus bitset has 1 for available thread. At the begining all threads
153 // are available and m_cpus(i) is set to
154 // 1 for i = [0...engine_info.NumThreads].
155 m_cpus.reset();
156 for (int i = 0; i < engine_info.NumThreads; i++) {
157 m_cpus.set(i);
160 // The following values will be used at pipeline creation for streams
161 m_num_cores = engine_info.NumCores;
162 m_num_threads = engine_info.NumThreads;
164 // Check if OFFLOAD_DMA_CHANNEL_COUNT is set to 2
165 // Only the value 2 is supported in 16.0
166 if (mic_dma_channel_count == 2) {
167 if (COI::ProcessConfigureDMA) {
168 // Set DMA channels using COI API
169 COI::ProcessConfigureDMA(2, COI::DMA_MODE_READ_WRITE);
171 else {
172 // Set environment variable COI_DMA_CHANNEL_COUNT
173 // use putenv instead of setenv as Windows has no setenv.
174 // Note: putenv requires its argument can't be freed or modified.
175 // So no free after call to putenv or elsewhere.
176 char * env_var = strdup("COI_DMA_CHANNEL_COUNT=2");
177 if (env_var == NULL)
178 LIBOFFLOAD_ERROR(c_malloc);
179 putenv(env_var);
183 // Target executable is not available then use compiler provided offload_main
184 if (__target_exe == 0) {
185 if (mic_device_main == 0)
186 LIBOFFLOAD_ERROR(c_report_no_host_exe);
188 OFFLOAD_DEBUG_TRACE(2,
189 "Loading target executable %s\n",mic_device_main);
191 res = COI::ProcessCreateFromFile(
192 engine, // in_Engine
193 mic_device_main, // in_pBinaryName
194 0, // in_Argc
195 0, // in_ppArgv
196 environ == 0, // in_DupEnv
197 environ, // in_ppAdditionalEnv
198 mic_proxy_io, // in_ProxyActive
199 mic_proxy_fs_root, // in_ProxyfsRoot
200 mic_buffer_size, // in_BufferSpace
201 mic_library_path, // in_LibrarySearchPath
202 &m_process // out_pProcess
205 else {
206 // Target executable should be available by the time when we
207 // attempt to initialize the device
209 // Need the full path of the FAT exe for VTUNE
211 #ifndef TARGET_WINNT
212 ssize_t len = readlink("/proc/self/exe", buf,1000);
213 #else
214 int len = GetModuleFileName(NULL, buf,1000);
215 #endif // TARGET_WINNT
216 if (len == -1) {
217 LIBOFFLOAD_ERROR(c_report_no_host_exe);
218 exit(1);
220 else if (len > 999) {
221 LIBOFFLOAD_ERROR(c_report_path_buff_overflow);
222 exit(1);
224 buf[len] = '\0';
227 OFFLOAD_DEBUG_TRACE(2,
228 "Loading target executable \"%s\" from %p, size %lld, host file %s\n",
229 __target_exe->name, __target_exe->data, __target_exe->size,
230 buf);
232 res = COI::ProcessCreateFromMemory(
233 engine, // in_Engine
234 __target_exe->name, // in_pBinaryName
235 __target_exe->data, // in_pBinaryBuffer
236 __target_exe->size, // in_BinaryBufferLength,
237 0, // in_Argc
238 0, // in_ppArgv
239 environ == 0, // in_DupEnv
240 environ, // in_ppAdditionalEnv
241 mic_proxy_io, // in_ProxyActive
242 mic_proxy_fs_root, // in_ProxyfsRoot
243 mic_buffer_size, // in_BufferSpace
244 mic_library_path, // in_LibrarySearchPath
245 buf, // in_FileOfOrigin
246 -1, // in_FileOfOriginOffset use -1 to indicate to
247 // COI that is is a FAT binary
248 &m_process // out_pProcess
251 check_result(res, c_process_create, m_index, res);
253 if ((mic_4k_buffer_size != 0) || (mic_2m_buffer_size !=0)) {
254 // available only in MPSS 4.2 and greater
255 if (COI::ProcessSetCacheSize != 0 ) {
256 int flags;
257 // Need compiler to use MPSS 3.2 or greater to get these
258 // definition so currently hardcoding it
259 // COI_CACHE_ACTION_GROW_NOW && COI_CACHE_MODE_ONDEMAND_SYNC;
260 flags = 0x00020002;
261 res = COI::ProcessSetCacheSize(
262 m_process, // in_Process
263 mic_2m_buffer_size, // in_HugePagePoolSize
264 flags, // inHugeFlags
265 mic_4k_buffer_size, // in_SmallPagePoolSize
266 flags, // inSmallFlags
267 0, // in_NumDependencies
268 0, // in_pDependencies
269 0 // out_PCompletion
271 OFFLOAD_DEBUG_TRACE(2,
272 "Reserve target buffers 4K pages = %d 2M pages = %d\n",
273 mic_4k_buffer_size, mic_2m_buffer_size);
274 check_result(res, c_process_set_cache_size, m_index, res);
276 else {
277 OFFLOAD_DEBUG_TRACE(2,
278 "Reserve target buffers not supported in current MPSS\n");
282 // get function handles
283 res = COI::ProcessGetFunctionHandles(m_process, c_funcs_total,
284 m_func_names, m_funcs);
285 check_result(res, c_process_get_func_handles, m_index, res);
287 // initialize device side
288 pid_t pid = init_device();
290 // For IDB
291 if (__dbg_is_attached) {
292 // TODO: we have in-memory executable now.
293 // Check with IDB team what should we provide them now?
294 if (strlen(__target_exe->name) < MAX_TARGET_NAME) {
295 strcpy(__dbg_target_exe_name, __target_exe->name);
297 __dbg_target_so_pid = pid;
298 __dbg_target_id = m_physical_index;
299 __dbg_target_so_loaded();
303 void Engine::fini_process(bool verbose)
305 if (m_process != 0) {
306 uint32_t sig;
307 int8_t ret;
309 // destroy target process
310 OFFLOAD_DEBUG_TRACE(2, "Destroying process on the device %d\n",
311 m_index);
313 COIRESULT res = COI::ProcessDestroy(m_process, -1, 0, &ret, &sig);
314 m_process = 0;
316 if (res == COI_SUCCESS) {
317 OFFLOAD_DEBUG_TRACE(3, "Device process: signal %d, exit code %d\n",
318 sig, ret);
319 if (verbose) {
320 if (sig != 0) {
321 LIBOFFLOAD_ERROR(
322 c_mic_process_exit_sig, m_index, sig,
323 c_signal_names[sig >= c_signal_max ? 0 : sig]);
325 else {
326 LIBOFFLOAD_ERROR(c_mic_process_exit_ret, m_index, ret);
330 // for idb
331 if (__dbg_is_attached) {
332 __dbg_target_so_unloaded();
335 else {
336 if (verbose) {
337 LIBOFFLOAD_ERROR(c_mic_process_exit, m_index);
343 void Engine::load_libraries()
345 // load libraries collected so far
346 for (TargetImageList::iterator it = m_images.begin();
347 it != m_images.end(); it++) {
348 OFFLOAD_DEBUG_TRACE(2,
349 "Loading library \"%s\" from %p, size %llu, host file %s\n",
350 it->name, it->data, it->size, it->origin);
352 // load library to the device
353 COILIBRARY lib;
354 COIRESULT res;
355 res = COI::ProcessLoadLibraryFromMemory(m_process,
356 it->data,
357 it->size,
358 it->name,
359 mic_library_path,
360 it->origin,
361 (it->origin) ? -1 : 0,
362 COI_LOADLIBRARY_V1_FLAGS,
363 &lib);
364 m_dyn_libs.push_front(DynLib(it->name, it->data, lib));
366 if (res != COI_SUCCESS && res != COI_ALREADY_EXISTS) {
367 check_result(res, c_load_library, m_index, res);
370 m_images.clear();
373 void Engine::unload_library(const void *data, const char *name)
375 if (m_process == 0) {
376 return;
378 for (DynLibList::iterator it = m_dyn_libs.begin();
379 it != m_dyn_libs.end(); it++) {
380 if (it->data == data) {
381 COIRESULT res;
382 OFFLOAD_DEBUG_TRACE(2,
383 "Unloading library \"%s\"\n",name);
384 res = COI::ProcessUnloadLibrary(m_process,it->lib);
385 m_dyn_libs.erase(it);
386 if (res != COI_SUCCESS) {
387 check_result(res, c_unload_library, m_index, res);
389 return;
394 static bool target_entry_cmp(
395 const VarList::BufEntry &l,
396 const VarList::BufEntry &r
399 const char *l_name = reinterpret_cast<const char*>(l.name);
400 const char *r_name = reinterpret_cast<const char*>(r.name);
401 return strcmp(l_name, r_name) < 0;
404 static bool host_entry_cmp(
405 const VarTable::Entry *l,
406 const VarTable::Entry *r
409 return strcmp(l->name, r->name) < 0;
412 void Engine::init_ptr_data(void)
414 COIRESULT res;
415 COIEVENT event;
417 // Prepare table of host entries
418 std::vector<const VarTable::Entry*> host_table(
419 Iterator(__offload_vars.get_head()),
420 Iterator());
422 // no need to do anything further is host table is empty
423 if (host_table.size() <= 0) {
424 return;
427 // Get var table entries from the target.
428 // First we need to get size for the buffer to copy data
429 struct {
430 int64_t nelems;
431 int64_t length;
432 } params;
434 res = COI::PipelineRunFunction(get_pipeline(),
435 m_funcs[c_func_var_table_size],
436 0, 0, 0,
437 0, 0,
438 0, 0,
439 &params, sizeof(params),
440 &event);
441 check_result(res, c_pipeline_run_func, m_index, res);
443 res = COI::EventWait(1, &event, -1, 1, 0, 0);
444 check_result(res, c_event_wait, res);
446 if (params.length == 0) {
447 return;
450 // create buffer for target entries and copy data to host
451 COIBUFFER buffer;
452 res = COI::BufferCreate(params.length, COI_BUFFER_NORMAL, 0, 0, 1,
453 &m_process, &buffer);
454 check_result(res, c_buf_create, m_index, res);
456 COI_ACCESS_FLAGS flags = COI_SINK_WRITE;
457 res = COI::PipelineRunFunction(get_pipeline(),
458 m_funcs[c_func_var_table_copy],
459 1, &buffer, &flags,
460 0, 0,
461 &params.nelems, sizeof(params.nelems),
462 0, 0,
463 &event);
464 check_result(res, c_pipeline_run_func, m_index, res);
466 res = COI::EventWait(1, &event, -1, 1, 0, 0);
467 check_result(res, c_event_wait, res);
469 // patch names in target data
470 VarList::BufEntry *target_table;
471 COIMAPINSTANCE map_inst;
472 res = COI::BufferMap(buffer, 0, params.length, COI_MAP_READ_ONLY, 0, 0,
473 0, &map_inst,
474 reinterpret_cast<void**>(&target_table));
475 check_result(res, c_buf_map, res);
477 VarList::table_patch_names(target_table, params.nelems);
479 // and sort entries
480 std::sort(target_table, target_table + params.nelems, target_entry_cmp);
481 std::sort(host_table.begin(), host_table.end(), host_entry_cmp);
483 // merge host and target entries and enter matching vars map
484 std::vector<const VarTable::Entry*>::const_iterator hi =
485 host_table.begin();
486 std::vector<const VarTable::Entry*>::const_iterator he =
487 host_table.end();
488 const VarList::BufEntry *ti = target_table;
489 const VarList::BufEntry *te = target_table + params.nelems;
491 while (hi != he && ti != te) {
492 int res = strcmp((*hi)->name, reinterpret_cast<const char*>(ti->name));
493 if (res == 0) {
494 bool is_new;
495 // add matching entry to var map
496 PtrData *ptr = insert_ptr_data((*hi)->addr, (*hi)->size, is_new);
498 // store address for new entries
499 if (is_new) {
500 ptr->mic_addr = ti->addr;
501 ptr->is_static = true;
503 ptr->alloc_ptr_data_lock.unlock();
504 hi++;
505 ti++;
507 else if (res < 0) {
508 hi++;
510 else {
511 ti++;
515 // cleanup
516 res = COI::BufferUnmap(map_inst, 0, 0, 0);
517 check_result(res, c_buf_unmap, res);
519 res = COI::BufferDestroy(buffer);
520 check_result(res, c_buf_destroy, res);
523 COIRESULT Engine::compute(
524 _Offload_stream stream,
525 const std::list<COIBUFFER> &buffers,
526 const void* data,
527 uint16_t data_size,
528 void* ret,
529 uint16_t ret_size,
530 uint32_t num_deps,
531 const COIEVENT* deps,
532 COIEVENT* event
533 ) /* const */
535 COIBUFFER *bufs;
536 COI_ACCESS_FLAGS *flags;
537 COIRESULT res;
539 // convert buffers list to array
540 int num_bufs = buffers.size();
541 if (num_bufs > 0) {
542 bufs = (COIBUFFER*) alloca(num_bufs * sizeof(COIBUFFER));
543 flags = (COI_ACCESS_FLAGS*) alloca(num_bufs *
544 sizeof(COI_ACCESS_FLAGS));
546 int i = 0;
547 for (std::list<COIBUFFER>::const_iterator it = buffers.begin();
548 it != buffers.end(); it++) {
549 bufs[i] = *it;
551 // TODO: this should be fixed
552 flags[i++] = COI_SINK_WRITE;
555 else {
556 bufs = 0;
557 flags = 0;
559 COIPIPELINE pipeline = (stream == no_stream) ?
560 get_pipeline() :
561 get_pipeline(stream);
562 // start computation
563 res = COI::PipelineRunFunction(pipeline,
564 m_funcs[c_func_compute],
565 num_bufs, bufs, flags,
566 num_deps, deps,
567 data, data_size,
568 ret, ret_size,
569 event);
570 return res;
573 pid_t Engine::init_device(void)
575 struct init_data {
576 int device_index;
577 int devices_total;
578 int console_level;
579 int offload_report_level;
580 } data;
581 COIRESULT res;
582 COIEVENT event;
583 pid_t pid;
585 OFFLOAD_DEBUG_TRACE_1(2, 0, c_offload_init,
586 "Initializing device with logical index %d "
587 "and physical index %d\n",
588 m_index, m_physical_index);
590 // setup misc data
591 data.device_index = m_index;
592 data.devices_total = mic_engines_total;
593 data.console_level = console_enabled;
594 data.offload_report_level = offload_report_level;
596 res = COI::PipelineRunFunction(get_pipeline(),
597 m_funcs[c_func_init],
598 0, 0, 0, 0, 0,
599 &data, sizeof(data),
600 &pid, sizeof(pid),
601 &event);
602 check_result(res, c_pipeline_run_func, m_index, res);
604 res = COI::EventWait(1, &event, -1, 1, 0, 0);
605 check_result(res, c_event_wait, res);
607 OFFLOAD_DEBUG_TRACE(2, "Device process pid is %d\n", pid);
609 return pid;
612 // data associated with each thread
613 struct Thread {
614 Thread(long* addr_coipipe_counter) {
615 m_addr_coipipe_counter = addr_coipipe_counter;
616 memset(m_pipelines, 0, sizeof(m_pipelines));
619 ~Thread() {
620 #ifndef TARGET_WINNT
621 __sync_sub_and_fetch(m_addr_coipipe_counter, 1);
622 #else // TARGET_WINNT
623 _InterlockedDecrement(m_addr_coipipe_counter);
624 #endif // TARGET_WINNT
625 for (int i = 0; i < mic_engines_total; i++) {
626 if (m_pipelines[i] != 0) {
627 COI::PipelineDestroy(m_pipelines[i]);
632 COIPIPELINE get_pipeline(int index) const {
633 return m_pipelines[index];
636 void set_pipeline(int index, COIPIPELINE pipeline) {
637 m_pipelines[index] = pipeline;
640 AutoSet& get_auto_vars() {
641 return m_auto_vars;
644 private:
645 long* m_addr_coipipe_counter;
646 AutoSet m_auto_vars;
647 COIPIPELINE m_pipelines[MIC_ENGINES_MAX];
650 COIPIPELINE Engine::get_pipeline(void)
652 Thread* thread = (Thread*) thread_getspecific(mic_thread_key);
653 if (thread == 0) {
654 thread = new Thread(&m_proc_number);
655 thread_setspecific(mic_thread_key, thread);
658 COIPIPELINE pipeline = thread->get_pipeline(m_index);
659 if (pipeline == 0) {
660 COIRESULT res;
661 int proc_num;
663 #ifndef TARGET_WINNT
664 proc_num = __sync_fetch_and_add(&m_proc_number, 1);
665 #else // TARGET_WINNT
666 proc_num = _InterlockedIncrement(&m_proc_number);
667 #endif // TARGET_WINNT
669 if (proc_num > COI_PIPELINE_MAX_PIPELINES) {
670 LIBOFFLOAD_ERROR(c_coipipe_max_number, COI_PIPELINE_MAX_PIPELINES);
671 LIBOFFLOAD_ABORT;
673 // create pipeline for this thread
674 res = COI::PipelineCreate(m_process, 0, mic_stack_size, &pipeline);
675 check_result(res, c_pipeline_create, m_index, res);
676 thread->set_pipeline(m_index, pipeline);
678 return pipeline;
681 Stream* Stream::find_stream(uint64_t handle, bool remove)
683 Stream *stream = 0;
685 m_stream_lock.lock();
687 StreamMap::iterator it = all_streams.find(handle);
688 if (it != all_streams.end()) {
689 stream = it->second;
690 if (remove) {
691 all_streams.erase(it);
695 m_stream_lock.unlock();
696 return stream;
699 COIPIPELINE Engine::get_pipeline(_Offload_stream handle)
701 Stream * stream = Stream::find_stream(handle, false);
703 if (!stream) {
704 LIBOFFLOAD_ERROR(c_offload_no_stream, m_index);
705 LIBOFFLOAD_ABORT;
708 COIPIPELINE pipeline = stream->get_pipeline();
710 if (pipeline == 0) {
711 COIRESULT res;
712 int proc_num;
713 COI_CPU_MASK in_Mask ;
715 #ifndef TARGET_WINNT
716 proc_num = __sync_fetch_and_add(&m_proc_number, 1);
717 #else // TARGET_WINNT
718 proc_num = _InterlockedIncrement(&m_proc_number);
719 #endif // TARGET_WINNT
721 if (proc_num > COI_PIPELINE_MAX_PIPELINES) {
722 LIBOFFLOAD_ERROR(c_coipipe_max_number, COI_PIPELINE_MAX_PIPELINES);
723 LIBOFFLOAD_ABORT;
726 m_stream_lock.lock();
728 // start process if not done yet
729 if (m_process == 0) {
730 init_process();
733 // create CPUmask
734 res = COI::PipelineClearCPUMask(in_Mask);
735 check_result(res, c_clear_cpu_mask, m_index, res);
737 int stream_cpu_num = stream->get_cpu_number();
739 stream->m_stream_cpus.reset();
741 int threads_per_core = m_num_threads / m_num_cores;
743 // The "stream_cpu_num" available threads is set in mask.
744 // Available threads are defined by examining of m_cpus bitset.
745 // We skip thread 0 .
746 for (int i = 1; i < m_num_threads; i++) {
747 // for available thread i m_cpus[i] is equal to 1
748 if (m_cpus[i]) {
749 res = COI::PipelineSetCPUMask(m_process,
750 i / threads_per_core,
751 i % threads_per_core,
752 in_Mask);
754 check_result(res, c_set_cpu_mask, res);
755 // mark thread i as nonavailable
756 m_cpus.set(i,0);
757 // Mark thread i as given for the stream.
758 // In case of stream destroying by call to
759 // _Offload_stream_destroy we can mark the thread i as
760 // available.
761 stream->m_stream_cpus.set(i);
762 if (--stream_cpu_num <= 0) {
763 break;
768 // if stream_cpu_num is greater than 0 there are not enough
769 // available threads
770 if (stream_cpu_num > 0) {
771 LIBOFFLOAD_ERROR(c_create_pipeline_for_stream, m_num_threads);
772 LIBOFFLOAD_ABORT;
774 // create pipeline for this thread
775 OFFLOAD_DEBUG_TRACE(2, "COIPipelineCreate Mask\n"
776 "%016lx %016lx %016lx %016lx\n%016lx %016lx %016lx %016lx\n"
777 "%016lx %016lx %016lx %016lx\n%016lx %016lx %016lx %016lx\n",
778 in_Mask[0], in_Mask[1], in_Mask[2], in_Mask[3],
779 in_Mask[4], in_Mask[5], in_Mask[6], in_Mask[7],
780 in_Mask[8], in_Mask[9], in_Mask[10], in_Mask[11],
781 in_Mask[12], in_Mask[13], in_Mask[14], in_Mask[15]);
782 res = COI::PipelineCreate(m_process, in_Mask,
783 mic_stack_size, &pipeline);
784 check_result(res, c_pipeline_create, m_index, res);
786 // Set stream's affinities
788 struct affinity_spec affinity_spec;
789 char* affinity_type;
790 int i;
792 // "compact" by default
793 affinity_spec.affinity_type = affinity_compact;
795 // Check if user has specified type of affinity
796 if ((affinity_type = getenv("OFFLOAD_STREAM_AFFINITY")) !=
797 NULL)
799 char affinity_str[16];
800 int affinity_str_len;
802 OFFLOAD_DEBUG_TRACE(2,
803 "User has specified OFFLOAD_STREAM_AFFINITY=%s\n",
804 affinity_type);
806 // Set type of affinity requested
807 affinity_str_len = strlen(affinity_type);
808 for (i=0; i<affinity_str_len && i<15; i++)
810 affinity_str[i] = tolower(affinity_type[i]);
812 affinity_str[i] = '\0';
813 if (strcmp(affinity_str, "compact") == 0) {
814 affinity_spec.affinity_type = affinity_compact;
815 OFFLOAD_DEBUG_TRACE(2, "Setting affinity=compact\n");
816 } else if (strcmp(affinity_str, "scatter") == 0) {
817 affinity_spec.affinity_type = affinity_scatter;
818 OFFLOAD_DEBUG_TRACE(2, "Setting affinity=scatter\n");
819 } else {
820 LIBOFFLOAD_ERROR(c_incorrect_affinity, affinity_str);
821 affinity_spec.affinity_type = affinity_compact;
822 OFFLOAD_DEBUG_TRACE(2, "Setting affinity=compact\n");
825 // Make flat copy of sink mask because COI's mask is opaque
826 for (i=0; i<16; i++) {
827 affinity_spec.sink_mask[i] = in_Mask[i];
829 // Set number of cores and threads
830 affinity_spec.num_cores = m_num_cores;
831 affinity_spec.num_threads = m_num_threads;
833 COIEVENT event;
834 res = COI::PipelineRunFunction(pipeline,
835 m_funcs[c_func_set_stream_affinity],
836 0, 0, 0,
837 0, 0,
838 &affinity_spec, sizeof(affinity_spec),
839 0, 0,
840 &event);
841 check_result(res, c_pipeline_run_func, m_index, res);
843 res = COI::EventWait(1, &event, -1, 1, 0, 0);
844 check_result(res, c_event_wait, res);
847 m_stream_lock.unlock();
848 stream->set_pipeline(pipeline);
850 return pipeline;
853 void Engine::stream_destroy(_Offload_stream handle)
855 // get stream
856 Stream * stream = Stream::find_stream(handle, true);
858 if (stream) {
859 // return cpus for future use
860 for (int i = 0; i < m_num_threads; i++) {
861 if (stream->m_stream_cpus.test(i)) {
862 m_cpus.set(i);
865 delete stream;
867 else {
868 LIBOFFLOAD_ERROR(c_offload_no_stream, m_index);
869 LIBOFFLOAD_ABORT;
873 uint64_t Engine::get_thread_id(void)
875 Thread* thread = (Thread*) thread_getspecific(mic_thread_key);
876 if (thread == 0) {
877 thread = new Thread(&m_proc_number);
878 thread_setspecific(mic_thread_key, thread);
881 return reinterpret_cast<uint64_t>(thread);
884 AutoSet& Engine::get_auto_vars(void)
886 Thread* thread = (Thread*) thread_getspecific(mic_thread_key);
887 if (thread == 0) {
888 thread = new Thread(&m_proc_number);
889 thread_setspecific(mic_thread_key, thread);
892 return thread->get_auto_vars();
895 void Engine::destroy_thread_data(void *data)
897 delete static_cast<Thread*>(data);