Scippy

SCIP

Solving Constraint Integer Programs

tpi_openmp.c
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and library */
4 /* SCIP --- Solving Constraint Integer Programs */
5 /* */
6 /* Copyright (c) 2002-2023 Zuse Institute Berlin (ZIB) */
7 /* */
8 /* Licensed under the Apache License, Version 2.0 (the "License"); */
9 /* you may not use this file except in compliance with the License. */
10 /* You may obtain a copy of the License at */
11 /* */
12 /* http://www.apache.org/licenses/LICENSE-2.0 */
13 /* */
14 /* Unless required by applicable law or agreed to in writing, software */
15 /* distributed under the License is distributed on an "AS IS" BASIS, */
16 /* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
17 /* See the License for the specific language governing permissions and */
18 /* limitations under the License. */
19 /* */
20 /* You should have received a copy of the Apache-2.0 license */
21 /* along with SCIP; see the file LICENSE. If not visit scipopt.org. */
22 /* */
23 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
24 
25 /**@file tpi_openmp.c
26  * @ingroup TASKINTERFACE
27  * @brief the interface functions for openmp
28  * @author Stephen J. Maher
29  * @author Leona Gottwald
30  */
31 
32 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
33 
34 #include "tpi/tpi.h"
35 #include "blockmemshell/memory.h"
36 
37 /** A job added to the queue */
38 struct SCIP_Job
39 {
40  int jobid; /**< id to identify jobs from a common process */
41  struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
42  SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
43  void* args; /**< pointer to the function arguments */
44  SCIP_RETCODE retcode; /**< return code of the job */
45 };
46 
47 /** the thread pool job queue */
49 {
50  SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
51  SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
52  int njobs; /**< number of jobs in the queue */
53 };
55 
57 {
58  SCIP_JOBQUEUE jobqueue; /**< queue of unprocessed jobs */
59  SCIP_JOB** currentjobs; /**< array with slot for each thread to store the currently running job */
60  int ncurrentjobs; /**< number of currently running jobs */
61  int nthreads; /**< number of threads */
62  SCIP_JOBQUEUE finishedjobs; /**< jobqueue containing the finished jobs */
63  SCIP_LOCK lock; /**< lock to protect this stucture from concurrent access */
64  SCIP_CONDITION jobfinished; /**< condition to signal if a job was finished */
65 };
67 
68 static SCIP_JOBQUEUES* _jobqueues = NULL;
69 
70 
71 /** create job queue */
72 static
74  int nthreads, /**< the number of threads */
75  int qsize, /**< the queue size */
76  SCIP_Bool blockwhenfull /**< should the queue be blocked from new jobs when full */
77  )
78 {
79  int i;
80 
81  assert(nthreads >= 0);
82  assert(qsize >= 0);
83  SCIP_UNUSED( blockwhenfull );
84 
85  /* allocting memory for the job queue */
86  SCIP_ALLOC( BMSallocMemory(&_jobqueues) );
87  _jobqueues->jobqueue.firstjob = NULL;
88  _jobqueues->jobqueue.lastjob = NULL;
89  _jobqueues->jobqueue.njobs = 0;
90  _jobqueues->finishedjobs.firstjob = NULL;
91  _jobqueues->finishedjobs.lastjob = NULL;
92  _jobqueues->finishedjobs.njobs = 0;
93  _jobqueues->ncurrentjobs = 0;
94 
95  _jobqueues->nthreads = nthreads;
96  SCIP_ALLOC( BMSallocMemoryArray(&_jobqueues->currentjobs, nthreads) );
97 
98  for( i = 0; i < nthreads; ++i )
99  _jobqueues->currentjobs[i] = NULL;
100 
101  SCIP_CALL( SCIPtpiInitLock(&_jobqueues->lock) );
102  SCIP_CALL( SCIPtpiInitCondition(&_jobqueues->jobfinished) );
103 
104  return SCIP_OKAY;
105 }
106 
107 
108 /** free job queue */
109 static
111  void
112  )
113 {
114  assert(_jobqueues != NULL);
115 
116  SCIPtpiDestroyLock(&_jobqueues->lock);
117  SCIPtpiDestroyCondition(&_jobqueues->jobfinished);
118  BMSfreeMemoryArray(&_jobqueues->currentjobs);
119 
120  BMSfreeMemory(&_jobqueues);
121 
122  return SCIP_OKAY;
123 }
124 
125 
126 /** execute job */
127 static
129  SCIP_JOB* job /**< the job to be executed in parallel */
130  )
131 {
132  int threadnum;
133 
134  threadnum = SCIPtpiGetThreadNum();
135 
136  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
137  _jobqueues->currentjobs[threadnum] = job;
138  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
139 
140  job->retcode = (*(job->jobfunc))(job->args);
141 
142  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
143  _jobqueues->ncurrentjobs--;
144  _jobqueues->currentjobs[threadnum] = NULL;
145 
146  /* insert job into finished jobs */
147  if( _jobqueues->finishedjobs.njobs == 0 )
148  {
149  _jobqueues->finishedjobs.firstjob = job;
150  _jobqueues->finishedjobs.lastjob = job;
151  }
152  else
153  {
154  _jobqueues->finishedjobs.lastjob->nextjob = job;
155  _jobqueues->finishedjobs.lastjob = job;
156  }
157 
158  ++_jobqueues->finishedjobs.njobs;
159 
161 
162  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
163 }
164 
165 
166 /** process jobs from job queue
167  *
168  * The job will only be added when the number of active jobs is equal to the number of threads.
169  * As such, there will always be number of threads + 1 tasks available for the scheduler to run.
170  */
171 static
173  void
174  )
175 {
176  SCIP_JOB* job;
177 
178  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
179 
180  while( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
181  {
182  SCIP_CALL_ABORT( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
183  }
184 
185  if( _jobqueues->jobqueue.njobs == 1 )
186  {
187  job = _jobqueues->jobqueue.firstjob;
188  _jobqueues->jobqueue.firstjob = NULL;
189  _jobqueues->jobqueue.lastjob = NULL;
190  --(_jobqueues->jobqueue.njobs);
191  }
192  else if( _jobqueues->jobqueue.njobs > 1 )
193  {
194  job = _jobqueues->jobqueue.firstjob;
195  _jobqueues->jobqueue.firstjob = job->nextjob;
196  --_jobqueues->jobqueue.njobs;
197  }
198  else
199  {
200  job = NULL;
201  }
202 
203  ++(_jobqueues->ncurrentjobs);
204  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
205 
206  if( job )
207  {
208  executeJob(job);
209  }
210 }
211 
212 
213 /** adding a job to the job queue
214  *
215  * This gives some more flexibility in the handling of new jobs.
216  * IMPORTANT: This function MUST be called from within a mutex.
217  */
218 static
220  SCIP_JOB* newjob
221  )
222 {
223  /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
224  /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
225  assert(newjob != NULL);
226 
227  newjob->nextjob = NULL;
228 
229  /* This function queries the current job list. This could change by other threads writing to the list. So a lock is
230  * required to ensure that the current joblist remains static. */
231  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
232 
233  /* checking the status of the job queue */
234  if( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
235  {
236  if( _jobqueues->jobqueue.njobs == 0 )
237  {
238  _jobqueues->jobqueue.firstjob = newjob;
239  _jobqueues->jobqueue.lastjob = newjob;
240  }
241  else /* it is assumed that the jobqueue is not full */
242  {
243  _jobqueues->jobqueue.lastjob->nextjob = newjob;
244  _jobqueues->jobqueue.lastjob = newjob;
245  }
246 
247  _jobqueues->jobqueue.njobs++;
248 
249  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
250 
251  #pragma omp task
253  }
254  else
255  {
256  assert(_jobqueues->ncurrentjobs < SCIPtpiGetNumThreads());
257 
258  _jobqueues->ncurrentjobs++;
259 
260  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
261  /* running the new job */
262  #pragma omp task firstprivate(newjob)
263  executeJob(newjob);
264  }
265 
266  return SCIP_OKAY;
267 }
268 
269 
270 /** signal a condition */
272  SCIP_CONDITION* condition /**< condition to signal */
273  )
274 {
275  assert( condition != NULL );
276 
277  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
278 
279  if( condition->_waitnum > condition->_signals )
280  ++condition->_signals;
281 
282  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
283 
284  return SCIP_OKAY;
285 }
286 
287 
288 /** broadcase a condition */
290  SCIP_CONDITION* condition /**< broadcast a condition */
291  )
292 {
293  assert( condition != NULL );
294 
295  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
296  condition->_signals = condition->_waitnum;
297  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
298 
299  return SCIP_OKAY;
300 }
301 
302 
303 /** wait for a condition */
305  SCIP_CONDITION* condition, /**< condition to wait for */
306  SCIP_LOCK* lock /**< corresponding lock */
307  )
308 {
309  int waitnum;
310 
311  SCIP_CALL( SCIPtpiReleaseLock(lock) );
312 
313  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
314  waitnum = ++condition->_waitnum;
315 
316  ++condition->_waiters;
317 
318  do
319  {
320  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
321  #pragma omp taskyield
322  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
323  }
324  while( condition->_signals < waitnum );
325 
326  --condition->_waiters;
327 
328  if( condition->_waiters == 0 )
329  {
330  condition->_signals = 0;
331  condition->_waitnum = 0;
332  }
333 
334  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
335 
336  SCIP_CALL( SCIPtpiAcquireLock(lock) );
337 
338  return SCIP_OKAY;
339 }
340 
341 /** returns the number of threads */
343  )
344 {
345  return omp_get_num_threads();
346 }
347 
348 /** returns the thread number */
350  )
351 {
352  return omp_get_thread_num();
353 }
354 
355 /** creates a job for parallel processing */
357  SCIP_JOB** job, /**< pointer to the job that will be created */
358  int jobid, /**< the id for the current job */
359  SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
360  void* jobarg /**< the job's argument */
361  )
362 {
363  SCIP_ALLOC( BMSallocMemory(job) );
364 
365  (*job)->jobid = jobid;
366  (*job)->jobfunc = jobfunc;
367  (*job)->args = jobarg;
368  (*job)->nextjob = NULL;
369 
370  return SCIP_OKAY;
371 }
372 
373 /** get a new job id for the new set of submitted jobs */
375  void
376  )
377 {
378  static int currentjobid = 0;
379  int jobid;
380 
381  #pragma omp atomic capture
382  jobid = ++currentjobid;
383 
384  return jobid;
385 }
386 
387 /** submit a job for parallel processing; the return value is a globally defined status */
389  SCIP_JOB* job, /**< pointer to the job to be submitted */
390  SCIP_SUBMITSTATUS* status /**< pointer to store the submit status */
391  )
392 {
393  assert(_jobqueues != NULL);
394 
395  *status = SCIP_SUBMIT_SUCCESS;
396  SCIP_CALL( jobQueueAddJob(job) );
397 
398  return SCIP_OKAY;
399 }
400 
401 
402 /** check whether a job is running */
403 static
405  int jobid /**< job id to check */
406  )
407 {
408  int i;
409 
410  if( _jobqueues->ncurrentjobs > 0 )
411  {
412  for( i = 0; i < _jobqueues->nthreads; ++i )
413  {
414  if( _jobqueues->currentjobs[i] != NULL && _jobqueues->currentjobs[i]->jobid == jobid )
415  return TRUE;
416  }
417  }
418 
419  return FALSE;
420 }
421 
422 
423 /** check whether a job is waiting */
424 static
426  int jobid /**< job id to check */
427  )
428 {
429  if( _jobqueues->jobqueue.njobs > 0 )
430  {
431  SCIP_JOB* currjob;
432  currjob = _jobqueues->jobqueue.firstjob;
433 
434  do
435  {
436  if( currjob->jobid == jobid )
437  return TRUE;
438 
439  if( currjob == _jobqueues->jobqueue.lastjob )
440  break;
441 
442  currjob = currjob->nextjob;
443  }
444  while( TRUE ); /*lint !e506*/
445  }
446 
447  return FALSE;
448 }
449 
450 
451 /** blocks until all jobs of the given jobid have finished
452  * and then returns the smallest SCIP_RETCODE of all the jobs */
454  int jobid /**< the jobid of the jobs to wait for */
455  )
456 {
458 
459  retcode = SCIP_OKAY;
460  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
461 
462  while( isJobRunning(jobid) || isJobWaiting(jobid) )
463  {
464  SCIP_CALL( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
465  }
466 
467  if( _jobqueues->finishedjobs.njobs > 0 )
468  {
469  SCIP_JOB* currjob = _jobqueues->finishedjobs.firstjob;
470  SCIP_JOB* prevjob = NULL;
471 
472  /* finding the location of the processed job in the currentjobs queue */
473  do
474  {
475  if( currjob->jobid == jobid )
476  {
477  SCIP_JOB* nextjob;
478 
479  /* if the job has the right jobid collect its retcode, remove it from the finished job list, and free it */
480  retcode = MIN(retcode, currjob->retcode);
481 
482  /* removing the finished job from finished jobs list */
483  if( currjob == _jobqueues->finishedjobs.firstjob )
484  _jobqueues->finishedjobs.firstjob = currjob->nextjob;
485  else
486  {
487  if( prevjob != NULL )
488  prevjob->nextjob = currjob->nextjob; /*lint !e613*/
489  }
490 
491  if( currjob == _jobqueues->finishedjobs.lastjob )
492  _jobqueues->finishedjobs.lastjob = prevjob;
493 
494  _jobqueues->finishedjobs.njobs--;
495 
496  /* update currjob and free finished job; prevjob stays the same */
497  nextjob = currjob->nextjob;
498  BMSfreeMemory(&currjob);
499  currjob = nextjob;
500  }
501  else
502  {
503  prevjob = currjob;
504  currjob = prevjob->nextjob;
505  }
506  }
507  while( prevjob != _jobqueues->finishedjobs.lastjob );
508  }
509  else
510  {
511  /* given jobid was not submitted */
512  printf("err1");
513  retcode = SCIP_ERROR;
514  }
515 
516  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
517 
518  return retcode;
519 }
520 
521 /** initializes tpi */
523  int nthreads, /**< the number of threads to be used */
524  int queuesize, /**< the size of the queue */
525  SCIP_Bool blockwhenfull /**< should the queue block when full */
526  )
527 {
528  omp_set_num_threads(nthreads);
529  assert(_jobqueues == NULL);
530 
531  SCIP_CALL( createJobQueue(nthreads, queuesize, blockwhenfull) );
532 
533  return SCIP_OKAY;
534 }
535 
536 /** deinitializes tpi */
538  void
539  )
540 {
541  assert(_jobqueues != NULL);
542  assert(_jobqueues->finishedjobs.njobs == 0);
543  assert(_jobqueues->jobqueue.njobs == 0);
544  assert(_jobqueues->ncurrentjobs == 0);
545 
546  SCIP_CALL( freeJobQueue() );
547 
548  return SCIP_OKAY;
549 }
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:54
SCIP_JOB * firstjob
Definition: tpi_openmp.c:50
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_openmp.c:356
#define FALSE
Definition: def.h:96
void * args
Definition: tpi_openmp.c:43
#define TRUE
Definition: def.h:95
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:63
SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
#define SCIP_UNUSED(x)
Definition: def.h:448
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:125
SCIP_RETCODE SCIPtpiInitCondition(SCIP_LOCK *lock)
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_openmp.c:453
SCIP_JOB * lastjob
Definition: tpi_openmp.c:51
static SCIP_Bool isJobRunning(int jobid)
Definition: tpi_openmp.c:404
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:522
#define BMSfreeMemory(ptr)
Definition: memory.h:147
static void jobQueueProcessJob(void)
Definition: tpi_openmp.c:172
static SCIP_RETCODE createJobQueue(int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:73
static SCIP_RETCODE jobQueueAddJob(SCIP_JOB *newjob)
Definition: tpi_openmp.c:219
SCIP_LOCK lock
Definition: tpi_openmp.c:63
int SCIPtpiGetThreadNum()
Definition: tpi_openmp.c:349
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:271
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:149
SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
the type definitions for the SCIP parallel interface
SCIP_RETCODE retcode
Definition: tpi_openmp.c:44
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:41
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_openmp.c:537
#define NULL
Definition: lpi_spx1.cpp:164
SCIP_RETCODE SCIPtpiSumbitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_openmp.c:388
#define SCIP_CALL(x)
Definition: def.h:394
static SCIP_Bool isJobWaiting(int jobid)
Definition: tpi_openmp.c:425
int jobid
Definition: tpi_openmp.c:40
#define SCIP_Bool
Definition: def.h:93
SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK *lock)
SCIP_CONDITION jobfinished
Definition: tpi_openmp.c:64
SCIP_JOBQUEUE jobqueue
Definition: tpi_openmp.c:58
void SCIPtpiDestroyLock(SCIP_LOCK *lock)
void SCIPtpiDestroyCondition(SCIP_LOCK *lock)
int SCIPtpiGetNumThreads()
Definition: tpi_openmp.c:342
#define BMSallocMemory(ptr)
Definition: memory.h:120
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:42
SCIP_JOB ** currentjobs
Definition: tpi_openmp.c:59
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:289
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_openmp.c:304
static void executeJob(SCIP_JOB *job)
Definition: tpi_openmp.c:128
#define SCIP_CALL_ABORT(x)
Definition: def.h:373
int SCIPtpiGetNewJobID(void)
Definition: tpi_openmp.c:374
static SCIP_RETCODE freeJobQueue(void)
Definition: tpi_openmp.c:110
#define SCIP_ALLOC(x)
Definition: def.h:405
SCIP_JOBQUEUE finishedjobs
Definition: tpi_openmp.c:62
memory allocation routines