Sistema de eventos em Python

195

Qual sistema de eventos para Python você usa? Eu já estou ciente do pydispatcher , mas queria saber o que mais pode ser encontrado ou é comumente usado?

Não estou interessado em gerenciadores de eventos que fazem parte de grandes estruturas; prefiro usar uma pequena solução básica que eu possa estender facilmente.

Josip
fonte

Respostas:

178

Pacotes PyPI

Em junho de 2020, esses são os pacotes relacionados ao evento disponíveis no PyPI, ordenados pela data de lançamento mais recente.

Tem mais

É possível escolher entre várias bibliotecas, usando terminologias muito diferentes (eventos, sinais, manipuladores, envio de métodos, ganchos, ...).

Estou tentando manter uma visão geral dos pacotes acima, além das técnicas mencionadas nas respostas aqui.

Primeiro, alguma terminologia ...

Padrão de observador

O estilo mais básico do sistema de eventos é o 'método bag of handler', que é uma implementação simples do padrão Observer .

Basicamente, os métodos do manipulador (chamadas) são armazenados em uma matriz e são chamados quando o evento é acionado.

Publicar-Assinar

A desvantagem dos sistemas de eventos do Observer é que você só pode registrar os manipuladores no objeto Event (ou lista de manipuladores) real. Portanto, no momento da inscrição, o evento já precisa existir.

É por isso que existe o segundo estilo de sistemas de eventos: o padrão de publicação-assinatura . Aqui, os manipuladores não se registram em um objeto de evento (ou lista de manipuladores), mas em um distribuidor central. Os notificadores também conversam apenas com o despachante. O que ouvir ou o que publicar é determinado por 'sinal', que nada mais é do que um nome (string).

Padrão do mediador

Também pode ser interessante: o padrão do Mediador .

Hooks

Um sistema de 'gancho' é usado geralmente no contexto de plugins de aplicativos. O aplicativo contém pontos de integração fixos (ganchos), e cada plug-in pode se conectar a esse gancho e executar determinadas ações.

Outros eventos'

Nota: threading.Event não é um 'sistema de eventos' no sentido acima. É um sistema de sincronização de threads em que um thread aguarda até que outro thread 'sinalize' o objeto Event.

As bibliotecas de mensagens de rede geralmente também usam o termo 'eventos'; às vezes, esses conceitos são semelhantes; às vezes não. É claro que eles podem atravessar fronteiras de processos, processos e computadores. Veja, por exemplo , pyzmq , pymq , Twisted , Tornado , gevent , eventlet .

Referências fracas

No Python, manter uma referência a um método ou objeto garante que ele não seja excluído pelo coletor de lixo. Isso pode ser desejável, mas também pode levar a vazamentos de memória: os manipuladores vinculados nunca são limpos.

Alguns sistemas de eventos usam referências fracas em vez de regulares para resolver isso.

Algumas palavras sobre as várias bibliotecas

Sistemas de eventos no estilo observador:

  • O zope.event mostra o básico de como isso funciona (veja a resposta de Lennart ). Nota: este exemplo nem suporta argumentos do manipulador.
  • A implementação da lista de chamadas do LongPoke mostra que esse sistema de eventos pode ser implementado de forma muito minimalista por subclassificação list.
  • A variação de Felk EventHook também garante as assinaturas de callees e chamadores.
  • O EventHook do spassig (padrão de evento de Michael Foord) é uma implementação direta.
  • A aula de Josip's Valued Lessons Event é basicamente a mesma, mas usa um em setvez de um listpara armazenar a sacola e implementos __call__que são adições razoáveis.
  • O PyNotify é similar em conceito e também fornece conceitos adicionais de variáveis ​​e condições ('evento alterado da variável'). A página inicial não está funcional.
  • axel é basicamente um saco de manipuladores com mais recursos relacionados a rosqueamento, tratamento de erros, ...
  • O python-dispatch exige que as classes de origem pares sejam derivadas pydispatch.Dispatcher.
  • O buslane é baseado em classes, suporta manipuladores únicos ou múltiplos e facilita dicas de tipo extensas.
  • O Observador / Evento do Pithikos é um design leve.

Bibliotecas de publicação / assinatura:

  • O pisca - pisca possui alguns recursos interessantes, como desconexão automática e filtragem com base no remetente.
  • O PyPubSub é um pacote estável e promete "recursos avançados que facilitam a depuração e manutenção de tópicos e mensagens".
  • pymitter é uma porta Python do Node.js EventEmitter2 e oferece namespaces, curingas e TTL.
  • O PyDispatcher parece enfatizar a flexibilidade em relação à publicação muitos-para-muitos etc. Suporta referências fracas.
  • louie é um PyDispatcher reformulado e deve funcionar "em uma ampla variedade de contextos".
  • O pypydispatcher é baseado no PyDispatcher (você adivinhou ...) e também funciona no PyPy.
  • O django.dispatch é um PyDispatcher reescrito "com uma interface mais limitada, mas com maior desempenho".
  • pyeventdispatcher é baseado no event-dispatcher da estrutura PHP do Symfony.
  • O dispatcher foi extraído do django.dispatch, mas está ficando bastante antigo.
  • O EventManger de Cristian Garcia é uma implementação muito curta.

Outras:

  • O pluggy contém um sistema de gancho usado pelos pytestplugins.
  • O RxPy3 implementa o padrão Observável e permite mesclar eventos, repetir etc.
  • Os sinais e slots do Qt estão disponíveis no PyQt ou PySide2 . Eles funcionam como retorno de chamada quando usados ​​no mesmo encadeamento ou como eventos (usando um loop de eventos) entre dois encadeamentos diferentes. Sinais e Slots têm a limitação de que eles só funcionam em objetos de classes derivadas QObject.
florisla
fonte
2
Há também louie, que é baseado no PyDispatcher: pypi.python.org/pypi/Louie/1.1
the979kid 30/08/2015
@ the979kid louie parece estar mal mantido, a página pypi tem links para 404 no GitHub: 11craft.github.io/louie ; github.com/gldnspud/louie . Deve ser github.com/11craft/louie .
Florisla 14/03/16
1
ouvintes de eventos com fraqueza são uma necessidade comum. Caso contrário, o uso no mundo real se torna árduo. Uma observação de quais soluções oferecem suporte que podem ser úteis.
Kxr # 11/17
O Pypubsub 4 é muitos para muitos e possui poderosas ferramentas de depuração para mensagens, além de várias maneiras de restringir a carga útil das mensagens, para que você saiba mais cedo quando enviou dados inválidos ou faltam dados. O PyPubSub 4 suporta o Python 3 (e o PyPubSub 3.x suporta o Python 2).
Oliver
Publiquei recentemente uma biblioteca chamada pymq github.com/thrau/pymq, que pode ser uma boa opção para esta lista.
thrau 14/02
98

Eu tenho feito desta maneira:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

No entanto, como em tudo o que vi, não há pydoc gerado automaticamente para isso, nem assinaturas, o que é realmente péssimo.

L̲̳o̲̳̳n̲̳̳g̲̳̳p̲̳o̲̳̳k̲̳̳e̲̳̳
fonte
3
Acho esse estilo bastante intrigante. É docemente esquecido. Gosto do fato de permitir manipular eventos e seus assinantes como operações autônomas. Vou ver como se sai em um projeto real.
Rudy Lattae
2
Estilo minimalista muito bonito! super!
akaRem
2
Não posso aprovar isso o suficiente, isso é realmente simples e fácil.
2
grande favor, alguém poderia explicar isso como se eu tivesse 10 anos? Essa classe é herdada pela classe principal? Não vejo um init, então super () não seria usado. Não está clicando para mim por algum motivo.
Omgimdrunk 28/09
1
@omgimdrunk Um manipulador de eventos simples dispara uma ou mais funções que podem ser chamadas sempre que um evento é disparado. Uma classe para "gerenciar" isso para você exigiria os seguintes métodos, no mínimo - add & fire. Dentro dessa classe, você precisaria manter uma lista de manipuladores a serem executados. Vamos colocar isso na variável de instância _bag_of_handlersque é uma lista. O método add da classe seria simplesmente self._bag_of_handlers.append(some_callable). O método de disparo da classe passaria por _bag_of_handlers` passando os argumentos e kwargs fornecidos aos manipuladores e executaria cada um em sequência.
Gabe Spradlin
68

Usamos um EventHook como sugerido por Michael Foord em seu Event Pattern :

Basta adicionar EventHooks às suas aulas com:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

Adicionamos a funcionalidade para remover todos os ouvintes de um objeto à classe Michaels e terminamos com isso:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler
spassig
fonte
Uma desvantagem de usar isso é que você precisa primeiro adicionar um evento antes de se registrar como assinante. Se apenas os editores adicionar seus eventos (não é necessário, apenas uma boa prática), então você deve inicializar os editores antes que os assinantes que é uma dor em grandes projetos
Jonathan
6
o último método foi corrigido porque os manipuladores self .__ são modificados durante as iterações. CORRECÇÃO: `self .__ manipuladores = [h para h em self .__ manipuladores se h.im_self = obj!]`
Simon Bergot
1
@ Simon está certo, mas apresenta um bug porque podemos ter funções não acopladas nos manipuladores self .__. Correção:self.__handlers = [h for h in self._handlers if getattr(h, 'im_self', False) != obj]
Eric Marcos
20

Eu uso o zope.event . São os ossos mais vazios que você pode imaginar. :-) De fato, aqui está o código fonte completo:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

Observe que você não pode enviar mensagens entre processos, por exemplo. Não é um sistema de mensagens, apenas um sistema de eventos, nada mais, nada menos.

Lennart Regebro
fonte
17
pypi.python.org/pypi/zope.event ... para salvar os pobres Google alguma largura de banda ;-)
Boldewyn
Eu ainda gostaria de poder enviar mensagens. Eu estaria usando o sistema de eventos no aplicativo construído no Tkinter. Não estou usando o sistema de eventos porque ele não suporta mensagens.
Josip
Você pode enviar o que quiser com o zope.event. Mas o que quero dizer é que não é um sistema de mensagens adequado, pois você não pode enviar eventos / mensagens para outros processos ou outros computadores. Você provavelmente deve ser mais específico com seus requisitos.
Lennart Regebro 08/07/2009
15

Encontrei este pequeno script em Lições valorizadas . Parece ter a proporção certa de simplicidade / potência que eu estou procurando. Peter Thatcher é o autor do código a seguir (nenhum licenciamento é mencionado).

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()
Josip
fonte
1
Usar um set () em vez de uma lista é bom para evitar que os manipuladores sejam registrados duas vezes. Uma conseqüência é que os manipuladores não estão sendo chamados na ordem em que foram registrados. Não necessariamente uma coisa ruim embora ...
florisla
1
@florisla poderia trocar por OrderedSet, se assim o desejar.
Robino 22/03/19
9

Aqui está um design minimalista que deve funcionar bem. O que você precisa fazer é simplesmente herdar Observeruma classe e depois usar observe(event_name, callback_fn)para escutar um evento específico. Sempre que esse evento específico for acionado em qualquer lugar do código (ou seja Event('USB connected')), o retorno de chamada correspondente será acionado .

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

Exemplo:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')
Pithikos
fonte
Gosto do seu design, é minimalista e fácil de entender. e seria leve por não precisar importar alguns módulos.
Atreyagaurav
8

Eu criei uma EventManagerclasse (código no final). A sintaxe é a seguinte:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

Aqui está um exemplo:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "\nInitial salute"
EventManager.salute('Oscar')

print "\nNow remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

Resultado:

Saudação inicial
Saudações Oscar
Olá Oscar

Agora remova saudações
Olá Oscar

Código EventManger:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)
Cristian Garcia
fonte
8

Você pode dar uma olhada no pymitter ( pypi ). É uma abordagem pequena de arquivo único (~ 250 loc) "fornecendo namespaces, curingas e TTL".

Aqui está um exemplo básico:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"
Dalailirium
fonte
6

Fiz uma variação da abordagem minimalista de Longpoke, que também garante as assinaturas de chamadas e chamadores:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()
Felk
fonte
3

Se eu codifico no pyQt eu uso o paradigma de soquetes / sinais QT, o mesmo é para django

Se eu estiver fazendo E / S assíncrona, use o módulo de seleção nativo

Se eu estiver usando um analisador python do SAX, estou usando a API de eventos fornecida pelo SAX. Parece que sou vítima da API subjacente :-)

Talvez você deva se perguntar o que você espera da estrutura / módulo de eventos. Minha preferência pessoal é usar o paradigma Socket / Signal do QT. mais informações sobre isso podem ser encontradas aqui

SashaN
fonte
2

Aqui está outro módulo para consideração. Parece uma opção viável para aplicativos mais exigentes.

Py-notify é um pacote Python que fornece ferramentas para implementar o padrão de programação do Observer. Essas ferramentas incluem sinais, condições e variáveis.

Sinais são listas de manipuladores que são chamados quando o sinal é emitido. As condições são basicamente variáveis ​​booleanas acopladas a um sinal emitido quando o estado da condição muda. Eles podem ser combinados usando operadores lógicos padrão (não, e etc.) em condições compostas. Variáveis, diferentemente das condições, podem conter qualquer objeto Python, não apenas booleanos, mas não podem ser combinadas.

Josip
fonte
1
A página inicial está fora de serviço para esta, talvez não sendo mais suportada?
David Parks
1

Se você quiser fazer coisas mais complicadas, como mesclar eventos ou tentar novamente, poderá usar o padrão Observable e uma biblioteca madura que implementa isso. https://github.com/ReactiveX/RxPY . Observáveis ​​são muito comuns em Javascript e Java e muito convenientes para usar em algumas tarefas assíncronas.

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

SAÍDA :

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!
David Dehghan
fonte
1

Se você precisar de um eventbus que funcione através dos limites do processo ou da rede, poderá experimentar o PyMQ . Atualmente, ele suporta pub / sub, filas de mensagens e RPC síncrono. A versão padrão funciona em cima de um back-end do Redis, portanto, você precisa de um servidor Redis em execução. Há também um back-end na memória para teste. Você também pode escrever seu próprio back-end.

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

Para inicializar o sistema:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

Isenção de responsabilidade: eu sou o autor desta biblioteca

thrau
fonte
0

Você pode tentar o buslanemódulo.

Essa biblioteca facilita a implementação do sistema baseado em mensagens. Ele suporta comandos (manipulador único) e abordagem de eventos (0 ou múltiplos manipuladores). O Buslane usa anotações do tipo Python para registrar corretamente o manipulador.

Exemplo simples:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='[email protected]',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='[email protected]',
    password='secret',
))

Para instalar o buslane, basta usar o pip:

$ pip install buslane
Konrad Hałas
fonte
0

Algum tempo atrás, escrevi uma biblioteca que pode ser útil para você. Ele permite que você tenha ouvintes locais e globais, várias maneiras diferentes de registrá-los, prioridade de execução e assim por diante.

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

Dê uma olhada pyeventdispatcher

Daniel Ancuta
fonte