Skip to content

KafkaDataset

The KafkaDataset class is a subclass of the AbstractDataset class.

aineko.datasets.kafka.KafkaDataset

KafkaDataset(
    name: str, params: Dict[str, Any], test: bool = False
)

Bases: AbstractDataset

Kafka dataset.

Dataset Storage Layer is a Kafka topic.

Dataset Query Layer is a Kafka Consumer and Producer.

read method consumes from a Kakfa topic.

write method produces to a Kafka topic.

create method creates the dataset topic in the Kafka cluster.

initialize method can be used to create a consumer or producer.

delete method deletes the dataset topic in the Kafka cluster.

exists method checks if the dataset topic exists.

Parameters:

Name Type Description Default
name str

name of the dataset

required
params Dict[str, Any]

dataset configuration parameters

required
test bool

whether the dataset should be initialized in test mode

False

Attributes:

Name Type Description
name str

name of the dataset

topic_name str

name of the Kafka topic

params dict

dataset configuration parameters

credentials KafkaCredentials

Kafka credentials

_consumer Consumer

Kafka consumer

_producer Producer

Kafka producer

_admin_client AdminClient

Kafka AdminClient

cached bool

True if the consumer has been polled, False otherwise

location str

location of the dataset

consumer_name str

name of the consumer

Raises:

Type Description
KafkaDatasetError

if an error occurs while creating the dataset

Initialize the dataset.

Source code in aineko/datasets/kafka.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def __init__(
    self,
    name: str,
    params: Dict[str, Any],
    test: bool = False,
):
    """Initialize the dataset."""
    self.name = name
    self.topic_name = name
    self.params = params
    self.consumer_name: Optional[str] = None
    self.credentials = KafkaCredentials(
        **params.get("kafka_credentials", {})
    )
    self.cached = False
    self.source_node: str
    self.source_pipeline: str
    self._consumer: Consumer
    self._producer: Producer
    self._test = test
    self._empty = True
    self._input_values: List[Dict] = []
    self._output_values: List[Dict] = []

    if self._test is False:
        self.location = self._update_location()
        self._create_admin_client()

cached instance-attribute

cached = False

consumer_name instance-attribute

consumer_name: Optional[str] = None

credentials instance-attribute

credentials = KafkaCredentials(
    **get("kafka_credentials", {})
)

location instance-attribute

location = _update_location()

name instance-attribute

name = name

params instance-attribute

params = params

source_node instance-attribute

source_node: str

source_pipeline instance-attribute

source_pipeline: str

topic_name instance-attribute

topic_name = name

consume_all

consume_all(end_message: Union[str, bool] = False) -> list

Reads all messages from the dataset until a specific one is found.

Parameters:

Name Type Description Default
end_message Union[str, bool]

Message to trigger the completion of consumption

False

Returns:

Type Description
list

list of messages from the dataset

Source code in aineko/datasets/kafka.py
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
def consume_all(self, end_message: Union[str, bool] = False) -> list:
    """Reads all messages from the dataset until a specific one is found.

    Args:
        end_message: Message to trigger the completion of consumption

    Returns:
        list of messages from the dataset
    """
    messages = []
    while True:
        message = self._consume()
        if message is None:
            continue
        if message["message"] == end_message:
            break
        messages.append(message)
    return messages

create

create(
    dataset_prefix: Optional[str] = None,
) -> DatasetCreationStatus

Create the dataset storage layer kafka topic.

Parameters:

Name Type Description Default
dataset_prefix Optional[str]

Optional prefix for dataset name.

None

Returns:

Type Description
DatasetCreationStatus

status of dataset creation.

Source code in aineko/datasets/kafka.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
def create(
    self,
    dataset_prefix: Optional[str] = None,
) -> DatasetCreationStatus:
    """Create the dataset storage layer kafka topic.

    Args:
        dataset_prefix: Optional prefix for dataset name.

    Returns:
      status of dataset creation.
    """
    return self._create_topic(
        dataset_name=self.name, dataset_prefix=dataset_prefix
    )

delete

delete() -> None

Delete the dataset topic from the Kafka cluster.

Raises:

Type Description
KafkaDatasetError

if an error occurs while deleting the topic

Source code in aineko/datasets/kafka.py
248
249
250
251
252
253
254
255
256
257
258
259
def delete(self) -> None:
    """Delete the dataset topic from the Kafka cluster.

    Raises:
        KafkaDatasetError: if an error occurs while deleting the topic
    """
    try:
        self._admin_client.delete_topics([self.topic_name])
    except Exception as err:
        raise KafkaDatasetError(
            f"Error deleting topic {self.topic_name}: {str(err)}"
        ) from err

exists

exists() -> bool

Check if the dataset exists.

Returns:

Type Description
bool

True if the dataset topic exists, False otherwise

Source code in aineko/datasets/kafka.py
366
367
368
369
370
371
372
def exists(self) -> bool:
    """Check if the dataset exists.

    Returns:
        True if the dataset topic exists, False otherwise
    """
    return self.topic_name in self._admin_client.list_topics().topics

initialize

initialize(
    create: Literal["consumer", "producer"],
    node_name: str,
    pipeline_name: str,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
) -> None

Create query layer reader or writer for the dataset.

This method initializes a producer or consumer for the Kafka dataset, depending on the value of the create parameter.

Parameters:

Name Type Description Default
create Literal['consumer', 'producer']

whether to create a consumer or producer for the dataset

required
node_name str

name of the node

required
pipeline_name str

name of the pipeline that the node belongs to

required
prefix Optional[str]

prefix for the dataset topic

None
has_pipeline_prefix bool

Whether the dataset topic has a pipeline prefix

False

Raises:

Type Description
KafkaDatasetError

if an error occurs while creating the consumer or producer

Source code in aineko/datasets/kafka.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
def initialize(
    self,
    create: Literal["consumer", "producer"],
    node_name: str,
    pipeline_name: str,
    prefix: Optional[str] = None,
    has_pipeline_prefix: bool = False,
) -> None:
    """Create query layer reader or writer for the dataset.

    This method initializes a producer or consumer for the Kafka dataset,
    depending on the value of the `create` parameter.

    Args:
        create: whether to create a consumer or producer for the dataset
        node_name: name of the node
        pipeline_name: name of the pipeline that the node belongs to
        prefix: prefix for the dataset topic
        has_pipeline_prefix: Whether the dataset topic has a pipeline prefix

    Raises:
        KafkaDatasetError: if an error occurs while creating the consumer
            or producer
    """
    if create == "consumer":
        try:
            self._create_consumer(
                consumer_params=ConsumerParams(
                    dataset_name=self.name,
                    node_name=node_name,
                    pipeline_name=pipeline_name,
                    prefix=prefix,
                    has_pipeline_prefix=has_pipeline_prefix,
                    consumer_config=DEFAULT_KAFKA_CONFIG.get(
                        "CONSUMER_CONFIG"
                    ),
                )
            )
            logger.info("Consumer for %s created.", self.topic_name)
        except KafkaError as err:
            raise KafkaDatasetError(
                f"Error creating consumer for {self.topic_name}: {str(err)}"
            ) from err
        return
    elif create == "producer":
        try:
            self._create_producer(
                producer_params=ProducerParams(
                    dataset_name=self.name,
                    node_name=node_name,
                    pipeline_name=pipeline_name,
                    prefix=prefix,
                    has_pipeline_prefix=has_pipeline_prefix,
                    producer_config=DEFAULT_KAFKA_CONFIG.get(
                        "PRODUCER_CONFIG"
                    ),
                )
            )
            logger.info("Producer for %s created.", self.topic_name)
        except KafkaError as err:
            raise KafkaDatasetError(
                f"Error creating producer for {self.topic_name}: {str(err)}"
            ) from err
        return

last

last(timeout: int = 1) -> Dict

Consumes the last message from the dataset.

Wraps the _consume_message(how="last") method. It implements a block that waits until a message is received before returning it. This method ensures that the consumed message is always the most recent message. If the consumer is slower than the producer, messages might be skipped. If the consumer is faster than the producer, messages might be repeated.

This is useful when the timeout is short and you expect the consumer to often return None.

Note: The timeout must be greater than 0 to prevent overwhelming the broker with requests to update the offset.

Parameters:

Name Type Description Default
timeout int

seconds to poll for a response from kafka broker. Must be >0.

1

Returns:

Type Description
Dict

message from the dataset

Raises:

Type Description
ValueError

if timeout is <= 0

Source code in aineko/datasets/kafka.py
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
def last(self, timeout: int = 1) -> Dict:
    """Consumes the last message from the dataset.

    Wraps the `_consume_message(how="last")` method. It implements a
    block that waits until a message is received before returning it.
    This method ensures that the consumed message is always the most
    recent message. If the consumer is slower than the producer, messages
    might be skipped. If the consumer is faster than the producer,
    messages might be repeated.

    This is useful when the timeout is short and you expect the consumer
    to often return `None`.

    Note: The timeout must be greater than 0 to prevent
    overwhelming the broker with requests to update the offset.

    Args:
        timeout: seconds to poll for a response from kafka broker.
            Must be >0.

    Returns:
        message from the dataset

    Raises:
        ValueError: if timeout is <= 0
    """
    if timeout <= 0:
        raise ValueError(
            "Timeout must be > 0 when consuming the last message."
        )
    return self._consume_message(how="last", timeout=timeout)

next

next() -> Dict

Consumes the next message from the dataset.

Wraps the _consume_message(how="next") method. It implements a block that waits until a message is received before returning it. This method ensures that every message is consumed, but the consumed message may not be the most recent message if the consumer is slower than the producer.

This is useful when the timeout is short and you expect the consumer to often return None.

Returns:

Type Description
Dict

message from the dataset

Source code in aineko/datasets/kafka.py
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
def next(self) -> Dict:
    """Consumes the next message from the dataset.

    Wraps the `_consume_message(how="next")` method. It implements a
    block that waits until a message is received before returning it.
    This method ensures that every message is consumed, but the consumed
    message may not be the most recent message if the consumer is slower
    than the producer.

    This is useful when the timeout is short and you expect the consumer
    to often return `None`.

    Returns:
        message from the dataset
    """
    return self._consume_message(how="next")

read

read(
    how: Literal["next", "last"],
    timeout: Optional[float] = None,
    block: bool = False,
) -> Optional[Dict]

Read the dataset message via the query layer.

Parameters:

Name Type Description Default
how Literal['next', 'last']

how to read the message "next": read the next message in the queue ":last": read the last message in the queue

required
timeout Optional[float]

seconds to poll for a response from kafka broker. If using how="last", set to bigger than 0.

None
block bool

if True, block until a message is received

False

Returns:

Type Description
Optional[Dict]

message from the dataset

Raises:

Type Description
ValueError

if how is not "next" or "last"

Source code in aineko/datasets/kafka.py
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
def read(
    self,
    how: Literal["next", "last"],
    timeout: Optional[float] = None,
    block: bool = False,
) -> Optional[Dict]:
    """Read the dataset message via the query layer.

    Args:
        how: how to read the message
            "next": read the next message in the queue
            ":last": read the last message in the queue
        timeout: seconds to poll for a response from kafka broker.
            If using how="last", set to bigger than 0.
        block: if True, block until a message is received

    Returns:
        message from the dataset

    Raises:
        ValueError: if how is not "next" or "last"
    """
    if block:
        return self._consume_message(how=how, timeout=timeout)
    else:
        return self._consume(how=how, timeout=timeout)

setup_test_mode

setup_test_mode(
    source_node: str,
    source_pipeline: str,
    input_values: Optional[List[dict]] = None,
) -> None

Sets up the dataset for testing.

The dataset is set up to return the input values when any reading method is called. Input values should be a list of dicts where the dict is the actual message payload. The dataset will handle the metadata for the messages. (timestamp, source_node, source_pipeline, etc.)

Parameters:

Name Type Description Default
source_node str

name of the source node

required
source_pipeline str

name of the source pipeline

required
input_values Optional[List[dict]]

list of input values to be used for testing

None

Raises:

Type Description
DatasetError

if the dataset is not initialized with the test flag

Source code in aineko/datasets/kafka.py
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
def setup_test_mode(
    self,
    source_node: str,
    source_pipeline: str,
    input_values: Optional[List[dict]] = None,
) -> None:
    """Sets up the dataset for testing.

    The dataset is set up to return the input values when any reading
    method is called. Input values should be a list of dicts where the dict
    is the actual message payload. The dataset will handle the metadata for
    the messages. (timestamp, source_node, source_pipeline, etc.)

    Args:
        source_node: name of the source node
        source_pipeline: name of the source pipeline
        input_values: list of input values to be used for testing

    Raises:
        DatasetError: if the dataset is not initialized with the test flag
    """
    if self._test is False:
        raise DatasetError(
            "Cannot set up test mode if the dataset is not initialized "
            "with the test flag."
        )

    self.source_node = source_node
    self.source_pipeline = source_pipeline
    if input_values is not None:
        for input_value in input_values.copy():
            self._input_values.append(
                {
                    "timestamp": datetime.datetime.now().strftime(
                        AINEKO_CONFIG.get("MSG_TIMESTAMP_FORMAT")
                    ),
                    "message": input_value,
                    "source_node": self.source_node,
                    "source_pipeline": self.source_pipeline,
                }
            )

write

write(msg: Dict, key: Optional[str] = None) -> None

Produce a message to the dataset.

Parameters:

Name Type Description Default
msg Dict

message to produce to the dataset

required
key Optional[str]

key to use for the message

None

Raises:

Type Description
KafkaDatasetError

if an error occurs while writing to the topic

Source code in aineko/datasets/kafka.py
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def write(self, msg: Dict, key: Optional[str] = None) -> None:
    """Produce a message to the dataset.

    Args:
        msg: message to produce to the dataset
        key: key to use for the message

    Raises:
        KafkaDatasetError: if an error occurs while writing to the topic
    """
    # Note, this will be re-written to use the dataset's schema,
    # without added metadata.
    message = {
        "timestamp": datetime.datetime.now().strftime(
            AINEKO_CONFIG.get("MSG_TIMESTAMP_FORMAT")
        ),
        "dataset": self.name,
        "source_pipeline": self.source_pipeline,
        "source_node": self.source_node,
        "message": msg,
    }

    if self._test:
        if msg is not None:
            self._output_values.append(message)
        return None

    self._producer.poll(0)

    key_bytes = str(key).encode("utf-8") if key is not None else None

    self._producer.produce(
        topic=self.topic_name,
        key=key_bytes,
        value=json.dumps(message).encode("utf-8"),
        callback=self._delivery_report,
    )
    self._producer.flush()