[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[gnue] r8414 - trunk/gnue-common/src/utils
From: |
johannes |
Subject: |
[gnue] r8414 - trunk/gnue-common/src/utils |
Date: |
Tue, 18 Apr 2006 08:04:26 -0500 (CDT) |
Author: johannes
Date: 2006-04-18 08:04:25 -0500 (Tue, 18 Apr 2006)
New Revision: 8414
Modified:
trunk/gnue-common/src/utils/uuid.py
Log:
Changed to make pylint happy with it. More pep8-ification.
Modified: trunk/gnue-common/src/utils/uuid.py
===================================================================
--- trunk/gnue-common/src/utils/uuid.py 2006-04-18 11:27:11 UTC (rev 8413)
+++ trunk/gnue-common/src/utils/uuid.py 2006-04-18 13:04:25 UTC (rev 8414)
@@ -20,6 +20,9 @@
# - Suite 330, Boston, MA 02111-1307, USA.
#
# $Id$
+#
+# pylint: disable-msg=W0703
+# pylint: disable-msg=F0401
"""
This module implements an UUID generator as described in the internet draft at
'http://www.ietf.org/internet-drafts/draft-mealling-uuid-urn-05.txt' or also
@@ -40,32 +43,57 @@
from gnue.common.apps import errors
-try:
- if sys.platform == 'win32':
- from win32com.client import GetObject
- else:
- import fcntl
+if sys.platform == 'win32':
+ from win32com.client import GetObject
+else:
+ import fcntl
-except ImportError:
- pass
+# Namespace available if imported all (from uuid import *)
+__all__ = ['InvalidVersionError', 'InvalidNamespaceError',
+ 'MissingNamespaceError', 'get_hardware_addresses', 'Generator',
+ 'UUID', 'TIME', 'MD5', 'RANDOM', 'SHA1']
+
# =============================================================================
# Exceptions
# =============================================================================
class InvalidVersionError(errors.SystemError):
+ """
+ The version '<version>' is not a valid UUID version.
+
+ Raised when L{Generator.generate}() gets called with an invalid
+ UUID-version.
+ """
def __init__(self, version):
msg = u_("The version '%s' is not a valid UUID version") \
% repr(version)
errors.SystemError.__init__(self, msg)
-class InvalidNamespaceError (errors.SystemError):
- def __init__ (self, ns):
+# =============================================================================
+
+class InvalidNamespaceError(errors.SystemError):
+ """
+ 'namespace' is not recognized as valid namespace argument.
+
+ Raised whenever an invalid namespace is given to
L{Generator.set_namespace}.
+ A valid namespace for name-based UUID generation is made up of either 36 or
+ 32 characters.
+ """
+ def __init__(self, namespace):
msg = u_("'%s' is not recognized as valid namespace argument") \
- % repr(ns)
+ % repr(namespace)
errors.SystemError.__init__(self, msg)
+# =============================================================================
+
class MissingNamespaceError(errors.ApplicationError):
+ """
+ No namespace given for SHA1-/MD5-based UUIDs.
+
+ MD5- and SHA1-based UUIDs must have a namespace defined. Use
+ L{set_namespace} to define this value.
+ """
def __init__(self):
msg = u_("No namespace given for namebased UUID generation")
errors.ApplicationError.__init__(self, msg)
@@ -81,24 +109,22 @@
manualLock = sys.version_info[:2] < (2, 3)
if os.path.exists('/dev/urandom'):
- try:
- dev_random = os.open('/dev/urandom', os.O_RDONLY)
+ dev_random = os.open('/dev/urandom', os.O_RDONLY)
- except IOError:
- pass
-
# -----------------------------------------------------------------------------
# Get a sequence of random bytes
# -----------------------------------------------------------------------------
-def getRandomBytes(count):
+def get_random_bytes(count):
"""
Return a sequence of 'count' random bytes from the best random number
generator available.
@param count: number of bytes to return
+ @type count: integer
@return: sequence of 'count' random bytes
+ @rtype: list
"""
result = []
@@ -115,11 +141,15 @@
# otherwise use the random module (which is threadsafe for python 2.3+)
else:
- for i in xrange(count):
- if manualLock: lock.acquire()
- result.append (pseudoRng.randrange(0, 256))
- if manualLock: lock.release()
+ for dummy in xrange(count):
+ if manualLock:
+ lock.acquire()
+ result.append(pseudoRng.randrange(0, 256))
+
+ if manualLock:
+ lock.release()
+
return result
@@ -127,7 +157,7 @@
# Get the hardware addresses of the installed network interfaces
# =============================================================================
-class NetworkInterfaces:
+def get_hardware_addresses():
"""
Retrieve a list of all available network interfaces and their hardware
addresses. After creating an instance of this class the attribute nics is a
@@ -145,171 +175,206 @@
If no interface could be detected at all, an interface will be generated
using the random number source. It will have the name 'generated'.
+
+ @return: list of tuples (ifname, hwaddr, hwstr) as described above
+ @rtype: list of tuples
"""
- # -------------------------------------------------------------------------
- # Constructor
- # -------------------------------------------------------------------------
+ trys = {'linux2': [_load_from_proc, _load_via_ifconf],
+ 'darwin': [_load_from_plist],
+ 'win32' : [_load_from_winmgmts]}
- def __init__(self):
+ nics = []
+ if sys.platform in trys:
+ for method in trys[sys.platform]:
+ nics = [item for item in method() if sum(item[1])]
+ if nics:
+ break
- trys = {'linux2': [self.__load_from_proc, self.__load_via_ifconf],
- 'darwin': [self.__load_from_plist],
- 'win32' : [self.__load_from_winmgmts]}
+ if not nics:
+ nics = [('generated', get_random_bytes(6))]
- nics = []
- if sys.platform in trys:
- for method in trys[sys.platform]:
- nics = self.__strip_empty(method())
- if nics:
- break
+ return [(name, hwaddr, ("%02x:" * 6)[:-1] % tuple(hwaddr)) \
+ for (name, hwaddr) in nics]
- if not nics:
- nics = [('generated', getRandomBytes(6))]
- self.nics = [(n, h, ("%02x:" * 6)[:-1] % tuple(h)) for (n, h) in nics]
+# -------------------------------------------------------------------------
+# Load a list of interfaces from the proc-filesystem
+# -------------------------------------------------------------------------
+def _load_from_proc():
+ """
+ Load a list of network-interfaces from the proc-filesystem (/proc/net/dev).
+ For each interface a tuple with it's name and hardware address will be
+ returned. The hardware address is a list of decimal numbers.
- # -------------------------------------------------------------------------
- # Load a list of interfaces from the proc-filesystem
- # -------------------------------------------------------------------------
+ @return: list of tuples (ifname, hwaddr)
+ @rtype: list of tuples
+ """
- def __load_from_proc (self):
+ result = []
+ try:
+ nfhd = open('/proc/net/dev', 'r')
- result = []
try:
- nfhd = open('/proc/net/dev', 'r')
+ for line in [raw.strip() for raw in nfhd.readlines() \
+ if ':' in raw]:
+ result.append(line.split(':', 1)[0])
- try:
- for line in [l.strip() for l in nfhd.readlines() if ':' in l]:
- result.append(line.split(':', 1)[0])
+ finally:
+ nfhd.close()
- finally:
- nfhd.close()
+ result = [(name, _hw_addr_from_socket(name)) \
+ for name in result]
- result = [(r, self.__hw_addr_from_socket(r)) for r in result]
+ except IOError:
+ result = []
- except:
- pass
+ return result
- return result
+# -------------------------------------------------------------------------
+# Load a list of interfaces via IFCONF socket call
+# -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # Load a list of interfaces via IFCONF socket call
- # -------------------------------------------------------------------------
+def _load_via_ifconf():
+ """
+ Load a list of network-interfaces using a SIOCGIFCONF (0x8912) socket
+ IO-call. For each interface a tuple with it's name and hardware address
+ will be returned. The hardware address is a list of decimal numbers.
- def __load_via_ifconf(self):
+ @return: list of tuples (ifname, hwaddr)
+ @rtype: list of tuples
+ """
- SIOCGIFCONF = 0x8912
- result = []
+ SIOCGIFCONF = 0x8912
+ result = []
+ try:
+ sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+
try:
- sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ dbuf = array.array('c', '\0' * 1024)
+ (addr, length) = dbuf.buffer_info()
+ ifconf = struct.pack("iP", length, addr)
+ data = fcntl.ioctl(sfhd.fileno(), SIOCGIFCONF, ifconf)
- try:
- buffer = array.array('c', '\0' * 1024)
- (addr, length) = buffer.buffer_info()
- ifconf = struct.pack ("iP", length, addr)
- data = fcntl.ioctl(sfhd.fileno(), SIOCGIFCONF, ifconf)
+ size = struct.unpack("iP", data)[0]
+ for idx in range(0, size, 32):
+ ifconf = dbuf.tostring()[idx:idx+32]
+ name = struct.unpack("16s16s", ifconf)[0].split('\0', 1)[0]
+ result.append(name)
- size, ptr = struct.unpack("iP", data)
- for idx in range (0, size, 32):
- ifconf = buffer.tostring()[idx:idx+32]
- name = struct.unpack("16s16s", ifconf)[0].split('\0', 1)[0]
- result.append(name)
+ finally:
+ sfhd.close()
- finally:
- sfhd.close()
+ result = [(name, _hw_addr_from_socket(name)) for name in result]
- result = [(r, self.__hw_addr_from_socket(r)) for r in result]
+ except Exception:
+ result = []
- except:
- pass
+ return result
- return result
+# -------------------------------------------------------------------------
+# Get a hardware address for an interface using a socket
+# -------------------------------------------------------------------------
- # -------------------------------------------------------------------------
- # Get a hardware address for an interface using a socket
- # -------------------------------------------------------------------------
+def _hw_addr_from_socket(iff):
+ """
+ Get the hardware address for a given interface name using the socket
+ IO-call SIOCGIFHWADDR (0x8927). The hardware address will be returned as a
+ list of bytes.
- def __hw_addr_from_socket(self, iff):
+ @param iff: name of the interface to get the hardware address for
+ @type iff: string
+ @return: hardware address of the requested interface
+ @rtype: list of 6 bytes
+ """
- SIOCGIFHWADDR = 0x8927
- sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- ifreq = (iff + '\0' * 32)[:32]
+ SIOCGIFHWADDR = 0x8927
+ sfhd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ ifreq = (iff + '\0' * 32)[:32]
- try:
- data = fcntl.ioctl(sfhd.fileno(), SIOCGIFHWADDR, ifreq)
- result = array.array('B', data[18:24]).tolist()
+ try:
+ data = fcntl.ioctl(sfhd.fileno(), SIOCGIFHWADDR, ifreq)
+ result = array.array('B', data[18:24]).tolist()
- finally:
- sfhd.close()
+ finally:
+ sfhd.close()
- return result
+ return result
- # -------------------------------------------------------------------------
- # Get a list of interfaces using the windows management instrumentation
- # -------------------------------------------------------------------------
+# -------------------------------------------------------------------------
+# Get a list of interfaces using the windows management instrumentation
+# -------------------------------------------------------------------------
- def __load_from_winmgmts(self):
+def _load_from_winmgmts():
+ """
+ Load a list of network-interfaces using the windows management
+ instrumentation. This assumes to have the win32 modules installed. For
+ each interface a tuple with it's name and hardware address will be
+ returned. The hardware address is a list of bytes.
- result = []
- try:
- cmd = "SELECT * FROM Win32_NetworkAdapterConfiguration " \
- "WHERE IPEnabled=True"
- for iff in [i for i in GetObject('winmgmts:').ExecQuery(cmd)]:
- parts = iff.MACAddress.split(':')
- args = [16] * len(parts)
- result.append((iff.Caption, map(int, parts, args)))
+ @return: list of tuples (ifname, hwaddr)
+ @rtype: list of tuples
+ """
- except:
- pass
+ result = []
+ try:
+ cmd = "SELECT * FROM Win32_NetworkAdapterConfiguration " \
+ "WHERE IPEnabled=True"
+ for iff in [intf for intf in GetObject('winmgmts:').ExecQuery(cmd)]:
+ parts = iff.MACAddress.split(':')
+ result.append((iff.Caption, \
+ [int(value, 16) for value in parts]))
- return result
+ except Exception:
+ result = []
+ return result
- # -------------------------------------------------------------------------
- # Load the network information from the NetworkInterfaces.plist XML file
- # -------------------------------------------------------------------------
- def __load_from_plist(self):
+# -------------------------------------------------------------------------
+# Load the network information from the NetworkInterfaces.plist XML file
+# -------------------------------------------------------------------------
- result = []
- try:
- import plistlib
- path = "/Library/Preferences/SystemConfiguration/" \
- "NetworkInterfaces.plist"
+def _load_from_plist():
+ """
+ Load a list of network-interfaces from the NetworkInterfaces preference
+ file (/Library/Preferences/SystemConfiguration/NetworkInterfaces.plist).
+ For each interface a tuple with it's name and hardware address will be
+ returned. The hardware address is a list of bytes.
- pl = plistlib.Plist.fromFile(path)
- for iff in pl.Interfaces:
- name = iff ['BSD Name']
- hwaddr = [ord(c) for c in iff.IOMACAddress.data]
+ @return: list of tuples (ifname, hwaddr)
+ @rtype: list of tuples
+ """
- # FireWire devices seem to have longer hardware addresses which
- # are theirfore not usable for UUID generation.
- if len(hwaddr) == 6:
- result.append ((name, hwaddr))
+ result = []
+ try:
+ import plistlib
+ path = "/Library/Preferences/SystemConfiguration/" \
+ "NetworkInterfaces.plist"
- except:
- pass
+ pref = plistlib.Plist.fromFile(path)
+ for iff in pref.Interfaces:
+ name = iff ['BSD Name']
+ hwaddr = [ord(char) for char in iff.IOMACAddress.data]
- return result
+ # FireWire devices seem to have longer hardware addresses which
+ # are theirfore not usable for UUID generation.
+ if len(hwaddr) == 6:
+ result.append((name, hwaddr))
+ except Exception:
+ result = []
- # -------------------------------------------------------------------------
- # Throw away all entries having an 'empty' hardware address
- # -------------------------------------------------------------------------
+ return result
- def __strip_empty (self, nics):
- return [item for item in nics if sum (item[1])]
-
-
# =============================================================================
# This class implements an UUID generator
# =============================================================================
@@ -326,8 +391,6 @@
also described in the RPC-RFC.
"""
- _NIC = NetworkInterfaces ()
-
# -------------------------------------------------------------------------
# Constructor
# -------------------------------------------------------------------------
@@ -336,7 +399,7 @@
self.version = version
self.__lastTime = None
- self.__clockSeq = int("%02x%02x" % tuple(getRandomBytes(2)), 16)
+ self.__clockSeq = int("%02x%02x" % tuple(get_random_bytes(2)), 16)
self.__clockField = self.__clockSeq & 0x3FFF | 0x8000
self.__namespace = None
@@ -346,7 +409,8 @@
self.__timeFormat = u"%016x%04x%s"
self.__randFormat = u"%02x" * 16
self.__hashFormat = u"%08x%04x%04x" + "%02x" * 8
- self.__currNode = "%02x" * 6 % tuple(self._NIC.nics[0][1])
+ self.__currNode = "%02x" * 6 % tuple(get_hardware_addresses()[0][1])
+ self.__uuids_per_tick = 0
# -------------------------------------------------------------------------
@@ -378,10 +442,10 @@
return self.generate_random()
elif vers == MD5:
- return self.generate_MD5(name, namespace)
+ return self.generate_md5(name, namespace)
elif vers == SHA1:
- return self.generate_SHA1(name, namespace)
+ return self.generate_sha1(name, namespace)
else:
raise InvalidVersionError, version
@@ -402,21 +466,21 @@
timeStamp = long(time.time() * 10000000) + 122192928000000000L
if timeStamp != self.__lastTime:
- self.__uuidsPerTick = 0
+ self.__uuids_per_tick = 0
- # If the time has been set backward, we change the clock sequence
- if timeStamp < self.__lastTime:
- self.__clockSeq += 1
- if self.__clockSeq > 0xFFFF:
- self.__clockSeq = 0
+ # If the time has been set backward, we change the clock sequence
+ if timeStamp < self.__lastTime:
+ self.__clockSeq += 1
+ if self.__clockSeq > 0xFFFF:
+ self.__clockSeq = 0
- self.__clockField = self.__clockSeq & 0x3FFF | 0x8000
+ self.__clockField = self.__clockSeq & 0x3FFF | 0x8000
- self.__lastTime = timeStamp
+ self.__lastTime = timeStamp
else:
- self.__uuidsPerTick += 1
- timeStamp += self.__uuidsPerTick
+ self.__uuids_per_tick += 1
+ timeStamp += self.__uuids_per_tick
lock.release()
@@ -437,7 +501,7 @@
@return: UUID (as 32 character unicode string)
"""
- data = getRandomBytes(16)
+ data = get_random_bytes(16)
# Set the two most significant bits (bits 6, 7) of the
# clock_seq_hi_and_reserved to zero and one
data[8] = data[8] & 0x3F | 0x80
@@ -452,7 +516,7 @@
# Generate a name-based UUID using an MD5 hash
# -------------------------------------------------------------------------
- def generate_MD5(self, name, namespace=None):
+ def generate_md5(self, name, namespace=None):
"""
Generate a name based UUID using a MD5 hash.
@@ -476,7 +540,7 @@
# Generate an UUID using a SHA1 hash
# -------------------------------------------------------------------------
- def generate_SHA1(self, name, namespace=None):
+ def generate_sha1(self, name, namespace=None):
"""
Generate a name based UUID using a SHA1 hash.
@@ -500,30 +564,31 @@
# Set the namespace to be used for name based UUIDs
# -------------------------------------------------------------------------
- def set_namespace (self, newNS):
+ def set_namespace(self, new_ns):
"""
Set the namespace used for name based UUID generation.
"""
- if isinstance(newNS, basestring):
- if len (newNS) == 36:
- newNS = "".join(newNS.split('-'))
+ if isinstance(new_ns, basestring):
+ if len(new_ns) == 36:
+ new_ns = "".join(new_ns.split('-'))
- elif len (newNS) != 32:
- raise "Invalid namespace argument"
+ elif len(new_ns) != 32:
+ raise InvalidNamespaceError(new_ns)
- parts = [newNS[:8], newNS[8:12], newNS[12:16], newNS[16:]]
+ parts = [new_ns[:8], new_ns[8:12], new_ns[12:16], new_ns[16:]]
timeLow = socket.htonl(long(parts[0], 16))
timeMid = socket.htons(int(parts[1], 16))
timeHi = socket.htons(int(parts[2], 16))
- rest = [int(parts[3][i:i+2], 16) for i in range(0, 16, 2)]
+ rest = [int(parts[3][inx:inx+2], 16) for inx in range(0, 16, 2)]
self.__namespace = struct.pack('>LHH8B', timeLow, timeMid, timeHi,
- *rest)
+ rest[0], rest[1], rest[2],rest[3],rest[4],rest[5],
+ rest[6], rest[7])
else:
- raise InvalidNamespaceError(newNS)
+ raise InvalidNamespaceError(new_ns)
# Create a ready-to-use instance of the UUID generator
@@ -545,8 +610,8 @@
UUID.set_namespace('6ba7b810-9dad-11d1-80b4-00c04fd430c8')
print "Namespace: '6ba7b810-9dad-11d1-80b4-00c04fd430c8'"
print "Encoding : 'www.gnuenterprise.org'"
- print "MD5 :", repr(UUID.generate_MD5('www.gnuenterprise.org'))
- print "SHA1 :", repr(UUID.generate_SHA1('www.gnuenterprise.org'))
+ print "MD5 :", repr(UUID.generate_md5('www.gnuenterprise.org'))
+ print "SHA1 :", repr(UUID.generate_sha1('www.gnuenterprise.org'))
print "T:", repr(UUID.generate(version = TIME))
print "R:", repr(UUID.generate(version = RANDOM))
[Prev in Thread] |
Current Thread |
[Next in Thread] |
- [gnue] r8414 - trunk/gnue-common/src/utils,
johannes <=