Skip to content

Configuring Kafka

Aineko uses kafka under the hood for sending messages between nodes. As part of running Aineko locally, it's recommended to run a local kafka and zookeeper server using
poetry run aineko service start

To use a different kafka cluster, such as in deployment settings, Aineko allows for configuring of kafka parameters through environment variables. Typically, you would want to modify configuration for the consumer and producer to point to the desired cluster.

See below for default kafka configuration that ships with aineko and how to override them.

aineko.config

Configuration file for Aineko modules.

Kafka configuration can be set using the following environment variables:

KAFKA_CONFIG: JSON string with kafka configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for all options)

Additionally, the following environment variables can be used to specify certain configuration values. They correspond to configuration keys found in the above link, but with a prefix. For example, KAFKA_CONFIG_BOOTSTRAP_SERVERS corresponds to bootstrap.servers.

  • KAFKA_CONFIG_BOOTSTRAP_SERVERS (e.g. localhost:9092,localhost:9093)
  • KAFKA_CONFIG_SASL_USERNAME
  • KAFKA_CONFIG_SASL_PASSWORD
  • KAFKA_CONFIG_SECURITY_PROTOCOL
  • KAFKA_CONFIG_SASL_MECHANISM

DEFAULT_KAFKA_CONFIG

Bases: BaseConfig

Kafka configuration.

BROKER_CONFIG class-attribute instance-attribute

BROKER_CONFIG = {'bootstrap.servers': 'localhost:9092'}

CONSUMER_CONFIG class-attribute instance-attribute

CONSUMER_CONFIG: Dict[str, str] = {
    None: BROKER_CONFIG,
    "auto.offset.reset": "earliest",
}

CONSUMER_MAX_MESSAGES class-attribute instance-attribute

CONSUMER_MAX_MESSAGES = 1000000

CONSUMER_TIMEOUT class-attribute instance-attribute

CONSUMER_TIMEOUT = 0

DATASET_PARAMS class-attribute instance-attribute

DATASET_PARAMS = {
    "num_partitions": 1,
    "replication_factor": 1,
    "config": {"retention.ms": 1000 * 60 * 60 * 24 * 7},
}

OVERRIDABLES class-attribute instance-attribute

OVERRIDABLES = {
    "KAFKA_CONFIG_BOOTSTRAP_SERVERS": "bootstrap.servers",
    "KAFKA_CONFIG_SASL_USERNAME": "sasl.username",
    "KAFKA_CONFIG_SASL_PASSWORD": "sasl.password",
    "KAFKA_CONFIG_SECURITY_PROTOCOL": "security.protocol",
    "KAFKA_CONFIG_SASL_MECHANISM": "sasl.mechanism",
}

PRODUCER_CONFIG class-attribute instance-attribute

PRODUCER_CONFIG: Dict[str, str] = {None: BROKER_CONFIG}

PRODUCER_OVERRIDABLES class-attribute instance-attribute

PRODUCER_OVERRIDABLES = []

kafka_config class-attribute instance-attribute

kafka_config = get('KAFKA_CONFIG', '{}')

value class-attribute instance-attribute

value = get(env)