root/gip/trunk/test/test_topology.py

Revision 2783, 2.7 kB (checked in by brian, 10 months ago)

Test case for ticket #20.

Line 
1
2 import os
3 import sys
4 import unittest
5
6 sys.path.append(os.path.expandvars("$GIP_LOCATION/lib/python"))
7 from gip_common import config, cp_getBoolean, voList
8 from gip_testing import runTest, streamHandler
9 from gip_ldap import read_ldap
10
11 class TestSiteTopology(unittest.TestCase):
12
13     def setUp(self):
14         filename = 'test/test_configs/red.conf'
15         if not os.path.exists(filename):
16             filename = 'test_configs/red.conf'
17         self.filename = filename
18         self.cp = config(filename)
19
20     def setUpLDAP(self, multi=False):
21         command = "$GIP_LOCATION/providers/site_topology.py --config=%s" % \
22             self.filename
23         stdout = os.popen(command)
24         entries = read_ldap(stdout, multi=multi)
25         return entries
26
27     def test_exitcode(self):
28         command = "$GIP_LOCATION/providers/site_topology.py --config=%s" % \
29             self.filename
30         stdout = os.popen(command)
31         stdout.read()
32         self.failUnless(stdout.close() == None, msg="Site topology provider " \
33             "failed with nonzero exit code.")
34
35     def test_site(self):
36         has_site = False
37         for entry in self.setUpLDAP():
38             if 'GlueSite' in entry.objectClass:
39                 self.failUnless(entry.glue['SiteUniqueID'] == 'Nebraska')
40                 self.failUnless(entry.glue['SiteName'] == 'Nebraska')
41                 self.failUnless(entry.glue['SiteDescription'] == 'OSG Site')
42                 self.failUnless(entry.glue['SiteLatitude'] == '40.82')
43                 has_site = True
44         self.failUnless(has_site, msg="No site LDAP entry present!")
45
46     def test_cluster(self):
47         has_cluster = False
48         for entry in self.setUpLDAP(multi=True):
49             if 'GlueCluster' in entry.objectClass:
50                 self.failUnless(entry.glue['ClusterName'][0] == 'red.unl.edu')
51                 self.failUnless(entry.glue['ClusterTmpDir'][0] == \
52                     '/opt/osg/data')
53                 self.failUnless('GlueSiteUniqueID=Nebraska' in \
54                     entry.glue['ForeignKey'])
55                 self.failUnless('GlueCEUniqueID=red.unl.edu:2119/jobmanager-' \
56                     'pbs-cmsprod' in entry.glue['ForeignKey'])
57                 has_cluster = True
58         self.failUnless(has_cluster, msg="No cluster LDAP entry present!")
59
60     def test_subcluster(self):
61         has_subcluster = False
62         for entry in self.setUpLDAP():
63             if 'GlueSubcluster' in entry.objectClass:
64                 has_subcluster = True
65         self.failUnless(has_subcluster, msg="No subcluster LDAP entry present!")
66
67
68 def main():
69     cp = config()
70     stream = streamHandler(cp)
71     runTest(cp, TestSiteTopology, stream, per_site=False)
72
73 if __name__ == '__main__':
74     main()
75
Note: See TracBrowser for help on using the browser.