OSDN Git Service

Don't use safe_new but new throughout. Fix copyright dates
[pf3gnuchains/pf3gnuchains4x.git] / winsup / cygserver / threaded_queue.cc
1 /* threaded_queue.cc
2
3    Copyright 2001, 2002, 2003 Red Hat Inc.
4
5    Written by Robert Collins <rbtcollins@hotmail.com>
6
7 This file is part of Cygwin.
8
9 This software is a copyrighted work licensed under the terms of the
10 Cygwin license.  Please consult the file "CYGWIN_LICENSE" for
11 details. */
12
13 #ifdef __OUTSIDE_CYGWIN__
14 #include "woutsup.h"
15
16 #include <assert.h>
17 #include <errno.h>
18 #include <stdio.h>
19 #include <unistd.h>
20 #include <sys/types.h>
21 #include <stdlib.h>
22 #include "threaded_queue.h"
23
24 /*****************************************************************************/
25
26 /* queue_request */
27
28 queue_request::~queue_request ()
29 {}
30
31 /*****************************************************************************/
32
33 /* threaded_queue */
34
35 threaded_queue::threaded_queue (const size_t initial_workers)
36   : _workers_count (0),
37     _running (false),
38     _submitters_head (NULL),
39     _requests_count (0),
40     _requests_head (NULL),
41     _requests_sem (NULL)
42 {
43   InitializeCriticalSection (&_queue_lock);
44
45   // This semaphore's count is the number of requests on the queue.
46   // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
47   // multiplied by max. threads per process (2028?), which is (a few)
48   // more requests than could ever be pending with the current design.
49
50   _requests_sem = CreateSemaphore (NULL,   // SECURITY_ATTRIBUTES
51                                    0,      // Initial count
52                                    129792, // Maximum count
53                                    NULL);  // Anonymous
54
55   if (!_requests_sem)
56     {
57       system_printf (("failed to create the request queue semaphore, "
58                       "error = %lu"),
59                      GetLastError ());
60       abort ();
61     }
62
63   create_workers (initial_workers);
64 }
65
66 threaded_queue::~threaded_queue ()
67 {
68   if (_running)
69     stop ();
70
71   debug_printf ("deleting all pending queue requests");
72   queue_request *reqptr = _requests_head;
73   while (reqptr)
74     {
75       queue_request *const ptr = reqptr;
76       reqptr = reqptr->_next;
77       delete ptr;
78     }
79
80   DeleteCriticalSection (&_queue_lock);
81   if (_requests_sem)
82     (void) CloseHandle (_requests_sem);
83 }
84
85 /* FIXME: return success or failure rather than quitting */
86 void
87 threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
88 {
89   assert (this);
90   assert (submitter);
91   assert (submitter->_queue == this);
92   assert (!submitter->_next);
93
94   submitter->_next =
95     TInterlockedExchangePointer (&_submitters_head, submitter);
96
97   if (_running)
98     submitter->start ();
99 }
100
101 bool
102 threaded_queue::start ()
103 {
104   EnterCriticalSection (&_queue_lock);
105   const bool was_running = _running;
106   _running = true;
107   queue_submission_loop *loopptr = _submitters_head;
108   LeaveCriticalSection (&_queue_lock);
109
110   if (!was_running)
111     {
112       debug_printf ("starting all queue submission loops");
113
114       while (loopptr)
115         {
116           queue_submission_loop *const ptr = loopptr;
117           loopptr = loopptr->_next;
118           ptr->start ();
119         }
120     }
121
122   return was_running;
123 }
124
125 bool
126 threaded_queue::stop ()
127 {
128   EnterCriticalSection (&_queue_lock);
129   const bool was_running = _running;
130   _running = false;
131   queue_submission_loop *loopptr = _submitters_head;
132   LeaveCriticalSection (&_queue_lock);
133
134   if (was_running)
135     {
136       debug_printf ("stopping all queue submission loops");
137       while (loopptr)
138         {
139           queue_submission_loop *const ptr = loopptr;
140           loopptr = loopptr->_next;
141           ptr->stop ();
142         }
143
144       ReleaseSemaphore (_requests_sem, _workers_count, NULL);
145       while (_workers_count)
146         {
147           debug_printf (("waiting for worker threads to terminate: "
148                          "%lu still running"),
149                         _workers_count);
150           Sleep (1000);
151         }
152       debug_printf ("all worker threads have terminated");
153     }
154
155   return was_running;
156 }
157
158 /* FIXME: return success or failure */
159 void
160 threaded_queue::add (queue_request *const therequest)
161 {
162   assert (this);
163   assert (therequest);
164   assert (!therequest->_next);
165
166   if (!_workers_count)
167     {
168       system_printf ("warning: no worker threads to handle request!");
169       // FIXME: And then what?
170     }
171
172   EnterCriticalSection (&_queue_lock);
173   if (!_requests_head)
174     _requests_head = therequest;
175   else
176     {
177       /* Add to the queue end. */
178       queue_request *reqptr = _requests_head;
179       for (; reqptr->_next; reqptr = reqptr->_next)
180         {}
181       assert (reqptr);
182       assert (!reqptr->_next);
183       reqptr->_next = therequest;
184     }
185
186   _requests_count += 1;
187   assert (_requests_count > 0);
188   LeaveCriticalSection (&_queue_lock);
189
190   (void) ReleaseSemaphore (_requests_sem, 1, NULL);
191 }
192
193 /*static*/ DWORD WINAPI
194 threaded_queue::start_routine (const LPVOID lpParam)
195 {
196   class threaded_queue *const queue = (class threaded_queue *) lpParam;
197   assert (queue);
198
199   queue->worker_loop ();
200
201   const long count = InterlockedDecrement (&queue->_workers_count);
202   assert (count >= 0);
203
204   if (queue->_running)
205     debug_printf ("worker loop has exited; thread about to terminate");
206
207   return 0;
208 }
209
210 /* Called from the constructor: so no need to be thread-safe until the
211  * worker threads start to be created; thus the interlocked increment
212  * of the `_workers_count' field.
213  */
214
215 void
216 threaded_queue::create_workers (const size_t initial_workers)
217 {
218   assert (initial_workers > 0);
219
220   for (unsigned int i = 0; i != initial_workers; i++)
221     {
222       const long count = InterlockedIncrement (&_workers_count);
223       assert (count > 0);
224
225       DWORD tid;
226       const HANDLE hThread =
227         CreateThread (NULL, 0, start_routine, this, 0, &tid);
228
229       if (!hThread)
230         {
231           system_printf ("failed to create thread, error = %lu",
232                          GetLastError ());
233           abort ();
234         }
235
236       (void) CloseHandle (hThread);
237     }
238 }
239
240 void
241 threaded_queue::worker_loop ()
242 {
243   while (true)
244     {
245       const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
246       if (rc == WAIT_FAILED)
247         {
248           system_printf ("wait for request semaphore failed, error = %lu",
249                          GetLastError ());
250           return;
251         }
252       assert (rc == WAIT_OBJECT_0);
253
254       EnterCriticalSection (&_queue_lock);
255       if (!_running)
256         {
257           LeaveCriticalSection (&_queue_lock);
258           return;
259         }
260
261       assert (_requests_head);
262       queue_request *const reqptr = _requests_head;
263       _requests_head = reqptr->_next;
264
265       _requests_count -= 1;
266       assert (_requests_count >= 0);
267       LeaveCriticalSection (&_queue_lock);
268
269       assert (reqptr);
270       reqptr->process ();
271       delete reqptr;
272     }
273 }
274
275 /*****************************************************************************/
276
277 /* queue_submission_loop */
278
279 queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
280                                               const bool ninterruptible)
281   : _running (false),
282     _interrupt_event (NULL),
283     _queue (queue),
284     _interruptible (ninterruptible),
285     _hThread (NULL),
286     _tid (0),
287     _next (NULL)
288 {
289   if (_interruptible)
290     {
291       // verbose: debug_printf ("creating an interruptible processing thread");
292
293       _interrupt_event = CreateEvent (NULL,  // SECURITY_ATTRIBUTES
294                                       FALSE, // Auto-reset
295                                       FALSE, // Initially non-signalled
296                                       NULL); // Anonymous
297
298       if (!_interrupt_event)
299         {
300           system_printf ("failed to create interrupt event, error = %lu",
301                          GetLastError ());
302           abort ();
303         }
304     }
305 }
306
307 queue_submission_loop::~queue_submission_loop ()
308 {
309   if (_running)
310     stop ();
311   if (_interrupt_event)
312     (void) CloseHandle (_interrupt_event);
313   if (_hThread)
314     (void) CloseHandle (_hThread);
315 }
316
317 bool
318 queue_submission_loop::start ()
319 {
320   assert (this);
321   assert (!_hThread);
322
323   const bool was_running = _running;
324
325   if (!was_running)
326     {
327       _running = true;
328
329       _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
330       if (!_hThread)
331         {
332           system_printf ("failed to create thread, error = %lu",
333                          GetLastError ());
334           abort ();
335         }
336     }
337
338   return was_running;
339 }
340
341 bool
342 queue_submission_loop::stop ()
343 {
344   assert (this);
345   assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
346
347   const bool was_running = _running;
348
349   if (_running)
350     {
351       _running = false;
352
353       if (_interruptible)
354         {
355           assert (_interrupt_event
356                   && _interrupt_event != INVALID_HANDLE_VALUE);
357
358           SetEvent (_interrupt_event);
359
360           if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
361             {
362               system_printf (("request loop thread %lu failed to shutdown "
363                               "when asked politely: about to get heavy"),
364                              _tid);
365
366               if (!TerminateThread (_hThread, 0))
367                 {
368                   system_printf (("failed to kill request loop thread %lu"
369                                   ", error = %lu"),
370                                  _tid, GetLastError ());
371                   abort ();
372                 }
373             }
374         }
375       else
376         {
377           // FIXME: could wait to see if the request loop notices that
378           // the submission loop is no longer running and shuts down
379           // voluntarily.
380
381           debug_printf ("killing request loop thread %lu", _tid);
382
383           if (!TerminateThread (_hThread, 0))
384             system_printf (("failed to kill request loop thread %lu"
385                             ", error = %lu"),
386                            _tid, GetLastError ());
387         }
388     }
389
390   return was_running;
391 }
392
393 /*static*/ DWORD WINAPI
394 queue_submission_loop::start_routine (const LPVOID lpParam)
395 {
396   class queue_submission_loop *const submission_loop =
397     (class queue_submission_loop *) lpParam;
398   assert (submission_loop);
399
400   submission_loop->request_loop ();
401
402   debug_printf ("submission loop has exited; thread about to terminate");
403
404   submission_loop->stop ();
405
406   return 0;
407 }
408
409 /*****************************************************************************/
410 #endif /* __OUTSIDE_CYGWIN__ */