Source code for advutils.eventqueue
"""
This module define event classes for queueing tasks
"""
from __future__ import print_function
from builtins import object
[docs]class Event(object):
"""
Class defining an event
"""
def __init__(self, doc=None):
self.__doc__ = doc
[docs] def sender(self, obj, objtype=None):
if obj is None:
return self
return EventHandler(self, obj)
def __set__(self, obj, value):
pass
__get__ = sender
[docs]class EventHandler(object):
"""
Class to handle an Event instance
"""
def __init__(self, event, sender):
self.event = event
self.sender = sender
def _getfunctionlist(self):
"""(internal use) """
try:
eventhandler = self.sender.__eventhandler__
except AttributeError:
eventhandler = self.sender.__eventhandler__ = {}
return eventhandler.setdefault(self.event, [])
[docs] def append(self, func):
"""Add new event handler function.
Event handler function must be defined like func(sender, earg).
You can add handler also by using '+=' operator.
"""
self._getfunctionlist().append(func)
return self
[docs] def remove(self, func):
"""Remove existing event handler function.
You can remove handler also by using '-=' operator.
"""
self._getfunctionlist().remove(func)
return self
[docs] def fire(self, *args, **kwargs):
"""
Fire event and call all handler functions. You can call EventHandler
object itself like `self(*args,**kwargs)` instead of
`self.fire(*args,**kwargs)`.
"""
for func in self._getfunctionlist():
func(self.sender, *args, **kwargs)
__iadd__ = append
__isub__ = remove
__call__ = fire
if __name__ == "__main__":
class MockFileWatcher(object):
#fileChanged = Event()
def __init__(self, source_path):
self.fileChanged = Event("this is a event").sender(self)
self.source_path = source_path
def changePath(self, source_path):
self.fileChanged(source_path)
self.source_path = source_path
def log_file_change(Sender, source_path):
if not Sender.source_path == source_path:
print(
"{} >>> Source path '{}' changed to '{}'.".format(
Sender,
Sender.source_path,
source_path))
def log_file_change2(Sender, source_path):
print("{} >>> {} inserted!".format(Sender, source_path))
watcher = MockFileWatcher("Toro")
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change
watcher.changePath("foo")
watcher.changePath("foo")
watcher.changePath("cabra")