2 # -*- coding: utf-8 -*-
4 # This file is part of Pysilhouette.
6 # Copyright (c) 2009 HDE, Inc.
8 # Permission is hereby granted, free of charge, to any person obtaining a copy
9 # of this software and associated documentation files (the "Software"), to deal
10 # in the Software without restriction, including without limitation the rights
11 # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12 # copies of the Software, and to permit persons to whom the Software is
13 # furnished to do so, subject to the following conditions:
15 # The above copyright notice and this permission notice shall be included in
16 # all copies or substantial portions of the Software.
18 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21 # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
28 @author: Kei Funagayama <kei@karesansui-project.info>
37 from sqlalchemy.pool import SingletonThreadPool, QueuePool
38 from pysilhouette.log import reload_conf
39 from pysilhouette.prep import readconf, getopts, chkopts
40 from pysilhouette.db import Database
41 from pysilhouette.db.model import reload_mappers, JOBGROUP_STATUS
42 from pysilhouette.db.access import jobgroup_findbystatus, jobgroup_update
43 from pysilhouette.worker import SimpleWorker
45 from pysilhouette.util import kill_proc, write_pidfile, create_fifo
47 def sigterm_handler(signum, frame):
48 logger = logging.getLogger('pysilhouette.performer.signal')
49 logger.info('Stop the performerd with signal - pid=%s, signal=%s' % (os.getpid(), signum))
51 def performer(opts, cf):
52 logger = logging.getLogger('pysilhouette.performer')
55 if os.access(cf["observer.mkfifo.path"], os.F_OK|os.R_OK|os.W_OK) is False:
57 os.unlink(cf["observer.mkfifo.path"])
58 logger.info('Deleted filo file. - file=%s' % cf["observer.mkfifo.path"])
62 create_fifo(cf["observer.mkfifo.path"],
63 cf["observer.mkfifo.user.name"],
64 cf["observer.mkfifo.group.name"],
65 cf["observer.mkfifo.perms"],
68 logger.info('The fifo file was created. - file=%s' % cf["observer.mkfifo.path"])
71 if opts.daemon is True:
73 if write_pidfile(opts.pidfile, pid):
74 logger.info('The process file was created. - file=%s' % opts.pidfile)
76 logger.error('Could not create process file. - file=%s' % opts.pidfile)
79 logger.info('performer : [started]')
82 if cf['database.url'][:6].strip() == 'sqlite':
83 db = Database(cf['database.url'],
86 #assert_unicode='warn', # DEBUG
88 #echo_pool = opts.verbose,
93 if int(cf['database.pool.status']) == 1:
94 db = Database(cf['database.url'],
97 #assert_unicode='warn', # DEBUG
99 pool_size=int(cf['database.pool.size']),
100 max_overflow=int(cf['database.pool.max.overflow']),
101 #echo = opts.verbose,
102 #echo_pool = opts.verbose,
104 echo_pool=True # TODO
107 db = Database(cf['database.url'],
109 convert_unicode=True,
110 #assert_unicode='warn', # DEBUG
111 poolclass=SingletonThreadPool,
112 #echo = opts.verbose,
113 #echo_pool = opts.verbose,
115 echo_pool=True # TODO
118 reload_mappers(db.get_metadata())
121 logger.error('Initializing a database error - %s' % str(e.args))
122 t_logger = logging.getLogger('pysilhouette_traceback')
123 t_logger.error(traceback.format_exc())
127 fp = open(cf["observer.mkfifo.path"], 'r')
133 logger.info('Received code from the FIFO file. - code=%s' % code)
134 session = db.get_session()
135 m_jgs = jobgroup_findbystatus(session)
138 logger.info('Queued the Job Group from the database. - Number of JobGroup=%d' % len(m_jgs))
140 if code == cf["observer.mkfifo.start.code"]:
144 w = SimpleWorker(cf, db, m_jg.id)
147 logger.info('Failed to perform the job group. Exceptions are not expected. - jobgroup_id=%d : %s'
148 % (m_jg.id, str(e.args)))
149 print >>sys.stderr, traceback.format_exc()
150 t_logger = logging.getLogger('pysilhouette_traceback')
151 t_logger.error(traceback.format_exc())
154 session = db.get_session()
155 jobgroup_update(session, m_jg, JOBGROUP_STATUS['APPERR'])
158 logger.error('Failed to change the status of the job group. - jobgroup_id=%d : %s'
159 % (m_jg.id, str(e.args)))
160 t_logger = logging.getLogger('pysilhouette_traceback')
161 t_logger.error(traceback.format_exc())
164 logger.info('No Job Group.')
165 elif code == cf["observer.mkfifo.stop.code"]:
166 logger.info('Received stop code from the FIFO file. - code=%s' % code)
169 logger.info('Received illegal code from the FIFO file. - code=%s' % code)
172 (opts, args) = getopts()
173 if chkopts(opts) is True:
176 cf = readconf(opts.config)
178 print >>sys.stderr, 'Failed to load the config file "%s". (%s)' % (opts.config, sys.argv[0])
181 # set env=PYSILHOUETTE_CONF
182 os.environ['PYSILHOUETTE_CONF'] = opts.config
184 if reload_conf(cf["env.sys.log.conf.path"]):
185 logger = logging.getLogger('pysilhouette.performer')
187 print >>sys.stderr, 'Failed to load the log file. (%s)' % sys.argv[0]
192 signal.signal(signal.SIGTERM, sigterm_handler)
193 ret = performer(opts, cf) # start!!
195 except KeyboardInterrupt, k:
196 logger.critical('Keyboard interrupt occurred. - %s' % str(k.args))
197 print >>sys.stderr, 'Keyboard interrupt occurred. - %s' % str(k.args)
199 logger.critical('System error has occurred. - %s' % str(e.args))
200 print >>sys.stderr, 'System error has occurred. - %s' % str(e.args)
201 print >>sys.stderr, traceback.format_exc()
202 t_logger = logging.getLogger('pysilhouette_traceback')
203 t_logger.critical(traceback.format_exc())
206 if opts.daemon is True and os.path.isfile(opts.pidfile):
207 os.unlink(opts.pidfile)
208 logger.info('Process file has been deleted.. - pidfile=%s' % opts.pidfile)
212 if __name__ == '__main__':