import logging
from json import JSONDecodeError
import requests
import jmespath
[docs]class Watcher(object):
"""Base Watcher object for resource monitoring"""
[docs] def __init__(self, reminder, schedules, *args, **kwargs):
"""
Create Watcher object.
:param Reminder reminder: Reminder instance to associate watcher with.
:param dict schedules:
Initial job schedules for watcher to use.
*Possibly going to be removed from base class*
"""
self._logger = logging.getLogger(__name__)
self.reminder = reminder
self.schedules = schedules
self._logger.setLevel(reminder._logger.level)
self._logger.debug('new watcher created: {}'.format(self.__dict__))
[docs] def update(self):
"""
**REQUIRED**
Return status from monitored resource. Up to concrete class to determine implementation.
"""
raise NotImplementedError('update() not yet implemented')
[docs]class HTTPWatcher(Watcher):
"""Watcher object for monitoring HTTP(S) REST Resource."""
[docs] def __init__(self, request_kwargs, json_expression, *args, **kwargs):
"""
Create HTTPWatcher object.
note:
Assumes response is JSON. May require separate classes for JSON/XML/Others in future.
:param dict request_kwargs:
Dictionary containing keyword arguments to be passed to requests.get()
:param str json_expression:
JMESPath expression to be used to retrieve status from results JSON object.
"""
super().__init__(*args, **kwargs)
self.request_kwargs = request_kwargs
self.json_expression = json_expression
[docs] def update(self):
"""Return resource status for Reminder to evaluate."""
response = requests.get(**self.request_kwargs)
try:
result = jmespath.search(self.json_expression, response.json())
except JSONDecodeError:
self._logger.error('Unable to decode JSON from {}'.format(response))
result = None
return result
[docs]class MQTTWatcher(Watcher):
"""Watcher object for monitoring MQTT Resource."""
[docs] def __init__(self, hostname, port=1883, tls=False, topic_kwargs=None, username=None, password=None, *args, **kwargs):
"""
Create MQTTWatcher object.
:param str hostname: url for MQTT client to connect to.
:param int port: port to be used for MQTT connection.
:param bool tls: Use SSL/TLS for secure connection.
:param dict topic_kwargs:
Dictionary containing:
* topic to monitor
* condition to start Alerter
* condition to cancel Alerter
.. note:: May be replaced with just topic as `str` in future.
:param str username: Username for MQTT client authentication.
:param str password: Password for MQTT client authentication.
"""
super().__init__(*args, **kwargs)
self.topic_kwargs = topic_kwargs
self._client = paho.Client()
self.status = None
for topic in topic_kwargs:
self._client.message_add_callback(topic, listener_callback)
[docs] def listener_callback(client, userdata, msg):
"""
Callback to set status when msg received on monitored topic.
:param client: Required by callback signature.
:param userdata: Required by callback signature.
:param msg: Message received on topic that generated this callback.
"""
# This will be called by configured topics
try:
self.status = msg.payload if isinstance(msg.payload, str) else msg.payload.decode('utf8')
except UnicodeDecodeError:
self.status = 'ERR'
# Trigger condition evaluation on callback
self.reminder.check()
[docs] def update(self):
"""Return status for Reminder evaluation."""
return self.status
[docs]class NullWatcher(Watcher):
"""Empty watcher for timed reminders"""
[docs] def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.reminder.condition = 'True' # Ghetto hack to force evaluation to always be True
return None