Firebird/Concepts: Difference between revisions

From stonehomewiki
Jump to navigationJump to search
 
(41 intermediate revisions by the same user not shown)
Line 17: Line 17:
<div class="mw-collapsible-preview">wiring</div>
<div class="mw-collapsible-preview">wiring</div>
<div class="mw-collapsible-content">
<div class="mw-collapsible-content">
You can connect an output port to one ore more input port to set the data flow direction.
* You can connect an output port to one or more input ports.
* When you emit data via an output port, the data gets sends to all the input port that is connected, and the node owns those input will receive the data.
</div>
</div>
</div>
</div>
Line 28: Line 29:
Represent a node in the pipeline. A node can have bunch of input ports and bunch of output ports. Each port has a unique id within the node.
Represent a node in the pipeline. A node can have bunch of input ports and bunch of output ports. Each port has a unique id within the node.


Properties:
<b>Properties</b>
<pre><nowiki>
<pre><nowiki>
* id:              The id of the node, it is unique within a pipeline
id:              The id of the node, it is unique within a pipeline
* title:          A human readable name of this node
title:          A human readable name of this node
* description:    Detailed description of this node
description:    Detailed description of this node
* pipeline:        The pipeline this node belongs to
pipeline:        The pipeline this node belongs to
* input_port_ids:  tuple of all input port ids
input_port_ids:  tuple of all input port ids
* output_port_ids: tuple of all output port ids
output_port_ids: tuple of all output port ids
* input:          The default input port
input:          The default input port
* output:          The default output port
output:          The default output port
</nowiki></pre>
</nowiki></pre>


 
<b>method: get_port</b>
 
Methods:
<pre><nowiki>
<pre><nowiki>
def get_port(self, id:str) -> Optional["Port"]
def get_port(self, id:str) -> Optional["Port"]
# each port has a unique id within the node.
# return a port given port id


# return a port given id
# You can also use [] to get port by name
node["foo"]  # same as node.get_port("foo")
</nowiki></pre>
</nowiki></pre>


<b>method: on_message</b>
<pre><nowiki>
<pre><nowiki>
on_message(self, name:str, data:Any)
on_message(self, port_id:str, data:Any)


This method will be called for this node to handle data point, data is a json object, name represent from which port the data is received.
Called when this node receives data. port_id is the id of the port which receive the data, data is the JSON payload of the data.
</nowiki></pre>
</nowiki></pre>


<b>method: <nowiki>>> and <<</nowiki></b>
<pre><nowiki>
<pre><nowiki>
def connect(self, *dest_ports:Union["Node", "Port"]) -> "Port"
# You can use >>, << to connect nodes, here are examples:
nodeA >> nodeB              # connect default output port of nodeA to default input port of node B
nodeA["foo"] >> nodeB        # connect port "foo" of nodeA to default input port of node B
nodeA["foo"] >> nodeB["bar"] # connect port "foo" of nodeA to port "bar" of node B


# connect the default output port to a input port of a node
# You can also use << as a reverse of >>
# if dest_port element is a Node, then the default input port is used
</nowiki></pre>
</nowiki></pre>


<b>method: emit</b>
<pre><nowiki>
<pre><nowiki>
def emit(self, data:Any, name:str=DEFAULT_PORT_NAME)
def emit(self, data:Any, port_id:str=DEFAULT_OUTPUT_PORT_ID)


# emit data to it's output port specified by name
# emit data to it's output port specified by port_id
</nowiki></pre>
</nowiki></pre>
</div>
</div>
Line 74: Line 81:
<div class="mw-collapsible-preview">Generator</div>
<div class="mw-collapsible-preview">Generator</div>
<div class="mw-collapsible-content">
<div class="mw-collapsible-content">
Represent a node that does not have input ports.
Represent a node that does not have input ports, it is derived from Node.


Methods:
<b>methods: pump</b>
<pre><nowiki>
<pre><nowiki>
def pump(self)
def pump(self)
Line 82: Line 89:
# derived class must override this method to pump data to the pipeline.
# derived class must override this method to pump data to the pipeline.
</nowiki></pre>
</nowiki></pre>
</div>
</div>
</div>
</div>
Line 90: Line 96:
<div class="mw-collapsible-preview">Sink</div>
<div class="mw-collapsible-preview">Sink</div>
<div class="mw-collapsible-content">
<div class="mw-collapsible-content">
Represent a node that does not have output ports.
Represent a node that does not have output ports, it is derived from Node.
</div>
</div>
</div>
</div>
Line 98: Line 104:
<div class="mw-collapsible-preview">Port</div>
<div class="mw-collapsible-preview">Port</div>
<div class="mw-collapsible-content">
<div class="mw-collapsible-content">
Represent a input port or output port
Represent an input port or output port of a node.


Properties:
<b>Properties</b>
* type: either PortType.INPUT or PortType.OUTPUT, represent it is a input port or output port
<pre><nowiki>
* name: name of the port, it is unique within the node
type: Either PortType.INPUT or PortType.OUTPUT, represent it is a input port or output port
* owner: the node which this port belongs to
id:   ID of the port, it is unique within the node
* connected_ports: other ports connected to this port
node: The node which this port belongs to
</nowiki></pre>


Methods:
<b>method: <nowiki>>> and <<</nowiki></b>
<pre><nowiki>
<pre><nowiki>
def connect(self, *ports: Union["Port", Node])
# connect this port to other port
# connect this port to other port
# see Node document.
</nowiki></pre>
</nowiki></pre>


<b>method: emit</b>
<pre><nowiki>
<pre><nowiki>
def emit(self, json_data:Any)
def emit(self, json_data:Any)
 
# emit JSON data to this port
# emit data to this port
</nowiki></pre>
</nowiki></pre>


Line 130: Line 136:
* A node's output port can connect to another node's input port
* A node's output port can connect to another node's input port


Properties:
<b>Properties</b>
* mq: message queue
<pre><nowiki>
* node_dict: dictionary, key is node name, value is node
id:          The id of the pipeline, every pipeline has a unique id.
title:       A human readable short description of this pipeline.
description: Detailed description of this pipeline.
mq:         RabbitMQ instance
nodes:       Containing all node belongs to this pipeline.
</nowiki></pre>


Methods:
<b>method: message_loop</b>
<pre><nowiki>
<pre><nowiki>
def message_loop(self):
def message_loop(self):
Line 141: Line 152:
</nowiki></pre>
</nowiki></pre>


<b>method: <nowiki>[]</nowiki></b>
<pre><nowiki>
<pre><nowiki>
def add_node(self, name:str, node_class, *argc, **kwargs)
# You can use [] to get node by id
 
pipeline["foo"]  # get node with id "foo" of this pipeline
# add a node to the pipeline, *argc, and **kwargs will be passed to the constructor
</nowiki></pre>
 
<pre><nowiki>
def connect(self, *, src_node_name, src_port_name=DEFAULT_PORT_NAME, dst_node_name, dst_port_name=DEFAULT_PORT_NAME)
 
# connect one node's output port to another node's input port
</nowiki></pre>
</nowiki></pre>


Line 160: Line 165:
<div class="mw-collapsible-preview">RabbitMQ</div>
<div class="mw-collapsible-preview">RabbitMQ</div>
<div class="mw-collapsible-content">
<div class="mw-collapsible-content">
Represent a RabbitMQ connection
</div>
</div>
<p></p>
= Exception Handling =
<div class="toccolours mw-collapsible mw-collapsed expandable">
<div class="mw-collapsible-preview"></div>
<div class="mw-collapsible-content">
* When you register a pipeline, 3 message queue will be created, they are
* ${pipeline_name}: all the message gets routed here
* ${pipeline_name}-error: when a message failed to process, it will be posted here, it has following fields: error_count, recent_errors: array of {failed_time, error_message}
* ${pipeline_critical}-critical: when a message failed too many times, it will be moved here.
</div>
</div>
</div>
</div>
<p></p>
<p></p>

Latest revision as of 22:46, 16 September 2023

Firebird

Overview

Models

Exception Handling