Utility class for managing Kafka consumers and producers. This class should not be instantiated.

Constructors

Methods

  • Disconnects the consumer and producer from Kafka.

    Parameters

    • kafka: Kafka

      The Kafka instance to disconnect from.

    • consumers: Consumer[]
    • producers: Producer[]
    • app: FastifyInstance<
          RawServerDefault,
          IncomingMessage,
          ServerResponse<IncomingMessage>,
          FastifyBaseLogger,
          FastifyTypeProviderDefault,
      >

      The Fastify instance used for logging.

    Returns Promise<void>

    A promise that resolves when both the consumer and producer are disconnected.

    Will log an error if there is an issue disconnecting the consumer or producer.

    await KafkaUtils.disconnectFromKafka(kafka, consumers, producers, app);
    
  • Initializes a Kafka consumer with the specified configuration and subscribes to the given topic.

    Parameters

    • kafka: Kafka

      An instance of the Kafka client.

    • kafkaConfig: KafkaConfig

      Configuration settings for the Kafka consumer, including the groupId.

    • topic: string

      The topic to which the consumer should subscribe.

    Returns Promise<Consumer>

    A promise that resolves to the initialized Kafka consumer.

    const consumer = await KafkaUtils.initializeConsumer(kafka, kafkaConfig, 'my-topic');
    consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
    console.log({
    value: message.value.toString(),
    });
    }
    });
  • Initializes and connects a Kafka producer.

    Parameters

    • kafka: Kafka

      The Kafka instance to use for creating the producer.

    Returns Promise<Producer>

    A promise that resolves to the connected Kafka producer.

    const producer = await KafkaUtils.initializeProducer(kafka);
    await producer.send({
    topic: 'my-topic',
    messages: [
    { value: 'Hello KafkaJS user!' },
    ],
    });