-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPyTranslator.py
356 lines (322 loc) · 14.3 KB
/
PyTranslator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
#!/usr/bin/env python
# $Header: /usr/local/cvsroot/stratex/rats/Snmp/PyTranslator.py,
"""
Implements a translator from MIB object names to OIDs
"""
__docformat__ = 'epytext en' #Formatted for use with Epydoc tool (http://epydoc.sourceforge.net/)
import cPickle, os, re
import Translator
class _TokenFile(file):
"""
Class for reading an input text file as a series of tokens
"""
def __init__(self, *args, **kwargs):
"""
Token file object constructor.
Initialises the instance, and opens the specified file for reading
@type *args: list
@param *args: Accepts arguments that would normally be passes to the file.open() method
@type *kwargs: list
@param *kwargs: Accepts keyword arguments that would normally be passed to the file.open() method
"""
file.__init__(self, *args, **kwargs)
self.token = "" #Token read from file
self.previousToken = "" #Previous token read from file
self.__textline = "" #Line of text read from file, & being processed.
self.lineno = 0
def getToken(self, delimiters):
"""
Gets a token from the file, separated by one of the specified delimiters
@type delimiters: string
@param delimiters: Delimiters to use for separating tokens
@rtype: string
@return: Next token from file separated by specified delimiters
"""
#If token is valid, save as previous token
if self.token:
self.previousToken = self.token
self.token = ""
#If text read from file is empty, read another line
if not self.__textline:
self.__textline = self.readline()
self.lineno += 1
#Loop until token is found, or end of file is reached
while self.__textline and not self.token:
#Tokenise the text read from the file using the specified delimiters
splittext = re.split("[" + delimiters + "]", self.__textline, 1)
#Token was found, save rest of text for next search
if len(splittext) > 1:
self.token = splittext[0]
self.__textline = splittext[1]
if not self.__textline:
self.__textline = self.readline()
self.lineno += 1
else:
#Token not found yet, read another line
self.__textline = self.readline()
self.lineno += 1
return self.token
def findToken(self, tokentofind, delimiters):
"""
Find a specified token in the file, ignoring occurrences that
are in quotes, and return the token following the one specified.
@type tokentofind: string
@param tokentofind: Specific token to search for
@type delimiters: string
@param delimiters: Delimiters to use for finding the next token after the specified one
@rtype: string
@return: Next token from file separated by specified delimiters
which follows the occurrence of the specified token
"""
quotecount = 0 #Used to count number of quotation marks
#Loop until we run out of tokens
while self.getToken(" \t\n\r"):
#Get number of quotation marks in token
quotecount += self.token.count("\"")
#Continue loop if number of quotation marks is odd
if quotecount % 2 != 0:
continue
#Exit loop if token match is found
if self.token == tokentofind:
break
return self.getToken(delimiters)
class _MibObject(object):
"""Stores information for a MIB object"""
def __init__(self):
"""MIB Object information class constructor"""
self.name = ""
"""Name of MIB object"""
self.syntax = ""
"""MIB object data type"""
self.oid = ""
"""OID of MIB object"""
self.branch = ""
"""Branch number of MIB object"""
self.parentName = ""
"""Name of MIB object's parent"""
self.parent = None
"""Instance of MIB object's parent"""
class PyTranslator(Translator.Translator):
"""
Interprets MIB definition files and provides method to
translate from an Object Name to the associated OID.
"""
def __init__(self, directory="./Snmp/mibs/"):
"""
Mib Translator Constructor.
@type directory: string
@param directory: The directory containing the MIB files
"""
self._mibDir = directory
self._objectList = []
self._objectDict = {}
self._tcDict = {}
self._objectTypes = {}
if not self._loadOidDictionary():
self.mibRefresh()
def translate(self, objectName):
"""
Returns a tuple containing the (OID, DataType) for the specified object name
@type objectName: string
@param objectName: The name of the MIB object whose OID should be returned
@rtype: tuple
@return: (OID, DataType) strings for specified MIB object if found, empty strings otherwise
"""
if objectName and self._objectDict.has_key(objectName.lower()):
return self._objectDict[objectName.lower()]
else:
return (None, None)
def mibRefresh(self, directory=""):
"""
Refreshes MIB information - Recreates OID dictionary from scratch
@type directory: string
@param directory: The directory containing the MIB files.
Only required if different from previously specified
"""
if directory: self._mibDir = directory
self._objectTypes = {
"BOOLEAN": "INTEGER",
"TruthValue": "INTEGER",
"INTEGER": "INTEGER",
"Integer32": "INTEGER",
"DisplayString": "OCTETSTRING",
"IpAddress": "IPADDRESS",
"NetworkAddress":"IPADDRESS",
"Counter32": "COUNTER32",
"Counter": "COUNTER32",
"Gauge32": "GAUGE32",
"Gauge": "GAUGE32",
"TimeTicks": "TIMETICKS",
"Unsigned": "UNSIGNED32",
"Unsigned32": "UNSIGNED32",
"OCTET": "OCTETSTRING",
"OBJECT": "OBJECTID",
"DateAndTime": "OCTETSTRING",
"none": "" }
self._createMergedMib()
self._parseMergedMib()
self._buildOidDictionary()
self._saveOidDictionary()
#Free memory used by objects that are no longer needed
del self._objectList
del self._tcDict
del self._objectTypes
def _loadOidDictionary(self):
"""
Loads previously saved OID dictionary from file
@rtype: boolean
@return: True if dictionary was loaded successfully, False otherwise
"""
result = True
oidFile = None
try:
filename = self._mibDir + "oids.dat"
oidFile = open(filename, 'rb')
self._objectDict = cPickle.load(oidFile)
except:
result = False
if oidFile:
oidFile.close()
return result
def _saveOidDictionary(self):
"""
Saves OID dictionary to a file
@rtype: boolean
@return: True if dictionary was saved successfully, False otherwise
"""
result = True
oidFile = None
try:
filename = self._mibDir + "oids.dat"
oidFile = open(filename, 'wb')
cPickle.dump(self._objectDict, oidFile, cPickle.HIGHEST_PROTOCOL)
except:
result = False
if oidFile:
oidFile.close()
return result
def _createMergedMib(self):
"""
Created a single merged file from all the MIBS in the assigned directory
Overwrites any existing merged MIB.
"""
#Extensions MUST be processed in the following particular
#order to ensure that Objects are assigned the correct OID.
mibFiles = []
for extension in (".MI2",".MIB",".mi2",".mib",".txt"):
for filename in os.listdir(self._mibDir):
if filename.endswith(extension):
mibFiles.append(self._mibDir + filename)
if mibFiles:
mergedMib = open(self._mibDir + "merged_mib.dat", "w")
for filename in mibFiles:
mibFile = open(filename, 'r')
mibData = mibFile.read()
mibFile.close()
mergedMib.write(filename)
mergedMib.write(mibData)
mergedMib.close()
def _parseMergedMib(self):
"""
Parses the merged file containing all the MIB information
Creates a list of MIB object information
"""
self._objectList = []
mibFile = _TokenFile(self._mibDir + "merged_mib.dat")
objectInfo = _MibObject()
while mibFile.getToken(" \t\n\r\{}:=\(\)"):
if mibFile.token == "OBJECT":
objectInfo.name = mibFile.previousToken
if mibFile.getToken(" \t\n\r") == "IDENTIFIER":
objectInfo.parentName = mibFile.findToken("::=", " \t\n\r\{\(")
objectInfo.branch = mibFile.getToken(" \t\n\r}")
self._objectList.append(objectInfo)
objectInfo = _MibObject()
elif mibFile.token == "OBJECT-TYPE":
objectInfo.name = mibFile.previousToken
if mibFile.getToken(" \t\n\r") == "SYNTAX":
objectInfo.syntax = mibFile.getToken(" \t\n\r\(\{")
if mibFile.token == "INTEGER":
if mibFile.getToken(" \t\n\r\(:") == "{":
while mibFile.token != "}":
if mibFile.getToken(" \t\n\r\(:") == "--":
mibFile.getToken("\t\n\r")
else:
mibFile.getToken(" \t\n\r\(\):")
if mibFile.getToken(" \t\n\r\):") != ",": break
elif mibFile.token != "STATUS": continue
objectInfo.parentName = mibFile.findToken("::=", " \t\n\r\{\(")
objectInfo.branch = mibFile.getToken(" \t\n\r}\)")
self._objectList.append(objectInfo)
objectInfo = _MibObject()
elif mibFile.token == "MODULE-IDENTITY":
objectInfo.name = mibFile.previousToken
if mibFile.getToken(" \t\n\r\{\(") == "LAST-UPDATED":
objectInfo.parentName = mibFile.findToken("::=", " \t\n\r\{\(")
objectInfo.branch = mibFile.getToken(" \t\n\r}\)")
self._objectList.append(objectInfo)
objectInfo = _MibObject()
elif mibFile.token == "OBJECT-IDENTITY":
objectInfo.name = mibFile.previousToken
if mibFile.getToken(" \t\n\r\{\(") == "STATUS":
objectInfo.parentName = mibFile.findToken("::=", " \t\n\r\{\(")
objectInfo.branch = mibFile.getToken(" \t\n\r}\)")
self._objectList.append(objectInfo)
objectInfo = _MibObject()
elif mibFile.token == "TEXTUAL-CONVENTION":
tcName = mibFile.previousToken
if mibFile.getToken(" \t\n\r\{\(") != "STATUS":
if mibFile.token != "DISPLAY-HINT": continue
mibFile.findToken("SYNTAX", " \t\n\r\{\(")
tcType = mibFile.token
if self._objectTypes.has_key(tcType):
tcType = self._objectTypes[tcType]
else:
tcType = ""
self._tcDict[tcName] = tcType
if mibFile.token == "INTEGER":
if mibFile.getToken(" \t\n\r\(:") == "{":
while mibFile.token != "}":
if mibFile.getToken(" \t\n\r\(:") == "--":
mibFile.getToken("\t\n\r");
else:
mibFile.getToken(" \t\n\r\(\):");
if mibFile.getToken(" \t\n\r\):") != ",": break
mibFile.close()
def _buildOidDictionary(self):
"""
Processes the list of MIB objects and creates a dictionary of object OIDs
"""
#Search for the parent of each node, and store the list index of the parent item
for child in self._objectList:
if child.branch == "org":
child.branch = "1"
if child.parentName:
for parent in self._objectList:
if parent.name == child.parentName:
child.parent = parent
break
#Now create an OID for each object by concatenating all ancestor branch numbers
self._objectDict.clear()
for leaf in self._objectList:
leaf.oid = leaf.branch
parent = leaf.parent
while parent:
#leaf.oid = parent.branch + "." + leaf.oid
#parent = parent.parent
if not parent.oid:
#Keep iterating to build up the OID one number at a time
leaf.oid = parent.branch + "." + leaf.oid
parent = parent.parent
else:
#Parent OID already created, so rest of leaf OID is already known
leaf.oid = parent.oid + "." + leaf.oid
break
if self._objectTypes.has_key(leaf.syntax):
leaf.syntax = self._objectTypes[leaf.syntax]
elif self._tcDict.has_key(leaf.syntax):
leaf.syntax = self._tcDict[leaf.syntax]
self._objectDict[leaf.name.lower()] = (leaf.oid, leaf.syntax)
if __name__ == "__main__":
print "RATS system support module: Cannot be run independently"
#