【插件】基于Appdaemon的通用定时器

家里卫生间是暗卫,通风全靠抽风机,洗完澡都得开着一、两个小时。但通常晚上睡觉去了,往往得开着一宿,有点浪费电。很久很久之前肉痛地换了个aqara墙壁开关,想做个自动化解决这一痛点,但发现洗澡场景不好判断,定点的自动化又有点低端,计划就搁浅了。后来在瀚思彼岸看到过一个定时器的帖子,试用下来还算可以,考量着就先这么控制吧。考虑到这个定时器是基于HA自动化做的,限制较多,最明显就是不支持多任务以及配置麻烦,于是自己折腾做了个Appdaemon版本的。为了优(zeng)雅(jia)调(nan)度(du),采用以前在公众号上看过的一篇文章提及的环形队列法。


0.功能说明

  • 支持多个定时任务
  • 可定义开/关操作
  • 自动加载设备列表,无需额外配置
  • 采用看起来很厉害的环形定时队列
  • 可记忆设备上次设置的定时时间

1.环境


2.过程

2.1Home Assistant部分

2.1.1配置项

  • common_timer.yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    #Packages配置方式
    #{HA配置目录}/packages/common_timer.yaml
    homeassistant:
    customize:
    input_select.domain:
    icon: mdi:format-list-bulleted-type
    input_select.entity:
    icon: mdi:format-list-checkbox
    input_select.opera:
    icon: mdi:nintendo-switch
    input_text.common_timer:
    friendly_name: 延迟时间
    icon: mdi:timer-sand
    input_boolean.timer_button:
    icon: mdi:sync

    group:
    common_timer:
    name: 通用定时器
    entities:
    - input_select.domain
    - input_select.entity
    - input_select.opera
    - input_text.common_timer
    - input_boolean.timer_button

    input_text:
    common_timer:
    name: common_timer
    initial: 00:00:00
    pattern: '([01][0-9]|2[0-3]):[0-5][0-9]:[0-5][0-9]'

    input_select:
    domain:
    name: 设备类型
    options:
    - 请选择设备类型
    initial: '请选择设备类型'
    entity:
    name: 设备名称
    options:
    - 请选择设备
    initial: '请选择设备'
    opera:
    name: 操作
    options:
    -
    -
    initial: '关'

    input_boolean:
    timer_button:
    name: '启用/暂停'
    initial: off
    icon: mdi:switch

INFO:增加三个input_select用于选择设备类型、设备、操作类型;一个input_text用于输入定时;一个input_boolean用于开关按钮。

2.2Appdaemon部分

2.2.1配置项

  • apps.yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    #{appdaemon配置目录}/apps/apps.yaml
    timer:
    module: common_timer
    class: Timer
    domains: #设置可控制的设备类型
    - light
    - switch
    - script
    - automation
    exclude: #排除设备
    - light.xxx
    should_zh: False #是否排除friendly_name非中文设备
    input_domain: 'input_select.domain' #设备类型下拉框
    input_entity: 'input_select.entity' #设备下拉框
    input_opera: 'input_select.opera' #操作类型下拉框
    input_duration: 'input_text.common_timer' #延时时间文本框
    switch: 'input_boolean.timer_button' #控制按钮

WARN:目前只设置开(turn_on)和关(turn_off)方法,只有HA注册了对应的服务才能执行,目前light、switch、script、automation几类都有。

INFO:下拉框、文本框、按钮的id设置与HA配置一致即可,可自行定义。

2.2.2代码

  • common_timer.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    #{appdaemon配置目录}/apps/common_timer.py
    import appdaemon.plugins.hass.hassapi as hass
    from datetime import datetime,timedelta
    import DelayQueue
    import re
    class Timer(hass.Hass):

    def initialize(self):
    self._entities = {}
    try:
    self._input_domain = self.args['input_domain']
    self._input_entity = self.args['input_entity']
    self._input_opera = self.args['input_opera']
    self._input_duration = self.args['input_duration']
    self._switch = self.args['switch']
    except KeyError as e:
    self.log("初始化失败,apps.yaml中需要定义input_domain、input_entity、input_opera、input_duration、switch")
    self._queue = DelayQueue.DelayQueue(60, self) #环形延时队列
    self._dic_friendly_name = {} #存储friendly_name与entity_id的单向映射
    self._dic_opera = {'turn_on':'开','turn_off':'关','开':'turn_on','关':'turn_off'} #存储opera与中文名的双向映射
    self.register_constraint('hasTask') #判断是否有延时任务
    self._domains = self.args.get('domains', ['light', 'switch']) #设置设备类型列表
    self._exclude = self.args.get('exclude', []) #排除指定设备
    self._should_zh = self.args.get('should_zh', True) #排除friendly_name非中文设备,默认是
    self._exclude.append(self._switch) #忽略控制开关

    #设置可配置延时的设备(存在friendly_name信息)
    entites = self.get_state()
    zhPattern = re.compile(u'[\u4e00-\u9fa5]+')
    for entity in entites:
    domain = entity.split('.')[0]
    if entity in self._exclude or domain in self._exclude or domain not in self._domains:
    pass
    else:
    friendly_name = entites[entity].get('attributes').get('friendly_name', None)
    if friendly_name is None:
    pass
    elif not self._should_zh or zhPattern.search(friendly_name):
    self._dic_friendly_name.setdefault(friendly_name, entity)
    self._entities.setdefault(domain,{}).setdefault(entity,{}).setdefault('friendly_name', friendly_name)
    self._entities[domain][entity]['id'] = entity
    self._entities[domain][entity]['duration'] = '0:00:00'
    self._entities[domain][entity]['remaining'] = '0:00:00'
    self._entities[domain][entity]['handle'] = None
    self._entities[domain][entity]['opera'] = 'turn_on' if domain == 'autonmation' or domain == 'script' else 'turn_off' #设置默认操作
    else:
    self.log("{}({}):无中文,忽略".format(friendly_name, entity))

    options= list(self._entities.keys())
    options.insert(0,'请选择设备类型')
    #初始化下拉框
    self.call_service('input_select/set_options', entity_id = self._input_domain, options = options)
    self.call_service('input_select/set_options', entity_id = self._input_entity, options = '请选择设备')
    self.select_option(self._input_domain, '请选择设备类型')
    self.select_option(self._input_entity, '请选择设备')
    #下拉框改变联动
    self.listen_state(self.choose_entity, self._input_entity)
    self.listen_state(self.choose_domain, self._input_domain)
    # self.listen_state(self.choose_opera, self._input_opera) #更改选项保存,否则只在开始执行脚本才保存
    #开关触发
    self.listen_state(self.switch, self._switch)
    #环形延时队列处理以及刷新倒计时
    handle = self.run_every(self.queue_task, datetime.now(), 1, hasTask=1)

    def queue_task(self, kwargs):
    #环形延时队列处理事务
    self._queue.read()
    #刷新倒计时
    if self.get_state(self._switch) == 'on' and self.get_state(self._input_entity) != '请选择设备':
    domain = self.get_state(self._input_domain)
    entity = self._dic_friendly_name.get(self.get_state(self._input_entity), None)
    if entity is None:
    self.log("Function task: friendly_name not found in dic !")
    return
    remaining_time = self._queue.get_remaining_time(self._entities[domain][entity]['handle'])
    if remaining_time is None:
    remaining_time = self._entities[domain][entity]['remaining']
    if remaining_time == '0:00:00':
    self.set_state(self._input_duration, state = self._entities[domain][entity]['duration'])
    else:
    self.set_state(self._input_duration, state = remaining_time)
    self.set_state(self._switch, state = 'off')
    else:
    self.set_state(self._input_duration, state = remaining_time)

    def choose_domain(self, entity, attribute, old, new, kwargs):
    if new == '请选择设备类型':
    options = '请选择设备'
    else:
    domain = new.split('.')[0]
    options = [self._entities[domain][entity]['friendly_name'] for entity in self._entities[domain] if self._entities[domain][entity]['friendly_name'] is not None]
    # self.log('加载设备列表:{}'.format(options))
    options.insert(0,'请选择设备')
    self.call_service('input_select/set_options', entity_id = self._input_entity, options = options)
    self.select_option(self._input_entity, '请选择设备')

    def choose_entity(self, entity, attribute, old, new, kwargs):
    if new == '请选择设备':
    self.set_state(self._input_duration, state= '0:00:00')
    self.set_state(self._switch, state = 'off')
    else: #加载entity的倒计时信息:先取remaining,否则取duration
    entity = self._dic_friendly_name.get(new, None)
    if entity is None:
    self.log("Function choose_entity: friendly_name not found in dic !")
    return
    domain = self.get_state(self._input_domain)
    remaining_time = self._queue.get_remaining_time(self._entities[domain][entity]['handle'])
    if remaining_time is not None:
    duration = remaining_time
    self.set_state(self._input_duration, state= duration)
    self.set_state(self._switch, state = 'on')
    else:
    duration = self._entities[domain][entity]['remaining'] if self._entities[domain][entity]['remaining'] != '0:00:00' else self._entities[domain][entity]['duration']
    self.set_state(self._input_duration, state= duration)
    self.set_state(self._switch, state = 'off')
    # self.log("entity:{},opera:{}".format(entity,self._entities[domain][entity]['opera']))
    self.select_option(self._input_opera, self._dic_opera.get(self._entities[domain][entity]['opera']))

    def choose_opera(self, entity, attribute, old, new, kwargs):
    entity = self._dic_friendly_name.get(self.get_state(self._input_entity), None)
    domain = entity.split('.')[0]
    if domain is not None and self.get_state(self._switch) == 'off': #当前不执行任务才更新opera
    self._entities[domain][entity]['opera'] = self._dic_opera.get(new)

    def switch(self, entity, attribute, old, new, kwargs):
    domain = self.get_state(self._input_domain)
    if domain != '请选择设备类型':
    entity = self._dic_friendly_name.get(self.get_state(self._input_entity), None)
    if entity is None:
    self.log("Function switch: friendly_name not found in dic !")
    self.set_state(self._switch, state = 'off')
    return
    if entity != '请选择设备':
    duration = self.get_state(self._input_duration)
    if duration == '0:00:00':
    return
    if new == 'on': #开操作
    if self._entities[domain][entity]['handle'] is None:
    if self._entities[domain][entity]['remaining'] != duration: #倒计时剩余时间有修改,则重新设置倒计时时间
    self._entities[domain][entity]['duration'] = duration
    opera = self._dic_opera.get(self.get_state(self._input_opera))
    self._entities[domain][entity]['handle'] = self._queue.insert(entity, duration, self.handle_task, opera = opera) #队列增加任务
    self._entities[domain][entity]['opera'] = opera #保存操作选项
    else: #关操作
    self._queue.remove(self._entities[domain][entity]['handle'])
    self._entities[domain][entity]['handle'] = None
    self._entities[domain][entity]['remaining'] = duration #记录倒计时剩余时间
    else:
    self.log("未选设备")
    self.set_state(self._switch, state = 'off')
    else:
    self.log("未选设备类型")
    self.set_state(self._switch, state = 'off')
    #回调处理
    def handle_task(self, entity, opera, **kwargs):
    domain = entity.split('.')[0]
    self._entities[domain][entity]['handle'] = None
    self._entities[domain][entity]['remaining'] = '0:00:00' #倒计时剩余时间置零
    if opera == 'custom':
    #自定义处理方法
    pass
    else:
    service_name = domain+'/'+opera
    self.call_service(service_name, entity_id=entity)
    self.log("handle_task:{}({})".format(service_name,entity))

    def hasTask(self, value):
    return True #注释掉的话,会根据是否有定时任务再启动队列处理,感觉然并卵所以还没测试
    for domain in self._entities:
    for entity in self._entities[domain]:
    if self._entities[domain][entity]['handle'] is not None:
    return True
    return False

DEBUG:大部分代码都是实现界面交互。

  • DelayQueue.py
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    #{appdaemon配置目录}/apps/DelayQueue.py
    from datetime import datetime,timedelta

    class DelayQueue(object):
    __current_slot = 1

    def __init__(self, slots_per_loop, app, **kwargs):
    self.__slots_per_loop = slots_per_loop
    self.__queue = [[] for i in range(slots_per_loop)]
    self.app = app

    def insert(self, task_id, duration, callback, opera = 'turn_off', **kwargs):
    if duration == "0:00:00":
    return None
    second = time_period_str(duration).total_seconds()
    loop = second / len(self.__queue)
    slot = (second + self.__current_slot - 1) % len(self.__queue)
    delayQueueTask = DelayQueueTask(task_id, opera, int(slot), loop, callback, kwargs = kwargs)
    self.__queue[delayQueueTask.slot].append(delayQueueTask)
    self.app.log("create task:{}/{}".format(delayQueueTask.slot, delayQueueTask.loop))
    return delayQueueTask

    def remove(self, delayQueueTask):
    # self.app.log("remove task in slot {}".format(delayQueueTask.slot))
    if delayQueueTask is not None:
    self.__queue[delayQueueTask.slot].remove(delayQueueTask)

    def get_remaining_time(self, delayQueueTask):
    if delayQueueTask:
    if self.__current_slot - 1 > delayQueueTask.slot and self.__current_slot - 1 < 60:
    second = self.__slots_per_loop * (delayQueueTask.loop + 1) + delayQueueTask.slot - (self.__current_slot - 1)
    else:
    second = self.__slots_per_loop * delayQueueTask.loop + delayQueueTask.slot - (self.__current_slot - 1)
    return str(timedelta(seconds = second))
    else:
    return None

    def read(self):
    if len(self.__queue) >= self.__current_slot:
    tasks = self.__queue[self.__current_slot - 1]
    # self.app.log("current slot:{}(has {} tasks)".format(self.__current_slot - 1,len(tasks)))
    if tasks:
    executed_task = []
    for task in tasks:
    # self.app.log("task info:{}/{},should_execute:{}".format(task.slot, task.loop, task.should_exec))
    if task.should_exec:
    # run
    task.exec_task()
    executed_task.append(task)
    else:
    task.nextLoop()
    for task in executed_task:
    tasks.remove(task)
    self.__current_slot += 1
    if self.__current_slot > len(self.__queue):
    self.__current_slot = 1

    class DelayQueueTask(object):
    def __init__(self, task_id, opera:str = 'turn_off', slot:int = 0 , loop:int = 0 , exec_task = None, **kwargs):
    self._task_id = task_id
    self._opera = opera
    self._slot = int(slot)
    self._loop = int(loop)
    self._exec_task = exec_task
    self._kwargs = kwargs

    @property
    def slot(self) -> int:
    return int(self._slot)

    @property
    def loop(self) -> int:
    return int(self._loop)

    @property
    def task_id(self):
    return self._task_id

    @property
    def opera(self):
    return self._opera

    def nextLoop(self):
    self._loop -= 1

    @property
    def should_exec(self) -> bool:
    if self._loop == 0:
    return True
    else:
    return False

    # @callback
    def exec_task(self):
    self._exec_task(self._task_id, self._opera, kwargs = self._kwargs)

    #copy from HA
    def time_period_str(value: str) -> timedelta:
    """Validate and transform time offset."""
    if isinstance(value, int):
    raise vol.Invalid('Make sure you wrap time values in quotes')
    elif not isinstance(value, str):
    raise vol.Invalid(TIME_PERIOD_ERROR.format(value))

    negative_offset = False
    if value.startswith('-'):
    negative_offset = True
    value = value[1:]
    elif value.startswith('+'):
    value = value[1:]

    try:
    parsed = [int(x) for x in value.split(':')]
    except ValueError:
    raise vol.Invalid(TIME_PERIOD_ERROR.format(value))

    if len(parsed) == 2:
    hour, minute = parsed
    second = 0
    elif len(parsed) == 3:
    hour, minute, second = parsed
    else:
    raise vol.Invalid(TIME_PERIOD_ERROR.format(value))

    offset = timedelta(hours=hour, minutes=minute, seconds=second)

    if negative_offset:
    offset *= -1

    return offset

DEBUG:需要另外一个定时器方法进行周期性调用才能工作,用了Appdaemon的run_every()方法。