Changeset 3571

Show
Ignore:
Timestamp:
10/26/2009 09:39:33 AM (1 month ago)
Author:
brian
Message:

SGE now takes an informed guess of which queue pending jobs will go into.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • gip/trunk/gip/lib/python/gip/batch_systems/sge.py

    r3561 r3571  
    4444        super(SgeBatchSystem, self).__init__(cp) 
    4545        self._version = None 
     46        self._qconf_cache = {} 
     47        self._nodes_cache = None 
     48        self._voqueues_cache = None 
     49        self._sge_job_cache = None 
     50        self._jobs_cache = None 
     51        self._queue_cache = None 
     52        self._pending_cache = {} 
    4653 
    4754    def getLrmsInfo(self): 
     
    5360        raise Exception("Unable to determine LRMS version info.") 
    5461 
    55     def parseNodes(cp): 
     62    def parseNodes(self): 
    5663        """ 
    5764        Parse the node information from SGE.  Using the output from qhost,  
     
    6370                (totalCPUs, freeCPUs). 
    6471        """ 
     72        if self._nodes_cache != None: 
     73            return self._nodes_cache 
    6574        xml = runCommand(sge_qhost_cmd) 
    6675        handler = HostInfoParser() 
     
    7685                pass 
    7786 
    78         xml = runCommand(sge_job_info_cmd) 
    79         handler = JobInfoParser() 
    80         parseXmlSax(xml, handler) 
    81         job_info = handler.getJobInfo() 
     87        job_info = self._getJobsInfo() 
    8288        free = total 
    8389        for job in job_info: 
     
    9197        free = max(free, 0) 
    9298        log.info("There were %i cores total, %i free." % (total, free)) 
     99        self._nodes_cache = total, free, {} 
    93100        return total, free, {} 
    94101 
     
    107114 
    108115    def getVoQueues(self): 
     116        if self._voqueues_cache != None: 
     117            return self._voqueues_cache 
    109118        voMap = self.vo_map 
    110119        try: 
     
    154163                    continue 
    155164                vo_queues.append((vo, queue)) 
     165        self._voqueues_cache = vo_queues 
    156166        return vo_queues 
    157167 
    158168    def getJobsInfo(self): 
    159         xml = runCommand(sge_job_info_cmd) 
    160         handler = JobInfoParser() 
    161         parseXmlSax(xml, handler) 
    162         job_info = handler.getJobInfo() 
     169        if self._jobs_cache != None: 
     170            return self._jobs_cache 
     171        job_info = self._getJobsInfo() 
    163172        queue_jobs = {} 
    164173 
     
    185194            info["total"] += 1 
    186195            info["vo"] = vo 
     196 
     197        pending_jobs = self._getPendingInfo(queue_jobs.keys()) 
     198        for queue, data in pending_jobs.items(): 
     199            voinfo = queue_jobs.setdefault(queue, {}) 
     200            for user, pending in data.items(): 
     201                try: 
     202                    vo = self.vo_map[user].lower() 
     203                except: 
     204                    continue 
     205                info = voinfo.setdefault(vo, {"running":0, "wait":0, "total":0}) 
     206                info['wait'] += pending 
     207 
    187208        log.debug("SGE job info: %s" % str(queue_jobs)) 
     209        self._jobs_cache = queue_jobs 
    188210        return queue_jobs 
    189211 
     
    195217        @returns: A dictionary of queue data and a dictionary of job data. 
    196218        """ 
     219        if self._queue_cache != None: 
     220            return self._queue_cache 
    197221        queue_list = {} 
    198222        xml = runCommand(sge_queue_info_cmd) 
     
    242266 
    243267            # How do you handle queues with no limit? 
    244             sqc = SGEQueueConfig(sgeCommand(sge_queue_config_cmd % name, 
    245                 self.cp)) 
     268            if name not in self._qconf_cache: 
     269                sqc = SGEQueueConfig(sgeCommand(sge_queue_config_cmd % name, 
     270                    self.cp)) 
     271                self._qconf_cache[name] = sqc 
     272            sqc = self._qconf_cache[name] 
    246273 
    247274            try: 
     
    272299            queue_list[name] = q 
    273300 
    274         waiting_jobs = 0 
    275         for job in queue_info['waiting']
    276             waiting_jobs += 1 
    277         queue_list['waiting'] = {'waiting': waiting_jobs} 
    278  
     301        pending_info = self._getPendingInfo(queue_list.keys()) 
     302        for queue, data in pending_info.items()
     303            queue_list[queue]['wait'] += sum(data.values()) 
     304 
     305        self._queue_cache = queue_list 
    279306        return queue_list #, queue_info 
    280307 
     308    def _maxQueue(self, queue_jobs): 
     309        """ 
     310        Given a dictionary of queues -> # of jobs, determine which queue has 
     311        the most active queue. 
     312        This will throw an exception if the queue_jobs parameter is empty 
     313        """ 
     314        most_active_queue = queue_jobs.keys()[0] 
     315        max_queue_size = queue_jobs[most_active_queue] 
     316        for qname, slots in queue_jobs.items(): 
     317            if slots > max_queue_size: 
     318                most_active_queue = qname 
     319                max_queue_size = slots 
     320        return most_active_queue 
     321 
     322    def _getJobsInfo(self): 
     323        if self._sge_job_cache != None: 
     324            return self._sge_job_cache 
     325        xml = runCommand(sge_job_info_cmd) 
     326        handler = JobInfoParser() 
     327        parseXmlSax(xml, handler) 
     328        self._sge_job_cache = handler.getJobInfo() 
     329        return self._sge_job_cache 
     330 
     331    def _getPendingInfo(self, queue_list): 
     332        """ 
     333        SGE doesn't let us know what queue the jobs belong to until the job 
     334        execution starts.  Hence, we look at what queue the user has the 
     335        most jobs in and arbitrarily assign the pending jobs there. 
     336 
     337        This method takes in a list of valid queue names and returns a dict; 
     338        the dictionary maps a queue_name to a dictionary mapping usernames to 
     339        number of pending jobs 
     340        """ 
     341        queue_set = sets.ImmutableSet(queue_list) 
     342        if queue_set in self._pending_cache: 
     343            return self._pending_cache[queue_set] 
     344        job_info = self._getJobsInfo() 
     345        pending_jobs = {} 
     346        running_jobs = {} 
     347        queue_jobs = {} 
     348        for job in job_info: 
     349            user = job['JB_owner'] 
     350            state = job['state'] 
     351            try: 
     352                slots = int(job['slots']) 
     353            except: 
     354                slots = 1 
     355            queue = job.get('queue_name', '') 
     356            if queue.strip() == '': 
     357                queue = 'waiting' 
     358            queue = queue.split('@')[0] 
     359            if state == "qw": 
     360                jobs = pending_jobs.setdefault(user, 0) 
     361                pending_jobs[user] = jobs + slots 
     362            else: 
     363                user_jobs = running_jobs.setdefault(user, {}) 
     364                jobs = user_jobs.setdefault(queue, 0) 
     365                user_jobs[queue] = jobs + slots 
     366                jobs = queue_jobs.setdefault(queue, 0) 
     367                queue_jobs[queue] = jobs + slots 
     368 
     369        if not queue_jobs: 
     370            if not queue_list: 
     371                log.warning("No queues configured, but pending jobs; " \ 
     372                    "will return nothing.") 
     373                self._queue_cache = {} 
     374                return self._queue_cache 
     375            most_active_queue = queue_list[0] 
     376            log.warning("No active queues but there are pending jobs; will" \ 
     377                " guess the jobs belong to an arbitrary queue, %s." % \ 
     378                most_active_queue) 
     379        else: 
     380            most_active_queue = self._maxQueue(queue_jobs) 
     381            if most_active_queue not in queue_list: 
     382                if not queue_list: 
     383                    log.warning("No queues configured, but pending jobs; " \ 
     384                        "will return nothing.") 
     385                    self._queue_cache = {} 
     386                    return self._queue_cache 
     387                tmp_queue = queue_list[0] 
     388                log.info("Most active queue %s is not already in queue list; " \ 
     389                    "this may indicate an error.  Arbitrarily picking %s as " \ 
     390                    "the most active queue." % (most_active_queue, tmp_queue)) 
     391                most_active_queue = tmp_queue 
     392        log.info("Most active queue is %s; any user with only pending jobs " \ 
     393            "will be assigned to this one." % most_active_queue) 
     394 
     395        results = {} 
     396        for user, pending in pending_jobs.items(): 
     397            if user not in running_jobs or not running_jobs[user]: 
     398                log.info("User %s will have their %i pending jobs assigned to "\ 
     399                    "queue %s because they are not running in any other " \ 
     400                    "queue." % (user, pending, most_active_queue)) 
     401                user_active_queue = most_active_queue 
     402            else: 
     403                user_active_queue = self._maxQueue(running_jobs[user]) 
     404                if len(running_jobs[user]) == 1: 
     405                    log.info("User %s will have their %i pending jobs " \ 
     406                        "assigned to queue %s because they already have %i " \ 
     407                        "jobs running there." % (user, pending, 
     408                        user_active_queue, 
     409                        running_jobs[user][user_active_queue])) 
     410                else: 
     411                    log.info("User %s will have their %i pending jobs " \ 
     412                        "assigned to queue %s because they already have %i " \ 
     413                        "jobs running there, more than any other queue." % \ 
     414                        (user, pending, user_active_queue, 
     415                        running_jobs[user][user_active_queue])) 
     416            if user_active_queue in queue_list: 
     417                tmp = results.setdefault(user_active_queue, {}) 
     418                tmp.setdefault(user, 0) 
     419                tmp[user] += pending 
     420            else: 
     421                log.warning("Most active user queue is %s, but it wasn't found"\ 
     422                    " in the queue list; this is probably an internal error." \ 
     423                    "  Assigning user %s's %i pending jobs to most active " \ 
     424                    "overall queue, %s" % (user_active_queue, user, pending, 
     425                    most_active_queue)) 
     426                tmp = results.setdefault(most_active_queue, {})  
     427                tmp.setdefault(user, 0)  
     428                tmp[user] += pending 
     429        self._pending_cache[queue_set] = results 
     430        return results 
     431