Firebird
From stonehomewiki
Revision as of 06:18, 30 April 2023 by Stonezhong (talk | contribs)
Firebird -- Python based Streaming Framework
Introduction
It is a python based streaming processing framework
APIs
Node
Represent a node in the pipeline. A node can have bunch of input ports and bunch of output ports. Each port has a unique id within the node.
Properties:
- id: the id of the node, it is unique within a pipeline
- input: the default input port, e.g. node.input
- output: the default input port, e.g. node.output
- input_port_ids: tuple of all input port ids
- output_port_ids: tuple of all output port ids
Methods:
def get_port(self, id:str) -> Optional["Port"] # return a port given id
on_message(self, name:str, data:Any) This method will be called for this node to handle data point, data is a json object, name represent from which port the data is received.
def connect(self, *dest_ports:Union["Node", "Port"]) -> "Port" # connect the default output port to a input port of a node # if dest_port element is a Node, then the default input port is used
def emit(self, data:Any, name:str=DEFAULT_PORT_NAME) # emit data to it's output port specified by name
Generator
Represent a node that does not have input ports.
Methods:
def pump(self) # derived class must override this method to pump data to the pipeline.
Sink
Represent a node that does not have output ports.
Port
Represent a input port or output port
Properties:
- type: either PortType.INPUT or PortType.OUTPUT, represent it is a input port or output port
- name: name of the port, it is unique within the node
- owner: the node which this port belongs to
- connected_ports: other ports connected to this port
Methods:
def connect(self, *ports: Union["Port", Node]) # connect this port to other port
def emit(self, json_data:Any) # emit data to this port
Pipeline
Represent a pipeline, which has
- bunch of nodes
- A node's output port can connect to another node's input port
Properties:
- mq: message queue
- node_dict: dictionary, key is node name, value is node
Methods:
def message_loop(self): # enters a message loop, allow each node to process data
def add_node(self, name:str, node_class, *argc, **kwargs) # add a node to the pipeline, *argc, and **kwargs will be passed to the constructor
def connect(self, *, src_node_name, src_port_name=DEFAULT_PORT_NAME, dst_node_name, dst_port_name=DEFAULT_PORT_NAME) # connect one node's output port to another node's input port
RabbitMQ
Command Line Tools
pipeline
Retrieved from "https://home.stonezhong.net/index.php?title=Firebird&oldid=102"