Scippy

SCIP

Solving Constraint Integer Programs

tpi_openmp.c
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and library */
4 /* SCIP --- Solving Constraint Integer Programs */
5 /* */
6 /* Copyright (C) 2002-2017 Konrad-Zuse-Zentrum */
7 /* fuer Informationstechnik Berlin */
8 /* */
9 /* SCIP is distributed under the terms of the ZIB Academic License. */
10 /* */
11 /* You should have received a copy of the ZIB Academic License */
12 /* along with SCIP; see the file COPYING. If not email to scip@zib.de. */
13 /* */
14 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
15 
16 /**@file tpi_openmp.c
17  * @ingroup TASKINTERFACE
18  * @brief the interface functions for openmp
19  * @author Stephen J. Maher
20  * @author Robert Lion Gottwald
21  */
22 
23 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
24 
25 #include "tpi/tpi.h"
26 #include "blockmemshell/memory.h"
27 
28 /** A job added to the queue */
29 struct SCIP_Job
30 {
31  int jobid; /**< id to identify jobs from a common process */
32  struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
33  SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
34  void* args; /**< pointer to the function arguements */
35  SCIP_RETCODE retcode; /**< return code of the job */
36 };
37 
38 /** the thread pool job queue */
40 {
41  SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
42  SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
43  int njobs; /**< number of jobs in the queue */
44 };
46 
48 {
49  SCIP_JOBQUEUE jobqueue; /**< queue of unprocessed jobs */
50  SCIP_JOB** currentjobs; /**< array with slot for each thread to store the currently running job */
51  int ncurrentjobs; /**< number of currently running jobs */
52  int nthreads; /**< number of threads */
53  SCIP_JOBQUEUE finishedjobs; /**< jobqueue containing the finished jobs */
54  SCIP_LOCK lock; /**< lock to protect this stucture from concurrent access */
55  SCIP_CONDITION jobfinished; /**< condition to signal if a job was finished */
56 };
58 
59 static SCIP_JOBQUEUES* _jobqueues = NULL;
60 
61 
62 
63 static
65  int nthreads, /**< the number of threads */
66  int qsize, /**< the queue size */
67  SCIP_Bool blockwhenfull /**< should the queue be blocked from new jobs when full */
68  )
69 {
70  int i;
71 
72  assert(nthreads >= 0);
73  assert(qsize >= 0);
74  SCIP_UNUSED( blockwhenfull );
75 
76  /* allocting memory for the job queue */
77  SCIP_ALLOC( BMSallocMemory(&_jobqueues) );
78  _jobqueues->jobqueue.firstjob = NULL;
79  _jobqueues->jobqueue.lastjob = NULL;
80  _jobqueues->jobqueue.njobs = 0;
81  _jobqueues->finishedjobs.firstjob = NULL;
82  _jobqueues->finishedjobs.lastjob = NULL;
83  _jobqueues->finishedjobs.njobs = 0;
84  _jobqueues->ncurrentjobs = 0;
85 
86  _jobqueues->nthreads = nthreads;
87  SCIP_ALLOC( BMSallocMemoryArray(&_jobqueues->currentjobs, nthreads) );
88 
89  for( i = 0; i < nthreads; ++i )
90  _jobqueues->currentjobs[i] = NULL;
91 
92  SCIP_CALL( SCIPtpiInitLock(&_jobqueues->lock) );
94 
95  return SCIP_OKAY;
96 }
97 
98 
99 
100 static
102  void
103  )
104 {
105  assert(_jobqueues != NULL);
106 
107  SCIPtpiDestroyLock(&_jobqueues->lock);
108  SCIPtpiDestroyCondition(&_jobqueues->jobfinished);
109  BMSfreeMemoryArray(&_jobqueues->currentjobs);
110 
111  BMSfreeMemory(&_jobqueues);
112 
113  return SCIP_OKAY;
114 }
115 
116 
117 static
119  SCIP_JOB* job /**< the job to be executed in parallel */
120  )
121 {
122  int threadnum;
123 
124  threadnum = SCIPtpiGetThreadNum();
125 
126  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
127  _jobqueues->currentjobs[threadnum] = job;
128  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
129 
130  job->retcode = (*(job->jobfunc))(job->args);
131 
132  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
133  _jobqueues->ncurrentjobs--;
134  _jobqueues->currentjobs[threadnum] = NULL;
135 
136  /* insert job into finished jobs */
137  if( _jobqueues->finishedjobs.njobs == 0 )
138  {
139  _jobqueues->finishedjobs.firstjob = job;
140  _jobqueues->finishedjobs.lastjob = job;
141  }
142  else
143  {
144  _jobqueues->finishedjobs.lastjob->nextjob = job;
145  _jobqueues->finishedjobs.lastjob = job;
146  }
147 
148  ++_jobqueues->finishedjobs.njobs;
149 
151 
152  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
153 
154 }
155 
156 
157 
158 
159 /** this is a job that will be executed on to process the job queue */
160 /* the job will only be added when the number of active jobs is equal to the number of threads.
161  * As such, there will always be number of threads + 1 tasks available for the scheduler to run. */
162 static
164  void
165  )
166 {
167  SCIP_JOB* job;
168  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_jobqueues->lock) );
169 
170  while( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
171  {
172  SCIP_CALL_ABORT( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
173  }
174 
175  if( _jobqueues->jobqueue.njobs == 1 )
176  {
177  job = _jobqueues->jobqueue.firstjob;
178  _jobqueues->jobqueue.firstjob = NULL;
179  _jobqueues->jobqueue.lastjob = NULL;
180  --_jobqueues->jobqueue.njobs;
181  }
182  else if( _jobqueues->jobqueue.njobs > 1 )
183  {
184  job = _jobqueues->jobqueue.firstjob;
185  _jobqueues->jobqueue.firstjob = job->nextjob;
186  --_jobqueues->jobqueue.njobs;
187  }
188  else
189  {
190  job = NULL;
191  }
192 
193  ++_jobqueues->ncurrentjobs;
194  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
195 
196  if( job )
197  {
198  executeJob(job);
199  }
200 }
201 
202 
203 
204 
205 /** adding a job to the job queue.
206  * This gives some more flexibility in the handling of new jobs.
207  * IMPORTANT: This function MUST be called from within a mutex. */
208 static
210  SCIP_JOB* newjob
211  )
212 {
213  /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
214  /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
215  assert(newjob != NULL);
216 
217  newjob->nextjob = NULL;
218 
219  /* this function queries the current job list. This could change by other threads writing to the list. So a lock is
220  * required to ensure that the current joblist remains static. */
221  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
222 
223  /* checking the status of the job queue */
224  if( _jobqueues->ncurrentjobs == SCIPtpiGetNumThreads() )
225  {
226  if( _jobqueues->jobqueue.njobs == 0 )
227  {
228  _jobqueues->jobqueue.firstjob = newjob;
229  _jobqueues->jobqueue.lastjob = newjob;
230  }
231  else /* it is assumed that the jobqueue is not full */
232  {
233  _jobqueues->jobqueue.lastjob->nextjob = newjob;
234  _jobqueues->jobqueue.lastjob = newjob;
235  }
236 
237  _jobqueues->jobqueue.njobs++;
238 
239  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
240 
241  #pragma omp task
243  }
244  else
245  {
246  assert(_jobqueues->ncurrentjobs < SCIPtpiGetNumThreads());
247 
248  _jobqueues->ncurrentjobs++;
249 
250  SCIP_CALL( SCIPtpiReleaseLock(&_jobqueues->lock) );
251  /* running the new job */
252  #pragma omp task firstprivate(newjob)
253  executeJob(newjob);
254  }
255 
256  return SCIP_OKAY;
257 }
258 
259 
261  SCIP_CONDITION* condition
262  )
263 {
264  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
265 
266  if( condition->_waitnum > condition->_signals )
267  ++condition->_signals;
268 
269  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
270 
271  return SCIP_OKAY;
272 }
273 
275  SCIP_CONDITION* condition
276  )
277 {
278 
279  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
280  condition->_signals = condition->_waitnum;
281  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
282 
283  return SCIP_OKAY;
284 }
285 
287  SCIP_CONDITION* condition,
288  SCIP_LOCK* lock
289  )
290 {
291  int waitnum;
292 
293  SCIP_CALL( SCIPtpiReleaseLock(lock) );
294 
295  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
296  waitnum = ++condition->_waitnum;
297 
298  ++condition->_waiters;
299 
300  do
301  {
302  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
303  #pragma omp taskyield
304  SCIP_CALL( SCIPtpiAcquireLock(&condition->_lock) );
305  }
306  while( condition->_signals < waitnum );
307 
308  --condition->_waiters;
309 
310  if( condition->_waiters == 0 )
311  {
312  condition->_signals = 0;
313  condition->_waitnum = 0;
314  }
315 
316  SCIP_CALL( SCIPtpiReleaseLock(&condition->_lock) );
317 
318  SCIP_CALL( SCIPtpiAcquireLock(lock) );
319 
320  return SCIP_OKAY;
321 }
322 
323 /** Returns the number of threads */
325  )
326 {
327  return omp_get_num_threads();
328 }
329 
330 /** Returns the thread number */
332  )
333 {
334  return omp_get_thread_num();
335 }
336 
337 /** creates a job for parallel processing*/
339  SCIP_JOB** job, /**< pointer to the job that will be created */
340  int jobid, /**< the id for the current job */
341  SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
342  void* jobarg /**< the job's argument */
343  )
344 {
345  SCIP_ALLOC( BMSallocMemory(job) );
346 
347  (*job)->jobid = jobid;
348  (*job)->jobfunc = jobfunc;
349  (*job)->args = jobarg;
350  (*job)->nextjob = NULL;
351 
352  return SCIP_OKAY;
353 }
354 
355 /** get a new job id for the new set of submitted jobs */
357  void
358  )
359 {
360  static int currentjobid = 0;
361  int jobid;
362 
363  #pragma omp atomic capture
364  jobid = ++currentjobid;
365 
366  return jobid;
367 }
368 
369 /** submit a job for parallel processing */
370 /* the return is a globally defined status */
372  SCIP_JOB* job, /**< pointer to the job to be submitted */
373  SCIP_SUBMITSTATUS* status /**< pointer to store the submit status */
374 
375  )
376 {
377  assert(_jobqueues != NULL);
378 
379  *status = SCIP_SUBMIT_SUCCESS;
380  SCIP_CALL( jobQueueAddJob(job) );
381 
382  return SCIP_OKAY;
383 }
384 
385 static
387  int jobid
388  )
389 {
390  int i;
391 
392  if( _jobqueues->ncurrentjobs > 0 )
393  {
394  for( i = 0; i < _jobqueues->nthreads; ++i )
395  {
396  if( _jobqueues->currentjobs[i] != NULL && _jobqueues->currentjobs[i]->jobid == jobid )
397  return TRUE;
398  }
399  }
400 
401  return FALSE;
402 }
403 
404 static
406  int jobid
407  )
408 {
409 
410  if( _jobqueues->jobqueue.njobs > 0 )
411  {
412  SCIP_JOB* currjob;
413  currjob = _jobqueues->jobqueue.firstjob;
414 
415  do
416  {
417  if( currjob->jobid == jobid )
418  return TRUE;
419 
420  if( currjob == _jobqueues->jobqueue.lastjob )
421  break;
422 
423  currjob = currjob->nextjob;
424  }
425  while( TRUE ); /*lint !e506*/
426  }
427 
428  return FALSE;
429 }
430 
431 
432 /** Blocks until all jobs of the given jobid have finished
433  * and then returns the smallest SCIP_RETCODE of all the jobs */
435  int jobid
436  )
437 {
439 
440  retcode = SCIP_OKAY;
441  SCIP_CALL( SCIPtpiAcquireLock(&_jobqueues->lock) );
442 
443  while( isJobRunning(jobid) || isJobWaiting(jobid) )
444  {
445  SCIP_CALL( SCIPtpiWaitCondition(&_jobqueues->jobfinished, &_jobqueues->lock) );
446  }
447 
448  if( _jobqueues->finishedjobs.njobs > 0 )
449  {
450  SCIP_JOB* currjob = _jobqueues->finishedjobs.firstjob;
451  SCIP_JOB* prevjob = NULL;
452 
453  /* finding the location of the processed job in the currentjobs queue */
454  do
455  {
456  if( currjob->jobid == jobid )
457  {
458  SCIP_JOB* nextjob;
459 
460  /** if the job has the right jobid collect its retcode,
461  * remove it from the finished job list, and free it */
462  retcode = MIN(retcode, currjob->retcode);
463 
464  /* removing the finished job from finished jobs list */
465  if( currjob == _jobqueues->finishedjobs.firstjob )
466  _jobqueues->finishedjobs.firstjob = currjob->nextjob;
467  else
468  prevjob->nextjob = currjob->nextjob; /*lint !e613*/
469 
470  if( currjob == _jobqueues->finishedjobs.lastjob )
471  _jobqueues->finishedjobs.lastjob = prevjob;
472 
473  _jobqueues->finishedjobs.njobs--;
474 
475  /* update currjob and free finished job; prevjob stays the same */
476  nextjob = currjob->nextjob;
477  BMSfreeMemory(&currjob);
478  currjob = nextjob;
479  }
480  else
481  {
482  prevjob = currjob;
483  currjob = prevjob->nextjob;
484  }
485  }
486  while( prevjob != _jobqueues->finishedjobs.lastjob );
487 
488  }
489  else
490  {
491  /* given jobid was not submitted */
492  printf("err1");
493  retcode = SCIP_ERROR;
494  }
495 
496  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_jobqueues->lock) );
497 
498  return retcode;
499 }
500 
501 /** initializes tpi */
503  int nthreads,
504  int queuesize,
505  SCIP_Bool blockwhenfull
506  )
507 {
508  omp_set_num_threads(nthreads);
509  assert(_jobqueues == NULL);
510 
511  SCIP_CALL( createJobQueue(nthreads, queuesize, blockwhenfull) );
512  return SCIP_OKAY;
513 }
514 
515 /** deinitializes tpi */
517  void
518  )
519 {
520  assert(_jobqueues != NULL);
521  assert(_jobqueues->finishedjobs.njobs == 0);
522  assert(_jobqueues->jobqueue.njobs == 0);
523  assert(_jobqueues->ncurrentjobs == 0);
524 
525  SCIP_CALL( freeJobQueue() );
526  return SCIP_OKAY;
527 }
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:45
SCIP_JOB * firstjob
Definition: tpi_openmp.c:41
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_openmp.c:338
#define FALSE
Definition: def.h:64
void * args
Definition: tpi_openmp.c:34
#define TRUE
Definition: def.h:63
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:53
SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
#define SCIP_UNUSED(x)
Definition: def.h:404
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:105
SCIP_RETCODE SCIPtpiInitCondition(SCIP_LOCK *lock)
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_openmp.c:434
SCIP_JOB * lastjob
Definition: tpi_openmp.c:42
static SCIP_Bool isJobRunning(int jobid)
Definition: tpi_openmp.c:386
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:502
#define BMSfreeMemory(ptr)
Definition: memory.h:127
static void jobQueueProcessJob(void)
Definition: tpi_openmp.c:163
static SCIP_RETCODE createJobQueue(int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_openmp.c:64
static SCIP_RETCODE jobQueueAddJob(SCIP_JOB *newjob)
Definition: tpi_openmp.c:209
SCIP_LOCK lock
Definition: tpi_openmp.c:54
int SCIPtpiGetThreadNum()
Definition: tpi_openmp.c:331
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:260
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:129
SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
the type definitions for the SCIP parallel interface
SCIP_RETCODE retcode
Definition: tpi_openmp.c:35
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:32
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_openmp.c:516
SCIP_RETCODE SCIPtpiSumbitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_openmp.c:371
#define SCIP_CALL(x)
Definition: def.h:350
static SCIP_Bool isJobWaiting(int jobid)
Definition: tpi_openmp.c:405
int jobid
Definition: tpi_openmp.c:31
#define SCIP_Bool
Definition: def.h:61
SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK *lock)
SCIP_CONDITION jobfinished
Definition: tpi_openmp.c:55
SCIP_JOBQUEUE jobqueue
Definition: tpi_openmp.c:49
void SCIPtpiDestroyLock(SCIP_LOCK *lock)
void SCIPtpiDestroyCondition(SCIP_LOCK *lock)
int SCIPtpiGetNumThreads()
Definition: tpi_openmp.c:324
#define BMSallocMemory(ptr)
Definition: memory.h:101
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:33
SCIP_JOB ** currentjobs
Definition: tpi_openmp.c:50
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:274
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_openmp.c:286
static void executeJob(SCIP_JOB *job)
Definition: tpi_openmp.c:118
#define SCIP_CALL_ABORT(x)
Definition: def.h:329
int SCIPtpiGetNewJobID(void)
Definition: tpi_openmp.c:356
static SCIP_RETCODE freeJobQueue(void)
Definition: tpi_openmp.c:101
#define SCIP_ALLOC(x)
Definition: def.h:361
SCIP_JOBQUEUE finishedjobs
Definition: tpi_openmp.c:53
memory allocation routines