Scippy

SCIP

Solving Constraint Integer Programs

tpi_tnycthrd.c
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and library */
4 /* SCIP --- Solving Constraint Integer Programs */
5 /* */
6 /* Copyright (C) 2002-2020 Konrad-Zuse-Zentrum */
7 /* fuer Informationstechnik Berlin */
8 /* */
9 /* SCIP is distributed under the terms of the ZIB Academic License. */
10 /* */
11 /* You should have received a copy of the ZIB Academic License */
12 /* along with SCIP; see the file COPYING. If not visit scipopt.org. */
13 /* */
14 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
15 
16 /**@file tpi_tnycthrd.c
17  * @ingroup TASKINTERFACE
18  * @brief a TPI implementation using tinycthreads
19  * @author Stephen J. Maher
20  * @author Leona Gottwald
21  */
22 
23 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
24 
25 #include "tpi/tpi.h"
26 #include "blockmemshell/memory.h"
27 
29 static SCIP_THREADPOOL* _threadpool = NULL;
30 _Thread_local int _threadnumber; /*lint !e129*/
31 
32 /** A job added to the queue */
33 struct SCIP_Job
34 {
35  int jobid; /**< id to identify jobs from a common process */
36  struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
37  SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
38  void* args; /**< pointer to the function arguements */
39  SCIP_RETCODE retcode; /**< return code of the job */
40 };
41 
42 /** the thread pool job queue */
43 struct SCIP_JobQueue
44 {
45  SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
46  SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
47  int njobs; /**< number of jobs in the queue */
48 };
50 
51 /** The thread pool */
53 {
54  /* Pool Characteristics */
55  int nthreads; /**< number of threads in the pool */
56  int queuesize; /**< the total number of items to enter the queue */
57 
58  /* Current pool state */
59  thrd_t* threads; /**< the threads included in the pool */
60  SCIP_JOBQUEUE* jobqueue; /**< the job queue */
61  SCIP_JOBQUEUE* currentjobs; /**< the jobs currently being processed on a thread.
62  Only a single job is allowed per thread. */
63  SCIP_JOBQUEUE* finishedjobs; /**< finished jobs that are not yet collected */
64  int currworkingthreads; /**< the threads currently processing jobs */
65  SCIP_Bool blockwhenfull; /**< indicates that the queue can only be as large as nthreads */
66  int currentid; /**< current job id */
67 
68  /* Control indicators */
69  SCIP_Bool shutdown; /**< indicates whether the pool needs to be shutdown */
70  SCIP_Bool queueopen; /**< indicates whether the queue is open */
71 
72  /* mutex and locks for the thread pool */
73  SCIP_LOCK poollock; /**< mutex to allow read and write of the pool features */
74  SCIP_CONDITION queuenotempty; /**< condition to broadcast the queue has jobs */
75  SCIP_CONDITION queuenotfull; /**< condition to broadcast the queue is not full */
76  SCIP_CONDITION queueempty; /**< condition to broadcast that the queue is empty */
77  SCIP_CONDITION jobfinished; /**< condition to broadcast that a job has been finished */
78 };
79 
80 /** this function controls the execution of each of the threads */
81 static
83  void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
84  )
85 {
86  SCIP_JOB* newjob;
87  SCIP_JOB* prevjob;
88  SCIP_JOB* currjob;
89 
90  _threadnumber = (int)(uintptr_t) threadnum;
91 
92  /* Increase the number of active threads */
93  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
94  _threadpool->currworkingthreads += 1;
95  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
96 
97  /* this is an endless loop that runs until the thrd_exit function is called */
98  while( TRUE ) /*lint !e716*/
99  {
100  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
101 
102  /* the queue is empty but the shutdown command has not been given */
103  while( _threadpool->jobqueue->njobs == 0 && !_threadpool->shutdown )
104  {
105  SCIP_CALL( SCIPtpiWaitCondition(&(_threadpool->queuenotempty), &(_threadpool->poollock)) );
106  }
107 
108  /* if the shutdown command has been given, the exit the thread */
109  if( _threadpool->shutdown )
110  {
111  /* Decrease the thread count when execution of job queue has completed */
112  _threadpool->currworkingthreads -= 1;
113  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
114 
115  thrd_exit((int)SCIP_OKAY);
116  }
117 
118  /* getting the next job in the queue */
119  newjob = _threadpool->jobqueue->firstjob;
120  _threadpool->jobqueue->njobs--; /* decreasing the number of jobs in the queue */
121 
122  if( _threadpool->jobqueue->njobs == 0 )
123  {
124  _threadpool->jobqueue->firstjob = NULL;
125  _threadpool->jobqueue->lastjob = NULL;
126  }
127  else
128  _threadpool->jobqueue->firstjob = newjob->nextjob; /* updating the queue */
129 
130  /* if we want to wait when the queue is full, then we broadcast that the queue can now take new jobs */
131  if( _threadpool->blockwhenfull &&
132  _threadpool->jobqueue->njobs == _threadpool->queuesize - 1 )
133  {
135  }
136 
137  /* indicating that the queue is empty */
138  if( _threadpool->jobqueue->njobs == 0 )
139  {
140  SCIP_CALL( SCIPtpiBroadcastCondition(&(_threadpool->queueempty)) );
141  }
142 
143  /* updating the current job list */
144  if( _threadpool->currentjobs->njobs == 0 )
145  {
146  _threadpool->currentjobs->firstjob = newjob;
147  _threadpool->currentjobs->lastjob = newjob;
148  }
149  else
150  {
151  _threadpool->currentjobs->lastjob->nextjob = newjob;
152  _threadpool->currentjobs->lastjob = newjob;
153  }
154 
155  _threadpool->currentjobs->njobs++;
156 
157  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
158 
159  /* setting the job to run on this thread */
160  newjob->retcode = (*(newjob->jobfunc))(newjob->args);
161 
162  /* setting the current job on this thread to NULL */
163  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
164 
165  /* finding the location of the processed job in the currentjobs queue */
166  currjob = _threadpool->currentjobs->firstjob;
167  prevjob = NULL;
168 
169  while( currjob != newjob )
170  {
171  prevjob = currjob;
172  currjob = prevjob->nextjob;
173  }
174 
175  /* removing the processed job from current jobs list */
176  if( currjob == _threadpool->currentjobs->firstjob )
177  _threadpool->currentjobs->firstjob = currjob->nextjob;
178  else
179  prevjob->nextjob = currjob->nextjob; /*lint !e794*/
180 
181  if( currjob == _threadpool->currentjobs->lastjob )
182  _threadpool->currentjobs->lastjob = prevjob;
183 
184  _threadpool->currentjobs->njobs--;
185 
186  /* updating the finished job list */
187  if( _threadpool->finishedjobs->njobs == 0 )
188  {
189  _threadpool->finishedjobs->firstjob = newjob;
190  _threadpool->finishedjobs->lastjob = newjob;
191  }
192  else
193  {
194  _threadpool->finishedjobs->lastjob->nextjob = newjob;
195  _threadpool->finishedjobs->lastjob = newjob;
196  }
197 
198  _threadpool->finishedjobs->njobs++;
199 
200  /* signalling that a job has been finished */
201  SCIP_CALL( SCIPtpiBroadcastCondition(&(_threadpool)->jobfinished) );
202 
203  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
204  }
205 }
206 
207 /** this function controls the execution of each of the threads */
208 static
210  void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
211  )
212 {
213  return (int) threadPoolThreadRetcode(threadnum);
214 }
215 
216 /** creates a threadpool */
217 static
219  SCIP_THREADPOOL** thrdpool, /**< pointer to store threadpool */
220  int nthreads, /**< number of threads in the threadpool */
221  int qsize, /**< maximum size of the jobqueue */
222  SCIP_Bool blockwhenfull /**< should the jobqueue block if it is full */
223  )
224 {
225  uintptr_t i;
226 
227  assert(nthreads >= 0);
228  assert(qsize >= 0);
229 
230  /* @todo think about the correct memory here */
231  SCIP_ALLOC( BMSallocMemory(thrdpool) );
232  (*thrdpool)->currentid = 0;
233  (*thrdpool)->queuesize = qsize;
234  (*thrdpool)->nthreads = nthreads;
235  (*thrdpool)->blockwhenfull = blockwhenfull;
236  (*thrdpool)->shutdown = FALSE;
237  (*thrdpool)->queueopen = TRUE;
238 
239  /* allocating memory for the job queue */
240  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->jobqueue) );
241  (*thrdpool)->jobqueue->firstjob = NULL;
242  (*thrdpool)->jobqueue->lastjob = NULL;
243  (*thrdpool)->jobqueue->njobs = 0;
244 
245  /* allocating memory for the job queue */
246  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->currentjobs) );
247  (*thrdpool)->currentjobs->firstjob = NULL;
248  (*thrdpool)->currentjobs->lastjob = NULL;
249  (*thrdpool)->currentjobs->njobs = 0;
250 
251  /* allocating memory for the job queue */
252  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->finishedjobs) );
253  (*thrdpool)->finishedjobs->firstjob = NULL;
254  (*thrdpool)->finishedjobs->lastjob = NULL;
255  (*thrdpool)->finishedjobs->njobs = 0;
256 
257  /* initialising the mutex */
258  SCIP_CALL( SCIPtpiInitLock(&(*thrdpool)->poollock) );
259 
260  /* initialising the conditions */
261  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->queuenotempty) );
262  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->queuenotfull) );
263  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->queueempty) );
264  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->jobfinished) );
265 
266  /* creating the threads */
267  (*thrdpool)->currworkingthreads = 0;
268 
269  /* allocating memory for the threads */
270  SCIP_ALLOC( BMSallocMemoryArray(&((*thrdpool)->threads), nthreads) );
271 
272  /* create the threads */
273  for( i = 0; i < (unsigned)nthreads; i++ )
274  {
275  if( thrd_create(&((*thrdpool)->threads[i]), threadPoolThread, (void*)i) != thrd_success )
276  return SCIP_ERROR;
277  }
278 
279  _threadnumber = nthreads;
280  /* halt while all threads are not active TODO: is synchronization required here ? */
281  /*TODO: this caused a deadlock, is it important to wait for all threads to start?
282  * while( (*thrdpool)->currworkingthreads != nthreads )
283  {}*/
284 
285  return SCIP_OKAY;
286 }
287 
288 /** adding a job to the job queue.
289  * This gives some more flexibility in the handling of new jobs.
290  * This function needs to be called from within a mutex. */
291 static
293  SCIP_THREADPOOL* threadpool,
294  SCIP_JOB* newjob
295  )
296 {
297  /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
298  /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
299  assert(threadpool->jobqueue->njobs < threadpool->queuesize);
300 
301  newjob->nextjob = NULL;
302 
303  /* checking the status of the job queue */
304  if( threadpool->jobqueue->njobs == 0 )
305  {
306  threadpool->jobqueue->firstjob = newjob;
307  threadpool->jobqueue->lastjob = newjob;
308  }
309  else /* it is assumed that the jobqueue is not full */
310  {
311  threadpool->jobqueue->lastjob->nextjob = newjob;
312  threadpool->jobqueue->lastjob = newjob;
313  }
314 
315  SCIP_CALL_ABORT( SCIPtpiSignalCondition(&(threadpool->queuenotempty)) ); /* signalling to all threads that the queue has jobs
316  * using the signal instead of broadcast because only one
317  * thread should be awakened */
318 
319  threadpool->jobqueue->njobs++;
320 }
321 
322 /** adds a job to the threadpool */
323 static
325  SCIP_JOB* newjob, /**< job to add to threadpool */
326  SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
327  )
328 {
329  assert(newjob != NULL);
330  assert(_threadpool != NULL);
331 
332  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
333 
334  /* if the queue is full and we are blocking, then return an error. */
335  if( _threadpool->jobqueue->njobs == _threadpool->queuesize && _threadpool->blockwhenfull )
336  {
337  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
338  *status = SCIP_SUBMIT_QUEUEFULL;
339  return SCIP_OKAY;
340  }
341 
342  /* Wait until the job queue is not full. If the queue is closed or the thread pool is shutdown, then stop waiting */
343  /* @todo this needs to be checked. It is possible that a job can be submitted and then the queue is closed or the
344  * thread pool is shutdown. Need to work out the best way to handle this. */
345  while( _threadpool->jobqueue->njobs == _threadpool->queuesize && !(_threadpool->shutdown || !_threadpool->queueopen) )
346  {
347  SCIP_CALL( SCIPtpiWaitCondition(&(_threadpool->queuenotfull), &(_threadpool->poollock)) );
348  }
349 
350  /* if the thread pool is shutdown or the queue is closed, then we need to leave the job submission */
351  if( !_threadpool->queueopen )
352  {
353  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
354  *status = SCIP_SUBMIT_QUEUECLOSED;
355  return SCIP_OKAY;
356  }
357  else if( _threadpool->shutdown )
358  {
359  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
360  *status = SCIP_SUBMIT_SHUTDOWN;
361  return SCIP_OKAY;
362  }
363 
364  /* creating the job for submission */
365  newjob->nextjob = NULL;
366 
367  /* adding the job to the queue */
368  /* this can only happen if the queue is not full
369  */
370  assert(_threadpool->jobqueue->njobs != _threadpool->queuesize);
371  jobQueueAddJob(_threadpool, newjob);
372 
373  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
374 
375  *status = SCIP_SUBMIT_SUCCESS;
376  return SCIP_OKAY;
377 }
378 
379 /** frees the jobqueue of the threadpool */
380 static
382  SCIP_THREADPOOL* thrdpool
383  )
384 {
385  SCIP_JOB* currjob;
386 
387  assert(!thrdpool->queueopen);
388  assert(thrdpool->shutdown);
389 
390  /* iterating through all jobs until all have been freed. */
391  while( thrdpool->jobqueue->firstjob != NULL )
392  {
393  currjob = thrdpool->jobqueue->firstjob->nextjob;
394  thrdpool->jobqueue->firstjob = thrdpool->jobqueue->firstjob->nextjob;
395  BMSfreeMemory(&currjob);
396  }
397 
398  assert(thrdpool->jobqueue->firstjob == NULL);
399  assert(thrdpool->jobqueue->lastjob == NULL);
400 
401  BMSfreeMemory(&thrdpool->jobqueue);
402 }
403 
404 
405 
406 
407 static
409  SCIP_THREADPOOL** thrdpool,
410  SCIP_Bool finishjobs,
411  SCIP_Bool completequeue
412  )
413 {
414  int i;
416 
417  /*TODO remove argument? */
418  SCIP_UNUSED( finishjobs );
419 
420  SCIP_CALL( SCIPtpiAcquireLock(&((*thrdpool)->poollock)) );
421 
422  /* if the shutdown is already in progress, then we don't need to completed this function */
423  if( !(*thrdpool)->queueopen || (*thrdpool)->shutdown )
424  {
425  SCIP_CALL( SCIPtpiReleaseLock(&((*thrdpool)->poollock)) );
426 
427  return SCIP_OKAY;
428  }
429 
430  /* indicating that the job queue is now closed for new jobs */
431  (*thrdpool)->queueopen = FALSE;
432 
433  /* if the jobs in the queue should be completed, then we wait until the queueempty condition is set */
434  if( completequeue )
435  {
436  while( (*thrdpool)->jobqueue->njobs > 0 )
437  {
438  SCIP_CALL( SCIPtpiWaitCondition(&((*thrdpool)->queueempty), &((*thrdpool)->poollock)) );
439  }
440  }
441 
442  /* indicating that the tpi has commenced the shutdown process */
443  (*thrdpool)->shutdown = TRUE;
444 
445  SCIP_CALL( SCIPtpiReleaseLock(&((*thrdpool)->poollock)) );
446 
447  /* waking up all threads so that they can check the shutdown condition
448  * this requires that the conditions queuenotempty and queuenotfull is broadcast
449  */
450  SCIP_CALL( SCIPtpiBroadcastCondition(&((*thrdpool)->queuenotempty)) );
451  SCIP_CALL( SCIPtpiBroadcastCondition(&((*thrdpool)->queuenotfull)) );
452 
453  retcode = SCIP_OKAY;
454 
455  /* calling a join to ensure that all worker finish before the thread pool is closed */
456  for( i = 0; i < (*thrdpool)->nthreads; i++ )
457  {
458  int thrdretcode;
459 
460  if( thrd_join((*thrdpool)->threads[i], &thrdretcode) != thrd_success )
461  retcode = (SCIP_RETCODE) MIN((int)SCIP_ERROR, (int)retcode);
462  else
463  retcode = (SCIP_RETCODE) MIN(thrdretcode, (int)retcode);
464  }
465 
466  /* freeing memory and data structures */
467  BMSfreeMemoryArray(&(*thrdpool)->threads);
468 
469  /* freeing the current jobs list. This assumes that all jobs complete before the tpi is closed. */
470  assert((*thrdpool)->currentjobs->njobs == 0);
471  BMSfreeMemory(&(*thrdpool)->currentjobs);
472  assert((*thrdpool)->finishedjobs->njobs == 0);
473  BMSfreeMemory(&(*thrdpool)->finishedjobs);
474 
475  freeJobQueue(*thrdpool);
476 
477  /* destroying the conditions */
478  SCIPtpiDestroyCondition(&(*thrdpool)->jobfinished);
479  SCIPtpiDestroyCondition(&(*thrdpool)->queueempty);
480  SCIPtpiDestroyCondition(&(*thrdpool)->queuenotfull);
481  SCIPtpiDestroyCondition(&(*thrdpool)->queuenotempty);
482 
483  /* destroying the mutex */
484  SCIPtpiDestroyLock(&(*thrdpool)->poollock);
485 
486  BMSfreeMemory(thrdpool);
487 
488  return retcode;
489 }
490 
491 
492 /* checking a job queue */
493 static
495  SCIP_JOBQUEUE* jobqueue,
496  int jobid
497  )
498 {
499  SCIP_JOB* currjob = jobqueue->firstjob;
500 
501  /* checking the job ids */
502  if( currjob != NULL )
503  {
504  while( currjob != jobqueue->lastjob )
505  {
506  if( currjob->jobid == jobid )
507  return SCIP_JOB_INQUEUE;
508 
509  currjob = currjob->nextjob;
510  }
511 
512  if( currjob->jobid == jobid )
513  return SCIP_JOB_INQUEUE;
514  }
515 
516  return SCIP_JOB_DOESNOTEXIST;
517 }
518 
519 /** returns whether the job id is running */
520 static
522  SCIP_JOBQUEUE* currentjobs,
523  int jobid
524  )
525 {
526  if( checkJobQueue(currentjobs, jobid) == SCIP_JOB_INQUEUE )
527  return TRUE;
528  else
529  return FALSE;
530 }
531 
532 /** returns the number of threads */
534  void
535  )
536 {
537  return _threadpool->nthreads;
538 }
539 
540 /** initializes tpi */
542  int nthreads,
543  int queuesize,
544  SCIP_Bool blockwhenfull
545  )
546 {
547  assert(_threadpool == NULL);
548  SCIP_CALL( createThreadPool(&_threadpool, nthreads, queuesize, blockwhenfull) );
549  return SCIP_OKAY;
550 }
551 
552 /** deinitializes tpi */
554  void
555  )
556 {
557  assert(_threadpool != NULL);
558 
559  SCIP_CALL( freeThreadPool(&_threadpool, TRUE, TRUE) );
560 
561  return SCIP_OKAY;
562 }
563 
564 /** creates a job for parallel processing */
566  SCIP_JOB** job, /**< pointer to the job that will be created */
567  int jobid, /**< the id for the current job */
568  SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
569  void* jobarg /**< the job's argument */
570  )
571 {
572  SCIP_ALLOC( BMSallocMemory(job) );
573 
574  (*job)->jobid = jobid;
575  (*job)->jobfunc = jobfunc;
576  (*job)->args = jobarg;
577  (*job)->nextjob = NULL;
578 
579  return SCIP_OKAY;
580 }
581 
582 /** get a new job id for the new set of submitted jobs */
584  void
585  )
586 {
587  int id;
588  assert(_threadpool != NULL);
589 
590  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_threadpool->poollock) );
591  id = ++_threadpool->currentid;
592  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_threadpool->poollock) );
593 
594  return id;
595 }
596 
597 /** submit a job for parallel processing the return is a globally defined status */
599  SCIP_JOB* job, /**< pointer to the job to be submitted */
600  SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
601  )
602 {
603  assert(job != NULL);
604 
605  assert(job->jobid == _threadpool->currentid); /* the job id must be set before submitting the job. The submitter controls
606  whether a new id is required. */
607 
608  SCIP_CALL( threadPoolAddWork(job, status) );
609 
610  return SCIP_OKAY;
611 }
612 
613 /** blocks until all jobs of the given jobid have finished
614  * and then returns the smallest SCIP_RETCODE of all the jobs
615  */
617  int jobid /**< the jobid of the jobs to wait for */
618  )
619 {
621  SCIP_JOB* currjob;
622  SCIP_JOB* prevjob;
623 
624  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
625 
626  while( isJobRunning(_threadpool->currentjobs, jobid) || isJobRunning(_threadpool->jobqueue, jobid) )
627  {
628  SCIP_CALL( SCIPtpiWaitCondition(&_threadpool->jobfinished, &_threadpool->poollock) );
629  }
630 
631  /* finding the location of the processed job in the currentjobs queue */
632  retcode = SCIP_OKAY;
633  currjob = _threadpool->finishedjobs->firstjob;
634  prevjob = NULL;
635 
636  while( currjob )
637  {
638  if( currjob->jobid == jobid )
639  {
640  SCIP_JOB* nextjob;
641 
642  /* if the job has the right jobid collect its retcode,
643  * remove it from the finished job list, and free it
644  */
645  retcode = MIN(retcode, currjob->retcode);
646 
647  /* removing the finished job from finished jobs list */
648  if( currjob == _threadpool->finishedjobs->firstjob )
649  {
650  _threadpool->finishedjobs->firstjob = currjob->nextjob;
651  }
652  else
653  {
654  assert(prevjob != NULL);
655  prevjob->nextjob = currjob->nextjob;
656  }
657 
658  if( currjob == _threadpool->finishedjobs->lastjob )
659  _threadpool->finishedjobs->lastjob = prevjob;
660 
661  _threadpool->finishedjobs->njobs--;
662 
663  /* update currjob and free finished job; prevjob stays the same */
664  nextjob = currjob->nextjob;
665  BMSfreeMemory(&currjob);
666  currjob = nextjob;
667  }
668  else
669  {
670  /* otherwise leave job untouched */
671  prevjob = currjob;
672  currjob = prevjob->nextjob;
673  }
674  }
675 
676  SCIP_CALL( SCIPtpiReleaseLock(&_threadpool->poollock) );
677 
678  return retcode;
679 }
SCIP_EXPORT SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:261
static int threadPoolThread(void *threadnum)
Definition: tpi_tnycthrd.c:209
SCIP_CONDITION queuenotempty
Definition: tpi_tnycthrd.c:74
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:45
SCIP_JOB * firstjob
Definition: tpi_openmp.c:41
#define FALSE
Definition: def.h:73
enum SCIP_Jobstatus SCIP_JOBSTATUS
Definition: type_tpi.h:60
void * args
Definition: tpi_openmp.c:34
static SCIP_JOBSTATUS checkJobQueue(SCIP_JOBQUEUE *jobqueue, int jobid)
Definition: tpi_tnycthrd.c:494
#define TRUE
Definition: def.h:72
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:54
#define SCIP_UNUSED(x)
Definition: def.h:418
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:115
SCIP_EXPORT SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_openmp.c:287
SCIP_JOB * lastjob
Definition: tpi_openmp.c:42
SCIP_JOBQUEUE * finishedjobs
Definition: tpi_tnycthrd.c:63
static SCIP_RETCODE createThreadPool(SCIP_THREADPOOL **thrdpool, int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:218
#define BMSfreeMemory(ptr)
Definition: memory.h:137
SCIP_JOBQUEUE * jobqueue
Definition: tpi_tnycthrd.c:60
thrd_t * threads
Definition: tpi_tnycthrd.c:59
SCIP_CONDITION queueempty
Definition: tpi_tnycthrd.c:76
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_tnycthrd.c:565
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:541
SCIP_EXPORT SCIP_RETCODE SCIPtpiInitCondition(SCIP_LOCK *lock)
SCIP_JOBQUEUE * currentjobs
Definition: tpi_tnycthrd.c:61
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:139
the type definitions for the SCIP parallel interface
SCIP_Bool blockwhenfull
Definition: tpi_tnycthrd.c:65
SCIP_RETCODE retcode
Definition: tpi_openmp.c:35
SCIP_EXPORT void SCIPtpiDestroyCondition(SCIP_LOCK *lock)
SCIP_Bool queueopen
Definition: tpi_tnycthrd.c:70
static SCIP_RETCODE threadPoolThreadRetcode(void *threadnum)
Definition: tpi_tnycthrd.c:82
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:32
SCIP_CONDITION queuenotfull
Definition: tpi_tnycthrd.c:75
SCIP_EXPORT SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK *lock)
#define NULL
Definition: lpi_spx1.cpp:155
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_tnycthrd.c:553
#define SCIP_CALL(x)
Definition: def.h:364
int jobid
Definition: tpi_openmp.c:31
int SCIPtpiGetNumThreads(void)
Definition: tpi_tnycthrd.c:533
#define SCIP_Bool
Definition: def.h:70
SCIP_EXPORT SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:275
SCIP_LOCK poollock
Definition: tpi_tnycthrd.c:73
SCIP_Bool shutdown
Definition: tpi_tnycthrd.c:69
SCIP_EXPORT void SCIPtpiDestroyLock(SCIP_LOCK *lock)
static SCIP_RETCODE threadPoolAddWork(SCIP_JOB *newjob, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:324
SCIP_EXPORT SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
static SCIP_RETCODE freeThreadPool(SCIP_THREADPOOL **thrdpool, SCIP_Bool finishjobs, SCIP_Bool completequeue)
Definition: tpi_tnycthrd.c:408
static void freeJobQueue(SCIP_THREADPOOL *thrdpool)
Definition: tpi_tnycthrd.c:381
#define BMSallocMemory(ptr)
Definition: memory.h:111
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:33
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_tnycthrd.c:616
#define SCIP_CALL_ABORT(x)
Definition: def.h:343
SCIP_RETCODE SCIPtpiSumbitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:598
#define SCIP_ALLOC(x)
Definition: def.h:375
static SCIP_Bool isJobRunning(SCIP_JOBQUEUE *currentjobs, int jobid)
Definition: tpi_tnycthrd.c:521
int SCIPtpiGetNewJobID(void)
Definition: tpi_tnycthrd.c:583
static void jobQueueAddJob(SCIP_THREADPOOL *threadpool, SCIP_JOB *newjob)
Definition: tpi_tnycthrd.c:292
SCIP_CONDITION jobfinished
Definition: tpi_tnycthrd.c:77
SCIP_EXPORT SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
memory allocation routines