--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# This file is part of Pysilhouette.
+#
+# Copyright (c) 2009 HDE, Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+
+"""
+@author: Kei Funagayama <kei@karesansui-project.info>
+"""
+import sys
+import os
+import logging
+import traceback
+import signal
+import Queue
+
+from pysilhouette import PROCERROR, PROCSUCCESS
+from pysilhouette.log import reload_conf
+from pysilhouette.prep import readconf, getopts, chkopts
+from pysilhouette.db.model import JOBGROUP_STATUS, JOBGROUP_TYPE
+from pysilhouette.db.access import jobgroup_findbytype_limit_status, jobgroup_update
+from pysilhouette.er import ER
+from pysilhouette.worker import ThreadQueue, ThreadWorker, dummy_set_job
+from pysilhouette.db import create_database, Database
+
+# var
+asynpool = []
+
+class AsynPerformer(ER):
+ """<comment-ja>
+ AsynPerformer Class
+ </comment-ja>
+ <comment-en>
+ TODO: English Comment
+ </comment-en>
+ """
+ def __init__(self, opts, cf):
+ ER.__init__(self, opts, cf)
+ self._fifo('asynperformer')
+ self._setdaemon()
+ self.db = create_database(cf)
+
+ def process(self):
+ self.logger.info('asynperformer : [started]')
+
+ # thread pool
+ request_queue = Queue.Queue()
+ #response_queue = Queue.Queue()
+ response_list = []
+ tq = ThreadQueue(request_queue, response_list)
+
+ while True:
+ fp = open(self.cf["asynperformer.mkfifo.path"], 'r')
+ try:
+ code = fp.read()
+ finally:
+ fp.close()
+
+ self.logger.info('Received code from the FIFO file. - code=%s' % code)
+ session = self.db.get_session()
+
+ # TODO:dummy data
+ dummy_set_job(self.cf,
+ int(self.cf['asynperformer.thread.pool.size']),
+ 'echo "aaaaaa"',
+ 'echo "bbbbb"',
+ 'echo "cccc"',
+ #'serial',
+ 'paralle',
+ self.db)
+
+ # Pending JobGroup search
+ if self.cf['asynperformer.thread.pool.size'] <= tq.now_alive():
+ continue
+
+ m_jgs = jobgroup_findbytype_limit_status(session,
+ JOBGROUP_TYPE['PARALLE'],
+ int(self.cf['asynperformer.thread.pool.size']) - tq.now_alive())
+
+ session.close()
+
+ self.logger.info('Queued the Job Group from the database. - Number of JobGroup=%d' \
+ % len(m_jgs))
+
+ self.logger.debug('filo code=%s, cf asynperformer.mkfifo.start.code=%s' % (code, self.cf["asynperformer.mkfifo.start.code"]))
+ if code == self.cf["asynperformer.mkfifo.start.code"]:
+ if 0 < len(m_jgs):
+ for m_jg in m_jgs:
+ try:
+ # thread worker!! start
+ tq.put(ThreadWorker(self.cf, self.db, m_jg.id))
+ except Exception, e:
+ self.logger.info('Failed to perform the job group. Exceptions are not expected. - jobgroup_id=%d : %s'
+ % (m_jg.id, str(e.args)))
+ print >>sys.stderr, traceback.format_exc()
+ t_logger = logging.getLogger('pysilhouette_traceback')
+ t_logger.error(traceback.format_exc())
+ else:
+ self.logger.info('No Job Group.')
+ elif code == self.cf["asynperformer.mkfifo.stop.code"]:
+ self.logger.info('Received stop code from the FIFO file. - code=%s' % code)
+ break
+ else:
+ self.logger.info('Received illegal code from the FIFO file. - code=%s' % code)
+
+def sigterm_handler(signum, frame):
+ logger = logging.getLogger('pysilhouette.asynperformer')
+ logger.info('Stop the AsynPerformerd with signal - pid=%s, signal=%s' % (os.getpid(), signum))
+ for x in asynpool :
+ os.kill(x.pid, signum)
+ sys.exit(os.getpid())
+
+def main():
+ (opts, args) = getopts()
+ if chkopts(opts) is True:
+ return PROCERROR
+
+ cf = readconf(opts.config)
+ if cf is None:
+ print >>sys.stderr, 'Failed to load the config file "%s". (%s)' % (opts.config, sys.argv[0])
+ return PROCERROR
+
+ # set env=PYSILHOUETTE_CONF
+ os.environ['PYSILHOUETTE_CONF'] = opts.config
+
+ if reload_conf(cf["env.sys.log.conf.path"]):
+ logger = logging.getLogger('pysilhouette.asynperformer')
+ else:
+ print >>sys.stderr, 'Failed to load the log file. (%s)' % sys.argv[0]
+ return PROCERROR
+
+ try:
+ try:
+ signal.signal(signal.SIGTERM, sigterm_handler)
+ asynperformer = AsynPerformer(opts, cf)
+ ret = asynperformer.process() # start!!
+ return ret
+ except KeyboardInterrupt, k:
+ logger.critical('Keyboard interrupt occurred. - %s' % str(k.args))
+ print >>sys.stderr, 'Keyboard interrupt occurred. - %s' % str(k.args)
+ except Exception, e:
+ logger.critical('System error has occurred. - %s' % str(e.args))
+ print >>sys.stderr, 'System error has occurred. - %s' % str(e.args)
+ print >>sys.stderr, traceback.format_exc()
+ t_logger = logging.getLogger('pysilhouette_traceback')
+ t_logger.critical(traceback.format_exc())
+
+ finally:
+ if opts.daemon is True and os.path.isfile(opts.pidfile):
+ os.unlink(opts.pidfile)
+ logger.info('Process file has been deleted.. - pidfile=%s' % opts.pidfile)
+
+ return PROCERROR
+
+
+if __name__ == '__main__':
+ sys.exit(main())
--- /dev/null
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# This file is part of Pysilhouette.
+#
+# Copyright (c) 2009 HDE, Inc.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+# THE SOFTWARE.
+#
+
+"""
+@author: Kei Funagayama <kei@karesansui-project.info>
+"""
+
+import os
+import sys
+import time
+import signal
+import traceback
+import logging
+
+from pysilhouette import PROCERROR, PROCSUCCESS
+from pysilhouette.er import ER
+from pysilhouette.db import create_database, Database
+from pysilhouette.log import reload_conf
+from pysilhouette.prep import readconf, getopts, chkopts
+
+ENTITYS = ('asynperformer',)
+
+class AsynScheduler(ER):
+ """<comment-ja>
+ AsynScheduler Class
+ </comment-ja>
+ <comment-en>
+ TODO: English Comment
+ </comment-en>
+ """
+ def __init__(self, opts, cf):
+ ER.__init__(self, opts, cf)
+ for entity in ENTITYS:
+ self._fifo(entity)
+
+ self._setdaemon()
+ create_database(cf)
+
+ def process(self):
+ self.logger.info('asynscheduler : [started]')
+ while True:
+ try:
+ for entity in ENTITYS:
+ fp = open(self.cf["%s.mkfifo.path" % entity], 'w')
+ try:
+ fp.write(self.cf['%s.mkfifo.start.code' % entity])
+ self.logger.info('Start code was written. - file=%s : code=%s'
+ % (self.cf["%s.mkfifo.path" % entity], self.cf['%s.mkfifo.start.code' % entity]))
+ finally:
+ fp.close()
+
+ self.logger.debug('interval start, interval=%s' % (self.cf['asynscheduler.interval']))
+ time.sleep(int(self.cf['asynscheduler.interval']))
+
+ except IOError, i:
+ if i.errno == 4:
+ return PROCSUCCESS # When ending with the signal
+
+ return PROCERROR # beyond expectation
+
+def sigterm_handler(signum, frame):
+ logger = logging.getLogger('pysilhouette.asynscheduler.signal')
+ logger.info('Stop the asynschedulerd with signal- pid=%s, signal=%s' % (os.getpid(), signum))
+
+
+def main():
+ (opts, args) = getopts()
+ if chkopts(opts) is True:
+ return PROCERROR
+
+ cf = readconf(opts.config)
+ if cf is None:
+ print >>sys.stderr, 'Failed to load the config file "%s". (%s)' % (opts.config, sys.argv[0])
+ return PROCERROR
+
+ if reload_conf(cf["env.sys.log.conf.path"]):
+ logger = logging.getLogger('pysilhouette.asynscheduler')
+ else:
+ print >>sys.stderr, 'Failed to load the log file. (%s)' % sys.argv[0]
+ return PROCERROR
+
+ try:
+ try:
+ signal.signal(signal.SIGTERM, sigterm_handler)
+ asynscheduler = AsynScheduler(opts, cf)
+ ret = asynscheduler.process() # start!!
+ return ret
+ except KeyboardInterrupt, k:
+ logger.critical('Keyboard interrupt occurred. - %s' % str(k.args))
+ print >>sys.stderr, 'Keyboard interrupt occurred. - %s' % str(k.args)
+ except Exception, e:
+ logger.critical('A system error has occurred. - %s' % str(e.args))
+ print >>sys.stderr, 'A system error has occurred. - %s' % str(e.args)
+ print >>sys.stderr, traceback.format_exc()
+ t_logger = logging.getLogger('pysilhouette_traceback')
+ t_logger.critical(traceback.format_exc())
+
+ finally:
+ if opts.daemon is True and os.path.isfile(opts.pidfile):
+ os.unlink(opts.pidfile)
+ logger.info('Process file has been deleted.. - pidfile=%s' % opts.pidfile)
+
+ return PROCERROR
+
+if __name__ == '__main__':
+ sys.exit(main())