import os, logging, random, socket, time, select
from .base import NBNS, NotConnectedError
from .nmb_constants import TYPE_CLIENT, TYPE_SERVER, TYPE_WORKSTATION
[docs]
class NetBIOS(NBNS):
log = logging.getLogger('NMB.NetBIOS')
[docs]
def __init__(self, broadcast = True, listen_port = 0):
"""
Instantiate a NetBIOS instance, and creates a IPv4 UDP socket to listen/send NBNS packets.
:param boolean broadcast: A boolean flag to indicate if we should setup the listening UDP port in broadcast mode
:param integer listen_port: Specifies the UDP port number to bind to for listening. If zero, OS will automatically select a free port number.
"""
self.broadcast = broadcast
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if self.broadcast:
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
if listen_port:
self.sock.bind(( '', listen_port ))
[docs]
def close(self):
"""
Close the underlying and free resources.
The NetBIOS instance should not be used to perform any operations after this method returns.
:return: None
"""
self.sock.close()
self.sock = None
def write(self, data, ip, port):
assert self.sock, 'Socket is already closed'
self.sock.sendto(data, ( ip, port ))
[docs]
def queryName(self, name, ip = '', port = 137, timeout = 30):
"""
Send a query on the network and hopes that if machine matching the *name* will reply with its IP address.
:param string ip: If the NBNSProtocol instance was instianted with broadcast=True, then this parameter can be an empty string. We will leave it to the OS to determine an appropriate broadcast address.
If the NBNSProtocol instance was instianted with broadcast=False, then you should provide a target IP to send the query.
:param integer port: The NetBIOS-NS port (IANA standard defines this port to be 137). You should not touch this parameter unless you know what you are doing.
:param integer/float timeout: Number of seconds to wait for a reply, after which the method will return None
:return: A list of IP addresses in dotted notation (aaa.bbb.ccc.ddd). On timeout, returns None.
"""
assert self.sock, 'Socket is already closed'
trn_id = random.randint(1, 0xFFFF)
data = self.prepareNameQuery(trn_id, name)
if self.broadcast and not ip:
ip = '<broadcast>'
elif not ip:
self.log.warning('queryName: ip parameter is empty. OS might not transmit this query to the network')
self.write(data, ip, port)
return self._pollForNetBIOSPacket(trn_id, timeout)
[docs]
def queryIPForName(self, ip, port = 137, timeout = 30):
"""
Send a query to the machine with *ip* and hopes that the machine will reply back with its name.
The implementation of this function is contributed by Jason Anderson.
:param string ip: If the NBNSProtocol instance was instianted with broadcast=True, then this parameter can be an empty string. We will leave it to the OS to determine an appropriate broadcast address.
If the NBNSProtocol instance was instianted with broadcast=False, then you should provide a target IP to send the query.
:param integer port: The NetBIOS-NS port (IANA standard defines this port to be 137). You should not touch this parameter unless you know what you are doing.
:param integer/float timeout: Number of seconds to wait for a reply, after which the method will return None
:return: A list of string containing the names of the machine at *ip*. On timeout, returns None.
"""
assert self.sock, 'Socket is already closed'
trn_id = random.randint(1, 0xFFFF)
data = self.prepareNetNameQuery(trn_id, False)
self.write(data, ip, port)
ret = self._pollForQueryPacket(trn_id, timeout)
if ret:
return list(map(lambda s: s[0], filter(lambda s: s[1] == TYPE_SERVER, ret)))
else:
return None
#
# Protected Methods
#
def _pollForNetBIOSPacket(self, wait_trn_id, timeout):
end_time = time.time() - timeout
while True:
try:
_timeout = time.time()-end_time
if _timeout <= 0:
return None
ready, _, _ = select.select([ self.sock.fileno() ], [ ], [ ], _timeout)
if not ready:
return None
data, _ = self.sock.recvfrom(0xFFFF)
if len(data) == 0:
raise NotConnectedError
trn_id, ret = self.decodePacket(data)
if trn_id == wait_trn_id:
return ret
except select.error as ex:
if type(ex) is tuple:
if ex[0] != errno.EINTR and ex[0] != errno.EAGAIN:
raise ex
else:
raise ex
#
# Contributed by Jason Anderson
#
def _pollForQueryPacket(self, wait_trn_id, timeout):
end_time = time.time() - timeout
while True:
try:
_timeout = time.time()-end_time
if _timeout <= 0:
return None
ready, _, _ = select.select([ self.sock.fileno() ], [ ], [ ], _timeout)
if not ready:
return None
data, _ = self.sock.recvfrom(0xFFFF)
if len(data) == 0:
raise NotConnectedError
trn_id, ret = self.decodeIPQueryPacket(data)
if trn_id == wait_trn_id:
return ret
except select.error as ex:
if type(ex) is tuple:
if ex[0] != errno.EINTR and ex[0] != errno.EAGAIN:
raise ex
else:
raise ex