| | 308 | def _maxQueue(self, queue_jobs): |
|---|
| | 309 | """ |
|---|
| | 310 | Given a dictionary of queues -> # of jobs, determine which queue has |
|---|
| | 311 | the most active queue. |
|---|
| | 312 | This will throw an exception if the queue_jobs parameter is empty |
|---|
| | 313 | """ |
|---|
| | 314 | most_active_queue = queue_jobs.keys()[0] |
|---|
| | 315 | max_queue_size = queue_jobs[most_active_queue] |
|---|
| | 316 | for qname, slots in queue_jobs.items(): |
|---|
| | 317 | if slots > max_queue_size: |
|---|
| | 318 | most_active_queue = qname |
|---|
| | 319 | max_queue_size = slots |
|---|
| | 320 | return most_active_queue |
|---|
| | 321 | |
|---|
| | 322 | def _getJobsInfo(self): |
|---|
| | 323 | if self._sge_job_cache != None: |
|---|
| | 324 | return self._sge_job_cache |
|---|
| | 325 | xml = runCommand(sge_job_info_cmd) |
|---|
| | 326 | handler = JobInfoParser() |
|---|
| | 327 | parseXmlSax(xml, handler) |
|---|
| | 328 | self._sge_job_cache = handler.getJobInfo() |
|---|
| | 329 | return self._sge_job_cache |
|---|
| | 330 | |
|---|
| | 331 | def _getPendingInfo(self, queue_list): |
|---|
| | 332 | """ |
|---|
| | 333 | SGE doesn't let us know what queue the jobs belong to until the job |
|---|
| | 334 | execution starts. Hence, we look at what queue the user has the |
|---|
| | 335 | most jobs in and arbitrarily assign the pending jobs there. |
|---|
| | 336 | |
|---|
| | 337 | This method takes in a list of valid queue names and returns a dict; |
|---|
| | 338 | the dictionary maps a queue_name to a dictionary mapping usernames to |
|---|
| | 339 | number of pending jobs |
|---|
| | 340 | """ |
|---|
| | 341 | queue_set = sets.ImmutableSet(queue_list) |
|---|
| | 342 | if queue_set in self._pending_cache: |
|---|
| | 343 | return self._pending_cache[queue_set] |
|---|
| | 344 | job_info = self._getJobsInfo() |
|---|
| | 345 | pending_jobs = {} |
|---|
| | 346 | running_jobs = {} |
|---|
| | 347 | queue_jobs = {} |
|---|
| | 348 | for job in job_info: |
|---|
| | 349 | user = job['JB_owner'] |
|---|
| | 350 | state = job['state'] |
|---|
| | 351 | try: |
|---|
| | 352 | slots = int(job['slots']) |
|---|
| | 353 | except: |
|---|
| | 354 | slots = 1 |
|---|
| | 355 | queue = job.get('queue_name', '') |
|---|
| | 356 | if queue.strip() == '': |
|---|
| | 357 | queue = 'waiting' |
|---|
| | 358 | queue = queue.split('@')[0] |
|---|
| | 359 | if state == "qw": |
|---|
| | 360 | jobs = pending_jobs.setdefault(user, 0) |
|---|
| | 361 | pending_jobs[user] = jobs + slots |
|---|
| | 362 | else: |
|---|
| | 363 | user_jobs = running_jobs.setdefault(user, {}) |
|---|
| | 364 | jobs = user_jobs.setdefault(queue, 0) |
|---|
| | 365 | user_jobs[queue] = jobs + slots |
|---|
| | 366 | jobs = queue_jobs.setdefault(queue, 0) |
|---|
| | 367 | queue_jobs[queue] = jobs + slots |
|---|
| | 368 | |
|---|
| | 369 | if not queue_jobs: |
|---|
| | 370 | if not queue_list: |
|---|
| | 371 | log.warning("No queues configured, but pending jobs; " \ |
|---|
| | 372 | "will return nothing.") |
|---|
| | 373 | self._queue_cache = {} |
|---|
| | 374 | return self._queue_cache |
|---|
| | 375 | most_active_queue = queue_list[0] |
|---|
| | 376 | log.warning("No active queues but there are pending jobs; will" \ |
|---|
| | 377 | " guess the jobs belong to an arbitrary queue, %s." % \ |
|---|
| | 378 | most_active_queue) |
|---|
| | 379 | else: |
|---|
| | 380 | most_active_queue = self._maxQueue(queue_jobs) |
|---|
| | 381 | if most_active_queue not in queue_list: |
|---|
| | 382 | if not queue_list: |
|---|
| | 383 | log.warning("No queues configured, but pending jobs; " \ |
|---|
| | 384 | "will return nothing.") |
|---|
| | 385 | self._queue_cache = {} |
|---|
| | 386 | return self._queue_cache |
|---|
| | 387 | tmp_queue = queue_list[0] |
|---|
| | 388 | log.info("Most active queue %s is not already in queue list; " \ |
|---|
| | 389 | "this may indicate an error. Arbitrarily picking %s as " \ |
|---|
| | 390 | "the most active queue." % (most_active_queue, tmp_queue)) |
|---|
| | 391 | most_active_queue = tmp_queue |
|---|
| | 392 | log.info("Most active queue is %s; any user with only pending jobs " \ |
|---|
| | 393 | "will be assigned to this one." % most_active_queue) |
|---|
| | 394 | |
|---|
| | 395 | results = {} |
|---|
| | 396 | for user, pending in pending_jobs.items(): |
|---|
| | 397 | if user not in running_jobs or not running_jobs[user]: |
|---|
| | 398 | log.info("User %s will have their %i pending jobs assigned to "\ |
|---|
| | 399 | "queue %s because they are not running in any other " \ |
|---|
| | 400 | "queue." % (user, pending, most_active_queue)) |
|---|
| | 401 | user_active_queue = most_active_queue |
|---|
| | 402 | else: |
|---|
| | 403 | user_active_queue = self._maxQueue(running_jobs[user]) |
|---|
| | 404 | if len(running_jobs[user]) == 1: |
|---|
| | 405 | log.info("User %s will have their %i pending jobs " \ |
|---|
| | 406 | "assigned to queue %s because they already have %i " \ |
|---|
| | 407 | "jobs running there." % (user, pending, |
|---|
| | 408 | user_active_queue, |
|---|
| | 409 | running_jobs[user][user_active_queue])) |
|---|
| | 410 | else: |
|---|
| | 411 | log.info("User %s will have their %i pending jobs " \ |
|---|
| | 412 | "assigned to queue %s because they already have %i " \ |
|---|
| | 413 | "jobs running there, more than any other queue." % \ |
|---|
| | 414 | (user, pending, user_active_queue, |
|---|
| | 415 | running_jobs[user][user_active_queue])) |
|---|
| | 416 | if user_active_queue in queue_list: |
|---|
| | 417 | tmp = results.setdefault(user_active_queue, {}) |
|---|
| | 418 | tmp.setdefault(user, 0) |
|---|
| | 419 | tmp[user] += pending |
|---|
| | 420 | else: |
|---|
| | 421 | log.warning("Most active user queue is %s, but it wasn't found"\ |
|---|
| | 422 | " in the queue list; this is probably an internal error." \ |
|---|
| | 423 | " Assigning user %s's %i pending jobs to most active " \ |
|---|
| | 424 | "overall queue, %s" % (user_active_queue, user, pending, |
|---|
| | 425 | most_active_queue)) |
|---|
| | 426 | tmp = results.setdefault(most_active_queue, {}) |
|---|
| | 427 | tmp.setdefault(user, 0) |
|---|
| | 428 | tmp[user] += pending |
|---|
| | 429 | self._pending_cache[queue_set] = results |
|---|
| | 430 | return results |
|---|
| | 431 | |
|---|