| 1 |
|
|---|
| 2 |
|
|---|
| 3 |
import os |
|---|
| 4 |
import sys |
|---|
| 5 |
import unittest |
|---|
| 6 |
import tempfile |
|---|
| 7 |
import ConfigParser |
|---|
| 8 |
|
|---|
| 9 |
sys.path.append(os.path.expandvars("$GIP_LOCATION/lib/python")) |
|---|
| 10 |
from gip_sets import Set |
|---|
| 11 |
import gip_sets as sets |
|---|
| 12 |
from gip_common import config, cp_getBoolean, voList |
|---|
| 13 |
from gip_cluster import getOSGVersion, getApplications |
|---|
| 14 |
from gip_testing import runTest, streamHandler |
|---|
| 15 |
import gip_testing |
|---|
| 16 |
import gip_osg |
|---|
| 17 |
|
|---|
| 18 |
fermigrid_vos = sets.Set(['osg', 'cdms', 'lqcd', 'auger', 'i2u2', 'cdf', 'des', |
|---|
| 19 |
'dzero', 'nanohub', 'grase', 'cms', 'fermilab', 'astro', 'accelerator', |
|---|
| 20 |
'hypercp', 'ktev', 'miniboone', 'minos', 'nova', 'numi', 'mipp', 'patriot', |
|---|
| 21 |
'sdss', 'theory', 'fermilab-test', 'accelerator', 'cdms', 'LIGO', 'glow', |
|---|
| 22 |
'dosar', 'star', 'geant4', 'mariachi', 'atlas', 'nwicg', 'ops', 'gugrid', |
|---|
| 23 |
'gpn', 'compbiogrid', 'engage', 'pragma', 'nysgrid', 'sbgrid', 'cigi', |
|---|
| 24 |
'mis', 'fmri', 'gridex', 'vo1']) |
|---|
| 25 |
|
|---|
| 26 |
class TestGipCommon(unittest.TestCase): |
|---|
| 27 |
|
|---|
| 28 |
def test_config(self): |
|---|
| 29 |
""" |
|---|
| 30 |
Make sure that the ConfigParser object can load without errors |
|---|
| 31 |
""" |
|---|
| 32 |
cp = config() |
|---|
| 33 |
|
|---|
| 34 |
def test_gip_conf(self): |
|---|
| 35 |
""" |
|---|
| 36 |
Make sure that the $GIP_LOCATION/etc/gip.conf file is read. |
|---|
| 37 |
""" |
|---|
| 38 |
old_gip_location = os.environ['GIP_LOCATION'] |
|---|
| 39 |
tmpdir = tempfile.mkdtemp() |
|---|
| 40 |
try: |
|---|
| 41 |
os.environ['GIP_LOCATION'] = tmpdir |
|---|
| 42 |
etc_dir = os.path.join(tmpdir, 'etc') |
|---|
| 43 |
try: |
|---|
| 44 |
os.mkdir(etc_dir) |
|---|
| 45 |
cp_orig = ConfigParser.ConfigParser() |
|---|
| 46 |
cp_orig.add_section("gip_test") |
|---|
| 47 |
cp_orig.set("gip_test", "gip_conf", "True") |
|---|
| 48 |
gip_conf = os.path.join(etc_dir, 'gip.conf') |
|---|
| 49 |
fp = open(gip_conf, 'w') |
|---|
| 50 |
try: |
|---|
| 51 |
cp_orig.write(fp) |
|---|
| 52 |
fp.close() |
|---|
| 53 |
cp = ConfigParser.ConfigParser() |
|---|
| 54 |
cp.read([gip_conf]) |
|---|
| 55 |
result = cp_getBoolean(cp, "gip_test", "gip_conf", False) |
|---|
| 56 |
self.failUnless(result, msg="Failed to load $GIP_LOCATION"\ |
|---|
| 57 |
"/etc/gip.conf") |
|---|
| 58 |
finally: |
|---|
| 59 |
os.unlink(gip_conf) |
|---|
| 60 |
finally: |
|---|
| 61 |
os.rmdir(etc_dir) |
|---|
| 62 |
finally: |
|---|
| 63 |
os.rmdir(tmpdir) |
|---|
| 64 |
os.environ['GIP_LOCATION'] = old_gip_location |
|---|
| 65 |
|
|---|
| 66 |
def test_osg_check(self): |
|---|
| 67 |
""" |
|---|
| 68 |
Make sure that the checkOsgConfigured function operates as desired. |
|---|
| 69 |
""" |
|---|
| 70 |
cp = config() |
|---|
| 71 |
try: |
|---|
| 72 |
import tempfile |
|---|
| 73 |
file = tempfile.NamedTemporaryFile() |
|---|
| 74 |
filename = file.name |
|---|
| 75 |
except: |
|---|
| 76 |
filename = '/tmp/osg_check' |
|---|
| 77 |
file = open(filename, 'w') |
|---|
| 78 |
if not cp.has_section("gip"): |
|---|
| 79 |
cp.add_section("gip") |
|---|
| 80 |
cp.set("gip", "osg_attributes", filename) |
|---|
| 81 |
try: |
|---|
| 82 |
file.write('hello world!\n') |
|---|
| 83 |
didFail = False |
|---|
| 84 |
try: |
|---|
| 85 |
gip_osg.checkOsgConfigured(cp) |
|---|
| 86 |
except: |
|---|
| 87 |
didFail = True |
|---|
| 88 |
self.failIf(didFail, "Failed on a 'valid' osg-attributes.conf") |
|---|
| 89 |
os.unlink(filename) |
|---|
| 90 |
didFail = False |
|---|
| 91 |
try: |
|---|
| 92 |
gip_osg.checkOsgConfigured(cp) |
|---|
| 93 |
except: |
|---|
| 94 |
didFail = True |
|---|
| 95 |
self.failUnless(didFail, "Did not fail on missing file.") |
|---|
| 96 |
finally: |
|---|
| 97 |
pass |
|---|
| 98 |
def test_voList(self): |
|---|
| 99 |
""" |
|---|
| 100 |
Make sure voList does indeed load up the correct VOs. |
|---|
| 101 |
""" |
|---|
| 102 |
cp = ConfigParser.ConfigParser() |
|---|
| 103 |
cp.add_section("vo") |
|---|
| 104 |
cp.set("vo", "vo_blacklist", "ilc") |
|---|
| 105 |
cp.set("vo", "vo_whitelist", "vo1") |
|---|
| 106 |
cp.set("vo","user_vo_map", "test_configs/fermigrid-osg-user-vo-map.txt") |
|---|
| 107 |
vos = voList(cp) |
|---|
| 108 |
vos = sets.Set(vos) |
|---|
| 109 |
diff = vos.symmetric_difference(fermigrid_vos) |
|---|
| 110 |
self.failIf(diff, msg="Difference between voList output and desired " \ |
|---|
| 111 |
"output: %s." % ', '.join(diff)) |
|---|
| 112 |
|
|---|
| 113 |
def test_osg_version(self): |
|---|
| 114 |
""" |
|---|
| 115 |
Test the osg-version function and test for the contents in the software. |
|---|
| 116 |
""" |
|---|
| 117 |
my_version = 'OSG-magic-version' |
|---|
| 118 |
cp = config("test_configs/red.conf") |
|---|
| 119 |
version = getOSGVersion(cp) |
|---|
| 120 |
self.failUnless(version == my_version, msg="Computed OSG version does"\ |
|---|
| 121 |
" not match the test case's OSG version.") |
|---|
| 122 |
found_osg = False |
|---|
| 123 |
locations = getApplications(cp) |
|---|
| 124 |
for loc in locations: |
|---|
| 125 |
if loc['locationId'] == my_version: |
|---|
| 126 |
self.failUnless(loc['locationName'] == my_version) |
|---|
| 127 |
self.failUnless(loc['version'] == my_version) |
|---|
| 128 |
found_osg = True |
|---|
| 129 |
if found_osg == False: |
|---|
| 130 |
self.fail(msg="OSG version not in software list!") |
|---|
| 131 |
|
|---|
| 132 |
|
|---|
| 133 |
def main(): |
|---|
| 134 |
cp = config() |
|---|
| 135 |
stream = streamHandler(cp) |
|---|
| 136 |
runTest(cp, TestGipCommon, stream, per_site=False) |
|---|
| 137 |
|
|---|
| 138 |
if __name__ == '__main__': |
|---|
| 139 |
main() |
|---|
| 140 |
|
|---|