Python 脚本部分实例 几个的python运维脚本 企业微信告警、FTP 客户端、SSH 客户端、Saltstack 客户端、vCenter 客户端、获取域名 ssl 证书过期时间、发送今天的天气预报以及未来的天气趋势图
企业微信告警 此脚本通过企业微信应用,进行微信告警,可用于 Zabbix 监控。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 import requestsimport jsonclass DLF : def __init__ (self, corpid, corpsecret ): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token (self ): ''' 获取企业微信API接口的access_token :return: ''' token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try : res = requests.get(token_url).json() token = res['access_token' ] return token except Exception as e: return str (e) def _get_media_id (self, file_obj ): get_media_url = self.url + "/media/upload?access_token={}&type=file" .format (self._token) data = {"media" : file_obj} try : res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id' ] return media_id except Exception as e: return str (e) def send_text (self, agentid, content, touser=None , toparty=None ): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser" : touser, "toparty" : toparty, "msgtype" : "text" , "agentid" : agentid, "text" : { "content" : content } } try : res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str (e) def send_image (self, agentid, file_obj, touser=None , toparty=None ): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser" : touser, "toparty" : toparty, "msgtype" : "image" , "agentid" : agentid, "image" : { "media_id" : media_id } } try : res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str (e)
FTP 客户端 通过 ftplib 模块操作 ftp 服务器,进行上传下载等操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 from ftplib import FTPfrom os import pathimport copyclass FTPClient : def __init__ (self, host, user, passwd, port=21 ): self.host = host self.user = user self.passwd = passwd self.port = port self.res = {'status' : True , 'msg' : None } self._ftp = None self._login() def _login (self ): ''' 登录FTP服务器 :return: 连接或登录出现异常时返回错误信息 ''' try : self._ftp = FTP() self._ftp.connect(self.host, self.port, timeout=30 ) self._ftp.login(self.user, self.passwd) except Exception as e: return e def upload (self, localpath, remotepath=None ): ''' 上传ftp文件 :param localpath: local file path :param remotepath: remote file path :return: ''' if not localpath: return 'Please select a local file. ' if not remotepath: remotepath = path.basename(localpath) self._ftp.storbinary('STOR ' + remotepath, localpath) def download (self, remotepath, localpath=None ): ''' localpath :param localpath: local file path :param remotepath: remote file path :return: ''' if not remotepath: return 'Please select a remote file. ' if not localpath: localpath = path.basename(remotepath) if path.isdir(localpath): localpath = path.join(localpath, path.basename(remotepath)) fp = open (localpath, 'wb' ) self._ftp.retrbinary('RETR ' + remotepath, fp.write) fp.close() def nlst (self, dir ='/' ): ''' 查看目录下的内容 :return: 以列表形式返回目录下的所有内容 ''' files_list = self._ftp.nlst(dir ) return files_list def rmd (self, dir =None ): ''' 删除目录 :param dir: 目录名称 :return: 执行结果 ''' if not dir : return 'Please input dirname' res = copy.deepcopy(self.res) try : del_d = self._ftp.rmd(dir ) res['msg' ] = del_d except Exception as e: res['status' ] = False res['msg' ] = str (e) return res def mkd (self, dir =None ): ''' 创建目录 :param dir: 目录名称 :return: 执行结果 ''' if not dir : return 'Please input dirname' res = copy.deepcopy(self.res) try : mkd_d = self._ftp.mkd(dir ) res['msg' ] = mkd_d except Exception as e: res['status' ] = False res['msg' ] = str (e) return res def del_file (self, filename=None ): ''' 删除文件 :param filename: 文件名称 :return: 执行结果 ''' if not filename: return 'Please input filename' res = copy.deepcopy(self.res) try : del_f = self._ftp.delete(filename) res['msg' ] = del_f except Exception as e: res['status' ] = False res['msg' ] = str (e) return res def get_file_size (self, filenames=[] ): ''' 获取文件大小,单位是字节 判断文件类型 :param filename: 文件名称 :return: 执行结果 ''' if not filenames: return {'msg' : 'This is an empty directory' } res_l = [] for file in filenames: res_d = {} try : size = self._ftp.size(file) type = 'f' except : size = '-' type = 'd' file = file + '/' res_d['filename' ] = file res_d['size' ] = size res_d['type' ] = type res_l.append(res_d) return res_l def rename (self, old_name=None , new_name=None ): ''' 重命名 :param old_name: 旧的文件或者目录名称 :param new_name: 新的文件或者目录名称 :return: 执行结果 ''' if not old_name or not new_name: return 'Please input old_name and new_name' res = copy.deepcopy(self.res) try : rename_f = self._ftp.rename(old_name, new_name) res['msg' ] = rename_f except Exception as e: res['status' ] = False res['msg' ] = str (e) return res def close (self ): ''' 退出ftp连接 :return: ''' try : self._ftp.quit() except Exception: return 'No response from server' finally : self._ftp.close()
SSH 客户端 此脚本仅用于通过 key 连接,如需要密码连接,简单修改下即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 import paramikoclass SSHClient : def __init__ (self, host, port, user, pkey ): self.ssh_host = host self.ssh_port = port self.ssh_user = user self.private_key = paramiko.RSAKey.from_private_key_file(pkey) self.ssh = None self._connect() def _connect (self ): self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) try : self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10 ) except : return 'ssh connect fail' def execute_command (self, command ): stdin, stdout, stderr = self.ssh.exec_command(command) out = stdout.read() err = stderr.read() return out, err def close (self ): self.ssh.close()
Saltstack 客户端 通过 api 对 Saltstack 服务端进行操作,执行命令。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 import requestsimport jsonimport copyclass SaltApi : """ 定义salt api接口的类 初始化获得token """ def __init__ (self ): self.url = "http://172.85.10.21:8000/" self.username = "saltapi" self.password = "saltapi" self.headers = {"Content-type" : "application/json" } self.params = {'client' : 'local' , 'fun' : None , 'tgt' : None , 'arg' : None } self.login_url = self.url + "login" self.login_params = {'username' : self.username, 'password' : self.password, 'eauth' : 'pam' } self.token = self.get_data(self.login_url, self.login_params)['token' ] self.headers['X-Auth-Token' ] = self.token def get_data (self, url, params ): ''' 请求url获取数据 :param url: 请求的url地址 :param params: 传递给url的参数 :return: 请求的结果 ''' send_data = json.dumps(params) request = requests.post(url, data=send_data, headers=self.headers) response = request.json() result = dict (response) return result['return' ][0 ] def get_auth_keys (self ): ''' 获取所有已经认证的key :return: ''' data = copy.deepcopy(self.params) data['client' ] = 'wheel' data['fun' ] = 'key.list_all' result = self.get_data(self.url, data) try : return result['data' ]['return' ]['minions' ] except Exception as e: return str (e) def get_grains (self, tgt, arg='id' ): """ 获取系统基础信息 :tgt: 目标主机 :return: """ data = copy.deepcopy(self.params) if tgt: data['tgt' ] = tgt else : data['tgt' ] = '*' data['fun' ] = 'grains.item' data['arg' ] = arg result = self.get_data(self.url, data) return result def execute_command (self, tgt, fun='cmd.run' , arg=None , tgt_type='list' , salt_async=False ): """ 执行saltstack 模块命令,类似于salt '*' cmd.run 'command' :param tgt: 目标主机 :param fun: 模块方法 可为空 :param arg: 传递参数 可为空 :return: 执行结果 """ data = copy.deepcopy(self.params) if not tgt: return {'status' : False , 'msg' : 'target host not exist' } if not arg: data.pop('arg' ) else : data['arg' ] = arg if tgt != '*' : data['tgt_type' ] = tgt_type if salt_async: data['client' ] = 'local_async' data['fun' ] = fun data['tgt' ] = tgt result = self.get_data(self.url, data) return result def jobs (self, fun='detail' , jid=None ): """ 任务 :param fun: active, detail :param jod: Job ID :return: 任务执行结果 """ data = {'client' : 'runner' } data['fun' ] = fun if fun == 'detail' : if not jid: return {'success' : False , 'msg' : 'job id is none' } data['fun' ] = 'jobs.lookup_jid' data['jid' ] = jid else : return {'success' : False , 'msg' : 'fun is active or detail' } result = self.get_data(self.url, data) return result
(SaltStack)[http://www.saltstack.com/]是一个服务器基础架构集中化管理平台,具备配置管理、远程执行、监控等功能,一般可以理解为简化版的[puppet)[http://puppetlabs.com]和加强版的(func)[https://fedorahosted.org/func/] Ansible。 SaltStack基于Python语言实现,结合轻量级消息队列(ZeroMQ)与Python第三方模块(Pyzmq、PyCrypto、Pyjinjia2、python-msgpack和PyYAML等)构建。有如下特性:
部署简单、方便;
支持大部分UNIX/Linux及Windows环境;
主从集中化管理;
配置简单、功能强大、扩展性强;
主控端(master)和被控端(minion)基于证书认证,安全可靠;
支持API及自定义模块,可通过Python轻松扩展。
vCenter 客户端 通过官方 SDK 对 vCenter 进行日常操作,此脚本是我用于 cmdb 平台的,自动获取主机信息,存入数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSLfrom pyVmomi import vimfrom asset import modelsimport atexitclass Vmware : def __init__ (self, ip, user, password, port, idc, vcenter_id ): self.ip = ip self.user = user self.password = password self.port = port self.idc_id = idc self.vcenter_id = vcenter_id def get_obj (self, content, vimtype, name=None ): ''' 列表返回,name 可以指定匹配的对象 ''' container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True ) obj = [ view for view in container.view ] return obj def get_esxi_info (self ): esxi_host = {} res = {"connect_status" : True , "msg" : None } try : si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60 ) except Exception as e: res['connect_status' ] = False try : res['msg' ] = ("%s Caught vmodl fault : " + e.msg) % (self.ip) except Exception as e: res['msg' ] = '%s: connection error' % (self.ip) return res atexit.register(Disconnect, si) content = si.RetrieveContent() esxi_obj = self.get_obj(content, [vim.HostSystem]) for esxi in esxi_obj: esxi_host[esxi.name] = {} esxi_host[esxi.name]['idc_id' ] = self.idc_id esxi_host[esxi.name]['vcenter_id' ] = self.vcenter_id esxi_host[esxi.name]['server_ip' ] = esxi.name esxi_host[esxi.name]['manufacturer' ] = esxi.summary.hardware.vendor esxi_host[esxi.name]['server_model' ] = esxi.summary.hardware.model for i in esxi.summary.hardware.otherIdentifyingInfo: if isinstance (i, vim.host.SystemIdentificationInfo): esxi_host[esxi.name]['server_sn' ] = i.identifierValue esxi_host[esxi.name]['system_name' ] = esxi.summary.config.product.fullName esxi_cpu_total = esxi.summary.hardware.numCpuThreads esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 / 1024 esxi_disk_total = 0 for ds in esxi.datastore: esxi_disk_total += ds.summary.capacity / 1024 / 1024 / 1024 default_configure = { 'cpu' : 4 , 'memory' : 8 , 'disk' : 100 } esxi_host[esxi.name]['vm_host' ] = [] vm_usage_total_cpu = 0 vm_usage_total_memory = 0 vm_usage_total_disk = 0 for vm in esxi.vm: host_info = {} host_info['vm_name' ] = vm.name host_info['power_status' ] = vm.runtime.powerState host_info['cpu_total_kernel' ] = str (vm.config.hardware.numCPU) + '核' host_info['memory_total' ] = str (vm.config.hardware.memoryMB) + 'MB' host_info['system_info' ] = vm.config.guestFullName disk_info = '' disk_total = 0 for d in vm.config.hardware.device: if isinstance (d, vim.vm.device.VirtualDisk): disk_total += d.capacityInKB / 1024 / 1024 disk_info += d.deviceInfo.label + ": " + str ((d.capacityInKB) / 1024 / 1024 ) + ' GB' + ',' host_info['disk_info' ] = disk_info esxi_host[esxi.name]['vm_host' ].append(host_info) if host_info['power_status' ] == 'poweredOn' : vm_usage_total_cpu += vm.config.hardware.numCPU vm_usage_total_disk += disk_total vm_usage_total_memory += (vm.config.hardware.memoryMB / 1024 ) esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu esxi_memory_free = esxi_memory_total - vm_usage_total_memory esxi_disk_free = esxi_disk_total - vm_usage_total_disk esxi_host[esxi.name]['cpu_info' ] = 'Total: %d核, Free: %d核' % (esxi_cpu_total, esxi_cpu_free) esxi_host[esxi.name]['memory_info' ] = 'Total: %dGB, Free: %dGB' % (esxi_memory_total, esxi_memory_free) esxi_host[esxi.name]['disk_info' ] = 'Total: %dGB, Free: %dGB' % (esxi_disk_total, esxi_disk_free) if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100 : free_allocation_vm_host = 0 else : free_allocation_vm_host = int (min ( [ esxi_cpu_free / default_configure['cpu' ], esxi_memory_free / default_configure['memory' ], esxi_disk_free / default_configure['disk' ] ] )) esxi_host[esxi.name]['free_allocation_vm_host' ] = free_allocation_vm_host esxi_host['connect_status' ] = True return esxi_host def write_to_db (self ): esxi_host = self.get_esxi_info() if not esxi_host['connect_status' ]: return esxi_host del esxi_host['connect_status' ] for machine_ip in esxi_host: esxi_host_dict = esxi_host[machine_ip] virtual_host = esxi_host[machine_ip]['vm_host' ] del esxi_host[machine_ip]['vm_host' ] obj = models.EsxiHost.objects.create(**esxi_host_dict) obj.save() for host_info in virtual_host: host_info['management_host_id' ] = obj.id obj2 = models.virtualHost.objects.create(**host_info) obj2.save()
获取域名 ssl 证书过期时间 用于 zabbix 告警
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import reimport sysimport timeimport subprocessfrom datetime import datetimefrom io import StringIOdef main (domain ): f = StringIO() comm = f"curl -Ivs https://{domain} --connect-timeout 10" result = subprocess.getstatusoutput(comm) f.write(result[1 ]) try : m = re.search('start date: (.*?)\n.*?expire date: (.*?)\n.*?common name: (.*?)\n.*?issuer: CN=(.*?)\n' , f.getvalue(), re.S) start_date = m.group(1 ) expire_date = m.group(2 ) common_name = m.group(3 ) issuer = m.group(4 ) except Exception as e: return 999999999 start_date = time.strptime(start_date, "%b %d %H:%M:%S %Y GMT" ) start_date_st = time.strftime("%Y-%m-%d %H:%M:%S" , start_date) expire_date = datetime.strptime(expire_date, "%b %d %H:%M:%S %Y GMT" ) expire_date_st = datetime.strftime(expire_date,"%Y-%m-%d %H:%M:%S" ) remaining = (expire_date-datetime.now()).days return remaining if __name__ == "__main__" : domain = sys.argv[1 ] remaining_days = main(domain) print (remaining_days)
发送今天的天气预报以及未来的天气趋势图 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 import requests import json import datetime def weather (city ): url = "http://wthrcdn.etouch.cn/weather_mini?city=%s" % city try : data = requests.get(url).json()['data' ] city = data['city' ] ganmao = data['ganmao' ] today_weather = data['forecast' ][0 ] res = "今天是{}\n今天天气概况\n城市: {:<10}\n时间: {:<10}\n高温: {:<10}\n低温: {:<10}\n风力: {:<10}\n风向: {:<10}\n天气: {:<10}\n\n稍后会发送近期温度趋势图,请注意查看。\ " .format ( ganmao, city, datetime.datetime.now().strftime('%Y-%m-%d' ), today_weather['high' ].split()[1 ], today_weather['low' ].split()[1 ], today_weather['fengli' ].split('[' )[2 ].split(']' )[0 ], today_weather['fengxiang' ],today_weather['type' ], ) return {"source_data" : data, "res" : res} except Exception as e: return str (e)
+ 获取天气预报趋势图
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import matplotlib.pyplot as pltimport reimport datetimedef Future_weather_states (forecast, save_path, day_num=5 ): ''' 展示未来的天气预报趋势图 :param forecast: 天气预报预测的数据 :param day_num: 未来几天 :return: 趋势图 ''' future_forecast = forecast dict ={} for i in range (day_num): data = [] date = future_forecast[i]["date" ] date = int (re.findall("\d+" ,date)[0 ]) data.append(int (re.findall("\d+" , future_forecast[i]["high" ])[0 ])) data.append(int (re.findall("\d+" , future_forecast[i]["low" ])[0 ])) data.append(future_forecast[i]["type" ]) dict [date] = data data_list = sorted (dict .items()) date=[] high_temperature = [] low_temperature = [] for each in data_list: date.append(each[0 ]) high_temperature.append(each[1 ][0 ]) low_temperature.append(each[1 ][1 ]) fig = plt.plot(date,high_temperature,"r" ,date,low_temperature,"b" ) current_date = datetime.datetime.now().strftime('%Y-%m' ) plt.rcParams['font.sans-serif' ] = ['SimHei' ] plt.rcParams['axes.unicode_minus' ] = False plt.xlabel(current_date) plt.ylabel("℃" ) plt.legend(["高温" , "低温" ]) plt.xticks(date) plt.title("最近几天温度变化趋势" ) plt.savefig(save_path)
+ 发送到企业微信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 import requests import json class DLF : def __init__ (self, corpid, corpsecret ): self.url = "https://qyapi.weixin.qq.com/cgi-bin" self.corpid = corpid self.corpsecret = corpsecret self._token = self._get_token() def _get_token (self ): ''' 获取企业微信API接口的access_token :return: ''' token_url = self.url + "/gettoken?corpid=%s&corpsecret=%s" %(self.corpid, self.corpsecret) try : res = requests.get(token_url).json() token = res['access_token' ] return token except Exception as e: return str (e) def _get_media_id (self, file_obj ): get_media_url = self.url + "/media/upload?access_token={}&type=file" .format (self._token) data = {"media" : file_obj} try : res = requests.post(url=get_media_url, files=data) media_id = res.json()['media_id' ] return media_id except Exception as e: return str (e) def send_text (self, agentid, content, touser=None , toparty=None ): send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser" : touser, "toparty" : toparty, "msgtype" : "text" , "agentid" : agentid, "text" : { "content" : content } } try : res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str (e) def send_image (self, agentid, file_obj, touser=None , toparty=None ): media_id = self._get_media_id(file_obj) send_msg_url = self.url + "/message/send?access_token=%s" % (self._token) send_data = { "touser" : touser, "toparty" : toparty, "msgtype" : "image" , "agentid" : agentid, "image" : { "media_id" : media_id } } try : res = requests.post(send_msg_url, data=json.dumps(send_data)) except Exception as e: return str (e) + main脚本 from plugins.weather_forecast import weatherfrom plugins.trend_chart import Future_weather_statesfrom plugins.send_wechat import DLFimport oscorpid = "xxx" corpsecret = "xxx" agentid = "xxx" _path = os.path.dirname(os.path.abspath(__file__)) save_path = os.path.join(_path ,'./tmp/weather_forecast.jpg' ) content = weather("大兴" ) dlf = DLF(corpid, corpsecret) dlf.send_text(agentid=agentid, content=content['res' ], toparty='1' ) Future_weather_states(content['source_data' ]['forecast' ], save_path) file_obj = open (save_path, 'rb' ) dlf.send_image(agentid=agentid, toparty='1' , file_obj=file_obj)