# YASARA BioTools # Visit www.yasara.org for more... # Copyright by Elmar Krieger # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import os,string,shutil,fnmatch,zipfile # ====================================================================== # D I S C F U N C T I O N G R O U P # ====================================================================== # MAKE PATH ABSOLUTE # ================== def abspath(path): return(os.path.abspath(path)) # CALCULATE FILE CHECKSUM # ======================= def checksum(path): checksum=0 file=open(path) pos=1 a=0 b=0 while (1): data=file.read(1000000) datalen=len(data) i=0 while (i0 and filename[i] not in string.digits): i=i-1 while (i>=0): num=ord(filename[i]) if (num<48 or num>57): break num=num+1 if (num==58): num=48 filename=filename[:i]+chr(num)+filename[i+1:] if (num!=48): break i=i-1 return(filename) # MAKE DIRECTORY CHAIN # ==================== def makedirs(path,permissions=0755): if (not os.path.exists(path)): os.makedirs(path,permissions) # THE PERMISSIONS ARE NOT ALWAYS SET CORRECTLY, DO IT AGAIN chmod(path,permissions) # GET MD5SUM # ========== def md5sum(filename): content=open(filename).read() md5sum=md5.new(content).digest() md5str="" for ch in md5sum: hexstr=hex(ord(ch)) if (len(hexstr)==3): md5str+='0'+hexstr[-1] else: md5str+=hexstr[-2:] return(md5str) # GET THE MODIFICATION TIME OF A FILE # =================================== def modtime(filename): if (not os.path.exists(filename)): return(0) else: return(os.path.getmtime(filename)) # GET ALL MODIFICATION TIMES FOR A LIST OF FILES # ============================================== def modtimes(filelist): timelist=[] for filename in filelist: if (not os.path.exists(filename)): timelist.append(0) else: timelist.append(os.path.getmtime(filename)) return(timelist) # CHECK IF TWO FILES ARE THE SAME # =============================== def havesamecontent(filename1,filename2): if (not os.path.exists(filename1) or not os.path.exists(filename2)): return(0) content1=open(filename1,"r").read() content2=open(filename2,"r").read() return (content1==content2) # BUILD TEMPORARY FILE NAME # ========================= def tmpfilename(filename): dotpos=string.rfind(filename,".") slashpos=string.rfind(filename,os.sep) if (dotpos==-1 or dotposmodtime(dstfilename))): copy(srcfilename,dstfilename,0,mod) modfilelist.append(dstfilename) if (obsoleted): # ALSO DELETE OBSOLETE FILES dstfilelist=dirlist(os.path.join(dstdir,os.path.basename(srcpath))) for dstfilename in dstfilelist: dstbasename=os.path.basename(dstfilename) if (dstbasename not in srcbaselist): remove(dstfilename) return(modfilelist) # ZIP A FILE # ========== def zip(filename): print "Deflating",filename zipfilename=filename+".zip" zip=zipfile.ZipFile(zipfilename,"w",zipfile.ZIP_DEFLATED) zip.write(filename,os.path.basename(filename)) zip.close() remove(filename) # RETURN AN UNZIPPED FILE # ======================= def unzipped(filename): zipfilename=filename+".zip" zip=zipfile.ZipFile(zipfilename,"r",zipfile.ZIP_DEFLATED) data=zip.read(os.path.basename(filename)) zip.close() return(data) # ====================================================================== # J O B C L A S S # ====================================================================== class job_list: """ This class is used to add jobs to the MODELS@HOME cluster job queue Create a class instance passing the filename of the job list and the name of an error-handling function. - Instance.error contains an error description if something went wrong - Instance.lock() locks the job list (done automatically by first call to add) - Instance.add() adds a job to the list - Instance.submit() submits the added jobs and removes the lock - Instance.waiting() returns the number of currently waiting jobs - Instance.waitresult() waits until the given list of result files have returned from the cluster Sample job list: JOB NUMB_S_P_CLIENT IP_______CLIENT NAME______TIME OF JOB SUBMISSION___CHECK____PROGRAM__OPTIONS|DIR|COPY|TEXT|RETURN|DONEFLAG 00000000 W 0 WHATIF |/mnt/home7/cc/elmar/client1| 00000001 A 5 123.234.456.789 CMBIPC1 Tue May 21 13:46:22 1991 13fed734 YASARA -nss -ncd -con -mcr|STARTUP.FIL= 00000002 W 9 WHATMB -step2 align.txt| Example: job=job_list(os.path.join(wmb_conf["CLUSTER_DIR"],wmb_conf["CLUSTER_JOB"]),error) whatifscript=job.convtext("STARTUP.FIL","build\n inibld\n ala\n %makmol\n\n ala.pdb\n all 0\n ONE ALANINE\n\nfullst y\n") for i in range(10): # ADD JOB job.add(8,"WHATIF",'|'+os.getcwd()+"||"+whatifscript+"|ala.pdb|") # SUBMIT THEM ALL job.submit() """ # A JOB LIST FILENAME MUST ALWAYS END WITH .job # IT IS RENAMED TO *.jupy (job update Python) BEFORE BEING ACCESSED (SO THAT NO OTHER # PROGRAM CAN MODIFY THE FILE) # INITIALIZE # ========== # - listname IS THE PATH AND NAME OF cluster.job # - errorfunc IS AN ERROR HANDLING FUNCTION def __init__(self,listname,errorfunc=None): self.locked=0 self.error=None self.errorfunc=errorfunc self.listname=listname self.lockname=listname[:-4]+".jupy" self.priority=5 self.list=[] self.waitinglist=[] # LOCK THE JOB LIST # ================= # THIS FUNCTION RETURNS AS SOON AS THE JOB LIST HAS BEEN LOCKED. # Instance.add CAN THEN BE USED TO ADD JOBS. def lock(self): if (not self.locked): # REMOVE THE LOCKED LIST, MIGHT BE A LEFTOVER if (os.path.exists(self.lockname)): os.remove(self.lockname) # IS THE LIST WITH JOBS PRESENT? while (1): # WAIT UNTIL LIST FILE IS ACCESSIBLE, THEN RENAME IT TO LOCK while (1): # TRY TO RENAME try: os.rename(self.listname,self.lockname) except: # SOMEONE ELSE IS USING THE FILE time.sleep(1) continue break if (os.path.exists(self.lockname)): break print "Job list was renamed to %s but cannot be found now. Trying again." % self.lockname # LOAD LIST self.list=open(self.lockname,"r").readlines() self.locked=1 # RAISE AN ERROR # ============== # CALLS THE ERRORFUNCTION PROVIDED BY THE USER WITH THE GIVEN STRING def raiseerror(self,errormsg): errormsg=self.__class__.__name__+'.'+errormsg self.error=errormsg if (self.errorfunc!=None): apply(self.errorfunc,[errormsg]) else: print errormsg raise SystemExit # ADD A JOB TO THE LIST # ===================== # - priority RANGES FROM 0 TO 9 # - program IS THE 6 LETTER PROGRAM ID # - options ARE PROGRAM OPTIONS, DATA FILES, RESULT FILES AND A DONE FILE. def add(self,priority,program,options): if (priority==None): priority=self.priority self.waitinglist.append([priority,program,options]) # SUBMIT THE JOBS # =============== # THE JOBS ARE SUBMITTED TO THE CLUSTER, cluster.job IS UNLOCKED # IF WAITFLAG IS SET, THIS FUNCTION WILL ONLY RETURN AFTER ALL RESULTS HAVE # BEEN RECEIVED. def submit(self,waitflag=1): if (self.waitinglist!=[]): self.lock() # NUMBER OF JOB = NUMBER OF LAST JOB IN LIST+1 */ if (len(self.list)==1): jobno=0 else: jobno=int(self.list[-1][0:8])+1 # ADD THE JOBS donefilelist=[] for i in range(len(self.waitinglist)): (priority,program,options)=self.waitinglist[i] options=string.rstrip(options) start=string.find(options,'|')+1 end=string.find(options,'|',start) workdir=options[start:end] donefilepos=string.rfind(options,'|')+1 donefilename=string.strip(options[donefilepos:]) if (donefilename==""): # ADD DONE FILE donefilename=tmpfilename("donefile"+str(i)) if (waitflag): options=options+donefilename if (waitflag): donefilename=os.path.join(workdir,donefilename) donefilelist.append(donefilename) remove(donefilename) if (priority<=0): # CHOOSE DEFAULT PRIORITY priority=self.priority+priority priority=max(0,priority) priority=min(9,priority) if (jobno>99999999): jobno=0 self.list.append("%08d W %1d " % (jobno,priority) +\ " %s %s\n" % (program,options)) jobno=jobno+1 # SAVE LIST listfile=open(self.lockname,"w") listfile.writelines(self.list) listfile.close() self.unlock() self.waitinglist=[] if (waitflag): # WAIT FOR ALL JOBS starttime=time.time() for i in range(len(donefilelist)): donefilename=donefilelist[i] sys.stdout.write("\r%d of %d results arrived." % (i,len(donefilelist))) sys.stdout.flush() while (not os.path.exists(donefilename)): time.sleep(1) remove(donefilename) print "\rAll %d results arrived in %.2f minutes. " % (len(donefilelist),(time.time()-starttime)/60.0) # SUBMIT THE JOBS IN CHUNKS # ========================= # THE JOBS ARE SUBMITTED IN CHUNKS, SO THAT NEVER MORE THAN jobs ARE IN THE # QUEUE TOGETHER. THIS IS HELPFUL IF DIFFERENT USERS WITH EQUAL PRIVILEGES # (AND THUS PRIORIRY) SHARE THE CLUSTER RESOURCES. def submitchunks(self,jobs): if (len(self.waitinglist)<=jobs or jobs==-1): self.submit(1) else: # PREPARE THE JOBS joblist=[] donefilelist=[] for i in range(len(self.waitinglist)): (priority,program,options)=self.waitinglist[i] options=string.rstrip(options) start=string.find(options,'|')+1 end=string.find(options,'|',start) workdir=options[start:end] donefilepos=string.rfind(options,'|')+1 donefilename=options[donefilepos:] if (donefilename==""): # ADD DONE FILE donefilename=tmpfilename("donefile"+str(i)) options=options+donefilename donefilename=os.path.join(workdir,donefilename) donefilelist.append(donefilename) remove(donefilename) if (priority<=0): # CHOOSE DEFAULT PRIORITY priority=self.priority+priority priority=max(0,priority) priority=min(9,priority) joblist.append(" W %1d " % (priority) +\ " %s %s\n" % (program,options)) # SUBMIT THEM IN JUNKS i=0 while (i99999999): jobno=0 self.list.append("%08d" % jobno +joblist[i]) jobno=jobno+1 i=i+1 # SAVE LIST listfile=open(self.lockname,"w") listfile.writelines(self.list) listfile.close() self.unlock() sys.stdout.write("\r%d of %d jobs submitted in chunks of %d." % (i,len(donefilelist),jobs)) sys.stdout.flush() time.sleep(5) self.waitinglist=[] # WAIT FOR ALL JOBS for i in range(len(donefilelist)): donefilename=donefilelist[i] sys.stdout.write("\r%d of %d results arrived. " % (i,len(donefilelist))) sys.stdout.flush() while (not os.path.exists(donefilename)): time.sleep(1) remove(donefilename) print "\rAll %d results arrived. " % len(donefilelist) # UNLOCK THE JOBLIST # ================== # THE JOBLIST IS RENAMED BACK TO cluster.job AND BECOMES # VISIBLE FOR OTHER PROGRAMS. def unlock(self): os.rename(self.lockname,self.listname) self.locked=0 # COUNT NUMBER OF WAITING JOBS IN LIST # ==================================== def waiting(self): if (not self.locked): self.raiseerror("waiting: Waiting method can only be called after job list has been locked") return(None) wait=0 for job in self.list: if (job[9]=='W'): wait=wait+1 return(wait) # CONVERT NORMAL TEXT TO JOB LIST FORMAT (REPLACE \n WITH #) # ========================================================== def convtext(self,filename,text): # GET RID OF LINE FEEDS if (type(text)==types.ListType): text=string.join(text,"\n") if (filename=="STARTUP.FIL"): # HACK FOR WHAT IF: DISABLE DEBUGGING CRASHES ON THE CLUSTER, WHERE IT HANGS THE NODES text="setwif 1012 0\n"+text text=string.replace(text,'\n','#') # SEARCH FOR UNIQUE CHARACTERS TO QUOTE TEXT IN JOB LIST for i in range(34,126): if (string.find(text,chr(i))==-1): break else: return(None) if (string.find(text,'|')!=-1): self.raiseerror("convtext: Text to convert contains '|' - this character is reserved for use by Models@Home") return(None) return(filename+'='+chr(i)+text+chr(i)) # WAIT FOR RESULTS # ================ # resultfilelist IS A LIST OF RESULTFILES THAT MUST BE # RETURNED BY THE CLUSTER def waitresult(self,resultfilelist): results=len(resultfilelist) if (not results): return for i in range(results): resultfile=resultfilelist[i] sys.stdout.write("\r%d of %d results arrived." % (i,results)) sys.stdout.flush() while (not os.path.exists(resultfile)): time.sleep(1) print "\rAll %d results arrived. " % results