Firebird/Introduction: Difference between revisions

From stonehomewiki
Jump to navigationJump to search
No edit summary
Line 15: Line 15:
= Make it easy to create streaming processing data pipeline using python =
= Make it easy to create streaming processing data pipeline using python =
<div class="toccolours mw-collapsible mw-collapsed expandable">
<div class="toccolours mw-collapsible mw-collapsed expandable">
<div class="mw-collapsible-preview">Brief</div>
<div class="mw-collapsible-preview"></div>
<div class="mw-collapsible-content">
<div class="mw-collapsible-content">
Firebird is a Python based Stream Processing Framework
Your python code could be as simple as below. All you need to do is, create Generates, Nodes and Sinks, and assemble them together using >> or << operator. If you have experience with Apache Airflow, you should be quite familiar with this model.
* Make it easy to create streaming processing data pipeline using python
 
* Make it easy to deploy your pipeline to Kubernete Cluster, so your pipeline can run at scale.
<pre><nowiki>
* A Management Web UI to visualize your pipeline topology.
from typing import Any
* The firebird framework encourage you to write reusable streaming processing unit and encapsulate it in Generator or Node or Sink.
from firebird import Pipeline, RabbitMQ, Generator, Sink, Node, RabbitMQ
import time
import os
 
class MyGenerator(Generator):
    def pump(self, quit_requested):
        # assuming volume checkpoint is mounted at /checkpoint
        next_number = 0
        if os.path.isfile("/checkpoint/state"):
            with open("/checkpoint/state", "rt") as f:
                next_number = int(f.read())
 
        while not quit_requested.value:
            self.emit(next_number)
            print(f"{self.id}: {next_number}")
            next_number += 1
            time.sleep(1)
       
        with open("/checkpoint/state", "wt") as f:
            f.write(f"{next_number}")
 
class MySink(Sink):
    def on_message(self, port_id:str, data:Any):
        print(f"{self.id}: {data}")
 
class Square(Node):
    def on_message(self, port_id:str, data:Any):
        print(f"{self.id}: {data} -> {data*data}")
        self.emit(data*data)
 
def get_pipeline(mq:RabbitMQ):
    pipeline_id = "test"
 
    pipeline = Pipeline(id=pipeline_id, title="Sample Pipeline", description="This is a sample firebird pipeline", mq=mq)
    src = MyGenerator(id="src", pipeline=pipeline, title="Generate numbers", description="Generate numbers")
    calc = Square(id="calc", pipeline=pipeline, title="Calculate square", description="output = input*input")
    end = MySink(id="end", pipeline = pipeline, title="Display calculation result", description="Display calculation result!")
    src >> calc >> end
    return pipeline
</nowiki></pre>
 
</div>
</div>
</div>
</div>
<p></p>
<p></p>

Revision as of 09:18, 15 September 2023

Firebird

Beirf

Make it easy to create streaming processing data pipeline using python