Scippy

SCIP

Solving Constraint Integer Programs

tpi_tnycthrd.c
Go to the documentation of this file.
1/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
2/* */
3/* This file is part of the program and library */
4/* SCIP --- Solving Constraint Integer Programs */
5/* */
6/* Copyright (c) 2002-2024 Zuse Institute Berlin (ZIB) */
7/* */
8/* Licensed under the Apache License, Version 2.0 (the "License"); */
9/* you may not use this file except in compliance with the License. */
10/* You may obtain a copy of the License at */
11/* */
12/* http://www.apache.org/licenses/LICENSE-2.0 */
13/* */
14/* Unless required by applicable law or agreed to in writing, software */
15/* distributed under the License is distributed on an "AS IS" BASIS, */
16/* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */
17/* See the License for the specific language governing permissions and */
18/* limitations under the License. */
19/* */
20/* You should have received a copy of the Apache-2.0 license */
21/* along with SCIP; see the file LICENSE. If not visit scipopt.org. */
22/* */
23/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
24
25/**@file tpi_tnycthrd.c
26 * @ingroup TASKINTERFACE
27 * @brief a TPI implementation using tinycthreads
28 * @author Stephen J. Maher
29 * @author Leona Gottwald
30 * @author Marc Pfetsch
31 */
32
33/*---+----1----+----2----+----3----+----4----+----5----+----6----+----7----+----8----+----9----+----0----+----1----+----2*/
34
35#include "tpi/tpi.h"
37#include "tinycthread/tinycthread.h"
38#include "scip/pub_message.h"
39
40/* macros for direct access */
41
42/* lock */
43#define SCIPtnyInitLock(lock) ( mtx_init((lock), mtx_plain) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
44#define SCIPtnyDestroyLock(lock) ( mtx_destroy(lock) )
45#define SCIPtnyAcquireLock(lock) ( mtx_lock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
46#define SCIPtnyReleaseLock(lock) ( mtx_unlock(lock) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
47
48/* condition */
49#define SCIPtnyInitCondition(condition) ( cnd_init(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
50#define SCIPtnyDestroyCondition(condition) ( cnd_destroy(condition) )
51#define SCIPtnySignalCondition(condition) ( cnd_signal(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
52#define SCIPtnyBroadcastCondition(condition) ( cnd_broadcast(condition) == thrd_success ? SCIP_OKAY : SCIP_ERROR )
53#define SCIPtnyWaitCondition(condition, lock) ( cnd_wait((condition), (lock)) == thrd_success ? SCIP_OKAY: SCIP_ERROR )
54
55/** struct containing lock */
56struct SCIP_Lock
57{
58 mtx_t lock;
59};
60
61/** struct containing condition */
62struct SCIP_Condition
63{
64 cnd_t condition;
65};
66
67
69static SCIP_THREADPOOL* _threadpool = NULL;
70_Thread_local int _threadnumber; /*lint !e129*/
71
72/** A job added to the queue */
73struct SCIP_Job
74{
75 int jobid; /**< id to identify jobs from a common process */
76 struct SCIP_Job* nextjob; /**< pointer to the next job in the queue */
77 SCIP_RETCODE (*jobfunc)(void* args);/**< pointer to the job function */
78 void* args; /**< pointer to the function arguments */
79 SCIP_RETCODE retcode; /**< return code of the job */
80};
81
82/** the thread pool job queue */
83struct SCIP_JobQueue
84{
85 SCIP_JOB* firstjob; /**< pointer to the first job in the queue */
86 SCIP_JOB* lastjob; /**< pointer to the last job in the queue */
87 int njobs; /**< number of jobs in the queue */
88};
90
91/** The thread pool */
93{
94 /* Pool Characteristics */
95 int nthreads; /**< number of threads in the pool */
96 int queuesize; /**< the total number of items to enter the queue */
97
98 /* Current pool state */
99 thrd_t* threads; /**< the threads included in the pool */
100 SCIP_JOBQUEUE* jobqueue; /**< the job queue */
101 SCIP_JOBQUEUE* currentjobs; /**< the jobs currently being processed on a thread;
102 * only a single job is allowed per thread. */
103 SCIP_JOBQUEUE* finishedjobs; /**< finished jobs that are not yet collected */
104 int currworkingthreads; /**< the threads currently processing jobs */
105 SCIP_Bool blockwhenfull; /**< indicates that the queue can only be as large as nthreads */
106 int currentid; /**< current job id */
107
108 /* Control indicators */
109 SCIP_Bool shutdown; /**< indicates whether the pool needs to be shut down */
110 SCIP_Bool queueopen; /**< indicates whether the queue is open */
111
112 /* mutex and locks for the thread pool */
113 mtx_t poollock; /**< mutex to allow read and write of the pool features */
114 cnd_t queuenotempty; /**< condition to broadcast the queue has jobs */
115 cnd_t queuenotfull; /**< condition to broadcast the queue is not full */
116 cnd_t queueempty; /**< condition to broadcast that the queue is empty */
117 cnd_t jobfinished; /**< condition to broadcast that a job has been finished */
118};
119
120/** this function controls the execution of each of the threads */
121static
123 void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
124 )
125{
126 SCIP_JOB* newjob;
127 SCIP_JOB* prevjob;
128 SCIP_JOB* currjob;
129
130 _threadnumber = (int)(uintptr_t) threadnum;
131
132 /* Increase the number of active threads */
133 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
134 _threadpool->currworkingthreads += 1;
135 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
136
137 /* this is an endless loop that runs until the thrd_exit function is called */
138 while( TRUE ) /*lint !e716*/
139 {
140 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
141
142 /* the queue is empty but the shutdown command has not been given */
143 while( _threadpool->jobqueue->njobs == 0 && !_threadpool->shutdown )
144 {
145 SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotempty), &(_threadpool->poollock)) );
146 }
147
148 /* if the shutdown command has been given, then exit the thread */
149 if( _threadpool->shutdown )
150 {
151 /* Decrease the thread count when execution of job queue has completed */
152 _threadpool->currworkingthreads -= 1;
153 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
154
155 thrd_exit((int)SCIP_OKAY);
156 }
157
158 /* getting the next job in the queue */
159 newjob = _threadpool->jobqueue->firstjob;
160 _threadpool->jobqueue->njobs--; /* decreasing the number of jobs in the queue */
161
162 if( _threadpool->jobqueue->njobs == 0 )
163 {
164 _threadpool->jobqueue->firstjob = NULL;
165 _threadpool->jobqueue->lastjob = NULL;
166 }
167 else
168 _threadpool->jobqueue->firstjob = newjob->nextjob; /* updating the queue */
169
170 /* if we want to wait when the queue is full, then we broadcast that the queue can now take new jobs */
171 if( _threadpool->blockwhenfull &&
172 _threadpool->jobqueue->njobs == _threadpool->queuesize - 1 )
173 {
175 }
176
177 /* indicating that the queue is empty */
178 if( _threadpool->jobqueue->njobs == 0 )
179 {
181 }
182
183 /* updating the current job list */
184 if( _threadpool->currentjobs->njobs == 0 )
185 {
186 _threadpool->currentjobs->firstjob = newjob;
187 _threadpool->currentjobs->lastjob = newjob;
188 }
189 else
190 {
191 _threadpool->currentjobs->lastjob->nextjob = newjob;
192 _threadpool->currentjobs->lastjob = newjob;
193 }
194
195 _threadpool->currentjobs->njobs++;
196
197 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
198
199 /* setting the job to run on this thread */
200 newjob->retcode = (*(newjob->jobfunc))(newjob->args);
201
202 /* setting the current job on this thread to NULL */
203 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
204
205 /* finding the location of the processed job in the currentjobs queue */
206 currjob = _threadpool->currentjobs->firstjob;
207 prevjob = NULL;
208
209 while( currjob != newjob )
210 {
211 prevjob = currjob;
212 currjob = prevjob->nextjob;
213 }
214
215 /* removing the processed job from current jobs list */
216 if( currjob == _threadpool->currentjobs->firstjob )
217 _threadpool->currentjobs->firstjob = currjob->nextjob;
218 else
219 prevjob->nextjob = currjob->nextjob; /*lint !e794*/
220
221 if( currjob == _threadpool->currentjobs->lastjob )
222 _threadpool->currentjobs->lastjob = prevjob;
223
224 _threadpool->currentjobs->njobs--;
225
226 /* updating the finished job list */
227 if( _threadpool->finishedjobs->njobs == 0 )
228 {
229 _threadpool->finishedjobs->firstjob = newjob;
230 _threadpool->finishedjobs->lastjob = newjob;
231 }
232 else
233 {
234 _threadpool->finishedjobs->lastjob->nextjob = newjob;
235 _threadpool->finishedjobs->lastjob = newjob;
236 }
237
238 _threadpool->finishedjobs->njobs++;
239
240 /* signalling that a job has been finished */
241 SCIP_CALL( SCIPtnyBroadcastCondition(&(_threadpool)->jobfinished) );
242
243 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
244 }
245}
246
247/** this function controls the execution of each of the threads */
248static
250 void* threadnum /**< thread number is passed in as argument stored inside a void pointer */
251 )
252{
253 return (int) threadPoolThreadRetcode(threadnum);
254}
255
256/** creates a threadpool */
257static
259 SCIP_THREADPOOL** thrdpool, /**< pointer to store threadpool */
260 int nthreads, /**< number of threads in the threadpool */
261 int qsize, /**< maximum size of the jobqueue */
262 SCIP_Bool blockwhenfull /**< should the jobqueue block if it is full */
263 )
264{
265 uintptr_t i;
266
267 assert(nthreads >= 0);
268 assert(qsize >= 0);
269
270 /* @todo think about the correct memory here */
271 SCIP_ALLOC( BMSallocMemory(thrdpool) );
272 (*thrdpool)->currentid = 0;
273 (*thrdpool)->queuesize = qsize;
274 (*thrdpool)->nthreads = nthreads;
275 (*thrdpool)->blockwhenfull = blockwhenfull;
276 (*thrdpool)->shutdown = FALSE;
277 (*thrdpool)->queueopen = TRUE;
278
279 /* allocating memory for the job queue */
280 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->jobqueue) );
281 (*thrdpool)->jobqueue->firstjob = NULL;
282 (*thrdpool)->jobqueue->lastjob = NULL;
283 (*thrdpool)->jobqueue->njobs = 0;
284
285 /* allocating memory for the job queue */
286 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->currentjobs) );
287 (*thrdpool)->currentjobs->firstjob = NULL;
288 (*thrdpool)->currentjobs->lastjob = NULL;
289 (*thrdpool)->currentjobs->njobs = 0;
290
291 /* allocating memory for the job queue */
292 SCIP_ALLOC( BMSallocMemory(&(*thrdpool)->finishedjobs) );
293 (*thrdpool)->finishedjobs->firstjob = NULL;
294 (*thrdpool)->finishedjobs->lastjob = NULL;
295 (*thrdpool)->finishedjobs->njobs = 0;
296
297 /* initialising the mutex */
298 SCIP_CALL( SCIPtnyInitLock(&(*thrdpool)->poollock) ); /*lint !e2482*/
299
300 /* initialising the conditions */
301 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotempty) );
302 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queuenotfull) );
303 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->queueempty) );
304 SCIP_CALL( SCIPtnyInitCondition(&(*thrdpool)->jobfinished) );
305
306 /* creating the threads */
307 (*thrdpool)->currworkingthreads = 0;
308
309 /* allocating memory for the threads */
310 SCIP_ALLOC( BMSallocMemoryArray(&((*thrdpool)->threads), nthreads) );
311
312 /* create the threads */
313 for( i = 0; i < (unsigned)nthreads; i++ )
314 {
315 if( thrd_create(&((*thrdpool)->threads[i]), threadPoolThread, (void*)i) != thrd_success )
316 return SCIP_ERROR;
317 }
318
319 _threadnumber = nthreads;
320 /* halt while all threads are not active TODO: is synchronization required here ? */
321 /*TODO: this caused a deadlock, is it important to wait for all threads to start?
322 * while( (*thrdpool)->currworkingthreads != nthreads )
323 {}*/
324
325 return SCIP_OKAY;
326}
327
328/** adding a job to the job queue.
329 *
330 * This gives some more flexibility in the handling of new jobs.
331 * This function needs to be called from within a mutex.
332 */
333static
335 SCIP_THREADPOOL* threadpool, /**< pointer to store threadpool */
336 SCIP_JOB* newjob /**< pointer to new job */
337 )
338{
339 /* @todo we want to work out what to do with a full job queue. Is there a problem if the limit is hit? */
340 /* @note it is important to have a queuesize. This will stop the code submitting infinitely many jobs. */
341 assert(threadpool->jobqueue->njobs < threadpool->queuesize);
342
343 newjob->nextjob = NULL;
344
345 /* checking the status of the job queue */
346 if( threadpool->jobqueue->njobs == 0 )
347 {
348 threadpool->jobqueue->firstjob = newjob;
349 threadpool->jobqueue->lastjob = newjob;
350 }
351 else /* it is assumed that the jobqueue is not full */
352 {
353 threadpool->jobqueue->lastjob->nextjob = newjob;
354 threadpool->jobqueue->lastjob = newjob;
355 }
356
357 /* signalling to all threads that the queue has jobs using the signal instead of broadcast because only one thread
358 * should be awakened */
360
361 threadpool->jobqueue->njobs++;
362}
363
364/** adds a job to the threadpool */
365static
367 SCIP_JOB* newjob, /**< job to add to threadpool */
368 SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
369 )
370{
371 assert(newjob != NULL);
372 assert(_threadpool != NULL);
373
374 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
375
376 /* if the queue is full and we are blocking, then return an error. */
377 if( _threadpool->jobqueue->njobs == _threadpool->queuesize && _threadpool->blockwhenfull )
378 {
379 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
380 *status = SCIP_SUBMIT_QUEUEFULL;
381 return SCIP_OKAY;
382 }
383
384 /* Wait until the job queue is not full. If the queue is closed or the thread pool is shut down, then stop waiting. */
385 /* @todo this needs to be checked. It is possible that a job can be submitted and then the queue is closed or the
386 * thread pool is shut down. Need to work out the best way to handle this. */
387 while( _threadpool->jobqueue->njobs == _threadpool->queuesize && !(_threadpool->shutdown || !_threadpool->queueopen) )
388 {
389 SCIP_CALL( SCIPtnyWaitCondition(&(_threadpool->queuenotfull), &(_threadpool->poollock)) );
390 }
391
392 /* if the thread pool is shut down or the queue is closed, then we need to leave the job submission */
393 if( !_threadpool->queueopen )
394 {
395 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
396 *status = SCIP_SUBMIT_QUEUECLOSED;
397 return SCIP_OKAY;
398 }
399 else if( _threadpool->shutdown )
400 {
401 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
402 *status = SCIP_SUBMIT_SHUTDOWN;
403 return SCIP_OKAY;
404 }
405
406 /* creating the job for submission */
407 newjob->nextjob = NULL;
408
409 /* adding the job to the queue */
410 /* this can only happen if the queue is not full */
411 assert(_threadpool->jobqueue->njobs != _threadpool->queuesize);
412 jobQueueAddJob(_threadpool, newjob);
413
414 SCIP_CALL( SCIPtnyReleaseLock(&(_threadpool->poollock)) );
415
416 *status = SCIP_SUBMIT_SUCCESS;
417
418 return SCIP_OKAY;
419}
420
421/** frees the jobqueue of the threadpool */
422static
424 SCIP_THREADPOOL* thrdpool /**< pointer to thread pool */
425 )
426{
427 SCIP_JOB* currjob;
428
429 assert(!thrdpool->queueopen);
430 assert(thrdpool->shutdown);
431
432 /* iterating through all jobs until all have been freed */
433 while( thrdpool->jobqueue->firstjob != NULL )
434 {
435 currjob = thrdpool->jobqueue->firstjob->nextjob;
436 thrdpool->jobqueue->firstjob = thrdpool->jobqueue->firstjob->nextjob;
437 BMSfreeMemory(&currjob);
438 }
439
440 assert(thrdpool->jobqueue->firstjob == NULL);
441 assert(thrdpool->jobqueue->lastjob == NULL);
442
443 BMSfreeMemory(&thrdpool->jobqueue);
444}
445
446/** free the thread pool */
447static
449 SCIP_THREADPOOL** thrdpool, /**< pointer to thread pool */
450 SCIP_Bool finishjobs, /**< currently unused */
451 SCIP_Bool completequeue /**< Wait until the queue has complete? */
452 )
453{
454 int i;
455 SCIP_RETCODE retcode;
456
457 /*TODO remove argument? */
458 SCIP_UNUSED( finishjobs );
459
460 SCIP_CALL( SCIPtnyAcquireLock(&((*thrdpool)->poollock)) );
461
462 /* if the shutdown is already in progress, then we don't need to complete this function */
463 if( !(*thrdpool)->queueopen || (*thrdpool)->shutdown )
464 {
465 SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
466
467 return SCIP_OKAY;
468 }
469
470 /* indicating that the job queue is now closed for new jobs */
471 (*thrdpool)->queueopen = FALSE;
472
473 /* if the jobs in the queue should be completed, then we wait until the queueempty condition is set */
474 if( completequeue )
475 {
476 while( (*thrdpool)->jobqueue->njobs > 0 )
477 {
478 SCIP_CALL( SCIPtnyWaitCondition(&((*thrdpool)->queueempty), &((*thrdpool)->poollock)) );
479 }
480 }
481
482 /* indicating that the tpi has commenced the shutdown process */
483 (*thrdpool)->shutdown = TRUE;
484
485 SCIP_CALL( SCIPtnyReleaseLock(&((*thrdpool)->poollock)) );
486
487 /* waking up all threads so that they can check the shutdown condition;
488 * this requires that the conditions queuenotempty and queuenotfull is broadcast
489 */
490 SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotempty)) );
491 SCIP_CALL( SCIPtnyBroadcastCondition(&((*thrdpool)->queuenotfull)) );
492
493 retcode = SCIP_OKAY;
494
495 /* calling a join to ensure that all worker finish before the thread pool is closed */
496 for( i = 0; i < (*thrdpool)->nthreads; i++ )
497 {
498 int thrdretcode;
499
500 if( thrd_join((*thrdpool)->threads[i], &thrdretcode) != thrd_success )
501 retcode = (SCIP_RETCODE) MIN((int)SCIP_ERROR, (int)retcode);
502 else
503 retcode = (SCIP_RETCODE) MIN(thrdretcode, (int)retcode);
504 }
505
506 /* freeing memory and data structures */
507 BMSfreeMemoryArray(&(*thrdpool)->threads);
508
509 /* Freeing the current jobs list. This assumes that all jobs complete before the tpi is closed. */
510 assert((*thrdpool)->currentjobs->njobs == 0);
511 BMSfreeMemory(&(*thrdpool)->currentjobs);
512 assert((*thrdpool)->finishedjobs->njobs == 0);
513 BMSfreeMemory(&(*thrdpool)->finishedjobs);
514
515 freeJobQueue(*thrdpool);
516
517 /* destroying the conditions */
518 SCIPtnyDestroyCondition(&(*thrdpool)->jobfinished);
519 SCIPtnyDestroyCondition(&(*thrdpool)->queueempty);
520 SCIPtnyDestroyCondition(&(*thrdpool)->queuenotfull);
521 SCIPtnyDestroyCondition(&(*thrdpool)->queuenotempty);
522
523 /* destroying the mutex */
524 SCIPtnyDestroyLock(&(*thrdpool)->poollock);
525
526 BMSfreeMemory(thrdpool);
527
528 return retcode;
529}
530
531
532/* checking a job queue */
533static
535 SCIP_JOBQUEUE* jobqueue, /**< pointer to the job queue */
536 int jobid /**< id of job to check */
537 )
538{
539 SCIP_JOB* currjob = jobqueue->firstjob;
540
541 /* checking the job ids */
542 if( currjob != NULL )
543 {
544 while( currjob != jobqueue->lastjob )
545 {
546 if( currjob->jobid == jobid )
547 return SCIP_JOB_INQUEUE;
548
549 currjob = currjob->nextjob;
550 }
551
552 if( currjob->jobid == jobid )
553 return SCIP_JOB_INQUEUE;
554 }
555
557}
558
559/** returns whether the job id is running */
560static
562 SCIP_JOBQUEUE* currentjobs, /**< queue of current jobs */
563 int jobid /**< id of job to check */
564 )
565{
566 if( checkJobQueue(currentjobs, jobid) == SCIP_JOB_INQUEUE )
567 return TRUE;
568 else
569 return FALSE;
570}
571
572/** returns the number of threads */
574 void
575 )
576{
577 return _threadpool != NULL ? _threadpool->nthreads : 0;
578}
579
580/** initializes tpi */
582 int nthreads, /**< the number of threads to be used */
583 int queuesize, /**< the size of the queue */
584 SCIP_Bool blockwhenfull /**< should the queue block when full */
585 )
586{
587 assert(_threadpool == NULL);
588 SCIP_CALL( createThreadPool(&_threadpool, nthreads, queuesize, blockwhenfull) );
589 return SCIP_OKAY;
590}
591
592/** deinitializes tpi */
594 void
595 )
596{
597 assert(_threadpool != NULL);
598
599 SCIP_CALL( freeThreadPool(&_threadpool, TRUE, TRUE) );
600
601 return SCIP_OKAY;
602}
603
604/** creates a job for parallel processing */
606 SCIP_JOB** job, /**< pointer to the job that will be created */
607 int jobid, /**< the id for the current job */
608 SCIP_RETCODE (*jobfunc)(void* args),/**< pointer to the job function */
609 void* jobarg /**< the job's argument */
610 )
611{
613
614 (*job)->jobid = jobid;
615 (*job)->jobfunc = jobfunc;
616 (*job)->args = jobarg;
617 (*job)->nextjob = NULL;
618
619 return SCIP_OKAY;
620}
621
622/** get a new job id for the new set of submitted jobs */
624 void
625 )
626{
627 int id;
628 assert(_threadpool != NULL);
629
631 id = ++_threadpool->currentid;
633
634 return id;
635}
636
637/** submit a job for parallel processing; the return value is a globally defined status */
639 SCIP_JOB* job, /**< pointer to the job to be submitted */
640 SCIP_SUBMITSTATUS* status /**< pointer to store the job's submit status */
641 )
642{
643 assert(job != NULL);
644
645 /* the job id must be set before submitting the job. The submitter controls whether a new id is required. */
646 assert(job->jobid == _threadpool->currentid);
647 SCIP_CALL( threadPoolAddWork(job, status) );
648
649 return SCIP_OKAY;
650}
651
652/** blocks until all jobs of the given jobid have finished
653 * and then returns the smallest SCIP_RETCODE of all the jobs
654 */
656 int jobid /**< the jobid of the jobs to wait for */
657 )
658{
659 SCIP_RETCODE retcode;
660 SCIP_JOB* currjob;
661 SCIP_JOB* prevjob;
662
663 SCIP_CALL( SCIPtnyAcquireLock(&(_threadpool->poollock)) );
664
665 while( isJobRunning(_threadpool->currentjobs, jobid) || isJobRunning(_threadpool->jobqueue, jobid) )
666 {
667 SCIP_CALL( SCIPtnyWaitCondition(&_threadpool->jobfinished, &_threadpool->poollock) );
668 }
669
670 /* finding the location of the processed job in the currentjobs queue */
671 retcode = SCIP_OKAY;
672 currjob = _threadpool->finishedjobs->firstjob;
673 prevjob = NULL;
674
675 while( currjob )
676 {
677 if( currjob->jobid == jobid )
678 {
679 SCIP_JOB* nextjob;
680
681 /* if the job has the right jobid collect its retcode,
682 * remove it from the finished job list, and free it
683 */
684 retcode = MIN(retcode, currjob->retcode);
685
686 /* removing the finished job from finished jobs list */
687 if( currjob == _threadpool->finishedjobs->firstjob )
688 {
689 _threadpool->finishedjobs->firstjob = currjob->nextjob;
690 }
691 else
692 {
693 assert(prevjob != NULL);
694 prevjob->nextjob = currjob->nextjob;
695 }
696
697 if( currjob == _threadpool->finishedjobs->lastjob )
698 _threadpool->finishedjobs->lastjob = prevjob;
699
700 _threadpool->finishedjobs->njobs--;
701
702 /* update currjob and free finished job; prevjob stays the same */
703 nextjob = currjob->nextjob;
704 BMSfreeMemory(&currjob);
705 currjob = nextjob;
706 }
707 else
708 {
709 /* otherwise leave job untouched */
710 prevjob = currjob;
711 currjob = prevjob->nextjob;
712 }
713 }
714
715 SCIP_CALL( SCIPtnyReleaseLock(&_threadpool->poollock) );
716
717 return retcode;
718}
719
720
721/*
722 * locks
723 */
724
725/** initializes the given lock */
727 SCIP_LOCK** lock /**< the lock */
728 )
729{
730 assert(lock != NULL);
731
732 SCIP_ALLOC( BMSallocMemory(lock) );
733
734 if( mtx_init(&(*lock)->lock, mtx_plain) == thrd_success )
735 return SCIP_OKAY;
736 else
737 {
738 BMSfreeMemory(lock);
739 return SCIP_ERROR;
740 }
741}
742
743/** destroys the given lock */
745 SCIP_LOCK** lock /**< the lock */
746 )
747{
748 assert(lock != NULL);
749
750 mtx_destroy(&(*lock)->lock);
751 BMSfreeMemory(lock);
752}
753
754/** acquires the given lock */
756 SCIP_LOCK* lock /**< the lock */
757 )
758{
759 if( mtx_lock(&lock->lock) == thrd_success )
760 return SCIP_OKAY;
761 return SCIP_ERROR;
762}
763
764/** releases the given lock */
766 SCIP_LOCK* lock /**< the lock */
767 )
768{
769 if( mtx_unlock(&lock->lock) == thrd_success )
770 return SCIP_OKAY;
771 return SCIP_ERROR;
772}
773
774
775/*
776 * conditions
777 */
778
779/** initializes the given condition variable */
781 SCIP_CONDITION** condition /**< condition to be created and initialized */
782 )
783{
784 assert(condition != NULL);
785
786 SCIP_ALLOC( BMSallocMemory(condition) );
787
788 if( cnd_init(&(*condition)->condition) == thrd_success )
789 return SCIP_OKAY;
790 return SCIP_ERROR;
791}
792
793/** destroys the given condition variable */
795 SCIP_CONDITION** condition /**< condition to be destroyed and freed */
796 )
797{
798 cnd_destroy(&(*condition)->condition);
799 BMSfreeMemory(condition);
800}
801
802/** signals one waiting thread */
804 SCIP_CONDITION* condition /**< the condition variable to signal */
805 )
806{
807 if( cnd_signal(&condition->condition) == thrd_success )
808 return SCIP_OKAY;
809 return SCIP_ERROR;
810}
811
812/** signals all waiting threads */
813SCIP_EXPORT
815 SCIP_CONDITION* condition /**< the condition variable to broadcast */
816 )
817{
818 if( cnd_broadcast(&condition->condition) == thrd_success )
819 return SCIP_OKAY;
820 return SCIP_ERROR;
821}
822
823/** waits on a condition variable. The given lock must be held by the caller and will
824 * be held when this function returns.
825 */
827 SCIP_CONDITION* condition, /**< the condition variable to wait on */
828 SCIP_LOCK* lock /**< the lock that is held by the caller */
829 )
830{
831 if( cnd_wait(&condition->condition, &lock->lock) == thrd_success )
832 return SCIP_OKAY;
833 return SCIP_ERROR;
834}
835
836/** returns the thread number */
838 void
839 )
840{
841 return _threadnumber;
842}
#define NULL
Definition: def.h:267
#define SCIP_UNUSED(x)
Definition: def.h:428
#define SCIP_Bool
Definition: def.h:91
#define MIN(x, y)
Definition: def.h:243
#define SCIP_ALLOC(x)
Definition: def.h:385
#define TRUE
Definition: def.h:93
#define FALSE
Definition: def.h:94
#define SCIP_CALL_ABORT(x)
Definition: def.h:353
#define SCIP_CALL(x)
Definition: def.h:374
memory allocation routines
#define BMSfreeMemory(ptr)
Definition: memory.h:145
#define BMSallocMemoryArray(ptr, num)
Definition: memory.h:123
#define BMSfreeMemoryArray(ptr)
Definition: memory.h:147
#define BMSallocMemory(ptr)
Definition: memory.h:118
public methods for message output
SCIP_JOB * lastjob
Definition: tpi_openmp.c:84
SCIP_JOB * firstjob
Definition: tpi_openmp.c:83
SCIP_RETCODE retcode
Definition: tpi_openmp.c:77
struct SCIP_Job * nextjob
Definition: tpi_openmp.c:74
void * args
Definition: tpi_openmp.c:76
SCIP_RETCODE(* jobfunc)(void *args)
Definition: tpi_openmp.c:75
int jobid
Definition: tpi_openmp.c:73
mtx_t lock
Definition: tpi_tnycthrd.c:58
omp_lock_t lock
Definition: tpi_openmp.c:57
SCIP_Bool queueopen
Definition: tpi_tnycthrd.c:110
SCIP_JOBQUEUE * jobqueue
Definition: tpi_tnycthrd.c:100
SCIP_Bool shutdown
Definition: tpi_tnycthrd.c:109
SCIP_Bool blockwhenfull
Definition: tpi_tnycthrd.c:105
SCIP_JOBQUEUE * currentjobs
Definition: tpi_tnycthrd.c:101
thrd_t * threads
Definition: tpi_tnycthrd.c:99
SCIP_JOBQUEUE * finishedjobs
Definition: tpi_tnycthrd.c:103
the type definitions for the SCIP parallel interface
static SCIP_RETCODE threadPoolAddWork(SCIP_JOB *newjob, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:366
static SCIP_JOBSTATUS checkJobQueue(SCIP_JOBQUEUE *jobqueue, int jobid)
Definition: tpi_tnycthrd.c:534
SCIP_RETCODE SCIPtpiWaitCondition(SCIP_CONDITION *condition, SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:826
SCIP_RETCODE SCIPtpiCreateJob(SCIP_JOB **job, int jobid, SCIP_RETCODE(*jobfunc)(void *args), void *jobarg)
Definition: tpi_tnycthrd.c:605
static SCIP_RETCODE threadPoolThreadRetcode(void *threadnum)
Definition: tpi_tnycthrd.c:122
SCIP_RETCODE SCIPtpiSignalCondition(SCIP_CONDITION *condition)
Definition: tpi_tnycthrd.c:803
SCIP_RETCODE SCIPtpiAcquireLock(SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:755
#define SCIPtnyInitCondition(condition)
Definition: tpi_tnycthrd.c:49
static SCIP_RETCODE createThreadPool(SCIP_THREADPOOL **thrdpool, int nthreads, int qsize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:258
static void jobQueueAddJob(SCIP_THREADPOOL *threadpool, SCIP_JOB *newjob)
Definition: tpi_tnycthrd.c:334
SCIP_RETCODE SCIPtpiExit(void)
Definition: tpi_tnycthrd.c:593
#define SCIPtnyInitLock(lock)
Definition: tpi_tnycthrd.c:43
SCIP_RETCODE SCIPtpiBroadcastCondition(SCIP_CONDITION *condition)
Definition: tpi_tnycthrd.c:814
#define SCIPtnyBroadcastCondition(condition)
Definition: tpi_tnycthrd.c:52
SCIP_RETCODE SCIPtpiSubmitJob(SCIP_JOB *job, SCIP_SUBMITSTATUS *status)
Definition: tpi_tnycthrd.c:638
static SCIP_Bool isJobRunning(SCIP_JOBQUEUE *currentjobs, int jobid)
Definition: tpi_tnycthrd.c:561
void SCIPtpiDestroyLock(SCIP_LOCK **lock)
Definition: tpi_tnycthrd.c:744
SCIP_RETCODE SCIPtpiCollectJobs(int jobid)
Definition: tpi_tnycthrd.c:655
#define SCIPtnyWaitCondition(condition, lock)
Definition: tpi_tnycthrd.c:53
#define SCIPtnyReleaseLock(lock)
Definition: tpi_tnycthrd.c:46
#define SCIPtnyDestroyCondition(condition)
Definition: tpi_tnycthrd.c:50
#define SCIPtnySignalCondition(condition)
Definition: tpi_tnycthrd.c:51
int SCIPtpiGetThreadNum(void)
Definition: tpi_tnycthrd.c:837
int SCIPtpiGetNumThreads(void)
Definition: tpi_tnycthrd.c:573
void SCIPtpiDestroyCondition(SCIP_CONDITION **condition)
Definition: tpi_tnycthrd.c:794
int SCIPtpiGetNewJobID(void)
Definition: tpi_tnycthrd.c:623
#define SCIPtnyDestroyLock(lock)
Definition: tpi_tnycthrd.c:44
static int threadPoolThread(void *threadnum)
Definition: tpi_tnycthrd.c:249
SCIP_RETCODE SCIPtpiInitLock(SCIP_LOCK **lock)
Definition: tpi_tnycthrd.c:726
static SCIP_RETCODE freeThreadPool(SCIP_THREADPOOL **thrdpool, SCIP_Bool finishjobs, SCIP_Bool completequeue)
Definition: tpi_tnycthrd.c:448
static void freeJobQueue(SCIP_THREADPOOL *thrdpool)
Definition: tpi_tnycthrd.c:423
SCIP_RETCODE SCIPtpiReleaseLock(SCIP_LOCK *lock)
Definition: tpi_tnycthrd.c:765
SCIP_RETCODE SCIPtpiInitCondition(SCIP_CONDITION **condition)
Definition: tpi_tnycthrd.c:780
#define SCIPtnyAcquireLock(lock)
Definition: tpi_tnycthrd.c:45
SCIP_RETCODE SCIPtpiInit(int nthreads, int queuesize, SCIP_Bool blockwhenfull)
Definition: tpi_tnycthrd.c:581
@ SCIP_OKAY
Definition: type_retcode.h:42
@ SCIP_ERROR
Definition: type_retcode.h:43
enum SCIP_Retcode SCIP_RETCODE
Definition: type_retcode.h:63
enum SCIP_Submitstatus SCIP_SUBMITSTATUS
Definition: type_tpi.h:50
@ SCIP_JOB_DOESNOTEXIST
Definition: type_tpi.h:60
@ SCIP_JOB_INQUEUE
Definition: type_tpi.h:61
enum SCIP_Jobstatus SCIP_JOBSTATUS
Definition: type_tpi.h:65
@ SCIP_SUBMIT_SUCCESS
Definition: type_tpi.h:48
@ SCIP_SUBMIT_SHUTDOWN
Definition: type_tpi.h:47
@ SCIP_SUBMIT_QUEUEFULL
Definition: type_tpi.h:45
@ SCIP_SUBMIT_QUEUECLOSED
Definition: type_tpi.h:46