python实现主机存活探测程序

您所在的位置:网站首页 内网ip探测 python实现主机存活探测程序

python实现主机存活探测程序

2024-07-09 14:13:31| 来源: 网络整理| 查看: 265

主机存活探测程序

目录主机存活探测程序0x01:ICMP协议原理一、概念二、ICMP作用三、ICMP报文格式四、ping命令0x02:使用ICMP实现主机探活一、ping命令的实现01、导入必要的模块02、创建并返回一个子进程03、与子进程交互,获取命令执行结果04、完整代码05、执行结果二、批量执行命令,完成对网段的扫描01、解析网段02、完整代码03、执行结果三、使用协程来提高效率01、创建队列02、开启多协程03、执行ping命令函数04、完整代码05、执行结果:

0x01:ICMP协议原理 一、概念

ICMP(Internet Control Message Protocol)Internet控制报文协议:是IP协议的附属协议。通常被认为是IP层的一部分,因为它需要在所有IP实现中存在。它使用IP协议进行传输。因此,确切地说,它既不是一个网络层协议,也不是一个传输层协议,而是位于两者之间。ICMP主要功能就是ICMP差错报文、ICMP查询报文。

二、ICMP作用 确认IP包是否成功达到目标IP。 通知在发送过程中的IP包被丢弃的原因。

ICMP是基于IP协议工作的,但是它并不是传输层的功能,因此人们仍把它归结于网络层协议。

ICMP只能搭配IPv4使用。如果是IPv6的情况下,需要使用ICMPv6。

三、ICMP报文格式

image

类型和代码是一起表示改报文的作用的,例如:类型为3、代码为3,则该报文想表达意思是端口不可达,也就是说该端口不开放。校验和顾名思义,就是检验报文在传输过程中是否出现了差错,每个数据报都有相应的校验和。如果校验和是错误的,则该报文将会被丢弃。

image

四、ping命令

ping (Packet Internet Groper),因特网包探索器,用于测试网络连接量的程序。Ping发送一个ICMP;回声请求消息给目的地并报告是否收到所希望的ICMP echo (ICMP回声应答)。它是用来检查网络是否通畅或者网络连接速度的命令。

ping命令通常用来作为网络可用性的检查。ping命令可以对一个网络地址发送测试数据包,看该网络地址是否有响应并统计响应时间,以此测试网络。

ping和ICMP的关系:ping命令发送数据使用的是ICMP协议。

0x02:使用ICMP实现主机探活

参考代码:https://github.com/Mianshatest/pyScan

一、ping命令的实现

主要使用python的subprocess 模块来实现系统的命令执行。

subprocess 模块允许我们启动一个新进程,并连接到它们的输入/输出/错误管道,从而获取返回值。 

01、导入必要的模块 import platform import subprocess 02、创建并返回一个子进程

使用subprocess模块的Popen类创建并返回一个子进程,并在这个进程中执行指定的程序。

ip = '220.181.38.251' # 百度的IP if (platform.system() == 'Windows') : # 判断操作系统类型,根据操作系统的不同执行不同的命令。 ping = subprocess.Popen( 'ping -n 1 {}'.format(ip), # 要执行的命令。 shell=False, # 布尔型变量,明确要求使用shell运行程序。 close_fds=True, # 布尔型变量,为 True 时,在子进程执行前强制关闭所有除 stdin,stdout和stderr外的文件。 stdout=subprocess.PIPE, # 指定子进程的标准输出。 stderr=subprocess.PIPE # 指定子进程的标准错误输出。 # subprocess.PIPE 是调用本模块提供的若干函数时,作为 std* 参数的值,为标准流文件打开一个管道。 ) else: ping = subprocess.Popen( 'ping -c 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) 03、与子进程交互,获取命令执行结果

通过判断命令返回的结果中是否包含“ttl”字符,来判断主机是否存活。

try: out, err = ping.communicate(timeout=8) # 从子进程的 stdout 和 stderr 读取数据,直到EOF。 if 'ttl' in out.decode('GBK').lower(): print("ip {} is alive".format(ip)) except: pass ping.kill() 04、完整代码 import platform import subprocess ip = '220.181.38.251' def ping_func(): if (platform.system() == 'Windows') : ping = subprocess.Popen( 'ping -n 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) else: ping = subprocess.Popen( 'ping -c 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: out, err = ping.communicate(timeout=8) if 'ttl' in out.decode('GBK').lower(): print("ip {} is alive".format(ip)) except: pass ping.kill() ping_func(ip) 05、执行结果

image

二、批量执行命令,完成对网段的扫描 01、解析网段

这一步主要是使程序可以识别例如192.168.0.1/24样式的IP地址网段,使用了模块IPy。

IPy是Python的第三方包,主要提供了包括网段、网络掩码、广播地址、子网数、IP类型的处理等等功能。

def list_ip(ip): newIplist = [] if "/" in ip: if int(ip.split("/")[1]) >= 24: num = ip.split('/')[1] length = len(IPy.IP('127.0.0.0/{}'.format(num))) # 计算网段的IP个数 endiplists = list_of_groups(range(0, 256), length) # 将整个C段按子网掩码划分成多个列表 for endiplist in endiplists: # 判断输入IP所在的子网 if int(ip.split('/')[0].split('.')[-1].strip()) in endiplist: for endip in endiplist: # 以.为连接符,组合IP。 newIplist.append('.'.join(ip.split('/')[0].split('.')[:-1]) + '.{}'.format(endip)) break elif int(ip.split("/")[1]) >= 16: new_ip = ip.split(".")[0] + "." + ip.split(".")[1] + ".0.0/{}".format(ip.split("/")[1]) ips = IPy.IP(new_ip) for i in ips: print(str(i)) newIplist.append(str(i)) else: new_ip = ip.split(".")[0] + ".0.0.0/{}".format(ip.split("/")[1]) ips = IPy.IP(new_ip) for i in ips: newIplist.append(str(i)) elif re.match(r'^\d{0,3}.\d{0,3}.\d{0,3}.\d{0,3}$', ip) != None: newIplist.append(ip) return newIplist

这里使用了一个函数list_of_groups,该函数主要作用是把一个列表按指定数目分成多个列表。

def list_of_groups(init_list, childern_list_len): list_of_groups = zip(*(iter(init_list),) * childern_list_len) # 使用zip函数将列表按照网段长度分成多个列表 end_list = [list(i) for i in list_of_groups] # 转换成列表 count = len(init_list) % childern_list_len end_list.append(init_list[-count:]) if count != 0 else end_list return end_list 02、完整代码

将其与前面的ping命令代码整合,可以得到一个对网段的探活程序。

import platform import subprocess import IPy def ping_func(ip): if (platform.system() == 'Windows') : ping = subprocess.Popen( 'ping -n 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) else: ping = subprocess.Popen( 'ping -c 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: out, err = ping.communicate(timeout=8) if 'ttl' in out.decode('GBK').lower(): print("ip {} is alive".format(ip)) except: pass ping.kill() def list_of_groups(init_list, childern_list_len): list_of_groups = zip(*(iter(init_list),) * childern_list_len) # 使用zip函数将列表按照网段长度分成多个列表 end_list = [list(i) for i in list_of_groups] # 转换成列表 count = len(init_list) % childern_list_len end_list.append(init_list[-count:]) if count != 0 else end_list return end_list def list_ip(ip): newIplist = [] if "/" in ip: if int(ip.split("/")[1]) >= 24: num = ip.split('/')[1] length = len(IPy.IP('127.0.0.0/{}'.format(num))) # 计算网段的IP个数 endiplists = list_of_groups(range(0, 256), length) # 将整个C段按子网掩码划分成多个列表 for endiplist in endiplists: # 判断输入IP所在的子网 if int(ip.split('/')[0].split('.')[-1].strip()) in endiplist: for endip in endiplist: newIplist.append('.'.join(ip.split('/')[0].split('.')[:-1]) + '.{}'.format(endip)) # 以.为连接符,组合IP。 break elif int(ip.split("/")[1]) >= 16: new_ip = ip.split(".")[0] + "." + ip.split(".")[1] + ".0.0/{}".format(ip.split("/")[1]) ips = IPy.IP(new_ip) for i in ips: newIplist.append(str(i)) else: new_ip = ip.split(".")[0] + ".0.0.0/{}".format(ip.split("/")[1]) ips = IPy.IP(new_ip) for i in ips: newIplist.append(str(i)) elif re.match(r'^\d{0,3}.\d{0,3}.\d{0,3}.\d{0,3}$', ip) != None: newIplist.append(ip) return newIplist def main(): ip = '192.168.247.0/30' newIplist = list_ip(ip) for perip in newIplist : print('ping {}'.format(perip)) ping_func(perip) if __name__ == "__main__": main() 03、执行结果

可以看到,仅仅ping了4个IP,耗时12.74s,效率是很低的,接下来就要使用协程来提高效率。

image

三、使用协程来提高效率

这里主要使用了两个库:gevent和queue来实现。

01、创建队列

首先使用queue库创建队列,queue提供的队列实现了锁原语,能够在多线程中直接使用,可以使用队列来实现线程间的同步。

from queue import Queue ip_que = Queue() 02、开启多协程 创建一个用来存放协程对象的列表。 使用for循环逐个创建协程对象,并加入协程对象列表。 使用joinall()方法,使主线程等待协程执行完成以后程序再退出。 def star_ping(ip_list): # 将IP列表中的IP加入队列 for ip in ip_list: ip_que.put(ip) # 开启多协程 cos = [] # 创建一个列表,用来存放协程对象。 for i in range(len(ip_list)): # gevent.spawn()方法会创建一个新的greenlet协程对象,并运行它。 c = gevent.spawn(ping_func) cos.append(c) # gevent.joinall()方法的参数是一个协程对象列表,它会等待所有的协程都执行完毕后再退出。 gevent.joinall(cos) 03、执行ping命令函数

修改之前的ping_func()函数,将之前获取IP的方式改为从队列中获取,使用get()方法。

def ping_func(): while True: if ip_que.qsize() == 0: break ip = ip_que.get() if (platform.system() == 'Windows') : print('ping -n 1 {}'.format(ip)) ping = subprocess.Popen( 'ping -n 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) else: ping = subprocess.Popen( 'ping -c 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: out, err = ping.communicate(timeout=8) if 'ttl' in out.decode('GBK').lower(): print("ip {} is alive".format(ip)) except: pass ping.kill() 04、完整代码

和前面解析IP网段的代码整合,得到最终的代码如下:

import gevent from gevent import monkey monkey.patch_all(thread=False) import platform import subprocess from queue import Queue import IPy import re ip_que = Queue() def star_ping(ip_list): for ip in ip_list: ip_que.put(ip) # 开启多协程 cos = [] for i in range(len(ip_list)): # 调用工作函数 c = gevent.spawn(ping_func) cos.append(c) gevent.joinall(cos) def ping_func(): while True: if ip_que.qsize() == 0: break ip = ip_que.get() if (platform.system() == 'Windows') : print('ping -n 1 {}'.format(ip)) ping = subprocess.Popen( 'ping -n 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) else: ping = subprocess.Popen( 'ping -c 1 {}'.format(ip), shell=False, close_fds=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: out, err = ping.communicate(timeout=8) if 'ttl' in out.decode('GBK').lower(): print("ip {} is alive".format(ip)) except: pass ping.kill() def list_of_groups(init_list, childern_list_len): list_of_groups = zip(*(iter(init_list),) * childern_list_len) # 使用zip函数将列表按照网段长度分成多个列表 end_list = [list(i) for i in list_of_groups] # 转换成列表 count = len(init_list) % childern_list_len end_list.append(init_list[-count:]) if count != 0 else end_list return end_list def list_ip(ip): newIplist = [] if "/" in ip: if int(ip.split("/")[1]) >= 24: num = ip.split('/')[1] length = len(IPy.IP('127.0.0.0/{}'.format(num))) # 计算网段的IP个数 endiplists = list_of_groups(range(0, 256), length) # 将整个C段按子网掩码划分成多个列表 for endiplist in endiplists: # 判断输入IP所在的子网 if int(ip.split('/')[0].split('.')[-1].strip()) in endiplist: for endip in endiplist: newIplist.append('.'.join(ip.split('/')[0].split('.')[:-1]) + '.{}'.format(endip)) # 以.为连接符,组合IP。 break elif int(ip.split("/")[1]) >= 16: new_ip = ip.split(".")[0] + "." + ip.split(".")[1] + ".0.0/{}".format(ip.split("/")[1]) ips = IPy.IP(new_ip) for i in ips: newIplist.append(str(i)) else: new_ip = ip.split(".")[0] + ".0.0.0/{}".format(ip.split("/")[1]) ips = IPy.IP(new_ip) for i in ips: newIplist.append(str(i)) elif re.match(r'^\d{0,3}.\d{0,3}.\d{0,3}.\d{0,3}$', ip) != None: newIplist.append(ip) return newIplist def main(): ip = '192.168.247.0/30' newIplist = list_ip(ip) star_ping(newIplist) if __name__ == "__main__": main() 05、执行结果:

可以看到,执行时间大大缩短,并且是先执行所有的ping命令,再得到的结果。

image



【本文地址】

公司简介

联系我们

今日新闻


点击排行

实验室常用的仪器、试剂和
说到实验室常用到的东西,主要就分为仪器、试剂和耗
不用再找了,全球10大实验
01、赛默飞世尔科技(热电)Thermo Fisher Scientif
三代水柜的量产巅峰T-72坦
作者:寞寒最近,西边闹腾挺大,本来小寞以为忙完这
通风柜跟实验室通风系统有
说到通风柜跟实验室通风,不少人都纠结二者到底是不
集消毒杀菌、烘干收纳为一
厨房是家里细菌较多的地方,潮湿的环境、没有完全密
实验室设备之全钢实验台如
全钢实验台是实验室家具中较为重要的家具之一,很多

推荐新闻


图片新闻

实验室药品柜的特性有哪些
实验室药品柜是实验室家具的重要组成部分之一,主要
小学科学实验中有哪些教学
计算机 计算器 一般 打孔器 打气筒 仪器车 显微镜
实验室各种仪器原理动图讲
1.紫外分光光谱UV分析原理:吸收紫外光能量,引起分
高中化学常见仪器及实验装
1、可加热仪器:2、计量仪器:(1)仪器A的名称:量
微生物操作主要设备和器具
今天盘点一下微生物操作主要设备和器具,别嫌我啰嗦
浅谈通风柜使用基本常识
 众所周知,通风柜功能中最主要的就是排气功能。在

专题文章

    CopyRight 2018-2019 实验室设备网 版权所有 win10的实时保护怎么永久关闭