Configuring Kafka
- Aineko uses
kafka
under the hood for sending messages between nodes. As part of running Aineko locally, it's recommended to run a localkafka
andzookeeper
server using -
poetry run aineko service start
To use a different kafka
cluster, such as in deployment settings, Aineko allows for configuring of kafka
parameters through environment variables. Typically, you would want to modify configuration for the consumer and producer to point to the desired cluster.
See below for default kafka
configuration that ships with aineko
and how to override them.
aineko.config
Configuration file for Aineko modules.
Kafka configuration can be set using the following environment variables:
KAFKA_CONFIG: JSON string with kafka configuration (see https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md for all options)
Additionally, the following environment variables can be used to specify certain
configuration values. They correspond to configuration keys found in the above
link, but with a prefix. For example, KAFKA_CONFIG_BOOTSTRAP_SERVERS
corresponds to bootstrap.servers
.
- KAFKA_CONFIG_BOOTSTRAP_SERVERS (e.g.
localhost:9092,localhost:9093
) - KAFKA_CONFIG_SASL_USERNAME
- KAFKA_CONFIG_SASL_PASSWORD
- KAFKA_CONFIG_SECURITY_PROTOCOL
- KAFKA_CONFIG_SASL_MECHANISM
DEFAULT_KAFKA_CONFIG
Bases: BaseConfig
Kafka configuration.
BROKER_CONFIG
class-attribute
instance-attribute
BROKER_CONFIG = {'bootstrap.servers': 'localhost:9092'}
CONSUMER_CONFIG
class-attribute
instance-attribute
CONSUMER_CONFIG: Dict[str, str] = {
None: BROKER_CONFIG,
"auto.offset.reset": "earliest",
}
CONSUMER_MAX_MESSAGES
class-attribute
instance-attribute
CONSUMER_MAX_MESSAGES = 1000000
CONSUMER_TIMEOUT
class-attribute
instance-attribute
CONSUMER_TIMEOUT = 0
DATASET_PARAMS
class-attribute
instance-attribute
DATASET_PARAMS = {
"num_partitions": 1,
"replication_factor": 1,
"config": {"retention.ms": 1000 * 60 * 60 * 24 * 7},
}
OVERRIDABLES
class-attribute
instance-attribute
OVERRIDABLES = {
"KAFKA_CONFIG_BOOTSTRAP_SERVERS": "bootstrap.servers",
"KAFKA_CONFIG_SASL_USERNAME": "sasl.username",
"KAFKA_CONFIG_SASL_PASSWORD": "sasl.password",
"KAFKA_CONFIG_SECURITY_PROTOCOL": "security.protocol",
"KAFKA_CONFIG_SASL_MECHANISM": "sasl.mechanism",
}
PRODUCER_CONFIG
class-attribute
instance-attribute
PRODUCER_CONFIG: Dict[str, str] = {None: BROKER_CONFIG}
PRODUCER_OVERRIDABLES
class-attribute
instance-attribute
PRODUCER_OVERRIDABLES = []
kafka_config
class-attribute
instance-attribute
kafka_config = get('KAFKA_CONFIG', '{}')
value
class-attribute
instance-attribute
value = get(env)