家里卫生间是暗卫,通风全靠抽风机,洗完澡都得开着一、两个小时。但通常晚上睡觉去了,往往得开着一宿,有点浪费电。很久很久之前肉痛地换了个aqara墙壁开关,想做个自动化解决这一痛点,但发现洗澡场景不好判断,定点的自动化又有点低端,计划就搁浅了。后来在瀚思彼岸看到过一个定时器的帖子,试用下来还算可以,考量着就先这么控制吧。考虑到这个定时器是基于HA自动化做的,限制较多,最明显就是不支持多任务以及配置麻烦,于是自己折腾做了个Appdaemon版本的。为了优(zeng)雅(jia)调(nan)度(du),采用以前在公众号上看过的一篇文章提及的环形队列法。
0. 功能说明
- 支持多个定时任务
- 可定义开/关操作
- 自动加载设备列表,无需额外配置
- 采用看起来很厉害的环形定时队列
- 可记忆设备上次设置的定时时间
1.环境
- HA 0.74.0
- Appdaemon 3.0.1(安装参考教程)
2.过程
2.1Home Assistant部分
2.1.1配置项
- common_timer.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55#Packages配置方式
#{HA配置目录}/packages/common_timer.yaml
homeassistant:
customize:
input_select.domain:
icon: mdi:format-list-bulleted-type
input_select.entity:
icon: mdi:format-list-checkbox
input_select.opera:
icon: mdi:nintendo-switch
input_text.common_timer:
friendly_name: 延迟时间
icon: mdi:timer-sand
input_boolean.timer_button:
icon: mdi:sync
group:
common_timer:
name: 通用定时器
entities:
- input_select.domain
- input_select.entity
- input_select.opera
- input_text.common_timer
- input_boolean.timer_button
input_text:
common_timer:
name: common_timer
initial: 00:00:00
pattern: '([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]'
input_select:
domain:
name: 设备类型
options:
- 请选择设备类型
initial: '请选择设备类型'
entity:
name: 设备名称
options:
- 请选择设备
initial: '请选择设备'
opera:
name: 操作
options:
- 开
- 关
initial: '关'
input_boolean:
timer_button:
name: '启用/暂停'
initial: off
icon: mdi:switch配置说明
增加5个entity:3个input_select用于选择设备类型、设备、操作类型;1个input_text用于输入定时;1个input_boolean用于开关按钮。
2.2Appdaemon部分
2.2.1配置项
- apps.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17#{appdaemon配置目录}/apps/apps.yaml
timer:
module: common_timer
class: Timer
domains: #设置可控制的设备类型
- light
- switch
- script
- automation
exclude: #排除设备
- light.xxx
should_zh: False #是否排除friendly_name非中文设备
input_domain: 'input_select.domain' #设备类型下拉框
input_entity: 'input_select.entity' #设备下拉框
input_opera: 'input_select.opera' #操作类型下拉框
input_duration: 'input_text.common_timer' #延时时间文本框
switch: 'input_boolean.timer_button' #控制按钮domains
目前只设置开(turn_on)和关(turn_off)方法,只有HA注册了对应的服务才能执行,目前light、switch、script、automation几类都有。
entity配置
新增的5个entity(下拉框、文本框、按钮)的id设置与2.1章节的HA配置一致即可,可自行定义。
2.2.2代码
common_timer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173#{appdaemon配置目录}/apps/common_timer.py
import appdaemon.plugins.hass.hassapi as hass
from datetime import datetime,timedelta
import DelayQueue
import re
class Timer(hass.Hass):
def initialize(self):
self._entities = {}
try:
self._input_domain = self.args['input_domain']
self._input_entity = self.args['input_entity']
self._input_opera = self.args['input_opera']
self._input_duration = self.args['input_duration']
self._switch = self.args['switch']
except KeyError as e:
self.log("初始化失败,apps.yaml中需要定义input_domain、input_entity、input_opera、input_duration、switch")
self._queue = DelayQueue.DelayQueue(60, self) #环形延时队列
self._dic_friendly_name = {} #存储friendly_name与entity_id的单向映射
self._dic_opera = {'turn_on':'开','turn_off':'关','开':'turn_on','关':'turn_off'} #存储opera与中文名的双向映射
self.register_constraint('hasTask') #判断是否有延时任务
self._domains = self.args.get('domains', ['light', 'switch']) #设置设备类型列表
self._exclude = self.args.get('exclude', []) #排除指定设备
self._should_zh = self.args.get('should_zh', True) #排除friendly_name非中文设备,默认是
self._exclude.append(self._switch) #忽略控制开关
#设置可配置延时的设备(存在friendly_name信息)
entites = self.get_state()
zhPattern = re.compile(u'[\u4e00-\u9fa5]+')
for entity in entites:
domain = entity.split('.')[0]
if entity in self._exclude or domain in self._exclude or domain not in self._domains:
pass
else:
friendly_name = entites[entity].get('attributes').get('friendly_name', None)
if friendly_name is None:
pass
elif not self._should_zh or zhPattern.search(friendly_name):
self._dic_friendly_name.setdefault(friendly_name, entity)
self._entities.setdefault(domain,{}).setdefault(entity,{}).setdefault('friendly_name', friendly_name)
self._entities[domain][entity]['id'] = entity
self._entities[domain][entity]['duration'] = '0:00:00'
self._entities[domain][entity]['remaining'] = '0:00:00'
self._entities[domain][entity]['handle'] = None
self._entities[domain][entity]['opera'] = 'turn_on' if domain == 'autonmation' or domain == 'script' else 'turn_off' #设置默认操作
else:
self.log("{}({}):无中文,忽略".format(friendly_name, entity))
options= list(self._entities.keys())
options.insert(0,'请选择设备类型')
#初始化下拉框
self.call_service('input_select/set_options', entity_id = self._input_domain, options = options)
self.call_service('input_select/set_options', entity_id = self._input_entity, options = '请选择设备')
self.select_option(self._input_domain, '请选择设备类型')
self.select_option(self._input_entity, '请选择设备')
#下拉框改变联动
self.listen_state(self.choose_entity, self._input_entity)
self.listen_state(self.choose_domain, self._input_domain)
# self.listen_state(self.choose_opera, self._input_opera) #更改选项保存,否则只在开始执行脚本才保存
#开关触发
self.listen_state(self.switch, self._switch)
#环形延时队列处理以及刷新倒计时
handle = self.run_every(self.queue_task, datetime.now(), 1, hasTask=1)
def queue_task(self, kwargs):
#环形延时队列处理事务
self._queue.read()
#刷新倒计时
if self.get_state(self._switch) == 'on' and self.get_state(self._input_entity) != '请选择设备':
domain = self.get_state(self._input_domain)
entity = self._dic_friendly_name.get(self.get_state(self._input_entity), None)
if entity is None:
self.log("Function task: friendly_name not found in dic !")
return
remaining_time = self._queue.get_remaining_time(self._entities[domain][entity]['handle'])
if remaining_time is None:
remaining_time = self._entities[domain][entity]['remaining']
if remaining_time == '0:00:00':
self.set_state(self._input_duration, state = self._entities[domain][entity]['duration'])
else:
self.set_state(self._input_duration, state = remaining_time)
self.set_state(self._switch, state = 'off')
else:
self.set_state(self._input_duration, state = remaining_time)
def choose_domain(self, entity, attribute, old, new, kwargs):
if new == '请选择设备类型':
options = '请选择设备'
else:
domain = new.split('.')[0]
options = [self._entities[domain][entity]['friendly_name'] for entity in self._entities[domain] if self._entities[domain][entity]['friendly_name'] is not None]
# self.log('加载设备列表:{}'.format(options))
options.insert(0,'请选择设备')
self.call_service('input_select/set_options', entity_id = self._input_entity, options = options)
self.select_option(self._input_entity, '请选择设备')
def choose_entity(self, entity, attribute, old, new, kwargs):
if new == '请选择设备':
self.set_state(self._input_duration, state= '0:00:00')
self.set_state(self._switch, state = 'off')
else: #加载entity的倒计时信息:先取remaining,否则取duration
entity = self._dic_friendly_name.get(new, None)
if entity is None:
self.log("Function choose_entity: friendly_name not found in dic !")
return
domain = self.get_state(self._input_domain)
remaining_time = self._queue.get_remaining_time(self._entities[domain][entity]['handle'])
if remaining_time is not None:
duration = remaining_time
self.set_state(self._input_duration, state= duration)
self.set_state(self._switch, state = 'on')
else:
duration = self._entities[domain][entity]['remaining'] if self._entities[domain][entity]['remaining'] != '0:00:00' else self._entities[domain][entity]['duration']
self.set_state(self._input_duration, state= duration)
self.set_state(self._switch, state = 'off')
# self.log("entity:{},opera:{}".format(entity,self._entities[domain][entity]['opera']))
self.select_option(self._input_opera, self._dic_opera.get(self._entities[domain][entity]['opera']))
def choose_opera(self, entity, attribute, old, new, kwargs):
entity = self._dic_friendly_name.get(self.get_state(self._input_entity), None)
domain = entity.split('.')[0]
if domain is not None and self.get_state(self._switch) == 'off': #当前不执行任务才更新opera
self._entities[domain][entity]['opera'] = self._dic_opera.get(new)
def switch(self, entity, attribute, old, new, kwargs):
domain = self.get_state(self._input_domain)
if domain != '请选择设备类型':
entity = self._dic_friendly_name.get(self.get_state(self._input_entity), None)
if entity is None:
self.log("Function switch: friendly_name not found in dic !")
self.set_state(self._switch, state = 'off')
return
if entity != '请选择设备':
duration = self.get_state(self._input_duration)
if duration == '0:00:00':
return
if new == 'on': #开操作
if self._entities[domain][entity]['handle'] is None:
if self._entities[domain][entity]['remaining'] != duration: #倒计时剩余时间有修改,则重新设置倒计时时间
self._entities[domain][entity]['duration'] = duration
opera = self._dic_opera.get(self.get_state(self._input_opera))
self._entities[domain][entity]['handle'] = self._queue.insert(entity, duration, self.handle_task, opera = opera) #队列增加任务
self._entities[domain][entity]['opera'] = opera #保存操作选项
else: #关操作
self._queue.remove(self._entities[domain][entity]['handle'])
self._entities[domain][entity]['handle'] = None
self._entities[domain][entity]['remaining'] = duration #记录倒计时剩余时间
else:
self.log("未选设备")
self.set_state(self._switch, state = 'off')
else:
self.log("未选设备类型")
self.set_state(self._switch, state = 'off')
#回调处理
def handle_task(self, entity, opera, **kwargs):
domain = entity.split('.')[0]
self._entities[domain][entity]['handle'] = None
self._entities[domain][entity]['remaining'] = '0:00:00' #倒计时剩余时间置零
if opera == 'custom':
#自定义处理方法
pass
else:
service_name = domain+'/'+opera
self.call_service(service_name, entity_id=entity)
self.log("handle_task:{}({})".format(service_name,entity))
def hasTask(self, value):
return True #注释掉的话,会根据是否有定时任务再启动队列处理,感觉然并卵所以还没测试
for domain in self._entities:
for entity in self._entities[domain]:
if self._entities[domain][entity]['handle'] is not None:
return True
return FalseTips
大部分代码都是实现界面交互。
DelayQueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130#{appdaemon配置目录}/apps/DelayQueue.py
from datetime import datetime,timedelta
class DelayQueue(object):
__current_slot = 1
def __init__(self, slots_per_loop, app, **kwargs):
self.__slots_per_loop = slots_per_loop
self.__queue = [[] for i in range(slots_per_loop)]
self.app = app
def insert(self, task_id, duration, callback, opera = 'turn_off', **kwargs):
if duration == "0:00:00":
return None
second = time_period_str(duration).total_seconds()
loop = second / len(self.__queue)
slot = (second + self.__current_slot - 1) % len(self.__queue)
delayQueueTask = DelayQueueTask(task_id, opera, int(slot), loop, callback, kwargs = kwargs)
self.__queue[delayQueueTask.slot].append(delayQueueTask)
self.app.log("create task:{}/{}".format(delayQueueTask.slot, delayQueueTask.loop))
return delayQueueTask
def remove(self, delayQueueTask):
# self.app.log("remove task in slot {}".format(delayQueueTask.slot))
if delayQueueTask is not None:
self.__queue[delayQueueTask.slot].remove(delayQueueTask)
def get_remaining_time(self, delayQueueTask):
if delayQueueTask:
if self.__current_slot - 1 > delayQueueTask.slot and self.__current_slot - 1 < 60:
second = self.__slots_per_loop * (delayQueueTask.loop + 1) + delayQueueTask.slot - (self.__current_slot - 1)
else:
second = self.__slots_per_loop * delayQueueTask.loop + delayQueueTask.slot - (self.__current_slot - 1)
return str(timedelta(seconds = second))
else:
return None
def read(self):
if len(self.__queue) >= self.__current_slot:
tasks = self.__queue[self.__current_slot - 1]
# self.app.log("current slot:{}(has {} tasks)".format(self.__current_slot - 1,len(tasks)))
if tasks:
executed_task = []
for task in tasks:
# self.app.log("task info:{}/{},should_execute:{}".format(task.slot, task.loop, task.should_exec))
if task.should_exec:
# run
task.exec_task()
executed_task.append(task)
else:
task.nextLoop()
for task in executed_task:
tasks.remove(task)
self.__current_slot += 1
if self.__current_slot > len(self.__queue):
self.__current_slot = 1
class DelayQueueTask(object):
def __init__(self, task_id, opera:str = 'turn_off', slot:int = 0 , loop:int = 0 , exec_task = None, **kwargs):
self._task_id = task_id
self._opera = opera
self._slot = int(slot)
self._loop = int(loop)
self._exec_task = exec_task
self._kwargs = kwargs
def slot(self) -> int:
return int(self._slot)
def loop(self) -> int:
return int(self._loop)
def task_id(self):
return self._task_id
def opera(self):
return self._opera
def nextLoop(self):
self._loop -= 1
def should_exec(self) -> bool:
if self._loop == 0:
return True
else:
return False
# @callback
def exec_task(self):
self._exec_task(self._task_id, self._opera, kwargs = self._kwargs)
#copy from HA
def time_period_str(value: str) -> timedelta:
"""Validate and transform time offset."""
if isinstance(value, int):
raise vol.Invalid('Make sure you wrap time values in quotes')
elif not isinstance(value, str):
raise vol.Invalid(TIME_PERIOD_ERROR.format(value))
negative_offset = False
if value.startswith('-'):
negative_offset = True
value = value[1:]
elif value.startswith('+'):
value = value[1:]
try:
parsed = [int(x) for x in value.split(':')]
except ValueError:
raise vol.Invalid(TIME_PERIOD_ERROR.format(value))
if len(parsed) == 2:
hour, minute = parsed
second = 0
elif len(parsed) == 3:
hour, minute, second = parsed
else:
raise vol.Invalid(TIME_PERIOD_ERROR.format(value))
offset = timedelta(hours=hour, minutes=minute, seconds=second)
if negative_offset:
offset *= -1
return offsetTips
需要另外一个定时器方法进行周期性调用才能工作,用了Appdaemon的run_every()方法。