Detecting router IP
This commit is contained in:
parent
97f91448f0
commit
0a2e9fa9f4
82
netwatch.py
82
netwatch.py
@ -12,32 +12,33 @@ import curses.wrapper
|
|||||||
from gi.repository import GLib, GObject
|
from gi.repository import GLib, GObject
|
||||||
# Pinging
|
# Pinging
|
||||||
import subprocess
|
import subprocess
|
||||||
|
import socket, struct
|
||||||
# Regex
|
# Regex
|
||||||
import re
|
import re
|
||||||
# Unicode
|
# Unicode
|
||||||
import locale
|
import locale
|
||||||
|
|
||||||
|
# Configure Unicode
|
||||||
locale.setlocale(locale.LC_ALL, '')
|
locale.setlocale(locale.LC_ALL, '')
|
||||||
encoding = locale.getpreferredencoding()
|
encoding = locale.getpreferredencoding()
|
||||||
|
|
||||||
# Variables
|
|
||||||
internet_host = "8.8.8.8"
|
|
||||||
router_host = "192.168.1.1"
|
|
||||||
ping_frequency = 5
|
|
||||||
|
|
||||||
class Pinger:
|
class Pinger:
|
||||||
internet_log = []
|
internet_log = []
|
||||||
router_log = []
|
router_log = []
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
internet_latency = self.ping(internet_host,self.internet_log)
|
self.stdscr.clear() # clear window during each run
|
||||||
self.draw("Internet:",self.internet_log,internet_latency,0)
|
|
||||||
|
|
||||||
router_latency = self.ping(router_host,self.router_log)
|
internet_latency = self.ping(self.internet_host,self.internet_log)
|
||||||
self.draw("Router:",self.router_log,router_latency,2)
|
self.draw("Internet:",self.internet_host,self.internet_log,internet_latency,0)
|
||||||
|
|
||||||
|
router_latency = self.ping(self.router_host,self.router_log)
|
||||||
|
self.draw("Router:",self.router_host,self.router_log,router_latency,2)
|
||||||
|
|
||||||
# Schedule next run
|
# Schedule next run
|
||||||
GObject.timeout_add_seconds(self.timeout, self.run)
|
GObject.timeout_add_seconds(self.timeout, self.run)
|
||||||
|
|
||||||
|
self.stdscr.refresh() # output drawing to screen
|
||||||
|
|
||||||
def ping(self,host,log):
|
def ping(self,host,log):
|
||||||
|
|
||||||
@ -67,60 +68,89 @@ class Pinger:
|
|||||||
|
|
||||||
return latency
|
return latency
|
||||||
|
|
||||||
def draw(self,title,log,latency,line):
|
def draw(self,title,subtitle,log,latency,line):
|
||||||
|
|
||||||
# Draw internet heading on specified line
|
# Draw internet heading on specified line
|
||||||
self.stdscr.addstr(line,0,title, curses.color_pair(3))
|
self.stdscr.addstr(line,0,title, self.COL_DEFAULT)
|
||||||
|
self.stdscr.addstr(line,len(title)+1,subtitle, self.COL_MUTE)
|
||||||
interpret = self.interpret_ping(latency)
|
interpret = self.interpret_ping(latency)
|
||||||
self.stdscr.addstr(line,10,interpret['subjective'], interpret['color'])
|
heading = str(round(latency/1000,3))+" second lag ("+interpret['subjective']+")"
|
||||||
right_align = self.ping_log_max_size-16
|
right_align = self.ping_log_max_size-len(heading)
|
||||||
self.stdscr.addstr(line,right_align,str(round(latency/1000,3))+" second lag", interpret['color'])
|
self.stdscr.addstr(line,right_align,heading, interpret['color'])
|
||||||
|
|
||||||
# Draw graph on next line
|
# Draw graph on next line
|
||||||
for idx,entry in enumerate(log):
|
for idx,entry in enumerate(log):
|
||||||
self.stdscr.addstr(line+1,idx,entry['graph'].encode(encoding), entry['color'])
|
self.stdscr.addstr(line+1,idx,entry['graph'].encode(encoding), entry['color'])
|
||||||
self.stdscr.refresh()
|
|
||||||
|
|
||||||
def interpret_ping(self, ping):
|
def interpret_ping(self, ping):
|
||||||
ping = float(ping)
|
ping = float(ping)
|
||||||
if ping == -1:
|
if ping == -1:
|
||||||
graph = "E "#u'\u2847' # Error
|
graph = "E "#u'\u2847' # Error
|
||||||
color = curses.color_pair(1)
|
color = self.COL_BAD
|
||||||
subjective = "Down"
|
subjective = "Down"
|
||||||
elif ping < 30:
|
elif ping < 30:
|
||||||
graph = u'\u2840'
|
graph = u'\u2840'
|
||||||
color = curses.color_pair(2)
|
color = self.COL_GOOD
|
||||||
subjective = "Great"
|
subjective = "Great"
|
||||||
elif ping < 70:
|
elif ping < 70:
|
||||||
graph = u'\u2844'
|
graph = u'\u2844'
|
||||||
color = curses.color_pair(2)
|
color = self.COL_GOOD
|
||||||
subjective = "Good"
|
subjective = "Good"
|
||||||
elif ping < 120:
|
elif ping < 120:
|
||||||
graph = u'\u2846'
|
graph = u'\u2846'
|
||||||
color = curses.color_pair(1)
|
color = self.COL_WARN
|
||||||
subjective = "Fair"
|
subjective = "Fair"
|
||||||
else:
|
else:
|
||||||
graph = u'\u2847'
|
graph = u'\u2847'
|
||||||
color = curses.color_pair(1)
|
color = self.COL_BAD
|
||||||
subjective = "Poor"
|
subjective = "Poor"
|
||||||
return {'graph': graph, 'color': color, 'subjective': subjective, 'ping': float(ping)}
|
return {'graph': graph, 'color': color, 'subjective': subjective, 'ping': float(ping)}
|
||||||
|
|
||||||
def __init__(self, stdscr):
|
# def find_router_ip(self):
|
||||||
|
# s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
# s.connect((self.internet_host,80))
|
||||||
|
# my_ip = s.getsockname()[0]
|
||||||
|
# s.close()
|
||||||
|
# return my_ip
|
||||||
|
|
||||||
# Initialize color pairs
|
def get_default_gateway_linux(self):
|
||||||
|
# Read the default gateway directly from /proc.
|
||||||
|
with open("/proc/net/route") as fh:
|
||||||
|
for line in fh:
|
||||||
|
fields = line.strip().split()
|
||||||
|
if fields[1] != '00000000' or not int(fields[3], 16) & 2:
|
||||||
|
continue
|
||||||
|
|
||||||
|
return socket.inet_ntoa(struct.pack("<L", int(fields[2], 16)))
|
||||||
|
|
||||||
|
def __init__(self, stdscr):
|
||||||
|
# Parameters
|
||||||
|
self.internet_host = "8.8.8.8"
|
||||||
|
self.router_host = self.get_default_gateway_linux()
|
||||||
|
self.timeout = 5 # Ping frequency
|
||||||
|
|
||||||
|
# Create color constants
|
||||||
curses.use_default_colors()
|
curses.use_default_colors()
|
||||||
|
|
||||||
curses.init_pair(1, curses.COLOR_RED, -1)
|
curses.init_pair(1, curses.COLOR_RED, -1)
|
||||||
curses.init_pair(2, curses.COLOR_GREEN, -1)
|
curses.init_pair(2, curses.COLOR_GREEN, -1)
|
||||||
curses.init_pair(3, curses.COLOR_WHITE, -1)
|
curses.init_pair(3, curses.COLOR_WHITE, -1)
|
||||||
|
curses.init_pair(4, curses.COLOR_YELLOW, -1)
|
||||||
|
curses.init_pair(5, curses.COLOR_WHITE, -1)
|
||||||
|
|
||||||
# Hide cursor
|
self.COL_BAD = curses.color_pair(1)
|
||||||
curses.curs_set(0)
|
self.COL_GOOD = curses.color_pair(2)
|
||||||
|
self.COL_DEFAULT = curses.color_pair(3)|curses.A_BOLD
|
||||||
|
self.COL_WARN = curses.color_pair(4)
|
||||||
|
self.COL_MUTE = curses.color_pair(5)|curses.A_DIM
|
||||||
|
|
||||||
# initialize screen and parameters
|
# Initialize screen
|
||||||
self.stdscr = stdscr
|
self.stdscr = stdscr
|
||||||
self.window_size = stdscr.getmaxyx() # returns an array [height,width]
|
self.window_size = stdscr.getmaxyx() # returns an array [height,width]
|
||||||
self.ping_log_max_size = self.window_size[1] # max width
|
self.ping_log_max_size = self.window_size[1] # max width
|
||||||
self.timeout = ping_frequency
|
|
||||||
|
# Hide cursor
|
||||||
|
curses.curs_set(0)
|
||||||
|
|
||||||
# start the ping process
|
# start the ping process
|
||||||
self.run()
|
self.run()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user