Changeset 3541

Show
Ignore:
Timestamp:
10/15/2009 09:24:55 AM (1 month ago)
Author:
brian
Message:

Migration of PBS batch system to new generic provider.

Files:

Legend:

Unmodified
Added
Removed
Modified
Copied
Moved
  • gip/trunk/gip/lib/python/batch_systems/batch_system.py

    r2841 r3541  
    11 
     2from gip_sets import Set 
    23from gip_common import VoMapper 
    34 
     
    6263    @returns: List of strings containing the queue names. 
    6364        """ 
    64         raise NotImplementedError() 
     65        vo_queues = self.getVoQueues() 
     66        queues = Set() 
     67        for vo, queue in vo_queues: 
     68            queues.add(queue) 
     69        return list(queues) 
    6570 
    6671    def getQueueInfo(self): 
  • gip/trunk/gip/lib/python/batch_systems/pbs.py

    r2860 r3541  
    1111from gip_common import HMSToMin, getLogger, VoMapper, voList, parseRvf 
    1212from gip_testing import runCommand 
     13from batch_system import BatchSystem 
    1314 
    1415log = getLogger("GIP.PBS") 
     
    117118    return pbsOutputFilter(fp) 
    118119 
    119 def getLrmsInfo(cp): 
    120     """ 
    121     Return the version string from the PBS batch system. 
    122     """ 
     120class PbsBatchSystem(BatchSystem): 
     121 
    123122    version_re = re.compile("pbs_version = (.*)\n") 
    124     for line in pbsCommand(batch_system_info_cmd, cp): 
    125         m = version_re.search(line) 
    126         if m: 
    127             return m.groups()[0] 
    128     raise Exception("Unable to determine LRMS version info.") 
    129  
    130 def getJobsInfo(vo_map, cp): 
    131     """ 
    132     Return information about the jobs currently running in PBS 
    133      
    134     The return value is a dictionary of dictionaries; the keys for the 
    135     top-level dictionary are queue names; the values are queuedata dictionaries 
    136      
    137     The queuedata dicts have key:val pairs of voname: voinfo, where voinfo is 
    138     a dictionary with the following keys: 
    139        - running: Number of VO running jobs in this queue. 
    140        - wait: Number of VO waiting jobs in this queue. 
    141        - total: Number of VO total jobs in this queue. 
    142      
    143     @param vo_map: A VoMapper object which is used to map user names to VOs. 
    144     @param cp: Site configuration object 
    145     @return: A dictionary containing queue job information. 
    146     """ 
    147     queue_jobs = {} 
    148     for orig_line in pbsCommand(jobs_cmd, cp): 
     123 
     124    def getLrmsInfo(self): 
     125        """ 
     126        Return the version string from the PBS batch system. 
     127        """ 
     128        for line in pbsCommand(batch_system_info_cmd, self.cp): 
     129            m = self.version_re.search(line) 
     130            if m: 
     131                self.version = m.groups()[0] 
     132                return "pbs", m.groups()[0] 
     133        raise Exception("Unable to determine LRMS version info.") 
     134 
     135    def getJobsInfo(self): 
     136        """ 
     137        Return information about the jobs currently running in PBS. 
     138        Refer to the BatchSystem documentation for information about return 
     139        values. 
     140        """ 
     141        queue_jobs = {} 
     142        for orig_line in pbsCommand(jobs_cmd, self.cp): 
     143            try: 
     144                job, _, user, _, status, queue = orig_line.split() 
     145            except (KeyboardInterrupt, SystemExit): 
     146                raise 
     147            except: 
     148                continue 
     149            if job.startswith("-"): 
     150                continue 
     151            queue_data = queue_jobs.get(queue, {}) 
     152            try: 
     153                vo = self.vo_map[user].lower() 
     154            except: 
     155                # Most likely, this means that the user is local and not 
     156                # associated with a VO, so we skip the job. 
     157                continue 
     158            info = queue_data.get(vo, {"running":0, "wait":0, "total":0}) 
     159            if status == "R": 
     160                info["running"] += 1 
     161            else: 
     162                info["wait"] += 1 
     163            info["total"] += 1 
     164            queue_data[vo] = info 
     165            queue_jobs[queue] = queue_data 
     166        return queue_jobs 
     167 
     168    def getQueueInfo(self): 
     169        """ 
     170        Looks up the queue information from PBS. 
     171        Refer to BatchSystem documentation for details. 
     172        """ 
     173        queueInfo = {} 
     174        queue_data = None 
     175        for orig_line in pbsCommand(queue_info_cmd, self.cp): 
     176            line = orig_line.strip() 
     177            if line.startswith("Queue: "): 
     178                if queue_data != None: 
     179                    if queue_data["started"] and queue_data["enabled"]: 
     180                        queue_data["status"] = "Production" 
     181                    elif queue_data["enabled"]: 
     182                        queue_data["status"] = "Queueing" 
     183                    elif queue_data["started"]: 
     184                        queue_data["status"] = "Draining" 
     185                    else: 
     186                        queue_data["status"] = "Closed" 
     187                    del queue_data["started"] 
     188                    del queue_data['enabled'] 
     189                queue_data = {} 
     190                queue_name = line[7:] 
     191                queueInfo[queue_name] = queue_data 
     192                continue 
     193            if queue_data == None: 
     194                continue 
     195            if len(line) == 0: 
     196                continue 
     197            attr, val = line.split(" = ") 
     198            if attr == "Priority": 
     199                queue_data['priority'] = int(val) 
     200            elif attr == "total_jobs": 
     201                queue_data["total"] = int(val) 
     202            elif attr == "state_count": 
     203                info = val.split() 
     204                for entry in info: 
     205                    state, count = entry.split(':') 
     206                    count = int(count) 
     207                    if state == 'Queued': 
     208                        queue_data['wait'] = queue_data.get('wait', 0) + count 
     209                    #elif state == 'Waiting': 
     210                    #    queue_data['wait'] = queue_data.get('wait', 0) + count 
     211                    elif state == 'Running': 
     212                        queue_data['running'] = count 
     213            elif attr == "resources_max.walltime": 
     214                queue_data["max_wall"] = HMSToMin(val) 
     215            elif attr == "enabled": 
     216                queue_data["enabled"] = val == "True" 
     217            elif attr == "started": 
     218                queue_data["started"] = val == "True" 
     219            elif attr == "max_running": 
     220                queue_data["max_running"] = int(val) 
     221            elif attr == "resources_max.nodect": 
     222                queue_data["job_slots"] = int(val) 
     223            elif attr == "max_queuable" or attr == 'max_queueable': 
     224                try: 
     225                    queue_data["max_waiting"] = int(val) 
     226                    queue_data["max_queuable"] = int(val) 
     227                except:  
     228                    log.warning("Invalid input for max_queuable: %s" % str(val)) 
     229            elif attr == "acl_group_enable" and val.lower() == 'true': 
     230                queue_data["groups"] = sets.Set() 
     231            elif attr == "acl_groups" and 'groups' in queue_data: 
     232                queue_data["groups"].update(val.split(',')) 
     233            elif attr == "acl_user_enable" and val.lower() == 'true': 
     234                queue_data["users"] = sets.Set() 
     235            elif attr == "acl_users" and 'users' in queue_data: 
     236                queue_data["users"].update(val.split(',')) 
     237        if queue_data != None: 
     238            if queue_data["started"] and queue_data["enabled"]: 
     239                queue_data["status"] = "Production" 
     240            elif queue_data["enabled"]: 
     241                queue_data["status"] = "Queueing" 
     242            elif queue_data["started"]: 
     243                queue_data["status"] = "Draining" 
     244            else: 
     245                queue_data["status"] = "Closed" 
     246            del queue_data["started"] 
     247            del queue_data['enabled'] 
     248 
     249        return queueInfo 
     250 
     251    def parseNodes(self): 
     252        """ 
     253        Parse the node information from PBS. 
     254        See BatchSystem documentation for more information. 
     255        """ 
     256        totalCpu = 0 
     257        freeCpu = 0 
     258        queueCpu = {} 
     259        queue = None 
     260        avail_cpus = None 
     261        used_cpus = None 
     262        if self.version.find("PBSPro") >= 0: 
     263            for line in pbsCommand(pbsnodes_cmd, self.cp): 
     264                if len(line.strip()) == 0: 
     265                    continue 
     266                if not line.startswith('    ') and avail_cpus != None: 
     267                    if queue != None: 
     268                        info = queueCpu.get(queue, [0, 0]) 
     269                        info[0] += avail_cpus 
     270                        info[1] += avail_cpus - used_cpus 
     271                        queueCpu[queue] = info 
     272                    else: 
     273                        totalCpu += avail_cpus 
     274                        freeCpu += avail_cpus - used_cpus 
     275                    queue = None 
     276                    continue 
     277                line = line.strip() 
     278                try: 
     279                    attr, val = line.split(" = ") 
     280                except: 
     281                    continue 
     282                if attr == "resources_available.ncpus": 
     283                    avail_cpus = int(val) 
     284                elif attr == "resources_assigned.ncpus": 
     285                    used_cpus = int(val) 
     286        else: 
     287            for line in pbsCommand(pbsnodes_cmd, self.cp): 
     288                try: 
     289                    attr, val = line.split(" = ") 
     290                except: 
     291                    continue 
     292                val = val.strip() 
     293                attr = attr.strip() 
     294                if attr == "state": 
     295                    state = val 
     296                if attr == "np": 
     297                    try: 
     298                        np = int(val) 
     299                    except: 
     300                        np = 1 
     301                    if not (state.find("down") >= 0 or \ 
     302                            state.find("offline") >= 0): 
     303                        totalCpu += np 
     304                    if state.find("free") >= 0: 
     305                        freeCpu += np 
     306                if attr == "jobs" and state == "free": 
     307                    freeCpu -= val.count(',') 
     308 
     309        return totalCpu, freeCpu, queueCpu 
     310 
     311    def getVoQueues(self): 
     312        """ 
     313        Determine the (vo, queue) tuples for this site.  This allows for central 
     314        configuration of which VOs are advertised. 
     315 
     316        Refer to BatchSystem documentation for more info 
     317        """ 
     318        voMap = self.vo_map 
    149319        try: 
    150             job, _, user, _, status, queue = orig_line.split() 
    151         except (KeyboardInterrupt, SystemExit): 
    152             raise 
     320            queue_exclude = [i.strip() for i in self.cp.get("pbs", 
     321                "queue_exclude").split(',')] 
    153322        except: 
    154             continue 
    155         if job.startswith("-"): 
    156             continue 
    157         queue_data = queue_jobs.get(queue, {}) 
    158         try: 
    159             vo = vo_map[user].lower() 
    160         except: 
    161             # Most likely, this means that the user is local and not 
    162             # associated with a VO, so we skip the job. 
    163             continue 
    164         info = queue_data.get(vo, {"running":0, "wait":0, "total":0}) 
    165         if status == "R": 
    166             info["running"] += 1 
    167         else: 
    168             info["wait"] += 1 
    169         info["total"] += 1 
    170         queue_data[vo] = info 
    171         queue_jobs[queue] = queue_data 
    172     return queue_jobs 
    173  
    174 def getQueueInfo(cp): 
    175     """ 
    176     Looks up the queue information from PBS. 
    177  
    178     The returned dictionary contains the following keys: 
    179      
    180       - B{status}: Production, Queueing, Draining, Closed 
    181       - B{priority}: The priority of the queue. 
    182       - B{max_wall}: Maximum wall time. 
    183       - B{max_running}: Maximum number of running jobs. 
    184       - B{running}: Number of running jobs in this queue. 
    185       - B{wait}: Waiting jobs in this queue. 
    186       - B{total}: Total number of jobs in this queue. 
    187  
    188     @param cp: Configuration of site. 
    189     @returns: A dictionary of queue data.  The keys are the queue names, and 
    190         the value is the queue data dictionary. 
    191     """ 
    192     queueInfo = {} 
    193     queue_data = None 
    194     for orig_line in pbsCommand(queue_info_cmd, cp): 
    195         line = orig_line.strip() 
    196         if line.startswith("Queue: "): 
    197             if queue_data != None: 
    198                 if queue_data["started"] and queue_data["enabled"]: 
    199                     queue_data["status"] = "Production" 
    200                 elif queue_data["enabled"]: 
    201                     queue_data["status"] = "Queueing" 
    202                 elif queue_data["started"]: 
    203                     queue_data["status"] = "Draining" 
    204                 else: 
    205                     queue_data["status"] = "Closed" 
    206                 del queue_data["started"] 
    207                 del queue_data['enabled'] 
    208             queue_data = {} 
    209             queue_name = line[7:] 
    210             queueInfo[queue_name] = queue_data 
    211             continue 
    212         if queue_data == None: 
    213             continue 
    214         if len(line) == 0: 
    215             continue 
    216         attr, val = line.split(" = ") 
    217         if attr == "Priority": 
    218             queue_data['priority'] = int(val) 
    219         elif attr == "total_jobs": 
    220             queue_data["total"] = int(val) 
    221         elif attr == "state_count": 
    222             info = val.split() 
    223             for entry in info: 
    224                 state, count = entry.split(':') 
    225                 count = int(count) 
    226                 if state == 'Queued': 
    227                     queue_data['wait'] = queue_data.get('wait', 0) + count 
    228                 #elif state == 'Waiting': 
    229                 #    queue_data['wait'] = queue_data.get('wait', 0) + count 
    230                 elif state == 'Running': 
    231                     queue_data['running'] = count 
    232         elif attr == "resources_max.walltime": 
    233             queue_data["max_wall"] = HMSToMin(val) 
    234         elif attr == "enabled": 
    235             queue_data["enabled"] = val == "True" 
    236         elif attr == "started": 
    237             queue_data["started"] = val == "True" 
    238         elif attr == "max_running": 
    239             queue_data["max_running"] = int(val) 
    240         elif attr == "resources_max.nodect": 
    241             queue_data["job_slots"] = int(val) 
    242         elif attr == "max_queuable" or attr == 'max_queueable': 
    243             try: 
    244                 queue_data["max_waiting"] = int(val) 
    245                 queue_data["max_queuable"] = int(val) 
    246             except:  
    247                 log.warning("Invalid input for max_queuable: %s" % str(val)) 
    248         elif attr == "acl_group_enable" and val.lower() == 'true': 
    249             queue_data["groups"] = sets.Set() 
    250         elif attr == "acl_groups" and 'groups' in queue_data: 
    251             queue_data["groups"].update(val.split(',')) 
    252         elif attr == "acl_user_enable" and val.lower() == 'true': 
    253             queue_data["users"] = sets.Set() 
    254         elif attr == "acl_users" and 'users' in queue_data: 
    255             queue_data["users"].update(val.split(',')) 
    256     if queue_data != None: 
    257         if queue_data["started"] and queue_data["enabled"]: 
    258             queue_data["status"] = "Production" 
    259         elif queue_data["enabled"]: 
    260             queue_data["status"] = "Queueing" 
    261         elif queue_data["started"]: 
    262             queue_data["status"] = "Draining" 
    263         else: 
    264             queue_data["status"] = "Closed" 
    265         del queue_data["started"] 
    266         del queue_data['enabled'] 
    267  
    268     return queueInfo 
    269  
    270 def parseNodes(cp, version): 
    271     """ 
    272     Parse the node information from PBS.  Using the output from pbsnodes,  
    273     determine: 
    274      
    275         - The number of total CPUs in the system. 
    276         - The number of free CPUs in the system. 
    277         - A dictionary mapping PBS queue names to a tuple containing the 
    278             (totalCPUs, freeCPUs). 
    279     """ 
    280     totalCpu = 0 
    281     freeCpu = 0 
    282     queueCpu = {} 
    283     queue = None 
    284     avail_cpus = None 
    285     used_cpus = None 
    286     if version.find("PBSPro") >= 0: 
    287         for line in pbsCommand(pbsnodes_cmd, cp): 
    288             if len(line.strip()) == 0: 
    289                 continue 
    290             if not line.startswith('    ') and avail_cpus != None: 
    291                 if queue != None: 
    292                     info = queueCpu.get(queue, [0, 0]) 
    293                     info[0] += avail_cpus 
    294                     info[1] += avail_cpus - used_cpus 
    295                     queueCpu[queue] = info 
    296                 else: 
    297                     totalCpu += avail_cpus 
    298                     freeCpu += avail_cpus - used_cpus 
    299                 queue = None 
    300                 continue 
    301             line = line.strip() 
    302             try: 
    303                 attr, val = line.split(" = ") 
     323            queue_exclude = [] 
     324        vo_queues = [] 
     325        queueInfo = self.getQueueInfo() 
     326        rvf_info = parseRvf('pbs.rvf') 
     327        rvf_queue_list = rvf_info.get('queue', {}).get('Values', None) 
     328        if rvf_queue_list: 
     329            rvf_queue_list = rvf_queue_list.split() 
     330            log.info("The RVF lists the following queues: %s." % ', '.join( \ 
     331                rvf_queue_list)) 
     332        log.debug("All queues to consider: %s" % ", ".join(queueInfo)) 
     333        for queue, qinfo in queueInfo.items(): 
     334            if rvf_queue_list and queue not in rvf_queue_list: 
     335                log.debug("Skipping %s because it is not in the RVF." % queue) 
     336                continue 
     337            if queue in queue_exclude: 
     338                log.debug("Skipping %s because it is in the queue_exclude." % \ 
     339                    queue) 
     340                continue 
     341            volist = sets.Set(voList(self.cp, voMap)) 
     342            try: 
     343                whitelist = [i.strip() for i in self.cp.get("pbs", 
     344                "%s_whitelist" % queue).split(',')] 
    304345            except: 
    305                 continue 
    306             if attr == "resources_available.ncpus": 
    307                 avail_cpus = int(val) 
    308             elif attr == "resources_assigned.ncpus": 
    309                 used_cpus = int(val) 
    310     else: 
    311         for line in pbsCommand(pbsnodes_cmd, cp): 
    312             try: 
    313                 attr, val = line.split(" = ") 
     346                whitelist = [] 
     347            whitelist = sets.Set(whitelist) 
     348            try: 
     349                blacklist = [i.strip() for i in self.cp.get("pbs", 
     350                    "%s_blacklist" % queue).split(',')] 
    314351            except: 
    315                 continue 
    316             val = val.strip() 
    317             attr = attr.strip() 
    318             if attr == "state": 
    319                 state = val 
    320             if attr == "np": 
     352                blacklist = [] 
     353            blacklist = sets.Set(blacklist) 
     354            if 'users' in qinfo or 'groups' in qinfo: 
     355                acl_vos = self.parseAclInfo(queue, qinfo) 
     356                volist.intersection_update(acl_vos) 
     357            # Force any VO in the whitelist to show up in the volist, even if it 
     358            # isn't in the acl_users / acl_groups 
     359            for vo in whitelist: 
     360                if vo not in volist: 
     361                    volist.add(vo) 
     362            # Apply white and black lists 
     363            log.debug("All VOs to consider for queue %s before black/white " 
     364                "list: %s" % (queue, ", ".join(volist))) 
     365            for vo in volist: 
     366                if (vo in blacklist or "*" in blacklist) and ((len(whitelist)\ 
     367                        == 0) or vo not in whitelist): 
     368                    if log.isEnabledFor(10): # 10=logging.DEBUG 
     369                        log.debug("Skipping VO %s" % vo) 
     370                        if whitelist and vo not in whitelist: 
     371                            log.debug("VO %s not in whitelist" % vo) 
     372                        elif vo in blacklist: 
     373                            log.debug("VO %s is in blacklist" % vo) 
     374                        else: 
     375                            log.debug("Blacklist contains *") 
     376                    continue 
     377                log.debug("Adding VO %s to queue %s" % (vo, queue)) 
     378                vo_queues.append((vo, queue)) 
     379        return vo_queues 
     380 
     381    def parseAclInfo(self, queue, qinfo): 
     382        """ 
     383        Take a queue information dictionary and determine which VOs are in the 
     384        ACL list.  The used keys are: 
     385 
     386           - users: A set of all user names allowed to access this queue. 
     387           - groups: A set of all group names allowed to access this queue. 
     388 
     389        @param queue: Queue name (for logging purposes). 
     390        @param qinfo: Queue info dictionary 
     391        @returns: A set of allowed VOs 
     392        """ 
     393        users = qinfo.get('users', sets.Set()) 
     394        if 'groups' in qinfo: 
     395            all_groups = grp.getgrall() 
     396            all_users = pwd.getpwall() 
     397            group_dict = {} 
     398            for group in all_groups: 
     399                if group[0] in qinfo['groups'] or group[2] in qinfo['groups']: 
     400                    users.add(group[0]) 
     401                group_dict[group[2]] = group[0] 
     402            for user in all_users: 
    321403                try: 
    322                     np = int(val) 
     404                    group = group_dict[user[3]] 
    323405                except: 
    324                     np = 1 
    325                 if not (state.find("down") >= 0 or \ 
    326                         state.find("offline") >= 0): 
    327                     totalCpu += np 
    328                 if state.find("free") >= 0: 
    329                     freeCpu += np 
    330             if attr == "jobs" and state == "free": 
    331                 freeCpu -= val.count(',') 
    332  
    333     return totalCpu, freeCpu, queueCpu 
    334  
    335 def getQueueList(cp): 
    336     """ 
    337     Returns a list of all the queue names that are supported. 
    338  
    339     @param cp: Site configuration 
    340     @returns: List of strings containing the queue names. 
    341     """ 
    342     queues = [] 
    343     try:             
    344         queue_exclude = [i.strip() for i in cp.get("pbs", "queue_exclude").\ 
    345             split(',')] 
    346     except:          
    347         queue_exclude = [] 
    348     rvf_info = parseRvf('pbs.rvf') 
    349     rvf_queue_list = rvf_info.get('queue', {}).get('Values', None) 
    350     if rvf_queue_list:  
    351         rvf_queue_list = rvf_queue_list.split() 
    352         log.info("The RVF lists the following queues: %s." % ', '.join( \ 
    353             rvf_queue_list)) 
    354     for queue in getQueueInfo(cp): 
    355         if queue not in queue_exclude: 
    356             queues.append(queue) 
    357         if rvf_queue_list and queue not in rvf_queue_list: 
    358             continue 
    359         if queue not in queue_exclude: 
    360             queues.append(queue) 
    361     return queues 
    362  
    363 def getVoQueues(cp): 
    364     """ 
    365     Determine the (vo, queue) tuples for this site.  This allows for central 
    366     configuration of which VOs are advertised. 
    367  
    368     Sites will be able to blacklist queues they don't want to advertise, 
    369     whitelist certain VOs for a particular queue, and blacklist VOs from queues. 
    370  
    371     @param cp: Site configuration 
    372     @returns: A list of (vo, queue) tuples representing the queues each VO 
    373         is allowed to run in. 
    374     """ 
    375     voMap = VoMapper(cp) 
    376     try: 
    377         queue_exclude = [i.strip() for i in cp.get("pbs", "queue_exclude").\ 
    378             split(',')] 
    379     except: 
    380         queue_exclude = [] 
    381     vo_queues = [] 
    382     queueInfo = getQueueInfo(cp) 
    383     rvf_info = parseRvf('pbs.rvf') 
    384     rvf_queue_list = rvf_info.get('queue', {}).get('Values', None) 
    385     if rvf_queue_list: 
    386         rvf_queue_list = rvf_queue_list.split() 
    387         log.info("The RVF lists the following queues: %s." % ', '.join( \ 
    388             rvf_queue_list)) 
    389     for queue, qinfo in queueInfo.items(): 
    390         if rvf_queue_list and queue not in rvf_queue_list: 
    391             continue 
    392         if queue in queue_exclude: 
    393             continue 
    394         volist = sets.Set(voList(cp, voMap)) 
    395         try: 
    396             whitelist = [i.strip() for i in cp.get("pbs", "%s_whitelist" % \ 
    397                 queue).split(',')] 
    398         except: 
    399             whitelist = [] 
    400         whitelist = sets.Set(whitelist) 
    401         try: 
    402             blacklist = [i.strip() for i in cp.get("pbs", "%s_blacklist" % \ 
    403                 queue).split(',')] 
    404         except: 
    405             blacklist = [] 
    406         blacklist = sets.Set(blacklist) 
    407         if 'users' in qinfo or 'groups' in qinfo: 
    408             acl_vos = parseAclInfo(queue, qinfo, voMap) 
    409             volist.intersection_update(acl_vos) 
    410         # Force any VO in the whitelist to show up in the volist, even if it 
    411         # isn't in the acl_users / acl_groups 
    412         for vo in whitelist: 
    413             if vo not in volist: 
    414                 volist.add(vo) 
    415         # Apply white and black lists 
    416         for vo in volist: 
    417             if (vo in blacklist or "*" in blacklist) and ((len(whitelist) == 0)\ 
    418                     or vo not in whitelist): 
    419                 continue 
    420             vo_queues.append((vo, queue)) 
    421     return vo_queues 
    422  
    423 def parseAclInfo(queue, qinfo, vo_mapper): 
    424     """ 
    425     Take a queue information dictionary and determine which VOs are in the ACL 
    426     list.  The used keys are: 
    427  
    428        - users: A set of all user names allowed to access this queue. 
    429        - groups: A set of all group names allowed to access this queue. 
    430  
    431     @param queue: Queue name (for logging purposes). 
    432     @param qinfo: Queue info dictionary 
    433     @param vo_mapper: VO mapper object 
    434     @returns: A set of allowed VOs 
    435     """ 
    436     users = qinfo.get('users', sets.Set()) 
    437     if 'groups' in qinfo: 
    438         all_groups = grp.getgrall() 
    439         all_users = pwd.getpwall() 
    440         group_dict = {} 
    441         for group in all_groups: 
    442             if group[0] in qinfo['groups'] or group[2] in qinfo['groups']: 
    443                 users.add(group[0]) 
    444             group_dict[group[2]] = group[0] 
    445         for user in all_users: 
    446             try: 
    447                 group = group_dict[user[3]] 
     406                    continue 
     407                if group[0] in qinfo['groups'] or user[3] in qinfo['groups']: 
     408                    users.add(group[0]) 
     409        vos = sets.Set() 
     410        for user in users: 
     411            try: 
     412                vos.add(self.vo_map[user]) 
    448413            except: 
    449                 continue 
    450             if group[0] in qinfo['groups'] or user[3] in qinfo['groups']: 
    451                 users.add(group[0]) 
    452     vos = sets.Set() 
    453     for user in users: 
    454         try: 
    455             vos.add(vo_mapper[user]) 
    456         except: 
    457             pass 
    458     log.info("The acl info for queue %s (users %s, groups %s) mapped to %s." % \ 
    459         (queue, ', '.join(qinfo.get('users', [])), 
    460         ', '.join(qinfo.get('groups', [])), ', '.join(vos))) 
    461     return vos 
    462  
     414                pass 
     415        log.info("The acl info for queue %s (users %s, groups %s) mapped to" \ 
     416            "%s." % (queue, ', '.join(qinfo.get('users', [])), 
     417            ', '.join(qinfo.get('groups', [])), ', '.join(vos))) 
     418        return vos 
     419 
  • gip/trunk/gip/lib/python/gip/providers/generic_batch_system.py

    r2843 r3541  
    1212from gip_sections import ce 
    1313from gip_storage import getDefaultSE 
    14 from gip.batch_systems.forwarding import Forwarding 
     14 
     15from batch_systems.pbs import PbsBatchSystem 
     16from batch_systems.forwarding import Forwarding 
    1517 
    1618log = getLogger("GIP.Batch") 
     
    8183            info['free_slots'] = min(info['free_slots'], info['max_total']) 
    8284        info['max_slots'] = 1 
     85 
     86        # Enforce invariants: 
     87        # max_total <= max_running 
     88        # free_slots <= max_running 
     89        info['max_total'] = min(info['max_total'], info['max_running']) 
     90        info['free_slots'] = min(info['free_slots'], info['max_running']) 
     91 
    8392        info['assigned'] = info['job_slots'] 
     93 
     94        # Enforce invariants: 
     95        # assigned <= max_running 
     96        info['assigned'] = min(info['assigned'], info['max_running']) 
     97 
    8498        info['lrmsType'] = system_name 
    8599        info['preemption'] = cp_get(cp, system_name, 'preemption', '0') 
     
    148162        if impl == 'forwarding': 
    149163            batch = Forwarding(cp) 
     164        if impl == 'pbs': 
     165            batch = PbsBatchSystem(cp) 
    150166        else: 
    151167            log.error("Unknown job manager: %s" % impl) 
  • gip/trunk/gip/lib/python/gip_sections.py

    r3031 r3541  
    1010sge = 'sge' 
    1111lsf = 'lsf' 
     12forwarding = 'forwarding' 
    1213 
  • gip/trunk/gip/providers/batch_system.py

    r2121 r3541  
    1010from gip.providers.sge import main as sge_main 
    1111from gip.providers.lsf import main as lsf_main 
     12from gip.providers.generic_batch_system import main as generic_main 
    1213 
    1314log = getLogger("GIP.BatchSystem") 
     
    2122       log.error("Job manager not specified!") 
    2223       sys.exit(2) 
    23     if job_manager == 'pbs': 
    24         pbs_main() 
    25     elif job_manager == 'condor': 
     24    if job_manager == 'condor': 
    2625        condor_main() 
    2726    elif job_manager == 'sge': 
     
    3029        lsf_main() 
    3130    else: 
    32         log.error("Unknown job manager: %s." % job_manager) 
    33         sys.exit(1) 
     31        generic_main() 
    3432 
    3533if __name__ == '__main__': 
  • gip/trunk/test/pbs_test.py

    r3468 r3541  
    99from gip_sets import Set 
    1010from gip_common import config, cp_get 
    11 from pbs_common import getVoQueues, getQueueList 
     11#from pbs_common import getVoQueues, getQueueList 
     12from batch_systems.pbs import PbsBatchSystem 
    1213from gip_ldap import read_ldap 
    1314from gip_testing import runTest, streamHandler 
     
    5253            del os.environ['GLOBUS_LOCATION'] 
    5354        try: 
    54             vo_queues = Set(getVoQueues(cp)) 
     55            pbs = PbsBatchSystem(cp) 
     56            vo_queues = Set(pbs.getVoQueues()) 
    5557        finally: 
    5658            if old_globus_loc != None: 
     
    184186            os.environ['GLOBUS_LOCATION'] = 'test_configs/globus' 
    185187            cp = config('test_configs/pbs_rvf.conf') 
    186             queue_set = Set(getQueueList(cp)) 
    187             vo_queues = getVoQueues(cp) 
     188            pbs = PbsBatchSystem(cp) 
     189            queue_set = Set(pbs.getQueueList()) 
     190            vo_queues = pbs.getVoQueues() 
    188191        finally: 
    189192            if old_globus_loc != None: 
  • gip/trunk/test/test_configs/gpn-husker.conf

    r2688 r3541  
     1 
     2# gpn-husker test config 
     3# Important things we are testing here: 
     4# 1) setting cluster changes the CE hosting cluster in the batch provider 
     5# 2) setting cluster changes the associated cluster in the cluster provider 
     6# 3) setting other_ces changes the foreign keys in the cluster provider 
     7 
    18[pbs] 
    29pbs_path = /usr/pbs/bin 
     
    2229 
    2330[gip] 
    24 osg_attributes=test_configs/red-osg-attributes.conf 
    25 gip_attributes=test_configs/red-gip-attributes.conf 
    26 osg_config=test_configs/red.config.ini 
     31osg_config=test_configs/gpn-husker.config.ini 
    2732osg_version_script = test_configs/osg-version 
    2833check_osg = False 
  • gip/trunk/test/test_configs/gpn-husker.config.ini

    r2277 r3541  
    66osg_location = /opt/osg/osg-100  
    77admin_email = cms@listserve.unl.edu  
    8 localhost = red.unl.edu 
     8localhost = gpn-husker.unl.edu 
    99 
    1010[Squid] 
     
    1717[PBS] 
    1818pbs_location = /opt-head/PBS 
    19 job_contact = red.unl.edu/jobmanager-pbs 
    20 util_contact = red.unl.edu/jobmanager 
     19job_contact = gpn-husker.unl.edu/jobmanager-pbs 
     20util_contact = gpn-husker.unl.edu/jobmanager 
    2121enabled = True 
    2222wsgram = True 
    2323home = /opt-head/PBS 
    2424 
     25[Subcluster SUN Nodes] 
     26name = SUN Nodes 
     27cores_per_node = 8 
     28ram_mb = 16417 
     29cpu_model = Quad-Core AMD Opteron(tm) Processor 2354 
     30cpu_speed_mhz = 2211 
     31inbound_network = FALSE 
     32cpus_per_node = 2 
     33node_count = 79 
     34outbound_network = TRUE 
     35cpu_vendor = AMD 
     36cpu_platform = x86_64 
     37 
    2538[GIP] 
    26 sc_outbound_1 = TRUE 
    27 sc_outbound_2 = TRUE 
    28 se_access_version = 2 
    29 special_vo_dir = %(unavailable)s 
    30 sc_name_1 = red.unl.edu 
    31 sc_name_2 = Dell Nodes 
    32 vo_dir = VONAME 
    33 se_control_version = 1 
    34 se_name = T2_Nebraska_Storage 
    35 se_host = srm.unl.edu 
    36 sc_numlcpus_1 = 4 
    37 gsiftp_path =  
    38 simplified_srm = True 
    39 sc_numlcpus_2 = 4 
    40 sc_nodes_2 = 53 
    41 sc_nodes_1 = 60 
    42 sc_numpcpus_2 = 2 
    43 gsiftp_host = gsiftp://srm.unl.edu:2811  
    44 sc_clock_2 = 2400 
    45 se_access_endpoints = gsiftp://srm.unl.edu:2811 
    46 srm_version = 2 
    47 sc_clock_1 = 2200 
    48 advertise_gsiftp = True 
    49 sc_numpcpus_1 = 2 
    50 sc_ramsize_1 = 4000 
    51 sc_vendor_2 = AMD 
    52 sc_model_2 = Dual-Core AMD Opteron(tm) Processor 2216 
    53 sc_model_1 = Opteron 275 
    54 sc_number = 2 
    55 sc_vendor_1 = AMD 
    56 srm_implementation = dcache 
    57 advertise_gums = True 
    58 se_root_path = /pnfs/unl.edu/data4 
    59 sc_inbound_1 = FALSE 
    60 sc_inbound_2 = FALSE 
    61 batch = pbs 
    62 srm = True 
    63 sc_ramsize_2 = 4110 
    64 dynamic_dcache = True 
    65 se_access_number = 1 
     39advertise_gums = False 
     40advertise_gsiftp = False 
     41cluster_name = red.unl.edu 
     42other_ces = red.unl.edu 
     43 
     44[Subcluster Dell Nodes] 
     45name = Dell Nodes 
     46cores_per_node = 4 
     47ram_mb = 8000 
     48cpu_model = Dual-Core AMD Opteron(tm) Processor 2216 
     49cpu_speed_mhz = 2400 
     50inbound_network = FALSE 
     51cpus_per_node = 2 
     52node_count = 53 
     53outbound_network = TRUE 
     54cpu_vendor = AMD 
     55cpu_platform = x86_64 
     56 
     57[SE Hadoop] 
     58provider_implementation = bestman 
     59srm_endpoint = httpg://dcache07.unl.edu:8443/srm/v2/server 
     60name = T2_Nebraska_Hadoop 
     61implementation = bestman 
     62enabled = True 
     63use_df = True 
     64version = 2.2.1.2.e1 
     65default_path = /mnt/hadoop/user/VONAME 
     66mount_point = /,/ 
     67 
     68[SE Hadoop2] 
     69provider_implementation = bestman 
     70srm_endpoint = httpg://srm.unl.edu:8446/srm/v2/server 
     71name = T2_Nebraska_Hadoop2 
     72implementation = bestman 
     73enabled = True 
     74use_df = True 
     75version = 2.2.1.2.e1 
     76default_path = /mnt/hadoop/user/VONAME 
     77mount_point = /,/ 
    6678 
    6779[Managed Fork] 
     
    8496group = OSG 
    8597contact = RCF 
    86 host_name = red.unl.edu 
     98host_name = gpn-husker.unl.edu 
    8799site_policy = http://t2.unl.edu/site_policy 
    88100latitude = 40.82 
     
    106118osg = /opt-head/osg/osg-100 
    107119 
    108 [Misc Services] 
    109 use_cert_updater = True 
    110 glexec_location = /usr/sbin/glexec 
    111 use_syslog_ng = False 
    112  
    113 [RSV] 
    114 ce_hosts = red.unl.edu  
    115 rsv_cert_file = /tmp/x509up_u1343 
    116 gridftp_hosts = %(default)s 
    117 srm_dir = %(unavailable)s 
    118 srm_webservice_path = %(unavailable)s 
    119 srm_hosts = %(default)s 
    120 rsv_key_file = %(default)s 
    121 enable_ce_probes = %(disable)s 
    122 enable_srm_probes = %(disable)s 
    123 setup_for_apache = %(disable)s 
    124 rsv_user = %(default)s 
    125 setup_rsv_nagios = %(disable)s 
    126 rsv_nagios_conf_file = %(unavailable)s 
    127 rsv_proxy_out_file = %(default)s 
    128 enable_gridftp_probes = %(disable)s 
    129 use_service_cert = %(disable)s 
    130 proxy_file = %(unavailable)s 
    131 enable_gums_probes = %(disable)s 
    132 gridftp_dir = %(default)s 
    133 gums_hosts = %(default)s 
    134 enabled = %(disable)s 
    135 enable_gratia = %(disable)s 
    136