Source code for dynamikontrol.Timer

import time
from datetime import datetime
import threading

[docs]class Timer(object): """General timer class. .. highlight:: python .. code-block:: python from dynamikontrol import Module, Timer import time t1 = Timer() t2 = Timer() module = Module() t1.callback_at(func=module.led.toggle, args=('r',), at='2021-03-02 19:46:30', interval=0.1) t2.callback_after(func=module.led.toggle, args=('g',), after=1, interval=0.1) time.sleep(5) t1.stop() t2.stop() module.disconnect() """ __stop_thread = False def __init__(self): pass
[docs] def callback_after(self, func, args=(), kwargs={}, after=0, interval=None): """Call the callback function after specific time. Args: func (function): Callback function. args (tuple, optional): args. Defaults to ``()``. kwargs (dict, optional): kwargs. Defaults to ``{}``. after (int, optional): Callback delay time in seconds. Defaults to ``0``. interval (int, optional): Callback interval time in seconds. Defaults to ``None``. """ self.__stop_thread = False if interval is None: self.__stop_thread = True def handler(): time.sleep(after) func(*args, **kwargs) next_call = time.time() while True: if self.__stop_thread: break next_call = next_call + interval time.sleep(max(next_call - time.time(), 0)) func(*args, **kwargs) timer_thread = threading.Thread(target=handler) timer_thread.start()
[docs] def callback_at(self, func, args=(), kwargs={}, at=None, interval=None): """Call the callback function at specific time. Args: func (function): Callback function. args (tuple, optional): args. Defaults to ``()``. kwargs (dict, optional): kwargs. Defaults to ``{}``. at (datetime str, optional): Callback time in datetime str. e.g) ``2021-03-04 21:57:30``. Defaults to ``None``. interval ([type], optional): Callback interval time in seconds. Defaults to ``None``. """ after = datetime.strptime(at, '%Y-%m-%d %H:%M:%S').timestamp() - datetime.now().timestamp() if after < 0: return False self.callback_after(func=func, args=args, kwargs=kwargs, after=after, interval=interval)
[docs] def stop(self): """Stop the timer. """ self.__stop_thread = True