Scippy

SCIP

Solving Constraint Integer Programs

tpi_openmp.c
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and library */
4 /* SCIP --- Solving Constraint Integer Programs */
5 /* */
6 /* Copyright (C) 2002-2019 Konrad-Zuse-Zentrum */
7 /* fuer Informationstechnik Berlin */
8 /* */
9 /* SCIP is distributed under the terms of the ZIB Academic License. */
10 /* */
11 /* You should have received a copy of the ZIB Academic License */
12 /* along with SCIP; see the file COPYING. If not visit scip.zib.de. */
13 /* */
14 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
15 
16 /**@file tpi_openmp.c
17  * @ingroup TASKINTERFACE
18  * @brief the interface functions for openmp
19  * @author Stephen J. Maher
20  * @author Robert Lion Gottwald
21  */
22 
23 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
24 
25 #include "tpi/tpi.h"
26 #include "blockmemshell/memory.h"
27 
28 /** A job added to the queue */
29 struct SCIP_Job
30 {
31  int jobid; /**< id to identify jobs from a common process */
32  struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
33  SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
34  void* args; /**< pointer to the function arguements */
35  SCIP_RETCODE retcode; /**< return code of the job */
36 };
37 
38 /** the thread pool job queue */
40 {
41  SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
42  SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
43  int njobs; /**< number of jobs in the queue */
44 };
46 
48 {
49  SCIP_JOBQUEUE jobqueue; /**< queue of unprocessed jobs */
50  SCIP_JOB** currentjobs; /**< array with slot for each thread to store the currently running job */
51  int ncurrentjobs; /**< number of currently running jobs */
52  int nthreads; /**< number of threads */
53  SCIP_JOBQUEUE finishedjobs; /**< jobqueue containing the finished jobs */
54  SCIP_LOCK lock; /**< lock to protect this stucture from concurrent access */
55  SCIP_CONDITION jobfinished; /**< condition to signal if a job was finished */
56 };
58 
59 static SCIP_JOBQUEUES* _jobqueues = NULL;
60 
61 
62 
63 static
65  int nthreads, /**< the number of threads */
66  int qsize, /**< the queue size */
67  SCIP_Bool blockwhenfull /**< should the queue be blocked from new jobs when full */
68  )
69 {
70  int i;
71 
72  assert(nthreads >= 0);
73  assert(qsize >= 0);
74  SCIP_UNUSED( blockwhenfull );
75 
76  /* allocting memory for the job queue */
77  SCIP_ALLOC( BMSallocMemory(&_jobqueues) );
78  _jobqueues->jobqueue.firstjob = NULL;
79  _jobqueues->jobqueue.lastjob = NULL;
80  _jobqueues->jobqueue.njobs = 0;
81  _jobqueues->finishedjobs.firstjob = NULL;
82  _jobqueues->finishedjobs.lastjob = NULL;
83  _jobqueues->finishedjobs.njobs = 0;
84  _jobqueues->ncurrentjobs = 0;
85 
86  _jobqueues->nthreads = nthreads;
87  SCIP_ALLOC( BMSallocMemoryArray(&_jobqueues->currentjobs, nthreads) );
88 
89  for( i = 0; i < nthreads; ++i )
90  _jobqueues->currentjobs[i] = NULL;
91 
92  SCIP_CALL( SCIPtpiInitLock(&_jobqueues->lock) );
94 
95  return SCIP_OKAY;
96 }
97 
98 
99 
100 static
102  void
103  )
104 {
105  assert(_jobqueues != NULL);
106 
107  SCIPtpiDestroyLock(&_jobqueues->lock);
108  SCIPtpiDestroyCondition(&_jobqueues->jobfinished);
109  BMSfreeMemoryArray(&_jobqueues->currentjobs);
110 
111  BMSfreeMemory(&_jobqueues);
112 
113  return SCIP_OKAY;
114 }
115 
116 
117 static
119  SCIP_JOB* job /**< the job to be executed in parallel */
120  )
121 {
122  int threadnum;
123 
124  threadnum = SCIPtpiGetThreadNum();
125 
126  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
127  _jobqueues->currentjobs[threadnum] = job;
128  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
129 
130  job->retcode = (*(job->jobfunc))(job->args);
131 
132  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
133  _jobqueues->ncurrentjobs--;
134  _jobqueues->currentjobs[threadnum] = NULL;
135 
136  /* insert job into finished jobs */
137  if( _jobqueues->finishedjobs.njobs == 0 )
138  {
139  _jobqueues->finishedjobs.firstjob = job;
140  _jobqueues->finishedjobs.lastjob = job;
141  }
142  else
143  {
144  _jobqueues->finishedjobs.lastjob->nextjob = job;
145  _jobqueues->finishedjobs.lastjob = job;
146  }
147 
148  ++_jobqueues->finishedjobs.njobs;
149 
151 
152  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
153 }
154 
155 
156 /** this is a job that will be executed on to process the job queue
157  *
158  * The job will only be added when the number of active jobs is equal to the number of threads.
159  * As such, there will always be number of threads + 1 tasks available for the scheduler to run.
160  */
161 static
163  void
164  )
165 {
166  SCIP_JOB* job;
167  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
168 
169  while( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
170  {
171  SCIP_CALL_ABORT( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
172  }
173 
174  if( _jobqueues->jobqueue.njobs == 1 )
175  {
176  job = _jobqueues->jobqueue.firstjob;
177  _jobqueues->jobqueue.firstjob = NULL;
178  _jobqueues->jobqueue.lastjob = NULL;
179  --_jobqueues->jobqueue.njobs;
180  }
181  else if( _jobqueues->jobqueue.njobs > 1 )
182  {
183  job = _jobqueues->jobqueue.firstjob;
184  _jobqueues->jobqueue.firstjob = job->nextjob;
185  --_jobqueues->jobqueue.njobs;
186  }
187  else
188  {
189  job = NULL;
190  }
191 
192  ++_jobqueues->ncurrentjobs;
193  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
194 
195  if( job )
196  {
197  executeJob(job);
198  }
199 }
200 
201 
202 
203 
204 /** adding a job to the job queue.
205  *
206  * This gives some more flexibility in the handling of new jobs.
207  * IMPORTANT: This function MUST be called from within a mutex.
208  */
209 static
211  SCIP_JOB* newjob
212  )
213 {
214  /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
215  /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
216  assert(newjob != NULL);
217 
218  newjob->nextjob = NULL;
219 
220  /* this function queries the current job list. This could change by other threads writing to the list. So a lock is
221  * required to ensure that the current joblist remains static. */
222  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
223 
224  /* checking the status of the job queue */
225  if( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
226  {
227  if( _jobqueues->jobqueue.njobs == 0 )
228  {
229  _jobqueues->jobqueue.firstjob = newjob;
230  _jobqueues->jobqueue.lastjob = newjob;
231  }
232  else /* it is assumed that the jobqueue is not full */
233  {
234  _jobqueues->jobqueue.lastjob->nextjob = newjob;
235  _jobqueues->jobqueue.lastjob = newjob;
236  }
237 
238  _jobqueues->jobqueue.njobs++;
239 
240  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
241 
242  #pragma omp task
244  }
245  else
246  {
247  assert(_jobqueues->ncurrentjobs < SCIPtpiGetNumThreads());
248 
249  _jobqueues->ncurrentjobs++;
250 
251  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
252  /* running the new job */
253  #pragma omp task firstprivate(newjob)
254  executeJob(newjob);
255  }
256 
257  return SCIP_OKAY;
258 }
259 
260 
262  SCIP_CONDITION* condition
263  )
264 {
265  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
266 
267  if( condition->_waitnum > condition->_signals )
268  ++condition->_signals;
269 
270  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
271 
272  return SCIP_OKAY;
273 }
274 
276  SCIP_CONDITION* condition
277  )
278 {
279 
280  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
281  condition->_signals = condition->_waitnum;
282  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
283 
284  return SCIP_OKAY;
285 }
286 
288  SCIP_CONDITION* condition,
289  SCIP_LOCK* lock
290  )
291 {
292  int waitnum;
293 
294  SCIP_CALL( SCIPtpiReleaseLock(lock) );
295 
296  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
297  waitnum = ++condition->_waitnum;
298 
299  ++condition->_waiters;
300 
301  do
302  {
303  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
304  #pragma omp taskyield
305  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
306  }
307  while( condition->_signals < waitnum );
308 
309  --condition->_waiters;
310 
311  if( condition->_waiters == 0 )
312  {
313  condition->_signals = 0;
314  condition->_waitnum = 0;
315  }
316 
317  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
318 
319  SCIP_CALL( SCIPtpiAcquireLock(lock) );
320 
321  return SCIP_OKAY;
322 }
323 
324 /** Returns the number of threads */
326  )
327 {
328  return omp_get_num_threads();
329 }
330 
331 /** Returns the thread number */
333  )
334 {
335  return omp_get_thread_num();
336 }
337 
338 /** creates a job for parallel processing*/
340  SCIP_JOB** job, /**< pointer to the job that will be created */
341  int jobid, /**< the id for the current job */
342  SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
343  void* jobarg /**< the job's argument */
344  )
345 {
346  SCIP_ALLOC( BMSallocMemory(job) );
347 
348  (*job)->jobid = jobid;
349  (*job)->jobfunc = jobfunc;
350  (*job)->args = jobarg;
351  (*job)->nextjob = NULL;
352 
353  return SCIP_OKAY;
354 }
355 
356 /** get a new job id for the new set of submitted jobs */
358  void
359  )
360 {
361  static int currentjobid = 0;
362  int jobid;
363 
364  #pragma omp atomic capture
365  jobid = ++currentjobid;
366 
367  return jobid;
368 }
369 
370 /** submit a job for parallel processing
371  *
372  * the return is a globally defined status
373  */
375  SCIP_JOB* job, /**< pointer to the job to be submitted */
376  SCIP_SUBMITSTATUS* status /**< pointer to store the submit status */
377 
378  )
379 {
380  assert(_jobqueues != NULL);
381 
382  *status = SCIP_SUBMIT_SUCCESS;
383  SCIP_CALL( jobQueueAddJob(job) );
384 
385  return SCIP_OKAY;
386 }
387 
388 static
390  int jobid
391  )
392 {
393  int i;
394 
395  if( _jobqueues->ncurrentjobs > 0 )
396  {
397  for( i = 0; i < _jobqueues->nthreads; ++i )
398  {
399  if( _jobqueues->currentjobs[i] != NULL && _jobqueues->currentjobs[i]->jobid == jobid )
400  return TRUE;
401  }
402  }
403 
404  return FALSE;
405 }
406 
407 static
409  int jobid
410  )
411 {
412  if( _jobqueues->jobqueue.njobs > 0 )
413  {
414  SCIP_JOB* currjob;
415  currjob = _jobqueues->jobqueue.firstjob;
416 
417  do
418  {
419  if( currjob->jobid == jobid )
420  return TRUE;
421 
422  if( currjob == _jobqueues->jobqueue.lastjob )
423  break;
424 
425  currjob = currjob->nextjob;
426  }
427  while( TRUE ); /*lint !e506*/
428  }
429 
430  return FALSE;
431 }
432 
433 
434 /** Blocks until all jobs of the given jobid have finished
435  * and then returns the smallest SCIP_RETCODE of all the jobs */
437  int jobid
438  )
439 {
441 
442  retcode = SCIP_OKAY;
443  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
444 
445  while( isJobRunning(jobid) || isJobWaiting(jobid) )
446  {
447  SCIP_CALL( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
448  }
449 
450  if( _jobqueues->finishedjobs.njobs > 0 )
451  {
452  SCIP_JOB* currjob = _jobqueues->finishedjobs.firstjob;
453  SCIP_JOB* prevjob = NULL;
454 
455  /* finding the location of the processed job in the currentjobs queue */
456  do
457  {
458  if( currjob->jobid == jobid )
459  {
460  SCIP_JOB* nextjob;
461 
462  /* if the job has the right jobid collect its retcode, remove it from the finished job list, and free it */
463  retcode = MIN(retcode, currjob->retcode);
464 
465  /* removing the finished job from finished jobs list */
466  if( currjob == _jobqueues->finishedjobs.firstjob )
467  _jobqueues->finishedjobs.firstjob = currjob->nextjob;
468  else
469  prevjob->nextjob = currjob->nextjob; /*lint !e613*/
470 
471  if( currjob == _jobqueues->finishedjobs.lastjob )
472  _jobqueues->finishedjobs.lastjob = prevjob;
473 
474  _jobqueues->finishedjobs.njobs--;
475 
476  /* update currjob and free finished job; prevjob stays the same */
477  nextjob = currjob->nextjob;
478  BMSfreeMemory(&currjob);
479  currjob = nextjob;
480  }
481  else
482  {
483  prevjob = currjob;
484  currjob = prevjob->nextjob;
485  }
486  }
487  while( prevjob != _jobqueues->finishedjobs.lastjob );
488  }
489  else
490  {
491  /* given jobid was not submitted */
492  printf("err1");
493  retcode = SCIP_ERROR;
494  }
495 
496  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
497 
498  return retcode;
499 }
500 
501 /** initializes tpi */
503  int nthreads,
504  int queuesize,
505  SCIP_Bool blockwhenfull
506  )
507 {
508  omp_set_num_threads(nthreads);
509  assert(_jobqueues == NULL);
510 
511  SCIP_CALL( createJobQueue(nthreads, queuesize, blockwhenfull) );
512 
513  return SCIP_OKAY;
514 }
515 
516 /** deinitializes tpi */
518  void
519  )
520 {
521  assert(_jobqueues != NULL);
522  assert(_jobqueues->finishedjobs.njobs == 0);
523  assert(_jobqueues->jobqueue.njobs == 0);
524  assert(_jobqueues->ncurrentjobs == 0);
525 
526  SCIP_CALL( freeJobQueue() );
527 
528  return SCIP_OKAY;
529 }
#define NULL
Definition: def.h:253
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:45
SCIP_JOB * firstjob
Definition: tpi_openmp.c:41
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_openmp.c:339
#define FALSE
Definition: def.h:73
void * args
Definition: tpi_openmp.c:34
#define TRUE
Definition: def.h:72
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:53
#define SCIP_UNUSED(x)
Definition: def.h:419
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:113
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_openmp.c:436
SCIP_JOB * lastjob
Definition: tpi_openmp.c:42
static SCIP_Bool isJobRunning(int jobid)
Definition: tpi_openmp.c:389
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:502
#define BMSfreeMemory(ptr)
Definition: memory.h:135
static void jobQueueProcessJob(void)
Definition: tpi_openmp.c:162
static SCIP_RETCODE createJobQueue(int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:64
static SCIP_RETCODE jobQueueAddJob(SCIP_JOB *newjob)
Definition: tpi_openmp.c:210
SCIP_LOCK lock
Definition: tpi_openmp.c:54
int SCIPtpiGetThreadNum()
Definition: tpi_openmp.c:332
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:261
SCIP_EXPORT SCIP_RETCODE SCIPtpiInitCondition(SCIP_LOCK *lock)
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:137
the type definitions for the SCIP parallel interface
SCIP_RETCODE retcode
Definition: tpi_openmp.c:35
SCIP_EXPORT void SCIPtpiDestroyCondition(SCIP_LOCK *lock)
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:32
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_openmp.c:517
SCIP_EXPORT SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK *lock)
SCIP_RETCODE SCIPtpiSumbitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_openmp.c:374
#define SCIP_CALL(x)
Definition: def.h:365
static SCIP_Bool isJobWaiting(int jobid)
Definition: tpi_openmp.c:408
int jobid
Definition: tpi_openmp.c:31
#define SCIP_Bool
Definition: def.h:70
#define MIN(x, y)
Definition: def.h:223
SCIP_CONDITION jobfinished
Definition: tpi_openmp.c:55
SCIP_EXPORT void SCIPtpiDestroyLock(SCIP_LOCK *lock)
SCIP_JOBQUEUE jobqueue
Definition: tpi_openmp.c:49
SCIP_EXPORT SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
int SCIPtpiGetNumThreads()
Definition: tpi_openmp.c:325
#define BMSallocMemory(ptr)
Definition: memory.h:109
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:33
SCIP_JOB ** currentjobs
Definition: tpi_openmp.c:50
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:275
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_openmp.c:287
static void executeJob(SCIP_JOB *job)
Definition: tpi_openmp.c:118
#define SCIP_CALL_ABORT(x)
Definition: def.h:344
int SCIPtpiGetNewJobID(void)
Definition: tpi_openmp.c:357
static SCIP_RETCODE freeJobQueue(void)
Definition: tpi_openmp.c:101
#define SCIP_ALLOC(x)
Definition: def.h:376
SCIP_EXPORT SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
SCIP_JOBQUEUE finishedjobs
Definition: tpi_openmp.c:53
memory allocation routines