A pipeline definition should have the following attributes:
Keys
pipeline
This is the top-level key in a pipeline configuration file, a configuration map to define the name, default settings, nodes, and datasets for a pipeline.
Key
Required
Type
Description
name
Y
string
Name of the pipeline.
default_node_settings
N
map
Defines common default values for node attributes which can be overridden at the node level.
nodes
Y
map
Defines the compute nodes for a pipeline, mapping to node names.
datasets
Y
map
Defines the compute nodes for a pipeline, mapping to structs with node name keys.
default_node_settings
This optional section can be used to set common default settings for all nodes in the pipeline. These settings are passed into ray actors as parameters, and accept any of the arguments found here. The most common one we usually use is num_cpus.
Defines default number of CPUs for a node. Can be less than one.
These can be overridden at the node level.
nodes
This section defines the compute nodes for a pipeline.
Key
Required
Type
Description
<name of node>
Y
map
Defines map of node names to node structures in the pipeline.
<node_name>
A particular node instance in the pipeline, defined by a unique name. Any parameters defined at the individual node level will locally overwrite any default settings defined at the default_node_settings level.
Key
Required
Type
Description
class
Y
string
Python module to run for the node. This should exist within the python module in the same repository .
inputs
N
list of strings
Defines which datasets to consume from if applicable.
outputs
N
list of strings
Defines which datasets to produce to if applicable.
node_params
N
map
Defines any arbitrary parameters relevant for node's application logic. In the example above, we defined initial_state and increment parameters, which are both integers. Environment variables can be injected into node_params by passing in strings that match the pattern {$ENV_VAR} where ENV_VAR is the environment variable to inject.
num_cpus
Y
float
Number of CPUs allocated to a node. Required either for each node definition or at default_node_settings level.
How are environment variables injected?
The following method is used to inject environment variables into node_params:
This function is used to recursively inject environment variables
into strings passed through node params via the pipeline config.
We only recursively parse strings, dicts, and lists, as these are
the only types that can contain environment variables (i.e.
excluding ints, floats, and Nones).
Environment variables are identified in strings by the pattern
{$ENV_VAR} where ENV_VAR is the name of the environment variable
to inject. For example, given the following environment variables:
$ export SECRET1=secret1
$ export SECRET2=secret2
The following node params dict:
```
{
"key1": "A string with a {$SECRET1} and a {$SECRET2}.",
"key2": {
"key3": "A string with a {$SECRET1} and a {$SECRET2}.",
"key4": [
"A string with a {$SECRET1} and a {$SECRET2}.",
"A string with a {$SECRET1} and a {$SECRET2}."
]
}
}
```
Will be transformed to:
```
{
"key1": "A string with a secret1 and a secret2.",
"key2": {
"key3": "A string with a secret1 and a secret2.",
"key4": [
"A string with a secret1 and a secret2.",
"A string with a secret1 and a secret2."
]
}
}
```
definject_env_vars(self,node_params:Optional[Union[Dict,List,str,int,float,bool]]=None,)->Optional[Union[Dict,List,str,int,float,bool]]:"""Inject environment variables into node params. This function is used to recursively inject environment variables into strings passed through node params via the pipeline config. We only recursively parse strings, dicts, and lists, as these are the only types that can contain environment variables (i.e. excluding ints, floats, and Nones). Environment variables are identified in strings by the pattern {$ENV_VAR} where ENV_VAR is the name of the environment variable to inject. For example, given the following environment variables: ``` $ export SECRET1=secret1 $ export SECRET2=secret2 ``` The following node params dict: ``` { "key1": "A string with a {$SECRET1} and a {$SECRET2}.", "key2": { "key3": "A string with a {$SECRET1} and a {$SECRET2}.", "key4": [ "A string with a {$SECRET1} and a {$SECRET2}.", "A string with a {$SECRET1} and a {$SECRET2}." ] } } ``` Will be transformed to: ``` { "key1": "A string with a secret1 and a secret2.", "key2": { "key3": "A string with a secret1 and a secret2.", "key4": [ "A string with a secret1 and a secret2.", "A string with a secret1 and a secret2." ] } } ``` """ifisinstance(node_params,dict):fork,vinlist(node_params.items()):node_params[k]=self.inject_env_vars(v)elifisinstance(node_params,list):fori,vinenumerate(node_params):node_params[i]=self.inject_env_vars(v)elifisinstance(node_params,str):env_var_pattern=r"\{\$.*?\}"env_var_match=re.search(env_var_pattern,node_params,re.DOTALL)ifenv_var_match:env_var_env_str=env_var_match.group()env_var_value=os.getenv(env_var_env_str[2:][:-1],default=None)ifenv_var_valueisNone:raiseValueError("Failed to inject environment variable. "f"{env_var_env_str[2:][:-1]} was not found.")node_params=node_params.replace(env_var_env_str,env_var_value)returnself.inject_env_vars(node_params)returnnode_params
datasets
This section defines the datasets for a pipeline.
Key
Required
Type
Description
<name of dataset>
Y
map
Defines map of dataset names to dataset structures in the pipeline.
<name of dataset>
A particular dataset instance in the pipeline, defined by a unique name. Each dataset is defined by a type.
Key
Required
Type
Description
type
Y
string
Defines which type of dataset to use. Currently, only kafka_stream is supported.
Note
Aineko is currently in the Beta release stage and is constantly improving.
If you have any feedback, questions, or suggestions, please reach out to us.