3 Copyright 2001, 2002, 2003 Red Hat Inc.
5 Written by Robert Collins <rbtcollins@hotmail.com>
7 This file is part of Cygwin.
9 This software is a copyrighted work licensed under the terms of the
10 Cygwin license. Please consult the file "CYGWIN_LICENSE" for
13 #ifdef __OUTSIDE_CYGWIN__
20 #include <sys/types.h>
22 #include "threaded_queue.h"
24 /*****************************************************************************/
28 queue_request::~queue_request ()
31 /*****************************************************************************/
35 threaded_queue::threaded_queue (const size_t initial_workers)
38 _submitters_head (NULL),
40 _requests_head (NULL),
43 InitializeCriticalSection (&_queue_lock);
45 // This semaphore's count is the number of requests on the queue.
46 // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
47 // multiplied by max. threads per process (2028?), which is (a few)
48 // more requests than could ever be pending with the current design.
50 _requests_sem = CreateSemaphore (NULL, // SECURITY_ATTRIBUTES
52 129792, // Maximum count
57 system_printf (("failed to create the request queue semaphore, "
63 create_workers (initial_workers);
66 threaded_queue::~threaded_queue ()
71 debug_printf ("deleting all pending queue requests");
72 queue_request *reqptr = _requests_head;
75 queue_request *const ptr = reqptr;
76 reqptr = reqptr->_next;
80 DeleteCriticalSection (&_queue_lock);
82 (void) CloseHandle (_requests_sem);
85 /* FIXME: return success or failure rather than quitting */
87 threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
91 assert (submitter->_queue == this);
92 assert (!submitter->_next);
95 TInterlockedExchangePointer (&_submitters_head, submitter);
102 threaded_queue::start ()
104 EnterCriticalSection (&_queue_lock);
105 const bool was_running = _running;
107 queue_submission_loop *loopptr = _submitters_head;
108 LeaveCriticalSection (&_queue_lock);
112 debug_printf ("starting all queue submission loops");
116 queue_submission_loop *const ptr = loopptr;
117 loopptr = loopptr->_next;
126 threaded_queue::stop ()
128 EnterCriticalSection (&_queue_lock);
129 const bool was_running = _running;
131 queue_submission_loop *loopptr = _submitters_head;
132 LeaveCriticalSection (&_queue_lock);
136 debug_printf ("stopping all queue submission loops");
139 queue_submission_loop *const ptr = loopptr;
140 loopptr = loopptr->_next;
144 ReleaseSemaphore (_requests_sem, _workers_count, NULL);
145 while (_workers_count)
147 debug_printf (("waiting for worker threads to terminate: "
148 "%lu still running"),
152 debug_printf ("all worker threads have terminated");
158 /* FIXME: return success or failure */
160 threaded_queue::add (queue_request *const therequest)
164 assert (!therequest->_next);
168 system_printf ("warning: no worker threads to handle request!");
169 // FIXME: And then what?
172 EnterCriticalSection (&_queue_lock);
174 _requests_head = therequest;
177 /* Add to the queue end. */
178 queue_request *reqptr = _requests_head;
179 for (; reqptr->_next; reqptr = reqptr->_next)
182 assert (!reqptr->_next);
183 reqptr->_next = therequest;
186 _requests_count += 1;
187 assert (_requests_count > 0);
188 LeaveCriticalSection (&_queue_lock);
190 (void) ReleaseSemaphore (_requests_sem, 1, NULL);
193 /*static*/ DWORD WINAPI
194 threaded_queue::start_routine (const LPVOID lpParam)
196 class threaded_queue *const queue = (class threaded_queue *) lpParam;
199 queue->worker_loop ();
201 const long count = InterlockedDecrement (&queue->_workers_count);
205 debug_printf ("worker loop has exited; thread about to terminate");
210 /* Called from the constructor: so no need to be thread-safe until the
211 * worker threads start to be created; thus the interlocked increment
212 * of the `_workers_count' field.
216 threaded_queue::create_workers (const size_t initial_workers)
218 assert (initial_workers > 0);
220 for (unsigned int i = 0; i != initial_workers; i++)
222 const long count = InterlockedIncrement (&_workers_count);
226 const HANDLE hThread =
227 CreateThread (NULL, 0, start_routine, this, 0, &tid);
231 system_printf ("failed to create thread, error = %lu",
236 (void) CloseHandle (hThread);
241 threaded_queue::worker_loop ()
245 const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
246 if (rc == WAIT_FAILED)
248 system_printf ("wait for request semaphore failed, error = %lu",
252 assert (rc == WAIT_OBJECT_0);
254 EnterCriticalSection (&_queue_lock);
257 LeaveCriticalSection (&_queue_lock);
261 assert (_requests_head);
262 queue_request *const reqptr = _requests_head;
263 _requests_head = reqptr->_next;
265 _requests_count -= 1;
266 assert (_requests_count >= 0);
267 LeaveCriticalSection (&_queue_lock);
275 /*****************************************************************************/
277 /* queue_submission_loop */
279 queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
280 const bool ninterruptible)
282 _interrupt_event (NULL),
284 _interruptible (ninterruptible),
291 // verbose: debug_printf ("creating an interruptible processing thread");
293 _interrupt_event = CreateEvent (NULL, // SECURITY_ATTRIBUTES
295 FALSE, // Initially non-signalled
298 if (!_interrupt_event)
300 system_printf ("failed to create interrupt event, error = %lu",
307 queue_submission_loop::~queue_submission_loop ()
311 if (_interrupt_event)
312 (void) CloseHandle (_interrupt_event);
314 (void) CloseHandle (_hThread);
318 queue_submission_loop::start ()
323 const bool was_running = _running;
329 _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
332 system_printf ("failed to create thread, error = %lu",
342 queue_submission_loop::stop ()
345 assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
347 const bool was_running = _running;
355 assert (_interrupt_event
356 && _interrupt_event != INVALID_HANDLE_VALUE);
358 SetEvent (_interrupt_event);
360 if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
362 system_printf (("request loop thread %lu failed to shutdown "
363 "when asked politely: about to get heavy"),
366 if (!TerminateThread (_hThread, 0))
368 system_printf (("failed to kill request loop thread %lu"
370 _tid, GetLastError ());
377 // FIXME: could wait to see if the request loop notices that
378 // the submission loop is no longer running and shuts down
381 debug_printf ("killing request loop thread %lu", _tid);
383 if (!TerminateThread (_hThread, 0))
384 system_printf (("failed to kill request loop thread %lu"
386 _tid, GetLastError ());
393 /*static*/ DWORD WINAPI
394 queue_submission_loop::start_routine (const LPVOID lpParam)
396 class queue_submission_loop *const submission_loop =
397 (class queue_submission_loop *) lpParam;
398 assert (submission_loop);
400 submission_loop->request_loop ();
402 debug_printf ("submission loop has exited; thread about to terminate");
404 submission_loop->stop ();
409 /*****************************************************************************/
410 #endif /* __OUTSIDE_CYGWIN__ */