OSDN Git Service

Add return code
[pysilhouette/pysilhouette.git.git] / pysilhouette / performer.py
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 #
4 # This file is part of Pysilhouette.
5 #
6 # Copyright (c) 2009 HDE, Inc.
7 #
8 # Permission is hereby granted, free of charge, to any person obtaining a copy
9 # of this software and associated documentation files (the "Software"), to deal
10 # in the Software without restriction, including without limitation the rights
11 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 # copies of the Software, and to permit persons to whom the Software is
13 # furnished to do so, subject to the following conditions:
14 #
15 # The above copyright notice and this permission notice shall be included in
16 # all copies or substantial portions of the Software.
17 #
18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
24 # THE SOFTWARE.
25 #
26
27 """
28 @author: Kei Funagayama <kei@karesansui-project.info>
29 """
30
31 import sys
32 import signal
33 import os
34 import traceback
35 import logging
36
37 from sqlalchemy.pool import SingletonThreadPool, QueuePool
38 from pysilhouette.log import reload_conf
39 from pysilhouette.prep import readconf, getopts, chkopts
40 from pysilhouette.db import Database
41 from pysilhouette.db.model import reload_mappers, JOBGROUP_STATUS
42 from pysilhouette.db.access import jobgroup_findbystatus, jobgroup_update
43 from pysilhouette.worker import SimpleWorker
44
45 from pysilhouette.util import kill_proc, write_pidfile, create_fifo
46
47 def sigterm_handler(signum, frame):
48     logger = logging.getLogger('pysilhouette.performer.signal')
49     logger.info('Stop the performerd with signal - pid=%s, signal=%s' % (os.getpid(), signum))
50
51 def performer(opts, cf):
52     logger = logging.getLogger('pysilhouette.performer')
53
54     # Initialization
55     if os.access(cf["observer.mkfifo.path"], os.F_OK|os.R_OK|os.W_OK) is False:
56         try:
57             os.unlink(cf["observer.mkfifo.path"])
58             logger.info('Deleted filo file. - file=%s' % cf["observer.mkfifo.path"])
59         except:
60             pass # Not anything
61         
62         create_fifo(cf["observer.mkfifo.path"],
63                     cf["observer.mkfifo.user.name"],
64                     cf["observer.mkfifo.group.name"],
65                     cf["observer.mkfifo.perms"],
66                     )
67
68         logger.info('The fifo file was created. - file=%s' % cf["observer.mkfifo.path"])
69
70
71     if opts.daemon is True:
72         pid = os.getpid()
73         if write_pidfile(opts.pidfile, pid):
74             logger.info('The process file was created. - file=%s' % opts.pidfile)
75         else:
76             logger.error('Could not create process file. - file=%s' % opts.pidfile)
77             return 1
78
79     logger.info('performer : [started]')
80
81     try:
82         if cf['database.url'][:6].strip() == 'sqlite':
83             db = Database(cf['database.url'],
84                           encoding="utf-8",
85                           convert_unicode=True,
86                           #assert_unicode='warn', # DEBUG
87                           #echo = opts.verbose,
88                           #echo_pool = opts.verbose,
89                           echo=True, # TODO
90                           echo_pool=True # TODO
91                           )
92         else:
93             if int(cf['database.pool.status']) == 1:
94                 db = Database(cf['database.url'],
95                               encoding="utf-8",
96                               convert_unicode=True,
97                               #assert_unicode='warn', # DEBUG
98                               poolclass=QueuePool,
99                               pool_size=int(cf['database.pool.size']),
100                               max_overflow=int(cf['database.pool.max.overflow']),
101                               #echo = opts.verbose,
102                               #echo_pool = opts.verbose,
103                               echo=True, # TODO
104                               echo_pool=True # TODO
105                               )
106             else:
107                 db = Database(cf['database.url'],
108                               encoding="utf-8",
109                               convert_unicode=True,
110                               #assert_unicode='warn', # DEBUG
111                               poolclass=SingletonThreadPool,
112                               #echo = opts.verbose,
113                               #echo_pool = opts.verbose,
114                               echo=True, # TODO
115                               echo_pool=True # TODO
116                               )
117
118         reload_mappers(db.get_metadata())
119
120     except Exception, e:
121         logger.error('Initializing a database error - %s' % str(e.args))
122         t_logger = logging.getLogger('pysilhouette_traceback')
123         t_logger.error(traceback.format_exc())
124         return 1
125
126     while True:
127         fp = open(cf["observer.mkfifo.path"], 'r')
128         try:
129             code = fp.read()
130         finally:
131             fp.close()
132                 
133         logger.info('Received code from the FIFO file. - code=%s' % code)
134         session = db.get_session()
135         m_jgs = jobgroup_findbystatus(session)
136         session.close()
137         
138         logger.info('Queued the Job Group from the database. - Number of JobGroup=%d' % len(m_jgs))
139         
140         if code == cf["observer.mkfifo.start.code"]:
141             if 0 < len(m_jgs):
142                 for m_jg in m_jgs:
143                     try:
144                         w = SimpleWorker(cf, db, m_jg.id)
145                         w.run()
146                     except Exception, e:
147                         logger.info('Failed to perform the job group. Exceptions are not expected. - jobgroup_id=%d : %s'
148                                      % (m_jg.id, str(e.args)))
149                         print >>sys.stderr, traceback.format_exc()
150                         t_logger = logging.getLogger('pysilhouette_traceback')
151                         t_logger.error(traceback.format_exc())
152
153                         try:
154                             session = db.get_session()
155                             jobgroup_update(session, m_jg, JOBGROUP_STATUS['APPERR'])
156                             session.close()
157                         except:
158                             logger.error('Failed to change the status of the job group. - jobgroup_id=%d : %s'
159                                          % (m_jg.id, str(e.args)))
160                             t_logger = logging.getLogger('pysilhouette_traceback')
161                             t_logger.error(traceback.format_exc())
162                             
163             else:
164                 logger.info('No Job Group.')
165         elif code == cf["observer.mkfifo.stop.code"]:
166             logger.info('Received stop code from the FIFO file. - code=%s' % code)
167             break
168         else:
169             logger.info('Received illegal code from the FIFO file. - code=%s' % code)
170
171 def main():
172     (opts, args) = getopts()
173     if chkopts(opts) is True:
174         return 1
175     
176     cf = readconf(opts.config)
177     if cf is None:
178         print >>sys.stderr, 'Failed to load the config file "%s". (%s)' % (opts.config, sys.argv[0])
179         return 1
180
181     # set env=PYSILHOUETTE_CONF
182     os.environ['PYSILHOUETTE_CONF'] = opts.config
183     
184     if reload_conf(cf["env.sys.log.conf.path"]):
185         logger = logging.getLogger('pysilhouette.performer')
186     else:
187         print >>sys.stderr, 'Failed to load the log file. (%s)' % sys.argv[0]
188         return 1
189
190     try:
191         try:
192             signal.signal(signal.SIGTERM, sigterm_handler)
193             ret = performer(opts, cf) # start!!
194             return ret
195         except KeyboardInterrupt, k:
196             logger.critical('Keyboard interrupt occurred. - %s' % str(k.args))
197             print >>sys.stderr, 'Keyboard interrupt occurred. - %s' % str(k.args)
198         except Exception, e:
199             logger.critical('System error has occurred. - %s' % str(e.args))
200             print >>sys.stderr, 'System error has occurred. - %s' % str(e.args)
201             print >>sys.stderr, traceback.format_exc()
202             t_logger = logging.getLogger('pysilhouette_traceback')
203             t_logger.critical(traceback.format_exc())
204             
205     finally:
206         if opts.daemon is True and os.path.isfile(opts.pidfile):
207             os.unlink(opts.pidfile)
208             logger.info('Process file has been deleted.. - pidfile=%s' % opts.pidfile)
209
210     return 1
211
212 if __name__ == '__main__':
213     sys.exit(main())