Scippy

SCIP

Solving Constraint Integer Programs

tpi_tnycthrd.c
Go to the documentation of this file.
1 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2 /* */
3 /* This file is part of the program and library */
4 /* SCIP --- Solving Constraint Integer Programs */
5 /* */
6 /* Copyright (c) 2002-2023 Zuse Institute Berlin (ZIB) */
7 /* */
8 /* Licensed under the Apache License, Version 2.0 (the "License"); */
9 /* you may not use this file except in compliance with the License. */
10 /* You may obtain a copy of the License at */
11 /* */
12 /* http://www.apache.org/licenses/LICENSE-2.0 */
13 /* */
14 /* Unless required by applicable law or agreed to in writing, software */
15 /* distributed under the License is distributed on an "AS IS" BASIS, */
16 /* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
17 /* See the License for the specific language governing permissions and */
18 /* limitations under the License. */
19 /* */
20 /* You should have received a copy of the Apache-2.0 license */
21 /* along with SCIP; see the file LICENSE. If not visit scipopt.org. */
22 /* */
23 /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
24 
25 /**@file tpi_tnycthrd.c
26  * @ingroup TASKINTERFACE
27  * @brief a TPI implementation using tinycthreads
28  * @author Stephen J. Maher
29  * @author Leona Gottwald
30  */
31 
32 /*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
33 
34 #include "tpi/tpi.h"
35 #include "blockmemshell/memory.h"
36 
38 static SCIP_THREADPOOL* _threadpool = NULL;
39 _Thread_local int _threadnumber; /*lint !e129*/
40 
41 /** A job added to the queue */
42 struct SCIP_Job
43 {
44  int jobid; /**< id to identify jobs from a common process */
45  struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
46  SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
47  void* args; /**< pointer to the function arguments */
48  SCIP_RETCODE retcode; /**< return code of the job */
49 };
50 
51 /** the thread pool job queue */
52 struct SCIP_JobQueue
53 {
54  SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
55  SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
56  int njobs; /**< number of jobs in the queue */
57 };
59 
60 /** The thread pool */
62 {
63  /* Pool Characteristics */
64  int nthreads; /**< number of threads in the pool */
65  int queuesize; /**< the total number of items to enter the queue */
66 
67  /* Current pool state */
68  thrd_t* threads; /**< the threads included in the pool */
69  SCIP_JOBQUEUE* jobqueue; /**< the job queue */
70  SCIP_JOBQUEUE* currentjobs; /**< the jobs currently being processed on a thread;
71  * only a single job is allowed per thread. */
72  SCIP_JOBQUEUE* finishedjobs; /**< finished jobs that are not yet collected */
73  int currworkingthreads; /**< the threads currently processing jobs */
74  SCIP_Bool blockwhenfull; /**< indicates that the queue can only be as large as nthreads */
75  int currentid; /**< current job id */
76 
77  /* Control indicators */
78  SCIP_Bool shutdown; /**< indicates whether the pool needs to be shut down */
79  SCIP_Bool queueopen; /**< indicates whether the queue is open */
80 
81  /* mutex and locks for the thread pool */
82  SCIP_LOCK poollock; /**< mutex to allow read and write of the pool features */
83  SCIP_CONDITION queuenotempty; /**< condition to broadcast the queue has jobs */
84  SCIP_CONDITION queuenotfull; /**< condition to broadcast the queue is not full */
85  SCIP_CONDITION queueempty; /**< condition to broadcast that the queue is empty */
86  SCIP_CONDITION jobfinished; /**< condition to broadcast that a job has been finished */
87 };
88 
89 /** this function controls the execution of each of the threads */
90 static
92  void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
93  )
94 {
95  SCIP_JOB* newjob;
96  SCIP_JOB* prevjob;
97  SCIP_JOB* currjob;
98 
99  _threadnumber = (int)(uintptr_t) threadnum;
100 
101  /* Increase the number of active threads */
102  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
103  _threadpool->currworkingthreads += 1;
104  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
105 
106  /* this is an endless loop that runs until the thrd_exit function is called */
107  while( TRUE ) /*lint !e716*/
108  {
109  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
110 
111  /* the queue is empty but the shutdown command has not been given */
112  while( _threadpool->jobqueue->njobs == 0 && !_threadpool->shutdown )
113  {
114  SCIP_CALL( SCIPtpiWaitCondition(&(_threadpool->queuenotempty), &(_threadpool->poollock)) );
115  }
116 
117  /* if the shutdown command has been given, then exit the thread */
118  if( _threadpool->shutdown )
119  {
120  /* Decrease the thread count when execution of job queue has completed */
121  _threadpool->currworkingthreads -= 1;
122  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
123 
124  thrd_exit((int)SCIP_OKAY);
125  }
126 
127  /* getting the next job in the queue */
128  newjob = _threadpool->jobqueue->firstjob;
129  _threadpool->jobqueue->njobs--; /* decreasing the number of jobs in the queue */
130 
131  if( _threadpool->jobqueue->njobs == 0 )
132  {
133  _threadpool->jobqueue->firstjob = NULL;
134  _threadpool->jobqueue->lastjob = NULL;
135  }
136  else
137  _threadpool->jobqueue->firstjob = newjob->nextjob; /* updating the queue */
138 
139  /* if we want to wait when the queue is full, then we broadcast that the queue can now take new jobs */
140  if( _threadpool->blockwhenfull &&
141  _threadpool->jobqueue->njobs == _threadpool->queuesize - 1 )
142  {
144  }
145 
146  /* indicating that the queue is empty */
147  if( _threadpool->jobqueue->njobs == 0 )
148  {
149  SCIP_CALL( SCIPtpiBroadcastCondition(&(_threadpool->queueempty)) );
150  }
151 
152  /* updating the current job list */
153  if( _threadpool->currentjobs->njobs == 0 )
154  {
155  _threadpool->currentjobs->firstjob = newjob;
156  _threadpool->currentjobs->lastjob = newjob;
157  }
158  else
159  {
160  _threadpool->currentjobs->lastjob->nextjob = newjob;
161  _threadpool->currentjobs->lastjob = newjob;
162  }
163 
164  _threadpool->currentjobs->njobs++;
165 
166  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
167 
168  /* setting the job to run on this thread */
169  newjob->retcode = (*(newjob->jobfunc))(newjob->args);
170 
171  /* setting the current job on this thread to NULL */
172  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
173 
174  /* finding the location of the processed job in the currentjobs queue */
175  currjob = _threadpool->currentjobs->firstjob;
176  prevjob = NULL;
177 
178  while( currjob != newjob )
179  {
180  prevjob = currjob;
181  currjob = prevjob->nextjob;
182  }
183 
184  /* removing the processed job from current jobs list */
185  if( currjob == _threadpool->currentjobs->firstjob )
186  _threadpool->currentjobs->firstjob = currjob->nextjob;
187  else
188  prevjob->nextjob = currjob->nextjob; /*lint !e794*/
189 
190  if( currjob == _threadpool->currentjobs->lastjob )
191  _threadpool->currentjobs->lastjob = prevjob;
192 
193  _threadpool->currentjobs->njobs--;
194 
195  /* updating the finished job list */
196  if( _threadpool->finishedjobs->njobs == 0 )
197  {
198  _threadpool->finishedjobs->firstjob = newjob;
199  _threadpool->finishedjobs->lastjob = newjob;
200  }
201  else
202  {
203  _threadpool->finishedjobs->lastjob->nextjob = newjob;
204  _threadpool->finishedjobs->lastjob = newjob;
205  }
206 
207  _threadpool->finishedjobs->njobs++;
208 
209  /* signalling that a job has been finished */
210  SCIP_CALL( SCIPtpiBroadcastCondition(&(_threadpool)->jobfinished) );
211 
212  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
213  }
214 }
215 
216 /** this function controls the execution of each of the threads */
217 static
219  void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
220  )
221 {
222  return (int) threadPoolThreadRetcode(threadnum);
223 }
224 
225 /** creates a threadpool */
226 static
228  SCIP_THREADPOOL** thrdpool, /**< pointer to store threadpool */
229  int nthreads, /**< number of threads in the threadpool */
230  int qsize, /**< maximum size of the jobqueue */
231  SCIP_Bool blockwhenfull /**< should the jobqueue block if it is full */
232  )
233 {
234  uintptr_t i;
235 
236  assert(nthreads >= 0);
237  assert(qsize >= 0);
238 
239  /* @todo think about the correct memory here */
240  SCIP_ALLOC( BMSallocMemory(thrdpool) );
241  (*thrdpool)->currentid = 0;
242  (*thrdpool)->queuesize = qsize;
243  (*thrdpool)->nthreads = nthreads;
244  (*thrdpool)->blockwhenfull = blockwhenfull;
245  (*thrdpool)->shutdown = FALSE;
246  (*thrdpool)->queueopen = TRUE;
247 
248  /* allocating memory for the job queue */
249  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->jobqueue) );
250  (*thrdpool)->jobqueue->firstjob = NULL;
251  (*thrdpool)->jobqueue->lastjob = NULL;
252  (*thrdpool)->jobqueue->njobs = 0;
253 
254  /* allocating memory for the job queue */
255  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->currentjobs) );
256  (*thrdpool)->currentjobs->firstjob = NULL;
257  (*thrdpool)->currentjobs->lastjob = NULL;
258  (*thrdpool)->currentjobs->njobs = 0;
259 
260  /* allocating memory for the job queue */
261  SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->finishedjobs) );
262  (*thrdpool)->finishedjobs->firstjob = NULL;
263  (*thrdpool)->finishedjobs->lastjob = NULL;
264  (*thrdpool)->finishedjobs->njobs = 0;
265 
266  /* initialising the mutex */
267  SCIP_CALL( SCIPtpiInitLock(&(*thrdpool)->poollock) ); /*lint !e2482*/
268 
269  /* initialising the conditions */
270  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->queuenotempty) );
271  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->queuenotfull) );
272  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->queueempty) );
273  SCIP_CALL( SCIPtpiInitCondition(&(*thrdpool)->jobfinished) );
274 
275  /* creating the threads */
276  (*thrdpool)->currworkingthreads = 0;
277 
278  /* allocating memory for the threads */
279  SCIP_ALLOC( BMSallocMemoryArray(&((*thrdpool)->threads), nthreads) );
280 
281  /* create the threads */
282  for( i = 0; i < (unsigned)nthreads; i++ )
283  {
284  if( thrd_create(&((*thrdpool)->threads[i]), threadPoolThread, (void*)i) != thrd_success )
285  return SCIP_ERROR;
286  }
287 
288  _threadnumber = nthreads;
289  /* halt while all threads are not active TODO: is synchronization required here ? */
290  /*TODO: this caused a deadlock, is it important to wait for all threads to start?
291  * while( (*thrdpool)->currworkingthreads != nthreads )
292  {}*/
293 
294  return SCIP_OKAY;
295 }
296 
297 /** adding a job to the job queue.
298  *
299  * This gives some more flexibility in the handling of new jobs.
300  * This function needs to be called from within a mutex.
301  */
302 static
304  SCIP_THREADPOOL* threadpool, /**< pointer to store threadpool */
305  SCIP_JOB* newjob /**< pointer to new job */
306  )
307 {
308  /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
309  /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
310  assert(threadpool->jobqueue->njobs < threadpool->queuesize);
311 
312  newjob->nextjob = NULL;
313 
314  /* checking the status of the job queue */
315  if( threadpool->jobqueue->njobs == 0 )
316  {
317  threadpool->jobqueue->firstjob = newjob;
318  threadpool->jobqueue->lastjob = newjob;
319  }
320  else /* it is assumed that the jobqueue is not full */
321  {
322  threadpool->jobqueue->lastjob->nextjob = newjob;
323  threadpool->jobqueue->lastjob = newjob;
324  }
325 
326  /* signalling to all threads that the queue has jobs using the signal instead of broadcast because only one thread
327  * should be awakened */
329 
330  threadpool->jobqueue->njobs++;
331 }
332 
333 /** adds a job to the threadpool */
334 static
336  SCIP_JOB* newjob, /**< job to add to threadpool */
337  SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
338  )
339 {
340  assert(newjob != NULL);
341  assert(_threadpool != NULL);
342 
343  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
344 
345  /* if the queue is full and we are blocking, then return an error. */
346  if( _threadpool->jobqueue->njobs == _threadpool->queuesize && _threadpool->blockwhenfull )
347  {
348  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
349  *status = SCIP_SUBMIT_QUEUEFULL;
350  return SCIP_OKAY;
351  }
352 
353  /* Wait until the job queue is not full. If the queue is closed or the thread pool is shut down, then stop waiting. */
354  /* @todo this needs to be checked. It is possible that a job can be submitted and then the queue is closed or the
355  * thread pool is shut down. Need to work out the best way to handle this. */
356  while( _threadpool->jobqueue->njobs == _threadpool->queuesize && !(_threadpool->shutdown || !_threadpool->queueopen) )
357  {
358  SCIP_CALL( SCIPtpiWaitCondition(&(_threadpool->queuenotfull), &(_threadpool->poollock)) );
359  }
360 
361  /* if the thread pool is shut down or the queue is closed, then we need to leave the job submission */
362  if( !_threadpool->queueopen )
363  {
364  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
365  *status = SCIP_SUBMIT_QUEUECLOSED;
366  return SCIP_OKAY;
367  }
368  else if( _threadpool->shutdown )
369  {
370  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
371  *status = SCIP_SUBMIT_SHUTDOWN;
372  return SCIP_OKAY;
373  }
374 
375  /* creating the job for submission */
376  newjob->nextjob = NULL;
377 
378  /* adding the job to the queue */
379  /* this can only happen if the queue is not full */
380  assert(_threadpool->jobqueue->njobs != _threadpool->queuesize);
381  jobQueueAddJob(_threadpool, newjob);
382 
383  SCIP_CALL( SCIPtpiReleaseLock(&(_threadpool->poollock)) );
384 
385  *status = SCIP_SUBMIT_SUCCESS;
386 
387  return SCIP_OKAY;
388 }
389 
390 /** frees the jobqueue of the threadpool */
391 static
393  SCIP_THREADPOOL* thrdpool /**< pointer to thread pool */
394  )
395 {
396  SCIP_JOB* currjob;
397 
398  assert(!thrdpool->queueopen);
399  assert(thrdpool->shutdown);
400 
401  /* iterating through all jobs until all have been freed */
402  while( thrdpool->jobqueue->firstjob != NULL )
403  {
404  currjob = thrdpool->jobqueue->firstjob->nextjob;
405  thrdpool->jobqueue->firstjob = thrdpool->jobqueue->firstjob->nextjob;
406  BMSfreeMemory(&currjob);
407  }
408 
409  assert(thrdpool->jobqueue->firstjob == NULL);
410  assert(thrdpool->jobqueue->lastjob == NULL);
411 
412  BMSfreeMemory(&thrdpool->jobqueue);
413 }
414 
415 /** free the thread pool */
416 static
418  SCIP_THREADPOOL** thrdpool, /**< pointer to thread pool */
419  SCIP_Bool finishjobs, /**< currently unused */
420  SCIP_Bool completequeue /**< Wait until the queue has complete? */
421  )
422 {
423  int i;
425 
426  /*TODO remove argument? */
427  SCIP_UNUSED( finishjobs );
428 
429  SCIP_CALL( SCIPtpiAcquireLock(&((*thrdpool)->poollock)) );
430 
431  /* if the shutdown is already in progress, then we don't need to complete this function */
432  if( !(*thrdpool)->queueopen || (*thrdpool)->shutdown )
433  {
434  SCIP_CALL( SCIPtpiReleaseLock(&((*thrdpool)->poollock)) );
435 
436  return SCIP_OKAY;
437  }
438 
439  /* indicating that the job queue is now closed for new jobs */
440  (*thrdpool)->queueopen = FALSE;
441 
442  /* if the jobs in the queue should be completed, then we wait until the queueempty condition is set */
443  if( completequeue )
444  {
445  while( (*thrdpool)->jobqueue->njobs > 0 )
446  {
447  SCIP_CALL( SCIPtpiWaitCondition(&((*thrdpool)->queueempty), &((*thrdpool)->poollock)) );
448  }
449  }
450 
451  /* indicating that the tpi has commenced the shutdown process */
452  (*thrdpool)->shutdown = TRUE;
453 
454  SCIP_CALL( SCIPtpiReleaseLock(&((*thrdpool)->poollock)) );
455 
456  /* waking up all threads so that they can check the shutdown condition;
457  * this requires that the conditions queuenotempty and queuenotfull is broadcast
458  */
459  SCIP_CALL( SCIPtpiBroadcastCondition(&((*thrdpool)->queuenotempty)) );
460  SCIP_CALL( SCIPtpiBroadcastCondition(&((*thrdpool)->queuenotfull)) );
461 
462  retcode = SCIP_OKAY;
463 
464  /* calling a join to ensure that all worker finish before the thread pool is closed */
465  for( i = 0; i < (*thrdpool)->nthreads; i++ )
466  {
467  int thrdretcode;
468 
469  if( thrd_join((*thrdpool)->threads[i], &thrdretcode) != thrd_success )
470  retcode = (SCIP_RETCODE) MIN((int)SCIP_ERROR, (int)retcode);
471  else
472  retcode = (SCIP_RETCODE) MIN(thrdretcode, (int)retcode);
473  }
474 
475  /* freeing memory and data structures */
476  BMSfreeMemoryArray(&(*thrdpool)->threads);
477 
478  /* Freeing the current jobs list. This assumes that all jobs complete before the tpi is closed. */
479  assert((*thrdpool)->currentjobs->njobs == 0);
480  BMSfreeMemory(&(*thrdpool)->currentjobs);
481  assert((*thrdpool)->finishedjobs->njobs == 0);
482  BMSfreeMemory(&(*thrdpool)->finishedjobs);
483 
484  freeJobQueue(*thrdpool);
485 
486  /* destroying the conditions */
487  SCIPtpiDestroyCondition(&(*thrdpool)->jobfinished);
488  SCIPtpiDestroyCondition(&(*thrdpool)->queueempty);
489  SCIPtpiDestroyCondition(&(*thrdpool)->queuenotfull);
490  SCIPtpiDestroyCondition(&(*thrdpool)->queuenotempty);
491 
492  /* destroying the mutex */
493  SCIPtpiDestroyLock(&(*thrdpool)->poollock);
494 
495  BMSfreeMemory(thrdpool);
496 
497  return retcode;
498 }
499 
500 
501 /* checking a job queue */
502 static
504  SCIP_JOBQUEUE* jobqueue, /**< pointer to the job queue */
505  int jobid /**< id of job to check */
506  )
507 {
508  SCIP_JOB* currjob = jobqueue->firstjob;
509 
510  /* checking the job ids */
511  if( currjob != NULL )
512  {
513  while( currjob != jobqueue->lastjob )
514  {
515  if( currjob->jobid == jobid )
516  return SCIP_JOB_INQUEUE;
517 
518  currjob = currjob->nextjob;
519  }
520 
521  if( currjob->jobid == jobid )
522  return SCIP_JOB_INQUEUE;
523  }
524 
525  return SCIP_JOB_DOESNOTEXIST;
526 }
527 
528 /** returns whether the job id is running */
529 static
531  SCIP_JOBQUEUE* currentjobs, /**< queue of current jobs */
532  int jobid /**< id of job to check */
533  )
534 {
535  if( checkJobQueue(currentjobs, jobid) == SCIP_JOB_INQUEUE )
536  return TRUE;
537  else
538  return FALSE;
539 }
540 
541 /** returns the number of threads */
543  void
544  )
545 {
546  return _threadpool->nthreads;
547 }
548 
549 /** initializes tpi */
551  int nthreads, /**< the number of threads to be used */
552  int queuesize, /**< the size of the queue */
553  SCIP_Bool blockwhenfull /**< should the queue block when full */
554  )
555 {
556  assert(_threadpool == NULL);
557  SCIP_CALL( createThreadPool(&_threadpool, nthreads, queuesize, blockwhenfull) );
558  return SCIP_OKAY;
559 }
560 
561 /** deinitializes tpi */
563  void
564  )
565 {
566  assert(_threadpool != NULL);
567 
568  SCIP_CALL( freeThreadPool(&_threadpool, TRUE, TRUE) );
569 
570  return SCIP_OKAY;
571 }
572 
573 /** creates a job for parallel processing */
575  SCIP_JOB** job, /**< pointer to the job that will be created */
576  int jobid, /**< the id for the current job */
577  SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
578  void* jobarg /**< the job's argument */
579  )
580 {
581  SCIP_ALLOC( BMSallocMemory(job) );
582 
583  (*job)->jobid = jobid;
584  (*job)->jobfunc = jobfunc;
585  (*job)->args = jobarg;
586  (*job)->nextjob = NULL;
587 
588  return SCIP_OKAY;
589 }
590 
591 /** get a new job id for the new set of submitted jobs */
593  void
594  )
595 {
596  int id;
597  assert(_threadpool != NULL);
598 
599  SCIP_CALL_ABORT( SCIPtpiAcquireLock(&_threadpool->poollock) );
600  id = ++_threadpool->currentid;
601  SCIP_CALL_ABORT( SCIPtpiReleaseLock(&_threadpool->poollock) );
602 
603  return id;
604 }
605 
606 /** submit a job for parallel processing; the return value is a globally defined status */
608  SCIP_JOB* job, /**< pointer to the job to be submitted */
609  SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
610  )
611 {
612  assert(job != NULL);
613 
614  /* the job id must be set before submitting the job. The submitter controls whether a new id is required. */
615  assert(job->jobid == _threadpool->currentid);
616  SCIP_CALL( threadPoolAddWork(job, status) );
617 
618  return SCIP_OKAY;
619 }
620 
621 /** blocks until all jobs of the given jobid have finished
622  * and then returns the smallest SCIP_RETCODE of all the jobs
623  */
625  int jobid /**< the jobid of the jobs to wait for */
626  )
627 {
629  SCIP_JOB* currjob;
630  SCIP_JOB* prevjob;
631 
632  SCIP_CALL( SCIPtpiAcquireLock(&(_threadpool->poollock)) );
633 
634  while( isJobRunning(_threadpool->currentjobs, jobid) || isJobRunning(_threadpool->jobqueue, jobid) )
635  {
636  SCIP_CALL( SCIPtpiWaitCondition(&_threadpool->jobfinished, &_threadpool->poollock) );
637  }
638 
639  /* finding the location of the processed job in the currentjobs queue */
640  retcode = SCIP_OKAY;
641  currjob = _threadpool->finishedjobs->firstjob;
642  prevjob = NULL;
643 
644  while( currjob )
645  {
646  if( currjob->jobid == jobid )
647  {
648  SCIP_JOB* nextjob;
649 
650  /* if the job has the right jobid collect its retcode,
651  * remove it from the finished job list, and free it
652  */
653  retcode = MIN(retcode, currjob->retcode);
654 
655  /* removing the finished job from finished jobs list */
656  if( currjob == _threadpool->finishedjobs->firstjob )
657  {
658  _threadpool->finishedjobs->firstjob = currjob->nextjob;
659  }
660  else
661  {
662  assert(prevjob != NULL);
663  prevjob->nextjob = currjob->nextjob;
664  }
665 
666  if( currjob == _threadpool->finishedjobs->lastjob )
667  _threadpool->finishedjobs->lastjob = prevjob;
668 
669  _threadpool->finishedjobs->njobs--;
670 
671  /* update currjob and free finished job; prevjob stays the same */
672  nextjob = currjob->nextjob;
673  BMSfreeMemory(&currjob);
674  currjob = nextjob;
675  }
676  else
677  {
678  /* otherwise leave job untouched */
679  prevjob = currjob;
680  currjob = prevjob->nextjob;
681  }
682  }
683 
684  SCIP_CALL( SCIPtpiReleaseLock(&_threadpool->poollock) );
685 
686  return retcode;
687 }
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_openmp.c:304
static int threadPoolThread(void *threadnum)
Definition: tpi_tnycthrd.c:218
SCIP_CONDITION queuenotempty
Definition: tpi_tnycthrd.c:83
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:54
SCIP_JOB * firstjob
Definition: tpi_openmp.c:50
#define FALSE
Definition: def.h:96
enum SCIP_Jobstatus SCIP_JOBSTATUS
Definition: type_tpi.h:69
void * args
Definition: tpi_openmp.c:43
static SCIP_JOBSTATUS checkJobQueue(SCIP_JOBQUEUE *jobqueue, int jobid)
Definition: tpi_tnycthrd.c:503
#define TRUE
Definition: def.h:95
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:63
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:289
SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
#define SCIP_UNUSED(x)
Definition: def.h:448
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:125
SCIP_RETCODE SCIPtpiInitCondition(SCIP_LOCK *lock)
SCIP_JOB * lastjob
Definition: tpi_openmp.c:51
SCIP_JOBQUEUE * finishedjobs
Definition: tpi_tnycthrd.c:72
static SCIP_RETCODE createThreadPool(SCIP_THREADPOOL **thrdpool, int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:227
#define BMSfreeMemory(ptr)
Definition: memory.h:147
SCIP_JOBQUEUE * jobqueue
Definition: tpi_tnycthrd.c:69
thrd_t * threads
Definition: tpi_tnycthrd.c:68
SCIP_CONDITION queueempty
Definition: tpi_tnycthrd.c:85
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_tnycthrd.c:574
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:550
SCIP_JOBQUEUE * currentjobs
Definition: tpi_tnycthrd.c:70
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:149
SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
the type definitions for the SCIP parallel interface
SCIP_Bool blockwhenfull
Definition: tpi_tnycthrd.c:74
SCIP_RETCODE retcode
Definition: tpi_openmp.c:44
SCIP_Bool queueopen
Definition: tpi_tnycthrd.c:79
static SCIP_RETCODE threadPoolThreadRetcode(void *threadnum)
Definition: tpi_tnycthrd.c:91
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:41
SCIP_CONDITION queuenotfull
Definition: tpi_tnycthrd.c:84
#define NULL
Definition: lpi_spx1.cpp:164
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_tnycthrd.c:562
#define SCIP_CALL(x)
Definition: def.h:394
int jobid
Definition: tpi_openmp.c:40
int SCIPtpiGetNumThreads(void)
Definition: tpi_tnycthrd.c:542
#define SCIP_Bool
Definition: def.h:93
SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK *lock)
SCIP_LOCK poollock
Definition: tpi_tnycthrd.c:82
SCIP_Bool shutdown
Definition: tpi_tnycthrd.c:78
void SCIPtpiDestroyLock(SCIP_LOCK *lock)
void SCIPtpiDestroyCondition(SCIP_LOCK *lock)
static SCIP_RETCODE threadPoolAddWork(SCIP_JOB *newjob, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:335
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_openmp.c:271
static SCIP_RETCODE freeThreadPool(SCIP_THREADPOOL **thrdpool, SCIP_Bool finishjobs, SCIP_Bool completequeue)
Definition: tpi_tnycthrd.c:417
static void freeJobQueue(SCIP_THREADPOOL *thrdpool)
Definition: tpi_tnycthrd.c:392
#define BMSallocMemory(ptr)
Definition: memory.h:120
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:42
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_tnycthrd.c:624
#define SCIP_CALL_ABORT(x)
Definition: def.h:373
SCIP_RETCODE SCIPtpiSumbitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:607
#define SCIP_ALLOC(x)
Definition: def.h:405
static SCIP_Bool isJobRunning(SCIP_JOBQUEUE *currentjobs, int jobid)
Definition: tpi_tnycthrd.c:530
int SCIPtpiGetNewJobID(void)
Definition: tpi_tnycthrd.c:592
static void jobQueueAddJob(SCIP_THREADPOOL *threadpool, SCIP_JOB *newjob)
Definition: tpi_tnycthrd.c:303
SCIP_CONDITION jobfinished
Definition: tpi_tnycthrd.c:86
memory allocation routines