Skip to content

AbstractDataset

The AbstractDataset class is the base class for defining synchronous datasets.

aineko.core.dataset.AbstractDataset

Bases: ABC, Generic[T]

Base class for defining new synchronous Aineko datasets.

A dataset comprises 2 subcomponents, the query layer and the storage layer. The storage layer refers to the actual storage infrastructure that holds the data, and the query layer is an API layer that allows for the interaction with the storage layer.

The AbstractDataset class provides a common interface for all dataset implementations. All dataset implementations must subclass the AbstractDataset class and must implement the following methods:

  • __init__: Initialize the dataset object.
  • create: Creation of the actual storage layer.
  • delete: Delete the storage layer.
  • exists: Check if the storage layer exists.
  • initialize: Initialize the query layer.
  • read: Read an entry from the dataset by querying the storage layer.
  • write: Write an entry to the dataset by querying the storage layer.
  • setup_test_mode: Set up the dataset for testing.

Please refer to the method docstrings for more information on the implementation details of each method.

Source code in aineko/core/dataset.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
class AbstractDataset(abc.ABC, Generic[T]):
    """Base class for defining new synchronous Aineko datasets.

    A dataset comprises 2 subcomponents, the query layer and the storage layer.
    The storage layer refers to the actual storage infrastructure that holds the
    data, and the query layer is an API layer that allows for the interaction
    with the storage layer.

    The `AbstractDataset` class provides a common interface for all dataset
    implementations. All dataset implementations must subclass the
    `AbstractDataset` class and must implement the following methods:

    * `__init__`: Initialize the dataset object.
    * `create`: Creation of the actual storage layer.
    * `delete`: Delete the storage layer.
    * `exists`: Check if the storage layer exists.
    * `initialize`: Initialize the query layer.
    * `read`: Read an entry from the dataset by querying the storage layer.
    * `write`: Write an entry to the dataset by querying the storage layer.
    * `setup_test_mode`: Set up the dataset for testing.

    Please refer to the method docstrings for more information on the
    implementation details of each method.
    """

    name: str

    _test: bool
    _input_values: List[Dict]
    _output_values: List[Dict]

    @abc.abstractmethod
    def __init__(
        self,
        name: str,
        params: Dict[str, Any],
        test: bool = False,
    ) -> None:
        """Subclass implementation to initialize the dataset object.

        All dataset implementations must implement the `__init__` method.
        A dataset object should be initialized with the following attributes:

        * `self.name`: The name of the dataset.
        * `self.params`: A dictionary of parameters.
        * `self._test`: Whether the dataset is in test mode.

        Args:
            name: The name of the dataset.
            params: A dictionary of parameters.
            test: Whether the dataset should be initialized in test mode.
        """
        raise NotImplementedError

    def __str__(self) -> str:
        """Return the string representation of the dataset."""
        return f"{self.__class__.__name__}({self.name})"

    @classmethod
    def from_config(
        cls, name: str, config: Dict[str, Any], test: bool = False
    ) -> "AbstractDataset":
        """Create a dataset from a configuration dictionary.

        Args:
            name: The name of the dataset.
            config: The configuration dictionary.
            test: Whether the dataset should be initialized in test mode.

        Returns:
            Instance of an `AbstractDataset` subclass.

        Example:
            In some cases, it is necessary to dynamically create a dataset from
            a configuration dictionary. Since the dataset type could be any
            dataset implementation, the `from_config` method provides a way to
            properly initialize the dataset object.

            ```python
            config = {
                "type": "aineo.datasets.kafka.KafkaDataset",
                "location": "localhost:9092",
                "params": {
                    "param_1": "bar"
                }
            }
            dataset = AbstractDataset.from_config("my_dataset", config)
            ```
        """
        dataset_config = DatasetConfig(**dict(config))

        class_obj = import_from_string(dataset_config.type, kind="class")
        class_instance = class_obj(name, dict(dataset_config), test=test)
        class_instance.name = name
        return class_instance

    @abc.abstractmethod
    def read(self, *args: T, **kwargs: T) -> Any:
        """Subclass implementation to read an entry from the dataset."""
        raise NotImplementedError

    @abc.abstractmethod
    def write(self, *args: T, **kwargs: T) -> Any:
        """Subclass implementation to write an entry to the dataset."""
        raise NotImplementedError

    @abc.abstractmethod
    def create(self, *args: T, **kwargs: T) -> DatasetCreationStatus:
        """Subclass implementation to create the dataset storage layer."""
        raise NotImplementedError

    @abc.abstractmethod
    def delete(self, *args: T, **kwargs: T) -> Any:
        """Subclass implementation to delete the dataset storage layer."""
        raise NotImplementedError

    @abc.abstractmethod
    def initialize(self, *args: T, **kwargs: T) -> Any:
        """Subclass implementation to initialize the dataset query layer."""
        raise NotImplementedError

    @abc.abstractmethod
    def exists(self, *args: T, **kwargs: T) -> bool:
        """Subclass implementation to check if the dataset exists.

        This method should return True if the dataset exists, otherwise False.
        """
        raise NotImplementedError

    @abc.abstractmethod
    def setup_test_mode(
        self,
        source_node: str,
        source_pipeline: str,
        input_values: Optional[List[dict]] = None,
    ) -> None:
        """Subclass implementation to set up the dataset for testing.

        Nodes have the ability to run in test mode, which allows them to run
        without setting up the actual dataset storage layer. All dataset
        implementations must implement this method. A dataset in test mode
        should never interact with the real storage layer. Instead, it should
        use the class attributes as the storage layer:

        * `_input_values` for input values
        * `_output_values` for output values

        Args:
            source_node: The name of the source node.
            source_pipeline: The name of the source pipeline.
            input_values: A list of input values.
        """
        raise NotImplementedError

    def get_test_input_values(self) -> List[Dict]:
        """Return the input values used for testing.

        Returns:
            A list of input values.

        Raises:
            DatasetError: If the dataset is not in test mode.
        """
        if self._test:
            return self._input_values

        raise DatasetError("Dataset is not in test mode.")

    def get_test_output_values(self) -> List[Dict]:
        """Return the output values used for testing.

        Returns:
            A list of output values.

        Raises:
            DatasetError: If the dataset is not in test mode.
        """
        if self._test:
            return self._output_values

        raise DatasetError("Dataset is not in test mode.")

    def test_is_empty(self) -> bool:
        """Return whether the dataset is empty.

        Returns:
            True if the dataset is empty, otherwise False.

        Raises:
            DatasetError: If the dataset is not in test mode.
        """
        if self._test:
            if len(self._input_values) == 0:
                return True
            return False

        raise DatasetError("Dataset is not in test mode.")

name instance-attribute

name: str

__init__ abstractmethod

__init__(
    name: str, params: Dict[str, Any], test: bool = False
) -> None

Subclass implementation to initialize the dataset object.

All dataset implementations must implement the __init__ method. A dataset object should be initialized with the following attributes:

  • self.name: The name of the dataset.
  • self.params: A dictionary of parameters.
  • self._test: Whether the dataset is in test mode.

Parameters:

Name Type Description Default
name str

The name of the dataset.

required
params Dict[str, Any]

A dictionary of parameters.

required
test bool

Whether the dataset should be initialized in test mode.

False
Source code in aineko/core/dataset.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@abc.abstractmethod
def __init__(
    self,
    name: str,
    params: Dict[str, Any],
    test: bool = False,
) -> None:
    """Subclass implementation to initialize the dataset object.

    All dataset implementations must implement the `__init__` method.
    A dataset object should be initialized with the following attributes:

    * `self.name`: The name of the dataset.
    * `self.params`: A dictionary of parameters.
    * `self._test`: Whether the dataset is in test mode.

    Args:
        name: The name of the dataset.
        params: A dictionary of parameters.
        test: Whether the dataset should be initialized in test mode.
    """
    raise NotImplementedError

__str__

__str__() -> str

Return the string representation of the dataset.

Source code in aineko/core/dataset.py
127
128
129
def __str__(self) -> str:
    """Return the string representation of the dataset."""
    return f"{self.__class__.__name__}({self.name})"

create abstractmethod

create(*args: T, **kwargs: T) -> DatasetCreationStatus

Subclass implementation to create the dataset storage layer.

Source code in aineko/core/dataset.py
179
180
181
182
@abc.abstractmethod
def create(self, *args: T, **kwargs: T) -> DatasetCreationStatus:
    """Subclass implementation to create the dataset storage layer."""
    raise NotImplementedError

delete abstractmethod

delete(*args: T, **kwargs: T) -> Any

Subclass implementation to delete the dataset storage layer.

Source code in aineko/core/dataset.py
184
185
186
187
@abc.abstractmethod
def delete(self, *args: T, **kwargs: T) -> Any:
    """Subclass implementation to delete the dataset storage layer."""
    raise NotImplementedError

exists abstractmethod

exists(*args: T, **kwargs: T) -> bool

Subclass implementation to check if the dataset exists.

This method should return True if the dataset exists, otherwise False.

Source code in aineko/core/dataset.py
194
195
196
197
198
199
200
@abc.abstractmethod
def exists(self, *args: T, **kwargs: T) -> bool:
    """Subclass implementation to check if the dataset exists.

    This method should return True if the dataset exists, otherwise False.
    """
    raise NotImplementedError

from_config classmethod

from_config(
    name: str, config: Dict[str, Any], test: bool = False
) -> AbstractDataset

Create a dataset from a configuration dictionary.

Parameters:

Name Type Description Default
name str

The name of the dataset.

required
config Dict[str, Any]

The configuration dictionary.

required
test bool

Whether the dataset should be initialized in test mode.

False

Returns:

Type Description
AbstractDataset

Instance of an AbstractDataset subclass.

Example

In some cases, it is necessary to dynamically create a dataset from a configuration dictionary. Since the dataset type could be any dataset implementation, the from_config method provides a way to properly initialize the dataset object.

config = {
    "type": "aineo.datasets.kafka.KafkaDataset",
    "location": "localhost:9092",
    "params": {
        "param_1": "bar"
    }
}
dataset = AbstractDataset.from_config("my_dataset", config)
Source code in aineko/core/dataset.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
@classmethod
def from_config(
    cls, name: str, config: Dict[str, Any], test: bool = False
) -> "AbstractDataset":
    """Create a dataset from a configuration dictionary.

    Args:
        name: The name of the dataset.
        config: The configuration dictionary.
        test: Whether the dataset should be initialized in test mode.

    Returns:
        Instance of an `AbstractDataset` subclass.

    Example:
        In some cases, it is necessary to dynamically create a dataset from
        a configuration dictionary. Since the dataset type could be any
        dataset implementation, the `from_config` method provides a way to
        properly initialize the dataset object.

        ```python
        config = {
            "type": "aineo.datasets.kafka.KafkaDataset",
            "location": "localhost:9092",
            "params": {
                "param_1": "bar"
            }
        }
        dataset = AbstractDataset.from_config("my_dataset", config)
        ```
    """
    dataset_config = DatasetConfig(**dict(config))

    class_obj = import_from_string(dataset_config.type, kind="class")
    class_instance = class_obj(name, dict(dataset_config), test=test)
    class_instance.name = name
    return class_instance

get_test_input_values

get_test_input_values() -> List[Dict]

Return the input values used for testing.

Returns:

Type Description
List[Dict]

A list of input values.

Raises:

Type Description
DatasetError

If the dataset is not in test mode.

Source code in aineko/core/dataset.py
227
228
229
230
231
232
233
234
235
236
237
238
239
def get_test_input_values(self) -> List[Dict]:
    """Return the input values used for testing.

    Returns:
        A list of input values.

    Raises:
        DatasetError: If the dataset is not in test mode.
    """
    if self._test:
        return self._input_values

    raise DatasetError("Dataset is not in test mode.")

get_test_output_values

get_test_output_values() -> List[Dict]

Return the output values used for testing.

Returns:

Type Description
List[Dict]

A list of output values.

Raises:

Type Description
DatasetError

If the dataset is not in test mode.

Source code in aineko/core/dataset.py
241
242
243
244
245
246
247
248
249
250
251
252
253
def get_test_output_values(self) -> List[Dict]:
    """Return the output values used for testing.

    Returns:
        A list of output values.

    Raises:
        DatasetError: If the dataset is not in test mode.
    """
    if self._test:
        return self._output_values

    raise DatasetError("Dataset is not in test mode.")

initialize abstractmethod

initialize(*args: T, **kwargs: T) -> Any

Subclass implementation to initialize the dataset query layer.

Source code in aineko/core/dataset.py
189
190
191
192
@abc.abstractmethod
def initialize(self, *args: T, **kwargs: T) -> Any:
    """Subclass implementation to initialize the dataset query layer."""
    raise NotImplementedError

read abstractmethod

read(*args: T, **kwargs: T) -> Any

Subclass implementation to read an entry from the dataset.

Source code in aineko/core/dataset.py
169
170
171
172
@abc.abstractmethod
def read(self, *args: T, **kwargs: T) -> Any:
    """Subclass implementation to read an entry from the dataset."""
    raise NotImplementedError

setup_test_mode abstractmethod

setup_test_mode(
    source_node: str,
    source_pipeline: str,
    input_values: Optional[List[dict]] = None,
) -> None

Subclass implementation to set up the dataset for testing.

Nodes have the ability to run in test mode, which allows them to run without setting up the actual dataset storage layer. All dataset implementations must implement this method. A dataset in test mode should never interact with the real storage layer. Instead, it should use the class attributes as the storage layer:

  • _input_values for input values
  • _output_values for output values

Parameters:

Name Type Description Default
source_node str

The name of the source node.

required
source_pipeline str

The name of the source pipeline.

required
input_values Optional[List[dict]]

A list of input values.

None
Source code in aineko/core/dataset.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@abc.abstractmethod
def setup_test_mode(
    self,
    source_node: str,
    source_pipeline: str,
    input_values: Optional[List[dict]] = None,
) -> None:
    """Subclass implementation to set up the dataset for testing.

    Nodes have the ability to run in test mode, which allows them to run
    without setting up the actual dataset storage layer. All dataset
    implementations must implement this method. A dataset in test mode
    should never interact with the real storage layer. Instead, it should
    use the class attributes as the storage layer:

    * `_input_values` for input values
    * `_output_values` for output values

    Args:
        source_node: The name of the source node.
        source_pipeline: The name of the source pipeline.
        input_values: A list of input values.
    """
    raise NotImplementedError

test_is_empty

test_is_empty() -> bool

Return whether the dataset is empty.

Returns:

Type Description
bool

True if the dataset is empty, otherwise False.

Raises:

Type Description
DatasetError

If the dataset is not in test mode.

Source code in aineko/core/dataset.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
def test_is_empty(self) -> bool:
    """Return whether the dataset is empty.

    Returns:
        True if the dataset is empty, otherwise False.

    Raises:
        DatasetError: If the dataset is not in test mode.
    """
    if self._test:
        if len(self._input_values) == 0:
            return True
        return False

    raise DatasetError("Dataset is not in test mode.")

write abstractmethod

write(*args: T, **kwargs: T) -> Any

Subclass implementation to write an entry to the dataset.

Source code in aineko/core/dataset.py
174
175
176
177
@abc.abstractmethod
def write(self, *args: T, **kwargs: T) -> Any:
    """Subclass implementation to write an entry to the dataset."""
    raise NotImplementedError

aineko.core.dataset.DatasetCreationStatus

DatasetCreationStatus(
    dataset_name: str, future: Optional[Future] = None
)

Status of dataset creation.

Attributes:

Name Type Description
dataset_name

Name of the dataset.

_future

Future representing the creation status of the dataset.

Usage
dataset = MyDataset("my_dataset")
creation_status = dataset.create()
if creation_status.done():
    print(f"Dataset {creation_status.dataset_name} has been created.")
else:
    print(f"Dataset {creation_status.dataset_name} is being created.")

Initialize the dataset creation status.

Source code in aineko/core/dataset.py
57
58
59
60
def __init__(self, dataset_name: str, future: Optional[Future] = None):
    """Initialize the dataset creation status."""
    self.dataset_name = dataset_name
    self._future = future

dataset_name instance-attribute

dataset_name = dataset_name

done

done() -> bool

Check if the dataset has been created.

Returns:

Type Description
bool

True if the dataset has been created, otherwise False.

Source code in aineko/core/dataset.py
62
63
64
65
66
67
68
69
70
def done(self) -> bool:
    """Check if the dataset has been created.

    Returns:
        True if the dataset has been created, otherwise False.
    """
    if self._future:
        return self._future.done()
    return True