3. Pipelines#
To create our first pipeline all we need is to define a Python function that receive parameters from a Saturn message and do some processing or returns new messages.
Let’s start by creating a new file named pipelines.py
:
from abc.collection import Iterator
from saturn_engine.core import TopicMessage
from saturn_engine.core.pipeline import PipelineResult
def upper_word(word: str) -> TopicMessage:
return TopicMessage(args={"upper": word.upper()})
What does this code does?
First it import a few object we need for typing definition. Saturn is written in modern Python and aims for a 100% typing coverage using mypy. Parameters types are very important since Saturn is going to use them to convert message into pipeline parameters.
Let’s move on to the function definition:
def upper_word(word: str) -> Iterator[PipelineResult]:
Saturn pipelines are simple Python function. They take arbitrary parameters and simply
returns new messages or events. This make testing a pipeline as simple as calling a
Python function. The typing is used by Saturn to ensure a message is converted into
valid pipeline parameters. This means that the pipeline will always receive word
as
a str
or fail otherwise. Pipeline can return new message that will be forwarded by
Saturn to the next step. The message type is TopicMessage
.
We keep the pipeline implementation as simple as possible for this example:
return TopicMessage(args={"upper": word.upper()})
In this case we simply return a new Saturn message with the given word as uppercase for its arguments.
That’s as simple as this. We built our first Saturn pipeline, now let’s plug it into a job in our next tutorial: <TODO>