Firebird/Introduction
From stonehomewiki
Revision as of 10:10, 15 September 2023 by Stonezhong (talk | contribs) (→Make it easy to create streaming processing data pipeline using python)
Contents
Beirf
Brief
Firebird is a Python based Stream Processing Framework
- Make it easy to create streaming processing data pipeline using python
- Make it easy to deploy your pipeline to Kubernete Cluster, so your pipeline can run at scale.
- A Management Web UI to visualize your pipeline topology.
- The firebird framework encourage you to write reusable streaming processing unit and encapsulate it in Generator or Node or Sink.
Use Python to build streaming pipeline
Your python code could be as simple as below. All you need to do is, create Generates, Nodes and Sinks, and assemble them together using >> or << operator. If you have experience with Apache Airflow, you should be quite familiar with this model.
from typing import Any
from firebird import Pipeline, RabbitMQ, Generator, Sink, Node, RabbitMQ
import time
import os
class MyGenerator(Generator):
def pump(self, quit_requested):
# assuming volume checkpoint is mounted at /checkpoint
next_number = 0
if os.path.isfile("/checkpoint/state"):
with open("/checkpoint/state", "rt") as f:
next_number = int(f.read())
while not quit_requested.value:
self.emit(next_number)
print(f"{self.id}: {next_number}")
next_number += 1
time.sleep(1)
with open("/checkpoint/state", "wt") as f:
f.write(f"{next_number}")
class MySink(Sink):
def on_message(self, port_id:str, data:Any):
print(f"{self.id}: {data}")
class Square(Node):
def on_message(self, port_id:str, data:Any):
print(f"{self.id}: {data} -> {data*data}")
self.emit(data*data)
def get_pipeline(mq:RabbitMQ):
pipeline_id = "test"
pipeline = Pipeline(id=pipeline_id, title="Sample Pipeline", description="This is a sample firebird pipeline", mq=mq)
src = MyGenerator(id="src", pipeline=pipeline, title="Generate numbers", description="Generate numbers")
calc = Square(id="calc", pipeline=pipeline, title="Calculate square", description="output = input*input")
end = MySink(id="end", pipeline = pipeline, title="Display calculation result", description="Display calculation result!")
src >> calc >> end
return pipeline
Easy to deploy to Kubernete Cluster
Brief
To deploy your pipeline, you can simple run the command below, you can specify parameter -r to specify how many pods you need to run pipeline, default is 1. Here is an example:
# it start pipeline "test", using 10 pods to run your pipeline simultaneously. /usr/src/app # pipeline start -pid test -r 10
Web UI
Home Page
Home page shows all pipelines. You can see
ID, each pipeline has a unique idNamespace, each pipeline is deployed in a kubernete namespaceImage, the name of the docker image for the pipelineModule, the python module name that contains the main pipeline entry.Running, show if the pipeline is currently running or not.Description, show the description of the pipeline.
Pipeline Page
Show the details of a given pipeline. Here is an example:
ID, each pipeline has a unique idNamespace, each pipeline is deployed in a kubernete namespaceImage, the name of the docker image for the pipelineModule, the python module name that contains the main pipeline entry.Running, show if the pipeline is currently running or not.Description, show the description of the pipeline.Diagram, show the topology of the pipeline. You can click each node to see the node details.Executors, if the pipeline is running, you can see the status of each executor.
Node dialogbox:
It show the id, title and description of the node. It also shows what are the connected ports, so you the upstream node and downstream node.
- You can click the "Start" button to start the start the pipeline
- You can click the "Stop" button to stop the pipeline