用python写一个简单预警机器人(支持微信和钉钉)

背景

线上的系统在运行中,发生故障时怎么及时的通过手机通知到相关人员?当然这是个很简单的需求,现有的方法有很多,例如:

如果我们用的云产品,那么一般都会有配套对应的监控预警功能,根据需要配置一下即可,支持短信,邮箱通知。如果我们已经搭建了一套运维监控系统,比如zabbix之类的,那么我们学会zabbix,然后配置也即可,支持短信,邮箱通知。...

但如果我们希望有一个比较简单轻便,能灵活定制和快速实施的方法,又能同时支持微信和钉钉通知呢?以下就介绍这样一个基于python的简单方法,暂且起个名字叫robotprobe。

上路

在开始之前,先定义两种对象,robotprobe将由这两种对象组成。

Probe:探针,用于检查检测某项功能,某个指标是否正常,并包含预警相关规则配置。

Robot:机器人,发生异常情况时,要发送通知的对象,这里特指微信机器人和钉钉机器人。

Probe

class Probe: def __init__(self, name, interval=10, threshold=3): self.name = name #检测间隔,单位秒 self.interval = interval #当前是否有预警 self.warning = False #预期消息 self.warning_msg = None #预警计数器 self.warning_counter = 0 #阈值,预警次数达到阈值时发消息通知 self.threshold = threshold #预警消息模板 self.warning_msg_tpl = self.name + "预警:%s" def test(self): print(self.name + " testing") self.do_test() self.warning = self.warning_counter >= self.threshold if self.warning: print(self.name + " has warning:" + self.warning_msg) def do_test(self): raise NotImplementedError def consume_warning(self): warning_msg = self.warning_msg self.warning = False self.warning_msg = None self.warning_counter = 0 return warning_msg

UrlProbe

class UrlProbe(Probe): def __init__(self, name, interval, threshold): super().__init__(name, interval, threshold) adapter = SSLAdapter('TLSv1') self.session = requests.Session() self.session.mount('https://', adapter) def do_test(self): try: self.session.get(self.url, timeout=10) except BaseException as e: self.warning_counter = self.warning_counter + 1 self.warning_msg = self.warning_msg_tpl % str(e) print(self.warning_msg)

SqlProbe

class SqlProbe(Probe): def __init__(self, name, interval, threshold): super().__init__(name, interval, threshold) db = pymysql.connect(host='localhost', user='root', passwd='xxxx', db='demo', port=3306, charset='utf8', autocommit=True) self.cursor = db.cursor() def do_test(self): try: self.cursor.execute(self.sql) result = self.cursor.fetchone() #当sql查询的指标大于业务阈值时,发生预警 if result[0] >= self.biz_threshold: self.warning_counter = self.warning_counter + 1 #biz_tpl可定制业务预警消息模板 self.warning_msg = self.warning_msg_tpl % (self.biz_tpl % (result[0], self.biz_threshold)) print(self.warning_msg) except BaseException as e: self.warning_counter = self.warning_counter + 1 self.warning_msg = self.warning_msg_tpl % str(e) print(self.warning_msg)

其他

其他类型的Probe则可以根据实际需要自由扩展实现。

Robot

class Robot: def __init__(self): pass def send_text(self, msg): raise NotImplementedError

WechatRobot

class WechatRobot(Robot): def __init__(self, chat_names): bot = Bot(console_qr=True, cache_path=True) self.chats = bot.search(chat_names) super().__init__() def send_text(self, msg): for chat in self.chats: chat.send_msg(msg) @staticmethod def run_embed(): ThreadPoolExecutor(1).submit(embed)

DingTalkRobot

class DingTalkRobot(Robot): def __init__(self, webhook): self.bot = DingtalkChatbot(webhook) super().__init__() def send_text(self, msg): self.bot.send_text(msg, is_at_all=True)

微信机器人基于wxpy实现,wxpy功能很丰富,基本微信上收发消息相关的功能都可以用它来实现,这篇文章有个挺好的使用示例,我们这里只用到发送消息的功能,微信虽然功能,但有个缺点就是用wxpy发消息(其实是网页版微信),账号有可能被封,另外新注册的微信号也是不能用的。

钉钉机器人基于DingtalkChatbot实现,优点就是官方默认提供机器人的功能,虽然发消息有限制,但一可以控制发消息的频率,二也可加多个机器人去分流,缺点就是只能发消息,不收消息,进而根据收到的消息做定制回复。

Demo示例

import time import schedule from probe import UrlProbe, SqlProbe from robot import DingTalkRobot, WechatRobot webhook = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxx' bot = DingTalkRobot(webhook) # bot = WechatRobot([u"研发部", u"运维部", u"someone you hate"]) # bot.run_embed() # 每20秒检测百度网站是否正常,如果不正常超3次,则预警 baidu_probe = UrlProbe("百度", 20, 3) baidu_probe.url = "https://www.baidux.com" # 每30秒检测在线用户数是否太高,如果不正常达到1次,则预警 online_user_probe = SqlProbe("在线用户数", 30, 1) online_user_probe.sql = "select count(1) from online_user" online_user_probe.biz_threshold = 50000 online_user_probe.biz_tpl = "当前在线用户数%s,达到阈值%s" probes = [baidu_probe, online_user_probe] def notify_latest_state(): for p in probes: if p.warning: bot.send_text(p.name + "异常:" + p.warning_msg) else: bot.send_text(p.name + "正常") def send_warning(msg): bot.send_text(msg) def trigger_probe(p): p.test() if p.warning: send_warning(p.consume_warning()) schedule.every().day.at("09:00").do(notify_latest_state) for probe in probes: schedule.every(probe.interval).seconds.do(trigger_probe, probe) if __name__ == '__main__': while True: schedule.run_pending() time.sleep(1) print("I am alive!")

依赖

certifi==2019.3.9 chardet==3.0.4 DingtalkChatbot==1.3.0 future==0.17.1 idna==2.8 itchat==1.2.32 PyMySQL==0.9.3 pypng==0.0.19 PyQRCode==1.2.1 requests==2.21.0 requests-toolbelt==0.9.1 schedule==0.6.0 urllib3==1.24.1 wxpy==0.3.9.8

源码下载

https://github.com/huangyemin/robotprobe https://gitee.com/huangyemin/robotprobe