| 124 | | for line in pbsCommand(batch_system_info_cmd, cp): |
|---|
| 125 | | m = version_re.search(line) |
|---|
| 126 | | if m: |
|---|
| 127 | | return m.groups()[0] |
|---|
| 128 | | raise Exception("Unable to determine LRMS version info.") |
|---|
| 129 | | |
|---|
| 130 | | def getJobsInfo(vo_map, cp): |
|---|
| 131 | | """ |
|---|
| 132 | | Return information about the jobs currently running in PBS |
|---|
| 133 | | |
|---|
| 134 | | The return value is a dictionary of dictionaries; the keys for the |
|---|
| 135 | | top-level dictionary are queue names; the values are queuedata dictionaries |
|---|
| 136 | | |
|---|
| 137 | | The queuedata dicts have key:val pairs of voname: voinfo, where voinfo is |
|---|
| 138 | | a dictionary with the following keys: |
|---|
| 139 | | - running: Number of VO running jobs in this queue. |
|---|
| 140 | | - wait: Number of VO waiting jobs in this queue. |
|---|
| 141 | | - total: Number of VO total jobs in this queue. |
|---|
| 142 | | |
|---|
| 143 | | @param vo_map: A VoMapper object which is used to map user names to VOs. |
|---|
| 144 | | @param cp: Site configuration object |
|---|
| 145 | | @return: A dictionary containing queue job information. |
|---|
| 146 | | """ |
|---|
| 147 | | queue_jobs = {} |
|---|
| 148 | | for orig_line in pbsCommand(jobs_cmd, cp): |
|---|
| | 123 | |
|---|
| | 124 | def getLrmsInfo(self): |
|---|
| | 125 | """ |
|---|
| | 126 | Return the version string from the PBS batch system. |
|---|
| | 127 | """ |
|---|
| | 128 | for line in pbsCommand(batch_system_info_cmd, self.cp): |
|---|
| | 129 | m = self.version_re.search(line) |
|---|
| | 130 | if m: |
|---|
| | 131 | self.version = m.groups()[0] |
|---|
| | 132 | return "pbs", m.groups()[0] |
|---|
| | 133 | raise Exception("Unable to determine LRMS version info.") |
|---|
| | 134 | |
|---|
| | 135 | def getJobsInfo(self): |
|---|
| | 136 | """ |
|---|
| | 137 | Return information about the jobs currently running in PBS. |
|---|
| | 138 | Refer to the BatchSystem documentation for information about return |
|---|
| | 139 | values. |
|---|
| | 140 | """ |
|---|
| | 141 | queue_jobs = {} |
|---|
| | 142 | for orig_line in pbsCommand(jobs_cmd, self.cp): |
|---|
| | 143 | try: |
|---|
| | 144 | job, _, user, _, status, queue = orig_line.split() |
|---|
| | 145 | except (KeyboardInterrupt, SystemExit): |
|---|
| | 146 | raise |
|---|
| | 147 | except: |
|---|
| | 148 | continue |
|---|
| | 149 | if job.startswith("-"): |
|---|
| | 150 | continue |
|---|
| | 151 | queue_data = queue_jobs.get(queue, {}) |
|---|
| | 152 | try: |
|---|
| | 153 | vo = self.vo_map[user].lower() |
|---|
| | 154 | except: |
|---|
| | 155 | # Most likely, this means that the user is local and not |
|---|
| | 156 | # associated with a VO, so we skip the job. |
|---|
| | 157 | continue |
|---|
| | 158 | info = queue_data.get(vo, {"running":0, "wait":0, "total":0}) |
|---|
| | 159 | if status == "R": |
|---|
| | 160 | info["running"] += 1 |
|---|
| | 161 | else: |
|---|
| | 162 | info["wait"] += 1 |
|---|
| | 163 | info["total"] += 1 |
|---|
| | 164 | queue_data[vo] = info |
|---|
| | 165 | queue_jobs[queue] = queue_data |
|---|
| | 166 | return queue_jobs |
|---|
| | 167 | |
|---|
| | 168 | def getQueueInfo(self): |
|---|
| | 169 | """ |
|---|
| | 170 | Looks up the queue information from PBS. |
|---|
| | 171 | Refer to BatchSystem documentation for details. |
|---|
| | 172 | """ |
|---|
| | 173 | queueInfo = {} |
|---|
| | 174 | queue_data = None |
|---|
| | 175 | for orig_line in pbsCommand(queue_info_cmd, self.cp): |
|---|
| | 176 | line = orig_line.strip() |
|---|
| | 177 | if line.startswith("Queue: "): |
|---|
| | 178 | if queue_data != None: |
|---|
| | 179 | if queue_data["started"] and queue_data["enabled"]: |
|---|
| | 180 | queue_data["status"] = "Production" |
|---|
| | 181 | elif queue_data["enabled"]: |
|---|
| | 182 | queue_data["status"] = "Queueing" |
|---|
| | 183 | elif queue_data["started"]: |
|---|
| | 184 | queue_data["status"] = "Draining" |
|---|
| | 185 | else: |
|---|
| | 186 | queue_data["status"] = "Closed" |
|---|
| | 187 | del queue_data["started"] |
|---|
| | 188 | del queue_data['enabled'] |
|---|
| | 189 | queue_data = {} |
|---|
| | 190 | queue_name = line[7:] |
|---|
| | 191 | queueInfo[queue_name] = queue_data |
|---|
| | 192 | continue |
|---|
| | 193 | if queue_data == None: |
|---|
| | 194 | continue |
|---|
| | 195 | if len(line) == 0: |
|---|
| | 196 | continue |
|---|
| | 197 | attr, val = line.split(" = ") |
|---|
| | 198 | if attr == "Priority": |
|---|
| | 199 | queue_data['priority'] = int(val) |
|---|
| | 200 | elif attr == "total_jobs": |
|---|
| | 201 | queue_data["total"] = int(val) |
|---|
| | 202 | elif attr == "state_count": |
|---|
| | 203 | info = val.split() |
|---|
| | 204 | for entry in info: |
|---|
| | 205 | state, count = entry.split(':') |
|---|
| | 206 | count = int(count) |
|---|
| | 207 | if state == 'Queued': |
|---|
| | 208 | queue_data['wait'] = queue_data.get('wait', 0) + count |
|---|
| | 209 | #elif state == 'Waiting': |
|---|
| | 210 | # queue_data['wait'] = queue_data.get('wait', 0) + count |
|---|
| | 211 | elif state == 'Running': |
|---|
| | 212 | queue_data['running'] = count |
|---|
| | 213 | elif attr == "resources_max.walltime": |
|---|
| | 214 | queue_data["max_wall"] = HMSToMin(val) |
|---|
| | 215 | elif attr == "enabled": |
|---|
| | 216 | queue_data["enabled"] = val == "True" |
|---|
| | 217 | elif attr == "started": |
|---|
| | 218 | queue_data["started"] = val == "True" |
|---|
| | 219 | elif attr == "max_running": |
|---|
| | 220 | queue_data["max_running"] = int(val) |
|---|
| | 221 | elif attr == "resources_max.nodect": |
|---|
| | 222 | queue_data["job_slots"] = int(val) |
|---|
| | 223 | elif attr == "max_queuable" or attr == 'max_queueable': |
|---|
| | 224 | try: |
|---|
| | 225 | queue_data["max_waiting"] = int(val) |
|---|
| | 226 | queue_data["max_queuable"] = int(val) |
|---|
| | 227 | except: |
|---|
| | 228 | log.warning("Invalid input for max_queuable: %s" % str(val)) |
|---|
| | 229 | elif attr == "acl_group_enable" and val.lower() == 'true': |
|---|
| | 230 | queue_data["groups"] = sets.Set() |
|---|
| | 231 | elif attr == "acl_groups" and 'groups' in queue_data: |
|---|
| | 232 | queue_data["groups"].update(val.split(',')) |
|---|
| | 233 | elif attr == "acl_user_enable" and val.lower() == 'true': |
|---|
| | 234 | queue_data["users"] = sets.Set() |
|---|
| | 235 | elif attr == "acl_users" and 'users' in queue_data: |
|---|
| | 236 | queue_data["users"].update(val.split(',')) |
|---|
| | 237 | if queue_data != None: |
|---|
| | 238 | if queue_data["started"] and queue_data["enabled"]: |
|---|
| | 239 | queue_data["status"] = "Production" |
|---|
| | 240 | elif queue_data["enabled"]: |
|---|
| | 241 | queue_data["status"] = "Queueing" |
|---|
| | 242 | elif queue_data["started"]: |
|---|
| | 243 | queue_data["status"] = "Draining" |
|---|
| | 244 | else: |
|---|
| | 245 | queue_data["status"] = "Closed" |
|---|
| | 246 | del queue_data["started"] |
|---|
| | 247 | del queue_data['enabled'] |
|---|
| | 248 | |
|---|
| | 249 | return queueInfo |
|---|
| | 250 | |
|---|
| | 251 | def parseNodes(self): |
|---|
| | 252 | """ |
|---|
| | 253 | Parse the node information from PBS. |
|---|
| | 254 | See BatchSystem documentation for more information. |
|---|
| | 255 | """ |
|---|
| | 256 | totalCpu = 0 |
|---|
| | 257 | freeCpu = 0 |
|---|
| | 258 | queueCpu = {} |
|---|
| | 259 | queue = None |
|---|
| | 260 | avail_cpus = None |
|---|
| | 261 | used_cpus = None |
|---|
| | 262 | if self.version.find("PBSPro") >= 0: |
|---|
| | 263 | for line in pbsCommand(pbsnodes_cmd, self.cp): |
|---|
| | 264 | if len(line.strip()) == 0: |
|---|
| | 265 | continue |
|---|
| | 266 | if not line.startswith(' ') and avail_cpus != None: |
|---|
| | 267 | if queue != None: |
|---|
| | 268 | info = queueCpu.get(queue, [0, 0]) |
|---|
| | 269 | info[0] += avail_cpus |
|---|
| | 270 | info[1] += avail_cpus - used_cpus |
|---|
| | 271 | queueCpu[queue] = info |
|---|
| | 272 | else: |
|---|
| | 273 | totalCpu += avail_cpus |
|---|
| | 274 | freeCpu += avail_cpus - used_cpus |
|---|
| | 275 | queue = None |
|---|
| | 276 | continue |
|---|
| | 277 | line = line.strip() |
|---|
| | 278 | try: |
|---|
| | 279 | attr, val = line.split(" = ") |
|---|
| | 280 | except: |
|---|
| | 281 | continue |
|---|
| | 282 | if attr == "resources_available.ncpus": |
|---|
| | 283 | avail_cpus = int(val) |
|---|
| | 284 | elif attr == "resources_assigned.ncpus": |
|---|
| | 285 | used_cpus = int(val) |
|---|
| | 286 | else: |
|---|
| | 287 | for line in pbsCommand(pbsnodes_cmd, self.cp): |
|---|
| | 288 | try: |
|---|
| | 289 | attr, val = line.split(" = ") |
|---|
| | 290 | except: |
|---|
| | 291 | continue |
|---|
| | 292 | val = val.strip() |
|---|
| | 293 | attr = attr.strip() |
|---|
| | 294 | if attr == "state": |
|---|
| | 295 | state = val |
|---|
| | 296 | if attr == "np": |
|---|
| | 297 | try: |
|---|
| | 298 | np = int(val) |
|---|
| | 299 | except: |
|---|
| | 300 | np = 1 |
|---|
| | 301 | if not (state.find("down") >= 0 or \ |
|---|
| | 302 | state.find("offline") >= 0): |
|---|
| | 303 | totalCpu += np |
|---|
| | 304 | if state.find("free") >= 0: |
|---|
| | 305 | freeCpu += np |
|---|
| | 306 | if attr == "jobs" and state == "free": |
|---|
| | 307 | freeCpu -= val.count(',') |
|---|
| | 308 | |
|---|
| | 309 | return totalCpu, freeCpu, queueCpu |
|---|
| | 310 | |
|---|
| | 311 | def getVoQueues(self): |
|---|
| | 312 | """ |
|---|
| | 313 | Determine the (vo, queue) tuples for this site. This allows for central |
|---|
| | 314 | configuration of which VOs are advertised. |
|---|
| | 315 | |
|---|
| | 316 | Refer to BatchSystem documentation for more info |
|---|
| | 317 | """ |
|---|
| | 318 | voMap = self.vo_map |
|---|
| 154 | | continue |
|---|
| 155 | | if job.startswith("-"): |
|---|
| 156 | | continue |
|---|
| 157 | | queue_data = queue_jobs.get(queue, {}) |
|---|
| 158 | | try: |
|---|
| 159 | | vo = vo_map[user].lower() |
|---|
| 160 | | except: |
|---|
| 161 | | # Most likely, this means that the user is local and not |
|---|
| 162 | | # associated with a VO, so we skip the job. |
|---|
| 163 | | continue |
|---|
| 164 | | info = queue_data.get(vo, {"running":0, "wait":0, "total":0}) |
|---|
| 165 | | if status == "R": |
|---|
| 166 | | info["running"] += 1 |
|---|
| 167 | | else: |
|---|
| 168 | | info["wait"] += 1 |
|---|
| 169 | | info["total"] += 1 |
|---|
| 170 | | queue_data[vo] = info |
|---|
| 171 | | queue_jobs[queue] = queue_data |
|---|
| 172 | | return queue_jobs |
|---|
| 173 | | |
|---|
| 174 | | def getQueueInfo(cp): |
|---|
| 175 | | """ |
|---|
| 176 | | Looks up the queue information from PBS. |
|---|
| 177 | | |
|---|
| 178 | | The returned dictionary contains the following keys: |
|---|
| 179 | | |
|---|
| 180 | | - B{status}: Production, Queueing, Draining, Closed |
|---|
| 181 | | - B{priority}: The priority of the queue. |
|---|
| 182 | | - B{max_wall}: Maximum wall time. |
|---|
| 183 | | - B{max_running}: Maximum number of running jobs. |
|---|
| 184 | | - B{running}: Number of running jobs in this queue. |
|---|
| 185 | | - B{wait}: Waiting jobs in this queue. |
|---|
| 186 | | - B{total}: Total number of jobs in this queue. |
|---|
| 187 | | |
|---|
| 188 | | @param cp: Configuration of site. |
|---|
| 189 | | @returns: A dictionary of queue data. The keys are the queue names, and |
|---|
| 190 | | the value is the queue data dictionary. |
|---|
| 191 | | """ |
|---|
| 192 | | queueInfo = {} |
|---|
| 193 | | queue_data = None |
|---|
| 194 | | for orig_line in pbsCommand(queue_info_cmd, cp): |
|---|
| 195 | | line = orig_line.strip() |
|---|
| 196 | | if line.startswith("Queue: "): |
|---|
| 197 | | if queue_data != None: |
|---|
| 198 | | if queue_data["started"] and queue_data["enabled"]: |
|---|
| 199 | | queue_data["status"] = "Production" |
|---|
| 200 | | elif queue_data["enabled"]: |
|---|
| 201 | | queue_data["status"] = "Queueing" |
|---|
| 202 | | elif queue_data["started"]: |
|---|
| 203 | | queue_data["status"] = "Draining" |
|---|
| 204 | | else: |
|---|
| 205 | | queue_data["status"] = "Closed" |
|---|
| 206 | | del queue_data["started"] |
|---|
| 207 | | del queue_data['enabled'] |
|---|
| 208 | | queue_data = {} |
|---|
| 209 | | queue_name = line[7:] |
|---|
| 210 | | queueInfo[queue_name] = queue_data |
|---|
| 211 | | continue |
|---|
| 212 | | if queue_data == None: |
|---|
| 213 | | continue |
|---|
| 214 | | if len(line) == 0: |
|---|
| 215 | | continue |
|---|
| 216 | | attr, val = line.split(" = ") |
|---|
| 217 | | if attr == "Priority": |
|---|
| 218 | | queue_data['priority'] = int(val) |
|---|
| 219 | | elif attr == "total_jobs": |
|---|
| 220 | | queue_data["total"] = int(val) |
|---|
| 221 | | elif attr == "state_count": |
|---|
| 222 | | info = val.split() |
|---|
| 223 | | for entry in info: |
|---|
| 224 | | state, count = entry.split(':') |
|---|
| 225 | | count = int(count) |
|---|
| 226 | | if state == 'Queued': |
|---|
| 227 | | queue_data['wait'] = queue_data.get('wait', 0) + count |
|---|
| 228 | | #elif state == 'Waiting': |
|---|
| 229 | | # queue_data['wait'] = queue_data.get('wait', 0) + count |
|---|
| 230 | | elif state == 'Running': |
|---|
| 231 | | queue_data['running'] = count |
|---|
| 232 | | elif attr == "resources_max.walltime": |
|---|
| 233 | | queue_data["max_wall"] = HMSToMin(val) |
|---|
| 234 | | elif attr == "enabled": |
|---|
| 235 | | queue_data["enabled"] = val == "True" |
|---|
| 236 | | elif attr == "started": |
|---|
| 237 | | queue_data["started"] = val == "True" |
|---|
| 238 | | elif attr == "max_running": |
|---|
| 239 | | queue_data["max_running"] = int(val) |
|---|
| 240 | | elif attr == "resources_max.nodect": |
|---|
| 241 | | queue_data["job_slots"] = int(val) |
|---|
| 242 | | elif attr == "max_queuable" or attr == 'max_queueable': |
|---|
| 243 | | try: |
|---|
| 244 | | queue_data["max_waiting"] = int(val) |
|---|
| 245 | | queue_data["max_queuable"] = int(val) |
|---|
| 246 | | except: |
|---|
| 247 | | log.warning("Invalid input for max_queuable: %s" % str(val)) |
|---|
| 248 | | elif attr == "acl_group_enable" and val.lower() == 'true': |
|---|
| 249 | | queue_data["groups"] = sets.Set() |
|---|
| 250 | | elif attr == "acl_groups" and 'groups' in queue_data: |
|---|
| 251 | | queue_data["groups"].update(val.split(',')) |
|---|
| 252 | | elif attr == "acl_user_enable" and val.lower() == 'true': |
|---|
| 253 | | queue_data["users"] = sets.Set() |
|---|
| 254 | | elif attr == "acl_users" and 'users' in queue_data: |
|---|
| 255 | | queue_data["users"].update(val.split(',')) |
|---|
| 256 | | if queue_data != None: |
|---|
| 257 | | if queue_data["started"] and queue_data["enabled"]: |
|---|
| 258 | | queue_data["status"] = "Production" |
|---|
| 259 | | elif queue_data["enabled"]: |
|---|
| 260 | | queue_data["status"] = "Queueing" |
|---|
| 261 | | elif queue_data["started"]: |
|---|
| 262 | | queue_data["status"] = "Draining" |
|---|
| 263 | | else: |
|---|
| 264 | | queue_data["status"] = "Closed" |
|---|
| 265 | | del queue_data["started"] |
|---|
| 266 | | del queue_data['enabled'] |
|---|
| 267 | | |
|---|
| 268 | | return queueInfo |
|---|
| 269 | | |
|---|
| 270 | | def parseNodes(cp, version): |
|---|
| 271 | | """ |
|---|
| 272 | | Parse the node information from PBS. Using the output from pbsnodes, |
|---|
| 273 | | determine: |
|---|
| 274 | | |
|---|
| 275 | | - The number of total CPUs in the system. |
|---|
| 276 | | - The number of free CPUs in the system. |
|---|
| 277 | | - A dictionary mapping PBS queue names to a tuple containing the |
|---|
| 278 | | (totalCPUs, freeCPUs). |
|---|
| 279 | | """ |
|---|
| 280 | | totalCpu = 0 |
|---|
| 281 | | freeCpu = 0 |
|---|
| 282 | | queueCpu = {} |
|---|
| 283 | | queue = None |
|---|
| 284 | | avail_cpus = None |
|---|
| 285 | | used_cpus = None |
|---|
| 286 | | if version.find("PBSPro") >= 0: |
|---|
| 287 | | for line in pbsCommand(pbsnodes_cmd, cp): |
|---|
| 288 | | if len(line.strip()) == 0: |
|---|
| 289 | | continue |
|---|
| 290 | | if not line.startswith(' ') and avail_cpus != None: |
|---|
| 291 | | if queue != None: |
|---|
| 292 | | info = queueCpu.get(queue, [0, 0]) |
|---|
| 293 | | info[0] += avail_cpus |
|---|
| 294 | | info[1] += avail_cpus - used_cpus |
|---|
| 295 | | queueCpu[queue] = info |
|---|
| 296 | | else: |
|---|
| 297 | | totalCpu += avail_cpus |
|---|
| 298 | | freeCpu += avail_cpus - used_cpus |
|---|
| 299 | | queue = None |
|---|
| 300 | | continue |
|---|
| 301 | | line = line.strip() |
|---|
| 302 | | try: |
|---|
| 303 | | attr, val = line.split(" = ") |
|---|
| | 323 | queue_exclude = [] |
|---|
| | 324 | vo_queues = [] |
|---|
| | 325 | queueInfo = self.getQueueInfo() |
|---|
| | 326 | rvf_info = parseRvf('pbs.rvf') |
|---|
| | 327 | rvf_queue_list = rvf_info.get('queue', {}).get('Values', None) |
|---|
| | 328 | if rvf_queue_list: |
|---|
| | 329 | rvf_queue_list = rvf_queue_list.split() |
|---|
| | 330 | log.info("The RVF lists the following queues: %s." % ', '.join( \ |
|---|
| | 331 | rvf_queue_list)) |
|---|
| | 332 | log.debug("All queues to consider: %s" % ", ".join(queueInfo)) |
|---|
| | 333 | for queue, qinfo in queueInfo.items(): |
|---|
| | 334 | if rvf_queue_list and queue not in rvf_queue_list: |
|---|
| | 335 | log.debug("Skipping %s because it is not in the RVF." % queue) |
|---|
| | 336 | continue |
|---|
| | 337 | if queue in queue_exclude: |
|---|
| | 338 | log.debug("Skipping %s because it is in the queue_exclude." % \ |
|---|
| | 339 | queue) |
|---|
| | 340 | continue |
|---|
| | 341 | volist = sets.Set(voList(self.cp, voMap)) |
|---|
| | 342 | try: |
|---|
| | 343 | whitelist = [i.strip() for i in self.cp.get("pbs", |
|---|
| | 344 | "%s_whitelist" % queue).split(',')] |
|---|
| 315 | | continue |
|---|
| 316 | | val = val.strip() |
|---|
| 317 | | attr = attr.strip() |
|---|
| 318 | | if attr == "state": |
|---|
| 319 | | state = val |
|---|
| 320 | | if attr == "np": |
|---|
| | 352 | blacklist = [] |
|---|
| | 353 | blacklist = sets.Set(blacklist) |
|---|
| | 354 | if 'users' in qinfo or 'groups' in qinfo: |
|---|
| | 355 | acl_vos = self.parseAclInfo(queue, qinfo) |
|---|
| | 356 | volist.intersection_update(acl_vos) |
|---|
| | 357 | # Force any VO in the whitelist to show up in the volist, even if it |
|---|
| | 358 | # isn't in the acl_users / acl_groups |
|---|
| | 359 | for vo in whitelist: |
|---|
| | 360 | if vo not in volist: |
|---|
| | 361 | volist.add(vo) |
|---|
| | 362 | # Apply white and black lists |
|---|
| | 363 | log.debug("All VOs to consider for queue %s before black/white " |
|---|
| | 364 | "list: %s" % (queue, ", ".join(volist))) |
|---|
| | 365 | for vo in volist: |
|---|
| | 366 | if (vo in blacklist or "*" in blacklist) and ((len(whitelist)\ |
|---|
| | 367 | == 0) or vo not in whitelist): |
|---|
| | 368 | if log.isEnabledFor(10): # 10=logging.DEBUG |
|---|
| | 369 | log.debug("Skipping VO %s" % vo) |
|---|
| | 370 | if whitelist and vo not in whitelist: |
|---|
| | 371 | log.debug("VO %s not in whitelist" % vo) |
|---|
| | 372 | elif vo in blacklist: |
|---|
| | 373 | log.debug("VO %s is in blacklist" % vo) |
|---|
| | 374 | else: |
|---|
| | 375 | log.debug("Blacklist contains *") |
|---|
| | 376 | continue |
|---|
| | 377 | log.debug("Adding VO %s to queue %s" % (vo, queue)) |
|---|
| | 378 | vo_queues.append((vo, queue)) |
|---|
| | 379 | return vo_queues |
|---|
| | 380 | |
|---|
| | 381 | def parseAclInfo(self, queue, qinfo): |
|---|
| | 382 | """ |
|---|
| | 383 | Take a queue information dictionary and determine which VOs are in the |
|---|
| | 384 | ACL list. The used keys are: |
|---|
| | 385 | |
|---|
| | 386 | - users: A set of all user names allowed to access this queue. |
|---|
| | 387 | - groups: A set of all group names allowed to access this queue. |
|---|
| | 388 | |
|---|
| | 389 | @param queue: Queue name (for logging purposes). |
|---|
| | 390 | @param qinfo: Queue info dictionary |
|---|
| | 391 | @returns: A set of allowed VOs |
|---|
| | 392 | """ |
|---|
| | 393 | users = qinfo.get('users', sets.Set()) |
|---|
| | 394 | if 'groups' in qinfo: |
|---|
| | 395 | all_groups = grp.getgrall() |
|---|
| | 396 | all_users = pwd.getpwall() |
|---|
| | 397 | group_dict = {} |
|---|
| | 398 | for group in all_groups: |
|---|
| | 399 | if group[0] in qinfo['groups'] or group[2] in qinfo['groups']: |
|---|
| | 400 | users.add(group[0]) |
|---|
| | 401 | group_dict[group[2]] = group[0] |
|---|
| | 402 | for user in all_users: |
|---|
| 324 | | np = 1 |
|---|
| 325 | | if not (state.find("down") >= 0 or \ |
|---|
| 326 | | state.find("offline") >= 0): |
|---|
| 327 | | totalCpu += np |
|---|
| 328 | | if state.find("free") >= 0: |
|---|
| 329 | | freeCpu += np |
|---|
| 330 | | if attr == "jobs" and state == "free": |
|---|
| 331 | | freeCpu -= val.count(',') |
|---|
| 332 | | |
|---|
| 333 | | return totalCpu, freeCpu, queueCpu |
|---|
| 334 | | |
|---|
| 335 | | def getQueueList(cp): |
|---|
| 336 | | """ |
|---|
| 337 | | Returns a list of all the queue names that are supported. |
|---|
| 338 | | |
|---|
| 339 | | @param cp: Site configuration |
|---|
| 340 | | @returns: List of strings containing the queue names. |
|---|
| 341 | | """ |
|---|
| 342 | | queues = [] |
|---|
| 343 | | try: |
|---|
| 344 | | queue_exclude = [i.strip() for i in cp.get("pbs", "queue_exclude").\ |
|---|
| 345 | | split(',')] |
|---|
| 346 | | except: |
|---|
| 347 | | queue_exclude = [] |
|---|
| 348 | | rvf_info = parseRvf('pbs.rvf') |
|---|
| 349 | | rvf_queue_list = rvf_info.get('queue', {}).get('Values', None) |
|---|
| 350 | | if rvf_queue_list: |
|---|
| 351 | | rvf_queue_list = rvf_queue_list.split() |
|---|
| 352 | | log.info("The RVF lists the following queues: %s." % ', '.join( \ |
|---|
| 353 | | rvf_queue_list)) |
|---|
| 354 | | for queue in getQueueInfo(cp): |
|---|
| 355 | | if queue not in queue_exclude: |
|---|
| 356 | | queues.append(queue) |
|---|
| 357 | | if rvf_queue_list and queue not in rvf_queue_list: |
|---|
| 358 | | continue |
|---|
| 359 | | if queue not in queue_exclude: |
|---|
| 360 | | queues.append(queue) |
|---|
| 361 | | return queues |
|---|
| 362 | | |
|---|
| 363 | | def getVoQueues(cp): |
|---|
| 364 | | """ |
|---|
| 365 | | Determine the (vo, queue) tuples for this site. This allows for central |
|---|
| 366 | | configuration of which VOs are advertised. |
|---|
| 367 | | |
|---|
| 368 | | Sites will be able to blacklist queues they don't want to advertise, |
|---|
| 369 | | whitelist certain VOs for a particular queue, and blacklist VOs from queues. |
|---|
| 370 | | |
|---|
| 371 | | @param cp: Site configuration |
|---|
| 372 | | @returns: A list of (vo, queue) tuples representing the queues each VO |
|---|
| 373 | | is allowed to run in. |
|---|
| 374 | | """ |
|---|
| 375 | | voMap = VoMapper(cp) |
|---|
| 376 | | try: |
|---|
| 377 | | queue_exclude = [i.strip() for i in cp.get("pbs", "queue_exclude").\ |
|---|
| 378 | | split(',')] |
|---|
| 379 | | except: |
|---|
| 380 | | queue_exclude = [] |
|---|
| 381 | | vo_queues = [] |
|---|
| 382 | | queueInfo = getQueueInfo(cp) |
|---|
| 383 | | rvf_info = parseRvf('pbs.rvf') |
|---|
| 384 | | rvf_queue_list = rvf_info.get('queue', {}).get('Values', None) |
|---|
| 385 | | if rvf_queue_list: |
|---|
| 386 | | rvf_queue_list = rvf_queue_list.split() |
|---|
| 387 | | log.info("The RVF lists the following queues: %s." % ', '.join( \ |
|---|
| 388 | | rvf_queue_list)) |
|---|
| 389 | | for queue, qinfo in queueInfo.items(): |
|---|
| 390 | | if rvf_queue_list and queue not in rvf_queue_list: |
|---|
| 391 | | continue |
|---|
| 392 | | if queue in queue_exclude: |
|---|
| 393 | | continue |
|---|
| 394 | | volist = sets.Set(voList(cp, voMap)) |
|---|
| 395 | | try: |
|---|
| 396 | | whitelist = [i.strip() for i in cp.get("pbs", "%s_whitelist" % \ |
|---|
| 397 | | queue).split(',')] |
|---|
| 398 | | except: |
|---|
| 399 | | whitelist = [] |
|---|
| 400 | | whitelist = sets.Set(whitelist) |
|---|
| 401 | | try: |
|---|
| 402 | | blacklist = [i.strip() for i in cp.get("pbs", "%s_blacklist" % \ |
|---|
| 403 | | queue).split(',')] |
|---|
| 404 | | except: |
|---|
| 405 | | blacklist = [] |
|---|
| 406 | | blacklist = sets.Set(blacklist) |
|---|
| 407 | | if 'users' in qinfo or 'groups' in qinfo: |
|---|
| 408 | | acl_vos = parseAclInfo(queue, qinfo, voMap) |
|---|
| 409 | | volist.intersection_update(acl_vos) |
|---|
| 410 | | # Force any VO in the whitelist to show up in the volist, even if it |
|---|
| 411 | | # isn't in the acl_users / acl_groups |
|---|
| 412 | | for vo in whitelist: |
|---|
| 413 | | if vo not in volist: |
|---|
| 414 | | volist.add(vo) |
|---|
| 415 | | # Apply white and black lists |
|---|
| 416 | | for vo in volist: |
|---|
| 417 | | if (vo in blacklist or "*" in blacklist) and ((len(whitelist) == 0)\ |
|---|
| 418 | | or vo not in whitelist): |
|---|
| 419 | | continue |
|---|
| 420 | | vo_queues.append((vo, queue)) |
|---|
| 421 | | return vo_queues |
|---|
| 422 | | |
|---|
| 423 | | def parseAclInfo(queue, qinfo, vo_mapper): |
|---|
| 424 | | """ |
|---|
| 425 | | Take a queue information dictionary and determine which VOs are in the ACL |
|---|
| 426 | | list. The used keys are: |
|---|
| 427 | | |
|---|
| 428 | | - users: A set of all user names allowed to access this queue. |
|---|
| 429 | | - groups: A set of all group names allowed to access this queue. |
|---|
| 430 | | |
|---|
| 431 | | @param queue: Queue name (for logging purposes). |
|---|
| 432 | | @param qinfo: Queue info dictionary |
|---|
| 433 | | @param vo_mapper: VO mapper object |
|---|
| 434 | | @returns: A set of allowed VOs |
|---|
| 435 | | """ |
|---|
| 436 | | users = qinfo.get('users', sets.Set()) |
|---|
| 437 | | if 'groups' in qinfo: |
|---|
| 438 | | all_groups = grp.getgrall() |
|---|
| 439 | | all_users = pwd.getpwall() |
|---|
| 440 | | group_dict = {} |
|---|
| 441 | | for group in all_groups: |
|---|
| 442 | | if group[0] in qinfo['groups'] or group[2] in qinfo['groups']: |
|---|
| 443 | | users.add(group[0]) |
|---|
| 444 | | group_dict[group[2]] = group[0] |
|---|
| 445 | | for user in all_users: |
|---|
| 446 | | try: |
|---|
| 447 | | group = group_dict[user[3]] |
|---|
| | 406 | continue |
|---|
| | 407 | if group[0] in qinfo['groups'] or user[3] in qinfo['groups']: |
|---|
| | 408 | users.add(group[0]) |
|---|
| | 409 | vos = sets.Set() |
|---|
| | 410 | for user in users: |
|---|
| | 411 | try: |
|---|
| | 412 | vos.add(self.vo_map[user]) |
|---|