Scippy

SCIP

Solving Constraint Integer Programs

tpi_openmp.c
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and library */
4 /* SCIP --- Solving Constraint Integer Programs */
5 /* */
6 /* Copyright (C) 2002-2021 Konrad-Zuse-Zentrum */
7 /* fuer Informationstechnik Berlin */
8 /* */
9 /* SCIP is distributed under the terms of the ZIB Academic License. */
10 /* */
11 /* You should have received a copy of the ZIB Academic License */
12 /* along with SCIP; see the file COPYING. If not visit scipopt.org. */
13 /* */
14 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
15 
16 /**@file tpi_openmp.c
17  * @ingroup TASKINTERFACE
18  * @brief the interface functions for openmp
19  * @author Stephen J. Maher
20  * @author Leona Gottwald
21  */
22 
23 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
24 
25 #include "tpi/tpi.h"
26 #include "blockmemshell/memory.h"
27 
28 /** A job added to the queue */
29 struct SCIP_Job
30 {
31  int jobid; /**< id to identify jobs from a common process */
32  struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
33  SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
34  void* args; /**< pointer to the function arguments */
35  SCIP_RETCODE retcode; /**< return code of the job */
36 };
37 
38 /** the thread pool job queue */
40 {
41  SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
42  SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
43  int njobs; /**< number of jobs in the queue */
44 };
46 
48 {
49  SCIP_JOBQUEUE jobqueue; /**< queue of unprocessed jobs */
50  SCIP_JOB** currentjobs; /**< array with slot for each thread to store the currently running job */
51  int ncurrentjobs; /**< number of currently running jobs */
52  int nthreads; /**< number of threads */
53  SCIP_JOBQUEUE finishedjobs; /**< jobqueue containing the finished jobs */
54  SCIP_LOCK lock; /**< lock to protect this stucture from concurrent access */
55  SCIP_CONDITION jobfinished; /**< condition to signal if a job was finished */
56 };
58 
59 static SCIP_JOBQUEUES* _jobqueues = NULL;
60 
61 
62 /** create job queue */
63 static
65  int nthreads, /**< the number of threads */
66  int qsize, /**< the queue size */
67  SCIP_Bool blockwhenfull /**< should the queue be blocked from new jobs when full */
68  )
69 {
70  int i;
71 
72  assert(nthreads >= 0);
73  assert(qsize >= 0);
74  SCIP_UNUSED( blockwhenfull );
75 
76  /* allocting memory for the job queue */
77  SCIP_ALLOC( BMSallocMemory(&_jobqueues) );
78  _jobqueues->jobqueue.firstjob = NULL;
79  _jobqueues->jobqueue.lastjob = NULL;
80  _jobqueues->jobqueue.njobs = 0;
81  _jobqueues->finishedjobs.firstjob = NULL;
82  _jobqueues->finishedjobs.lastjob = NULL;
83  _jobqueues->finishedjobs.njobs = 0;
84  _jobqueues->ncurrentjobs = 0;
85 
86  _jobqueues->nthreads = nthreads;
87  SCIP_ALLOC( BMSallocMemoryArray(&_jobqueues->currentjobs, nthreads) );
88 
89  for( i = 0; i < nthreads; ++i )
90  _jobqueues->currentjobs[i] = NULL;
91 
92  SCIP_CALL( SCIPtpiInitLock(&_jobqueues->lock) );
94 
95  return SCIP_OKAY;
96 }
97 
98 
99 /** free job queue */
100 static
102  void
103  )
104 {
105  assert(_jobqueues != NULL);
106 
107  SCIPtpiDestroyLock(&_jobqueues->lock);
108  SCIPtpiDestroyCondition(&_jobqueues->jobfinished);
109  BMSfreeMemoryArray(&_jobqueues->currentjobs);
110 
111  BMSfreeMemory(&_jobqueues);
112 
113  return SCIP_OKAY;
114 }
115 
116 
117 /** execute job */
118 static
120  SCIP_JOB* job /**< the job to be executed in parallel */
121  )
122 {
123  int threadnum;
124 
125  threadnum = SCIPtpiGetThreadNum();
126 
127  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
128  _jobqueues->currentjobs[threadnum] = job;
129  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
130 
131  job->retcode = (*(job->jobfunc))(job->args);
132 
133  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
134  _jobqueues->ncurrentjobs--;
135  _jobqueues->currentjobs[threadnum] = NULL;
136 
137  /* insert job into finished jobs */
138  if( _jobqueues->finishedjobs.njobs == 0 )
139  {
140  _jobqueues->finishedjobs.firstjob = job;
141  _jobqueues->finishedjobs.lastjob = job;
142  }
143  else
144  {
145  _jobqueues->finishedjobs.lastjob->nextjob = job;
146  _jobqueues->finishedjobs.lastjob = job;
147  }
148 
149  ++_jobqueues->finishedjobs.njobs;
150 
152 
153  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
154 }
155 
156 
157 /** process jobs from job queue
158  *
159  * The job will only be added when the number of active jobs is equal to the number of threads.
160  * As such, there will always be number of threads + 1 tasks available for the scheduler to run.
161  */
162 static
164  void
165  )
166 {
167  SCIP_JOB* job;
168 
169  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
170 
171  while( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
172  {
173  SCIP_CALL_ABORT( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
174  }
175 
176  if( _jobqueues->jobqueue.njobs == 1 )
177  {
178  job = _jobqueues->jobqueue.firstjob;
179  _jobqueues->jobqueue.firstjob = NULL;
180  _jobqueues->jobqueue.lastjob = NULL;
181  --(_jobqueues->jobqueue.njobs);
182  }
183  else if( _jobqueues->jobqueue.njobs > 1 )
184  {
185  job = _jobqueues->jobqueue.firstjob;
186  _jobqueues->jobqueue.firstjob = job->nextjob;
187  --_jobqueues->jobqueue.njobs;
188  }
189  else
190  {
191  job = NULL;
192  }
193 
194  ++(_jobqueues->ncurrentjobs);
195  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
196 
197  if( job )
198  {
199  executeJob(job);
200  }
201 }
202 
203 
204 /** adding a job to the job queue
205  *
206  * This gives some more flexibility in the handling of new jobs.
207  * IMPORTANT: This function MUST be called from within a mutex.
208  */
209 static
211  SCIP_JOB* newjob
212  )
213 {
214  /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
215  /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
216  assert(newjob != NULL);
217 
218  newjob->nextjob = NULL;
219 
220  /* This function queries the current job list. This could change by other threads writing to the list. So a lock is
221  * required to ensure that the current joblist remains static. */
222  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
223 
224  /* checking the status of the job queue */
225  if( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
226  {
227  if( _jobqueues->jobqueue.njobs == 0 )
228  {
229  _jobqueues->jobqueue.firstjob = newjob;
230  _jobqueues->jobqueue.lastjob = newjob;
231  }
232  else /* it is assumed that the jobqueue is not full */
233  {
234  _jobqueues->jobqueue.lastjob->nextjob = newjob;
235  _jobqueues->jobqueue.lastjob = newjob;
236  }
237 
238  _jobqueues->jobqueue.njobs++;
239 
240  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
241 
242  #pragma omp task
244  }
245  else
246  {
247  assert(_jobqueues->ncurrentjobs < SCIPtpiGetNumThreads());
248 
249  _jobqueues->ncurrentjobs++;
250 
251  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
252  /* running the new job */
253  #pragma omp task firstprivate(newjob)
254  executeJob(newjob);
255  }
256 
257  return SCIP_OKAY;
258 }
259 
260 
261 /** signal a condition */
263  SCIP_CONDITION* condition /**< condition to signal */
264  )
265 {
266  assert( condition != NULL );
267 
268  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
269 
270  if( condition->_waitnum > condition->_signals )
271  ++condition->_signals;
272 
273  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
274 
275  return SCIP_OKAY;
276 }
277 
278 
279 /** broadcase a condition */
281  SCIP_CONDITION* condition /**< broadcast a condition */
282  )
283 {
284  assert( condition != NULL );
285 
286  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
287  condition->_signals = condition->_waitnum;
288  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
289 
290  return SCIP_OKAY;
291 }
292 
293 
294 /** wait for a condition */
296  SCIP_CONDITION* condition, /**< condition to wait for */
297  SCIP_LOCK* lock /**< corresponding lock */
298  )
299 {
300  int waitnum;
301 
302  SCIP_CALL( SCIPtpiReleaseLock(lock) );
303 
304  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
305  waitnum = ++condition->_waitnum;
306 
307  ++condition->_waiters;
308 
309  do
310  {
311  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
312  #pragma omp taskyield
313  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
314  }
315  while( condition->_signals < waitnum );
316 
317  --condition->_waiters;
318 
319  if( condition->_waiters == 0 )
320  {
321  condition->_signals = 0;
322  condition->_waitnum = 0;
323  }
324 
325  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
326 
327  SCIP_CALL( SCIPtpiAcquireLock(lock) );
328 
329  return SCIP_OKAY;
330 }
331 
332 /** returns the number of threads */
334  )
335 {
336  return omp_get_num_threads();
337 }
338 
339 /** returns the thread number */
341  )
342 {
343  return omp_get_thread_num();
344 }
345 
346 /** creates a job for parallel processing */
348  SCIP_JOB** job, /**< pointer to the job that will be created */
349  int jobid, /**< the id for the current job */
350  SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
351  void* jobarg /**< the job's argument */
352  )
353 {
354  SCIP_ALLOC( BMSallocMemory(job) );
355 
356  (*job)->jobid = jobid;
357  (*job)->jobfunc = jobfunc;
358  (*job)->args = jobarg;
359  (*job)->nextjob = NULL;
360 
361  return SCIP_OKAY;
362 }
363 
364 /** get a new job id for the new set of submitted jobs */
366  void
367  )
368 {
369  static int currentjobid = 0;
370  int jobid;
371 
372  #pragma omp atomic capture
373  jobid = ++currentjobid;
374 
375  return jobid;
376 }
377 
378 /** submit a job for parallel processing; the return value is a globally defined status */
380  SCIP_JOB* job, /**< pointer to the job to be submitted */
381  SCIP_SUBMITSTATUS* status /**< pointer to store the submit status */
382  )
383 {
384  assert(_jobqueues != NULL);
385 
386  *status = SCIP_SUBMIT_SUCCESS;
387  SCIP_CALL( jobQueueAddJob(job) );
388 
389  return SCIP_OKAY;
390 }
391 
392 
393 /** check whether a job is running */
394 static
396  int jobid /**< job id to check */
397  )
398 {
399  int i;
400 
401  if( _jobqueues->ncurrentjobs > 0 )
402  {
403  for( i = 0; i < _jobqueues->nthreads; ++i )
404  {
405  if( _jobqueues->currentjobs[i] != NULL && _jobqueues->currentjobs[i]->jobid == jobid )
406  return TRUE;
407  }
408  }
409 
410  return FALSE;
411 }
412 
413 
414 /** check whether a job is waiting */
415 static
417  int jobid /**< job id to check */
418  )
419 {
420  if( _jobqueues->jobqueue.njobs > 0 )
421  {
422  SCIP_JOB* currjob;
423  currjob = _jobqueues->jobqueue.firstjob;
424 
425  do
426  {
427  if( currjob->jobid == jobid )
428  return TRUE;
429 
430  if( currjob == _jobqueues->jobqueue.lastjob )
431  break;
432 
433  currjob = currjob->nextjob;
434  }
435  while( TRUE ); /*lint !e506*/
436  }
437 
438  return FALSE;
439 }
440 
441 
442 /** blocks until all jobs of the given jobid have finished
443  * and then returns the smallest SCIP_RETCODE of all the jobs */
445  int jobid /**< the jobid of the jobs to wait for */
446  )
447 {
449 
450  retcode = SCIP_OKAY;
451  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
452 
453  while( isJobRunning(jobid) || isJobWaiting(jobid) )
454  {
455  SCIP_CALL( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
456  }
457 
458  if( _jobqueues->finishedjobs.njobs > 0 )
459  {
460  SCIP_JOB* currjob = _jobqueues->finishedjobs.firstjob;
461  SCIP_JOB* prevjob = NULL;
462 
463  /* finding the location of the processed job in the currentjobs queue */
464  do
465  {
466  if( currjob->jobid == jobid )
467  {
468  SCIP_JOB* nextjob;
469 
470  /* if the job has the right jobid collect its retcode, remove it from the finished job list, and free it */
471  retcode = MIN(retcode, currjob->retcode);
472 
473  /* removing the finished job from finished jobs list */
474  if( currjob == _jobqueues->finishedjobs.firstjob )
475  _jobqueues->finishedjobs.firstjob = currjob->nextjob;
476  else
477  {
478  if( prevjob != NULL )
479  prevjob->nextjob = currjob->nextjob; /*lint !e613*/
480  }
481 
482  if( currjob == _jobqueues->finishedjobs.lastjob )
483  _jobqueues->finishedjobs.lastjob = prevjob;
484 
485  _jobqueues->finishedjobs.njobs--;
486 
487  /* update currjob and free finished job; prevjob stays the same */
488  nextjob = currjob->nextjob;
489  BMSfreeMemory(&currjob);
490  currjob = nextjob;
491  }
492  else
493  {
494  prevjob = currjob;
495  currjob = prevjob->nextjob;
496  }
497  }
498  while( prevjob != _jobqueues->finishedjobs.lastjob );
499  }
500  else
501  {
502  /* given jobid was not submitted */
503  printf("err1");
504  retcode = SCIP_ERROR;
505  }
506 
507  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
508 
509  return retcode;
510 }
511 
512 /** initializes tpi */
514  int nthreads, /**< the number of threads to be used */
515  int queuesize, /**< the size of the queue */
516  SCIP_Bool blockwhenfull /**< should the queue block when full */
517  )
518 {
519  omp_set_num_threads(nthreads);
520  assert(_jobqueues == NULL);
521 
522  SCIP_CALL( createJobQueue(nthreads, queuesize, blockwhenfull) );
523 
524  return SCIP_OKAY;
525 }
526 
527 /** deinitializes tpi */
529  void
530  )
531 {
532  assert(_jobqueues != NULL);
533  assert(_jobqueues->finishedjobs.njobs == 0);
534  assert(_jobqueues->jobqueue.njobs == 0);
535  assert(_jobqueues->ncurrentjobs == 0);
536 
537  SCIP_CALL( freeJobQueue() );
538 
539  return SCIP_OKAY;
540 }
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:45
SCIP_JOB * firstjob
Definition: tpi_openmp.c:41
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_openmp.c:347
#define FALSE
Definition: def.h:73
void * args
Definition: tpi_openmp.c:34
#define TRUE
Definition: def.h:72
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:54
#define SCIP_UNUSED(x)
Definition: def.h:424
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:115
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_openmp.c:444
SCIP_JOB * lastjob
Definition: tpi_openmp.c:42
static SCIP_Bool isJobRunning(int jobid)
Definition: tpi_openmp.c:395
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:513
#define BMSfreeMemory(ptr)
Definition: memory.h:137
static void jobQueueProcessJob(void)
Definition: tpi_openmp.c:163
static SCIP_RETCODE createJobQueue(int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:64
static SCIP_RETCODE jobQueueAddJob(SCIP_JOB *newjob)
Definition: tpi_openmp.c:210
SCIP_LOCK lock
Definition: tpi_openmp.c:54
int SCIPtpiGetThreadNum()
Definition: tpi_openmp.c:340
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:262
SCIP_EXPORT SCIP_RETCODE SCIPtpiInitCondition(SCIP_LOCK *lock)
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:139
the type definitions for the SCIP parallel interface
SCIP_RETCODE retcode
Definition: tpi_openmp.c:35
SCIP_EXPORT void SCIPtpiDestroyCondition(SCIP_LOCK *lock)
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:32
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_openmp.c:528
SCIP_EXPORT SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK *lock)
#define NULL
Definition: lpi_spx1.cpp:155
SCIP_RETCODE SCIPtpiSumbitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_openmp.c:379
#define SCIP_CALL(x)
Definition: def.h:370
static SCIP_Bool isJobWaiting(int jobid)
Definition: tpi_openmp.c:416
int jobid
Definition: tpi_openmp.c:31
#define SCIP_Bool
Definition: def.h:70
SCIP_CONDITION jobfinished
Definition: tpi_openmp.c:55
SCIP_EXPORT void SCIPtpiDestroyLock(SCIP_LOCK *lock)
SCIP_JOBQUEUE jobqueue
Definition: tpi_openmp.c:49
SCIP_EXPORT SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
int SCIPtpiGetNumThreads()
Definition: tpi_openmp.c:333
#define BMSallocMemory(ptr)
Definition: memory.h:111
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:33
SCIP_JOB ** currentjobs
Definition: tpi_openmp.c:50
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:280
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_openmp.c:295
static void executeJob(SCIP_JOB *job)
Definition: tpi_openmp.c:119
#define SCIP_CALL_ABORT(x)
Definition: def.h:349
int SCIPtpiGetNewJobID(void)
Definition: tpi_openmp.c:365
static SCIP_RETCODE freeJobQueue(void)
Definition: tpi_openmp.c:101
#define SCIP_ALLOC(x)
Definition: def.h:381
SCIP_EXPORT SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
SCIP_JOBQUEUE finishedjobs
Definition: tpi_openmp.c:53
memory allocation routines