netwatch.py/netaddr-0.7.10/netaddr/fbsocket.py

294 lines
10 KiB
Python

#-----------------------------------------------------------------------------
# Copyright (c) 2008-2012, David P. D. Moss. All rights reserved.
#
# Released under the BSD license. See the LICENSE file for details.
#-----------------------------------------------------------------------------
"""Fallback routines for Python's standard library socket module"""
from struct import unpack as _unpack, pack as _pack
from netaddr.compat import _bytes_join
AF_INET = 2
AF_INET6 = 10
#-----------------------------------------------------------------------------
def inet_ntoa(packed_ip):
"""
Convert an IP address from 32-bit packed binary format to string format.
"""
if not hasattr(packed_ip, 'split'):
raise TypeError('string type expected, not %s' % str(type(packed_ip)))
if len(packed_ip) != 4:
raise ValueError('invalid length of packed IP address string')
return '%d.%d.%d.%d' % _unpack('4B', packed_ip)
#-----------------------------------------------------------------------------
def inet_aton(ip_string):
"""
Convert an IP address in string format (123.45.67.89) to the 32-bit packed
binary format used in low-level network functions.
"""
if hasattr(ip_string, 'split'):
invalid_addr = ValueError('illegal IP address string %r' % ip_string)
# Support for hexadecimal and octal octets.
tokens = []
base = 10
for token in ip_string.split('.'):
if token.startswith('0x'):
base = 16
elif token.startswith('0') and len(token) > 1:
base = 8
elif token == '':
continue
try:
tokens.append(int(token, base))
except ValueError:
raise invalid_addr
# Zero fill missing octets.
num_tokens = len(tokens)
if num_tokens < 4:
fill_tokens = [0] * (4 - num_tokens)
if num_tokens > 1:
end_token = tokens.pop()
tokens = tokens + fill_tokens + [end_token]
else:
tokens = tokens + fill_tokens
# Pack octets.
if len(tokens) == 4:
words = []
for token in tokens:
if (token >> 8) != 0:
raise invalid_addr
words.append(_pack('B', token))
return _bytes_join(words)
else:
raise invalid_addr
raise ValueError('argument should be a string, not %s' % type(ip_string))
#-----------------------------------------------------------------------------
def _compact_ipv6_tokens(tokens):
new_tokens = []
positions = []
start_index = None
num_tokens = 0
# Discover all runs of zeros.
for idx, token in enumerate(tokens):
if token == '0':
if start_index is None:
start_index = idx
num_tokens += 1
else:
if num_tokens > 1:
positions.append((num_tokens, start_index))
start_index = None
num_tokens = 0
new_tokens.append(token)
# Store any position not saved before loop exit.
if num_tokens > 1:
positions.append((num_tokens, start_index))
# Replace first longest run with an empty string.
if len(positions) != 0:
# Locate longest, left-most run of zeros.
positions.sort(key=lambda x: x[1])
best_position = positions[0]
for position in positions:
if position[0] > best_position[0]:
best_position = position
# Replace chosen zero run.
(length, start_idx) = best_position
new_tokens = new_tokens[0:start_idx] + [''] + \
new_tokens[start_idx+length:]
# Add start and end blanks so join creates '::'.
if new_tokens[0] == '':
new_tokens.insert(0, '')
if new_tokens[-1] == '':
new_tokens.append('')
return new_tokens
#-----------------------------------------------------------------------------
def inet_ntop(af, packed_ip):
"""Convert an packed IP address of the given family to string format."""
if af == AF_INET:
# IPv4.
return inet_ntoa(packed_ip)
elif af == AF_INET6:
# IPv6.
if len(packed_ip) != 16 or not hasattr(packed_ip, 'split'):
raise ValueError('invalid length of packed IP address string')
tokens = ['%x' % i for i in _unpack('>8H', packed_ip)]
# Convert packed address to an integer value.
words = list(_unpack('>8H', packed_ip))
int_val = 0
for i, num in enumerate(reversed(words)):
word = num
word = word << 16 * i
int_val = int_val | word
if 0xffff < int_val <= 0xffffffff or int_val >> 32 == 0xffff:
# IPv4 compatible / mapped IPv6.
packed_ipv4 = _pack('>2H', *[int(i, 16) for i in tokens[-2:]])
ipv4_str = inet_ntoa(packed_ipv4)
tokens = tokens[0:-2] + [ipv4_str]
return ':'.join(_compact_ipv6_tokens(tokens))
else:
raise ValueError('unknown address family %d' % af)
#-----------------------------------------------------------------------------
def _inet_pton_af_inet(ip_string):
"""
Convert an IP address in string format (123.45.67.89) to the 32-bit packed
binary format used in low-level network functions. Differs from inet_aton
by only support decimal octets. Using octal or hexadecimal values will
raise a ValueError exception.
"""
#TODO: optimise this ... use inet_aton with mods if available ...
if hasattr(ip_string, 'split'):
invalid_addr = ValueError('illegal IP address string %r' % ip_string)
# Support for hexadecimal and octal octets.
tokens = ip_string.split('.')
# Pack octets.
if len(tokens) == 4:
words = []
for token in tokens:
if token.startswith('0x') or \
(token.startswith('0') and len(token) > 1):
raise invalid_addr
try:
octet = int(token)
except ValueError:
raise invalid_addr
if (octet >> 8) != 0:
raise invalid_addr
words.append(_pack('B', octet))
return _bytes_join(words)
else:
raise invalid_addr
raise ValueError('argument should be a string, not %s' % type(ip_string))
#-----------------------------------------------------------------------------
def inet_pton(af, ip_string):
"""
Convert an IP address from string format to a packed string suitable for
use with low-level network functions.
"""
if af == AF_INET:
# IPv4.
return _inet_pton_af_inet(ip_string)
elif af == AF_INET6:
invalid_addr = ValueError('illegal IP address string %r' % ip_string)
# IPv6.
values = []
if not hasattr(ip_string, 'split'):
raise invalid_addr
if 'x' in ip_string:
# Don't accept hextets with the 0x prefix.
raise invalid_addr
if '::' in ip_string:
if ip_string == '::':
# Unspecified address.
return '\x00'.encode() * 16
# IPv6 compact mode.
try:
prefix, suffix = ip_string.split('::')
except ValueError:
raise invalid_addr
l_prefix = []
l_suffix = []
if prefix != '':
l_prefix = prefix.split(':')
if suffix != '':
l_suffix = suffix.split(':')
# IPv6 compact IPv4 compatibility mode.
if len(l_suffix) and '.' in l_suffix[-1]:
ipv4_str = _inet_pton_af_inet(l_suffix.pop())
l_suffix.append('%x' % _unpack('>H', ipv4_str[0:2])[0])
l_suffix.append('%x' % _unpack('>H', ipv4_str[2:4])[0])
token_count = len(l_prefix) + len(l_suffix)
if not 0 <= token_count <= 8 - 1:
raise invalid_addr
gap_size = 8 - ( len(l_prefix) + len(l_suffix) )
values = [_pack('>H', int(i, 16)) for i in l_prefix] \
+ ['\x00\x00'.encode() for i in range(gap_size)] \
+ [_pack('>H', int(i, 16)) for i in l_suffix]
try:
for token in l_prefix + l_suffix:
word = int(token, 16)
if not 0 <= word <= 0xffff:
raise invalid_addr
except ValueError:
raise invalid_addr
else:
# IPv6 verbose mode.
if ':' in ip_string:
tokens = ip_string.split(':')
if '.' in ip_string:
ipv6_prefix = tokens[:-1]
if ipv6_prefix[:-1] != ['0', '0', '0', '0', '0']:
raise invalid_addr
if ipv6_prefix[-1].lower() not in ('0', 'ffff'):
raise invalid_addr
# IPv6 verbose IPv4 compatibility mode.
if len(tokens) != 7:
raise invalid_addr
ipv4_str = _inet_pton_af_inet(tokens.pop())
tokens.append('%x' % _unpack('>H', ipv4_str[0:2])[0])
tokens.append('%x' % _unpack('>H', ipv4_str[2:4])[0])
values = [_pack('>H', int(i, 16)) for i in tokens]
else:
# IPv6 verbose mode.
if len(tokens) != 8:
raise invalid_addr
try:
tokens = [int(token, 16) for token in tokens]
for token in tokens:
if not 0 <= token <= 0xffff:
raise invalid_addr
except ValueError:
raise invalid_addr
values = [_pack('>H', i) for i in tokens]
else:
raise invalid_addr
return _bytes_join(values)
else:
raise ValueError('Unknown address family %d' % af)