用python写一个简单预警机器人(支持微信和钉钉)

背景

线上的系统在运行中,发生故障时怎么及时的通过手机通知到相关人员?当然这是个很简单的需求,现有的方法有很多,例如:

  1. 如果我们用的云产品,那么一般都会有配套对应的监控预警功能,根据需要配置一下即可,支持短信,邮箱通知。
  2. 如果我们已经搭建了一套运维监控系统,比如zabbix之类的,那么我们学会zabbix,然后配置也即可,支持短信,邮箱通知。
  3. ...

但如果我们希望有一个比较简单轻便,能灵活定制和快速实施的方法,又能同时支持微信和钉钉通知呢?以下就介绍这样一个基于python的简单方法,暂且起个名字叫robotprobe。

上路

在开始之前,先定义两种对象,robotprobe将由这两种对象组成。

Probe:探针,用于检查检测某项功能,某个指标是否正常,并包含预警相关规则配置。

Robot:机器人,发生异常情况时,要发送通知的对象,这里特指微信机器人和钉钉机器人。

Probe

class Probe:
 def __init__(self, name, interval=10, threshold=3):
 self.name = name
 #检测间隔,单位秒
 self.interval = interval
 #当前是否有预警
 self.warning = False
 #预期消息
 self.warning_msg = None
 #预警计数器
 self.warning_counter = 0
 #阈值,预警次数达到阈值时发消息通知
 self.threshold = threshold
 #预警消息模板
 self.warning_msg_tpl = self.name + "预警:%s"
 def test(self):
 print(self.name + " testing")
 self.do_test()
 self.warning = self.warning_counter >= self.threshold
 if self.warning:
 print(self.name + " has warning:" + self.warning_msg)
 def do_test(self):
 raise NotImplementedError
 def consume_warning(self):
 warning_msg = self.warning_msg
 self.warning = False
 self.warning_msg = None
 self.warning_counter = 0
 return warning_msg

UrlProbe

class UrlProbe(Probe):
 def __init__(self, name, interval, threshold):
 super().__init__(name, interval, threshold)
 adapter = SSLAdapter('TLSv1')
 self.session = requests.Session()
 self.session.mount('https://', adapter)
 def do_test(self):
 try:
 self.session.get(self.url, timeout=10)
 except BaseException as e:
 self.warning_counter = self.warning_counter + 1
 self.warning_msg = self.warning_msg_tpl % str(e)
 print(self.warning_msg)

SqlProbe

class SqlProbe(Probe):
 def __init__(self, name, interval, threshold):
 super().__init__(name, interval, threshold)
 db = pymysql.connect(host='localhost', user='root',
 passwd='xxxx', db='demo', port=3306, charset='utf8',
 autocommit=True)
 self.cursor = db.cursor()
 def do_test(self):
 try:
 self.cursor.execute(self.sql)
 result = self.cursor.fetchone()
 #当sql查询的指标大于业务阈值时,发生预警
 if result[0] >= self.biz_threshold:
 self.warning_counter = self.warning_counter + 1
 #biz_tpl可定制业务预警消息模板
 self.warning_msg = self.warning_msg_tpl % (self.biz_tpl % (result[0], self.biz_threshold))
 print(self.warning_msg)
 except BaseException as e:
 self.warning_counter = self.warning_counter + 1
 self.warning_msg = self.warning_msg_tpl % str(e)
 print(self.warning_msg)

其他

其他类型的Probe则可以根据实际需要自由扩展实现。

Robot

class Robot:
 def __init__(self):
 pass
 def send_text(self, msg):
 raise NotImplementedError

WechatRobot

class WechatRobot(Robot):
 def __init__(self, chat_names):
 bot = Bot(console_qr=True, cache_path=True)
 self.chats = bot.search(chat_names)
 super().__init__()
 def send_text(self, msg):
 for chat in self.chats:
 chat.send_msg(msg)
 @staticmethod
 def run_embed():
 ThreadPoolExecutor(1).submit(embed)

DingTalkRobot

class DingTalkRobot(Robot):
 def __init__(self, webhook):
 self.bot = DingtalkChatbot(webhook)
 super().__init__()
 def send_text(self, msg):
 self.bot.send_text(msg, is_at_all=True)

微信机器人基于wxpy实现,wxpy功能很丰富,基本微信上收发消息相关的功能都可以用它来实现,这篇文章有个挺好的使用示例,我们这里只用到发送消息的功能,微信虽然功能,但有个缺点就是用wxpy发消息(其实是网页版微信),账号有可能被封,另外新注册的微信号也是不能用的。

钉钉机器人基于DingtalkChatbot实现,优点就是官方默认提供机器人的功能,虽然发消息有限制,但一可以控制发消息的频率,二也可加多个机器人去分流,缺点就是只能发消息,不收消息,进而根据收到的消息做定制回复。

Demo示例

import time
import schedule
from probe import UrlProbe, SqlProbe
from robot import DingTalkRobot, WechatRobot
webhook = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx'
bot = DingTalkRobot(webhook)
# bot = WechatRobot([u"研发部", u"运维部", u"someone you hate"])
# bot.run_embed()
# 每20秒检测百度网站是否正常,如果不正常超3次,则预警
baidu_probe = UrlProbe("百度", 20, 3)
baidu_probe.url = "https://www.baidux.com"
# 每30秒检测在线用户数是否太高,如果不正常达到1次,则预警
online_user_probe = SqlProbe("在线用户数", 30, 1)
online_user_probe.sql = "select count(1) from online_user"
online_user_probe.biz_threshold = 50000
online_user_probe.biz_tpl = "当前在线用户数%s,达到阈值%s"
probes = [baidu_probe, online_user_probe]
def notify_latest_state():
 for p in probes:
 if p.warning:
 bot.send_text(p.name + "异常:" + p.warning_msg)
 else:
 bot.send_text(p.name + "正常")
def send_warning(msg):
 bot.send_text(msg)
def trigger_probe(p):
 p.test()
 if p.warning:
 send_warning(p.consume_warning())
schedule.every().day.at("09:00").do(notify_latest_state)
for probe in probes:
 schedule.every(probe.interval).seconds.do(trigger_probe, probe)
if __name__ == '__main__':
 while True:
 schedule.run_pending()
 time.sleep(1)
 print("I am alive!")

依赖

certifi==2019.3.9
chardet==3.0.4
DingtalkChatbot==1.3.0
future==0.17.1
idna==2.8
itchat==1.2.32
PyMySQL==0.9.3
pypng==0.0.19
PyQRCode==1.2.1
requests==2.21.0
requests-toolbelt==0.9.1
schedule==0.6.0
urllib3==1.24.1
wxpy==0.3.9.8

源码下载

https://github.com/huangyemin/robotprobe
https://gitee.com/huangyemin/robotprobe


分享到:


相關文章: