#!/usr/bin/python
# -*- coding: utf-8 -*-
from pyUnixDaemon import pyUnixDaemon
from time import sleep
import datetime
from os import getpid
import sys
import urllib2
from cookielib import CookieJar
import urllib
import time
import simplejson as json
import hashlib
import threading
import os
import getopt
import signal
import ConfigParser
class pyUnixDaemon:
""" This class implements a UNIX daemon. You can derived from it to build
your own daemon.
"""
def __init__(self, lockFilename = "/var/run/pyUnixDaemon.pid", user = (0, 0)):
""" This method initializes some internal variables and instancies an
object.
"""
self._user = user
self._lockFilename = lockFilename
def _daemonExists(self):
""" This method checks if the lock file exists.
"""
return os.path.exists(self._lockFilename)
def _lock(self):
""" This method creates the lock file and write the daemon pid.
"""
self._lockFile = open(self._lockFilename, "w")
self._lockFile.write("%d" % (os.getpid()))
self._lockFile.flush()
def _unlock(self):
""" This method closes and removes the lock file.
"""
self._lockFile.close()
os.unlink(self._lockFilename)
def _fork(self):
""" This method forks the daemon using the unix way.
"""
if (self._daemonExists()):
print "[Error] Could not be daemonized: already in memory"
sys.exit(1)
try:
pid = os.fork()
if (pid > 0):
sys.exit(0)
except OSError, e:
print "[Error] Fork #1 failed: %s (%d)" % (e.strerror, e.errno)
sys.exit(1)
os.chdir("/")
os.setsid()
os.umask(0)
try:
pid = os.fork()
if (pid > 0):
sys.exit(0)
except OSError, e:
print "[Error] Fork #2 failed: %s (%d)" % (e.strerror, e.errno)
sys.exit(1)
def _run(self):
""" This method must be derivated to do something.
"""
while(self._loop):
pass
def __signalHandler(self, signalNumber, frame):
""" This method sets an internal flag that permits to break the forever
loop.
"""
self._loop = False
def setOutput(self, output, error = None):
""" This method redirects stdout and stderr flows.
"""
self._stdout = sys.stdout
self._output = output
self._stderr = sys.stderr
if (error is not None):
self._error = error
else:
self._error = output
def setMaxThreads(self, maxThreads):
""" This method set the max number of parallel threads permitted.
"""
self._maxThreads = maxThreads
self._allocatedCPUs = 0
def launch(self):
""" This method prepares all required actions before lauching the background method '_run'.
"""
self._fork()
#os.setegid(self._user[1])
#os.seteuid(self._user[0])
self._lock()
self._loop = True
signal.signal(signal.SIGTERM, self.__signalHandler)
sys.stdout = self._output
sys.stderr = self._error
self._run()
sys.stdout = self._stdout
sys.stderr = self._stderr
self._unlock()
class mediacadEncodeDeamon(pyUnixDaemon):
def __init__(self, lockFilename, config):
pyUnixDaemon.__init__(self, lockFilename)
self._config=config
def _run(self):
now = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
print "%s - %s" % (now, getpid())+" ; Lancement du deamon"
print "%s - %s" % (now, getpid())+" ; Le nombre maximum de processus en parallele sera : "+self._config.get("launchParams", "maxParallelThreads")
print "%s - %s" % (now, getpid())+" ; Le waiting time entre deux appels vers Mediacad est : "+self._config.get("launchParams", "loopSleep")+"s"
print "%s - %s" % (now, getpid())+" ; The url of the application Mediacad est : "+self._config.get("mediacad", "url")
while(self._loop):
try:
activeThreads = threading.activeCount()
if activeThreads < int(self._maxThreads)+1:
mc = mediacadEncode(self._config)
mc.start()
except Exception, error:
# Only for unexpected errors
print "%s ; FATAL ; %s"%(now, str(error))
self._loop = False
try:
mc._Thread__stop()
except UnboundLocalError:
pass
sys.stdout.flush()
time.sleep(int(self._config.get("launchParams","loopSleep")))
print "%s - %s" % (now, getpid())+" ; Arret du deamon"
class mediacadEncode(threading.Thread):
def __init__(self, config):
threading.Thread.__init__(self)
""" Initialisation des variables internes """
# Chemins des applications d'encodage :
self.applicationsPath = {
'ffmpeg': {
'ffmpeg': config.get("applicationPath","ffmpeg"),
'ffprobe': config.get("applicationPath","ffprobe")
},
'imageMagick': {
'convert': config.get("applicationPath","convert")
},
'calibre': {
'ebookConvert': config.get("applicationPath","ebookConvert"),
'ebookMeta': config.get("applicationPath","ebookMeta")
},
'unoconv': {
'unoconv': config.get("applicationPath","unoconv")
},
'pdftk': {
'pdftk': config.get("applicationPath","pdftk")
}
}
# login (to access the webservice)
self.login = config.get("mediacad","login")
self.clearPassword = config.get("mediacad","password")
# Path des fichiers temporaires (sur le serveur de fichiers)
self.tmpPath = config.get("filesPath","tmpPath")
# Path des sources (sur le serveur de fichier)
# after l'upload, Mediacad passe les fichiers de uploadPath vers sourcePath
self.sourcePath = config.get("filesPath","sourcePath")
# Path des fichiers encoded
self.encodedPath = config.get("filesPath","encodedPath")
# url du webservice de Mediacad :
self.url = config.get("mediacad","url")
def run(self):
try:
resultat = self.traiterNextTask()
if resultat == "notask":
# Ne rien faire
pass
elif resultat == "success":
self.success()
else:
self._printerr("run() ; Echec lors de l'appel a traiterNextTask()["+str(resultat)+"]")
self.error(resultat)
except Exception, error:
# Seulement pour les erreurs imprevues
self._printerr("run() ; Erreur fatale imprevue : "+str(error))
self._Thread__stop()
def _strNow(self):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
def _printlog(self, log):
print self._strNow() + " INFO " + str(log)
return;
def _printerr(self, log):
print self._strNow() + " ERROR " + str(log)
return;
def _requestWS(self, path, params = {}):
""" Request un webservice """
#self._printlog("_requestWS() ; Operation demandee ["+path+"]")
# L'interrogation du webservice de Mediacad necessite de pouvoir
# recuperer le cookie recieved lors du 1er appel,
# et de le renvoyer lors du 2nd appel (la recuperation de la
# prochaine task)
cj = CookieJar()
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
# Get le salt :
#self._printlog("_requestWS() ; getting the salt")
try:
page = opener.open(self.url+path)
result = page.read()
# self._printlog("_requestWS() ; answer du premier appel : "+str(result))
fluxRetour = json.loads(result)
except urllib2.HTTPError, error:
return {'isSuccess': False, 'data': {}, 'errorMessage': str(error)}
try:
salt = fluxRetour["data"]["salt"]
except KeyError:
return {'isSuccess': False, 'data': {}, 'errorMessage': "Salt not found dans la answer du webservice"}
# Authentification :
password = hashlib.sha256(self.clearPassword + salt).hexdigest()
params['login'] = self.login
params['password'] = password
#self._printlog("_requestWS() ; Authentification")
data_encoded = urllib.urlencode(params)
try:
#for cookie in cj:
#self._printlog('_requestWS() ; cookie [%s --> %s]'%(cookie.name,cookie.value))
page = opener.open(self.url+path, data_encoded)
result = page.read()
reponse = json.loads(result)
#self._printlog("_requestWS() ; answer de l'authentification : "+str(reponse))
except urllib2.HTTPError, error:
return {'isSuccess': False, 'data': {}, 'errorMessage': str(error)}
#self._printlog("_requestWS() ; Exit")
return reponse
def traiterNextTask(self):
answerRequest = self._requestWS("getnexttask")
# Checking qu'il n'y ait pas eu d'erreur :
if answerRequest["isSuccess"] == False:
self._printlog("traiterNextTask() ; La answer est dans le state FALSE: " + str(answerRequest))
raise Exception("traiterNextTask() ; Erreur dans le getting de la prochaine task !")
# Stopper si il n'y aucune task en attente :
try:
if answerRequest["data"]["hasNewEncodingTask"] == False:
return "notask"
except KeyError:
raise Exception("traiterNextTask() ; inconsistency de la answer !")
# A partir d'ici, on ne peut plus se permettre de ne pas maitriser le traitement
# de la task que nous venons de recuperer :
# l'interrogation de l'operation "getnexttask" du webservice modifie
# le state de la task dans la database de Mediacad.
# Nous devons donc controler son traitement pour etre en mesure de renvoyer
# une boolean answer.
try:
# Log de la task :
self._printlog("traiterNextTask() ; A task was just found: " + str(answerRequest))
# Parsing de la tâche :
self.task = answerRequest["data"]["task"]
self.encodingTaskId = answerRequest["data"]["encodingTaskId"]
# Execute la commande
self.debut = time.time()
# Commande copy
if self.task == 'copy':
self._printlog("traiterNextTask() ; Copie")
sourceLocation = answerRequest["data"]["sourceLocation"]
destLocation = answerRequest["data"]["destLocation"]
sourceLocation = sourceLocation.replace("##tmpPath##", self.tmpPath)
sourceLocation = sourceLocation.replace("##sourcePath##", self.sourcePath)
sourceLocation = sourceLocation.replace("##encodedPath##", self.encodedPath)
destLocation = destLocation.replace("##tmpPath##", self.tmpPath)
destLocation = destLocation.replace("##sourcePath##", self.sourcePath)
destLocation = destLocation.replace("##encodedPath##", self.encodedPath)
from shutil import copyfile
self._printlog("traiterNextTask() ; Fichier source [" + sourceLocation + "]")
self._printlog("traiterNextTask() ; Fichier cible [" + destLocation+"]")
copyfile(sourceLocation, destLocation)
return 'success'
# Commande delete
elif self.task == 'delete':
self._printlog("traiterNextTask() ; Suppression")
location = answerRequest["data"]["location"]
location = location.replace("##tmpPath##", self.tmpPath)
location = location.replace("##sourcePath##", self.sourcePath)
location = location.replace("##encodedPath##", self.encodedPath)
self._printlog("traiterNextTask() ; Fichier to supprimer [" + location + "]")
os.remove(location)
return 'success'
# Commande encode
else:
self._printlog("traiterNextTask() ; Encodage")
commande = answerRequest["data"]["command"]
application = answerRequest["data"]["application"]
exe = answerRequest["data"]["exe"]
commande = commande.replace('##applicationPath##', self.applicationsPath[application][exe])
commande = commande.replace('##tmpPath##', self.tmpPath)
commande = commande.replace('##sourcePath##', self.sourcePath)
commande = commande.replace('##encodedPath##', self.encodedPath)
self._printlog("traiterNextTask() ; Commande [" + str(commande)+"]")
import commands
resultat = commands.getstatusoutput(commande) #
if resultat[0] != 0:
return "Erreur executing the command: " + resultat[1]
else:
#self._printlog("traiterNextTask() ; Retour de la sortie standard pour la task ["+self.encodingTaskId+"]: " + resultat[1])
return 'success'
except Exception, error:
return error
return "ERREUR : cette erreur n'aurait pas du remonter"
def success(self):
""" Informer Mediacad du success du traitement de la task """
self._printlog("success() ; La task [" + self.task + "] d'identifiant [" + self.encodingTaskId + "] s'est terminee avec success")
fin = time.time()
duree = fin - self.debut
#encodeDuration = datetime.timedelta(seconds=duree)
encodeDuration = str(duree)
self._printlog("success() ; Duree de l'encodage : " + encodeDuration)
# Envoi du SUCCESS
datasToTransfert = {'encodingTaskId': self.encodingTaskId, 'encodeDuration': encodeDuration}
answerRequest = self._requestWS("setencodesuccess", datasToTransfert)
if not answerRequest["isSuccess"]:
raise Exception("Echec dans l'envoi d'un SUCCESS to Mediacad ! ["+ str(answerRequest["errorMessage"]) + "]")
self._printlog("success() ; Envoi du SUCCESS to Mediacad: OK")
return
def error(self,resultatCommande = ""):
""" Informer Mediacad du failure du traitement de la task """
self._printerr("error() ; Une erreur s'est produite lors de l'execution de la task [" + self.encodingTaskId + "]")
# Envoi du ECHEC
datasToTransfert = {'encodingTaskId': self.encodingTaskId, 'errorMessage': resultatCommande}
answerRequest = self._requestWS("setencodeerror", datasToTransfert)
if not answerRequest["isSuccess"]:
raise Exception("Echec dans l'envoi d'un ECHEC to Mediacad ! ["+ str(answerRequest["errorMessage"]) + "]")
self._printlog("error() ; Envoi du ECHEC to Mediacad: OK")
return
#
# Partie principale
#
def main(argv):
# Nom du script courant :
currentScript = os.path.abspath(__file__)
if currentScript[-3:] != ".py":
print "Mauvaise extension de fichier ! (<filename>.py)"
sys.exit()
# Lire la configuration :
config = ConfigParser.ConfigParser()
config.read(currentScript[:-3]+".ini")
# pid file for the deamon :
hostname = os.getenv('HOSTNAME', 'unknown_hostname')
if hostname == 'unknown_hostname':
print "Variable linux HOSTNAME non disponible"
sys.exit()
pidFileName = currentScript[:-3]+"_"+hostname+".pid"
lofFileName = currentScript[:-3]+"_"+hostname+".log"
help = currentScript+" [--start] [--stop] [--force] [--help]"
try:
options, args = getopt.gnu_getopt(sys.argv, 'h', ["start","stop","force","help"])
except getopt.GetoptError:
print help
sys.exit(2)
for opt, arg in options:
if opt in ('-h', '--help'):
print help
elif opt == '--start':
daemon = mediacadEncodeDeamon(pidFileName,config)
daemon.setOutput(open(lofFileName, "a"))
print "Le fichier de log est : "+lofFileName
daemon.setMaxThreads(config.get("launchParams", "maxParallelThreads"))
daemon.launch()
elif opt == '--stop':
print "Graceful stop asked"
try:
pid = int(open(pidFileName, 'r').read())
except IOError:
print "No pid file available"
sys.exit()
os.kill(pid, signal.SIGTERM)
print "Graceful stop ended"
sys.exit()
elif opt == '--force':
print "Force stop asked"
try:
pidFile = open(pidFileName, 'r')
pid = int(pidFile.read())
except IOError:
print "No pid file available"
sys.exit()
os.kill(pid, signal.SIGKILL)
pidFile.close()
os.unlink(pidFileName)
print "Force stop ended"
sys.exit()
else:
assert False, "option incorrecte"
if __name__ == "__main__":
main(sys.argv[1:])
Créé avec HelpNDoc Personal Edition: Créer des documents d'aide facilement