Compare commits

..

2 Commits

Author SHA1 Message Date
Gilles MOREL 983e4adee6
Mise en cache des données des capteurs
J'ai ajouté une classe qui permet de stocker et restituer le contenu des données des capteurs pour une minute. Cette classe met automatiquement à jour à la prochaine requête sur les données à l'expiration de la durée de vie du cache.
2020-10-20 17:02:34 +02:00
Gilles MOREL 5cc3b483ed
Récupération de l'état et la puissance des prises 2020-10-20 10:28:22 +02:00
1 changed files with 154 additions and 38 deletions

View File

@ -8,12 +8,13 @@ from urllib import request
from http.cookiejar import CookieJar
from urllib.parse import urlencode
from urllib.error import HTTPError
from time import time
authors = (
'Gilles "Almtesh" Émilien MOREL',
)
name = 'homesfr pour Python 3'
version = '1.3'
version = '1.4'
# Modes utilisables
MODE_OFF = 0
@ -28,6 +29,8 @@ SIREN = 'SIREN' # https://boutique.home.sfr.fr/sirene-interieure (et peut-
REMOTE_CONTROLER = 'REMOTE' # https://boutique.home.sfr.fr/telecommande
KEYPAD_CONTROLER = 'KEYPAD' # https://boutique.home.sfr.fr/clavier-de-commande
PRESENCE_CAMERA_DETECTOR = 'PIR_CAMERA' # https://boutique.home.sfr.fr/camera
TEMPHUM_SENSOR = 'TEMP_HUM' # https://boutique.home.sfr.fr/thermometre
ONOFF_PLUG = 'ON_OFF_PLUG' # https://boutique.home.sfr.fr/prise-commandee-connectee-legrand
base_url = 'https://home.sfr.fr'
@ -48,7 +51,6 @@ sensors_list = '/mysensors'
alerts_path = '/listalert'
# Détection
mode_get_path = '/mysensors'
mode_get_label = 'alarm_mode'
mode_set_path = '/alarmmode'
mode_set_field = 'action' # Name for GET field
@ -56,7 +58,7 @@ mode_off = 'OFF' # Value for off
mode_custom = 'CUSTOM' # Value for custom
mode_on = 'ON' # Value for on
# Cémera
# Caméra
cameras_list = '/homescope/mycams'
camera_snapshot = '/homescope/snapshot?size=4'
camera_snapshot_mac = 'mac'
@ -107,6 +109,11 @@ sensors_temp_text = 'Temperature'
sensors_hum_name = 'name'
sensors_hum_text = 'Humidity'
# Prise connectée (ON_OFF_PLUG)
sensors_oop_stateroot = 'automation'
sensors_oop_state = 'on_off'
sensors_oop_power = 'power_level'
# Fil d'événements
logs_path = '/getlog?page=1&nbparpage=10000' # je pense qu'on récupère tous les événements avec cette valeur
logs_labels = 'LOG'
@ -152,6 +159,98 @@ def get_xml_tree (fp):
class HomeSFR ():
class MySensors ():
'''
Sous-classe gérant la récupération et le stockage temporaire des données issue du fichier https://home.sfr.fr/mysensors
'''
def __init__ (self, __parent__, max_age = 60):
self.__parent__ = __parent__
self.max_age = max_age
self.data = None
self.date = None
def __add__ (self, value):
self.autorefresh ()
return (self.data + value)
def __contains__ (self, key):
self.autorefresh ()
return (key in self.data)
def __eq__ (self, value):
self.autorefresh ()
return (self.data == value)
def __ge__ (self, value):
self.autorefresh ()
return (self.data >= value)
def __getitem__ (self, key):
self.autorefresh ()
return (self.data [key])
def __gt__ (self, value):
self.autorefresh ()
return (self.data > value)
def __hash__ (self):
self.autorefresh ()
return (hash (self.data))
def __iter__ (self):
self.autorefresh ()
return (iter (self.data))
def __le__ (self, value):
self.autorefresh ()
return (self.data <= value)
def __len__ (self):
self.autorefresh ()
return (len (self.data))
def __lt__ (self, value):
self.autorefresh ()
return (self.data < value)
def __mul__ (self, value):
self.autorefresh ()
return (self.data * value)
def __ne__ (self, value):
self.autorefresh ()
return (self.data != value)
def __repr__ (self):
self.autorefresh ()
return (repr (self.data))
def __rmul__ (self, value):
self.autorefresh ()
return (value * self.data)
def count (self, value):
self.autorefresh ()
return (self.data.count (value))
def get_age (self):
if self.date is None:
return (None)
return (time () - self.date)
def refresh (self):
r = base_url + sensors_list
self.data = get_xml_tree (bytes2file (self.__parent__.get_or_autologin (r).read ()))
self.date = time ()
def autorefresh (self):
if self.date is None or self.max_age != 0 and self.get_age () > self.max_age:
self.refresh ()
def get_data (self):
self.autorefresh ()
return (self.data)
def __init__ (self, username = None, password = None, cookies = None, debug = False, autologin = True):
'''
Instancie la classe avec un identifiant et un mot de passe, ou des cookies
@ -186,6 +285,7 @@ class HomeSFR ():
else:
raise TypeError ('Les cookies doivent être de type CookieJar !')
self.opener = request.build_opener (request.HTTPCookieProcessor (self.cookies))
self.mysensors = self.MySensors (self)
def __str__ (self):
'''
@ -239,10 +339,16 @@ class HomeSFR ():
Fais une requête, si une erreur 403 est retourné, tente une authentification automatiquement (si réglé dans le paramètre autologin), sinon lève une exception
'''
try:
if self.DEBUG:
print ('Getting ' + url)
return (self.opener.open (url, data = data))
except HTTPError as e:
if '403' in str (e) and self.autologin:
if self.DEBUG:
print ('Not logged, logging…')
self.login ()
if self.DEBUG:
print ('Getting ' + url)
return (self.opener.open (url, data = data))
else:
raise e
@ -292,11 +398,7 @@ class HomeSFR ():
'''
Retourne le mode de détection
'''
r = base_url + mode_get_path
if self.DEBUG:
print ('Demande ' + r)
a = bytes2file (self.get_or_autologin (r).read ())
b = get_xml_tree (a) [1]
b = self.mysensors [1]
c = b [mode_get_label]
if self.DEBUG:
print ('Mode de détection ' + c)
@ -327,9 +429,7 @@ class HomeSFR ():
'''
Retourne un objet Sensor à partir de l'ID
'''
r = Sensor (id, self.get_or_autologin)
r.refresh ()
return (r)
return (Sensor (id, self.mysensors, self.get_or_autologin))
def get_all_sensors (self):
'''
@ -342,99 +442,101 @@ class HomeSFR ():
class Sensor ():
def __init__ (self, id, get_or_autologin):
def __init__ (self, id, mysensors, get_or_autologin):
self.id = id
self.sensor_dict = None
self.mysensors = mysensors
self.get_or_autologin = get_or_autologin
def refresh (self):
'''
Mets à jour les données du capteur
'''
r = base_url + sensors_list
self.sensor_dict = None
for i in get_xml_tree (bytes2file (self.get_or_autologin (r).read ())) [2]:
if i [0] == sensors_label and i [1] [sensors_label_id] == self.id:
self.sensor_dict = i [2]
break
self.mysensors.refresh ()
def get_raw (self):
'''
Retourne les données brutes du capteur
'''
return (self.sensor_dict)
for i in self.mysensors [2]:
if i [0] == sensors_label and i [1] [sensors_label_id] == self.id:
return (i [2])
def get_attributes (self, lst, key):
for i in lst:
if i [0] == key:
return (i [1])
raise KeyError ('no key ' + key)
def get_value (self, lst, key):
for i in lst:
if i [0] == key:
return (i [2])
raise KeyError ('no value ' + key)
raise KeyError ('no key ' + key)
def get_mac (self):
'''
Retourne l'adresse matérielle du capteur, s'il en a une
'''
return (self.get_value (self.sensor_dict, sensors_mac_field))
return (self.get_value (self.get_raw (), sensors_mac_field))
def get_type (self):
'''
Retourne le type du capteur
Les types sont ceux définis dans les constantes
'''
return (self.get_value (self.sensor_dict, sensors_type_field))
return (self.get_value (self.get_raw (), sensors_type_field))
def get_model (self):
'''
Retourne le modèle du capteur
'''
return (self.get_value (self.sensor_dict, sensors_model_field))
return (self.get_value (self.get_raw (), sensors_model_field))
def get_version (self):
'''
Retourne la version du capteur
'''
return (self.get_value (self.sensor_dict, sensors_version_field))
return (self.get_value (self.get_raw (), sensors_version_field))
def get_name (self):
'''
Retourne le nom du capteur
'''
return (self.get_value (self.sensor_dict, sensors_name_field))
return (self.get_value (self.get_raw (), sensors_name_field))
def get_longname (self):
'''
Retourne un nom long du capteur composé de son type en français et de son nom
'''
return (self.get_value (self.sensor_dict, sensors_longname_field))
return (self.get_value (self.get_raw (), sensors_longname_field))
def get_namegender (self):
'''
Retourne le genre du nom du type de capteur en français
M pour masculin et F pour féminin
'''
return (self.get_value (self.sensor_dict, sensors_namegender_field))
return (self.get_value (self.get_raw (), sensors_namegender_field))
def get_batterylevel (self):
'''
Retourne le niveau de batterie sur 10
Toute autre valeur doit être considérée comme venant d'un capteur n'ayant pas de batterie
'''
return (int (self.get_value (self.sensor_dict, sensors_batterylevel_field)))
return (int (self.get_value (self.get_raw (), sensors_batterylevel_field)))
def get_signal (self):
'''
Retourne le niveau de signal sur 10
Tout autre valeur est pour un capteur connecté par câble
'''
return (int (self.get_value (self.sensor_dict, sensors_signal_field)))
return (int (self.get_value (self.get_raw (), sensors_signal_field)))
def get_status (self):
'''
Retourne True si le capteur est considéré comme opérationnel par le système
'''
return (self.get_value (self.sensor_dict, sensors_status_field) == sensors_status_value_ok)
return (self.get_value (self.get_raw (), sensors_status_field) == sensors_status_value_ok)
def get_camera_snapshot (self):
'''
@ -449,25 +551,25 @@ class Sensor ():
Retourne l'état du mode animaux domestiques
Ce mode réduit la sensibilité du capteur pour éviter des déclanchements d'alarme dus aux animaux
'''
return (self.sensor_dict [camera_get_config_petmode] == '1')
return (self.get_raw () [camera_get_config_petmode] == '1')
def get_camera_recording (self):
'''
Retourne l'état de l'enregistrement vidéo 24/24
'''
return (self.sensor_dict [camera_get_config_recording] == '1')
return (self.get_raw () [camera_get_config_recording] == '1')
def get_camera_privacy (self):
'''
Si cette méthode retourne True, la caméra est paramétrée pour ne pas capture d'image
'''
return (self.sensor_dict [camera_get_config_privacy] == '1')
return (self.get_raw () [camera_get_config_privacy] == '1')
def get_temperature (self):
'''
Retourne la température donnée par le capteur
'''
a = self.get_value (self.sensor_dict, sensors_temphum_root_field)
a = self.get_value (self.get_raw (), sensors_temphum_root_field)
for i in a:
if i [1] [sensors_temp_name] == sensors_temp_text:
return (float (i [2].replace ('°C', '')))
@ -476,7 +578,21 @@ class Sensor ():
'''
Retourne l'humidité donnée par le capteur
'''
a = self.get_value (self.sensor_dict, sensors_temphum_root_field)
a = self.get_value (self.get_raw (), sensors_temphum_root_field)
for i in a:
if i [1] [sensors_hum_name] == sensors_hum_text:
return (int (i [2].replace ('%', '')))
return (int (i [2].replace ('%', '')))
def get_on_off_state (self):
'''
Retourne l'état d'une prise connectée, True sur la prise est fermée
'''
a = self.get_attributes (self.get_raw (), sensors_oop_stateroot)
return (True if a [sensors_oop_state] == '1' else False)
def get_on_off_power (self):
'''
Retourne la puissance active qui traverse la prise, en watts
'''
a = self.get_attributes (self.get_raw (), sensors_oop_stateroot)
return (int (a [sensors_oop_power]))