Skip to content

Kafka Operations


setBootstrapServers

Description: This function is used to set the bootstrap servers for establishing the kafka connection.

Input Format : @Expected server details

ObjectName Action Input Condition Reference
Kafka 🟢 setBootstrapServers @value ⬅ Hardcoded Input
Kafka 🟢 setBootstrapServers Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setBootstrapServers %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Bootstrap Servers", input = InputType.YES, condition = InputType.NO)
public void setBootstrapServers() {
    try {
        kafkaServers.put(key, Data);
        Report.updateTestLog(Action, "Bootstrap Servers have been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Bootstrap Servers setup", ex);
        Report.updateTestLog(Action, "Error in setting Bootstrap Servers: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setProducerTopic

Description: This function is used to set the Producer Topic name

Input Format : @Expected Producer Topic Name

ObjectName Action Input Condition Reference
Kafka 🟢 setProducerTopic @value ⬅ Hardcoded Input
Kafka 🟢 setProducerTopic Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setProducerTopic %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Producer Topic", input = InputType.YES, condition = InputType.NO)
public void setProducerTopic() {
    try {
        kafkaProducerTopic.put(key, Data);
        Report.updateTestLog(Action, "Topic has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Topic setup", ex);
        Report.updateTestLog(Action, "Error in setting Topic: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setKey

Description: This function is used to set the key for the Kafka connection.

Input Format : @Expected Key

ObjectName Action Input Condition Reference
Kafka 🟢 setKey @value ⬅ Hardcoded Input
Kafka 🟢 setKey Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setKey %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Key", input = InputType.YES, condition = InputType.NO)
public void setKey() {
    try {
        kafkaKey.put(key, Data);
        Report.updateTestLog(Action, "Key has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Key setup", ex);
        Report.updateTestLog(Action, "Error in setting Key: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setKeySerializer

Description: This function is used to set the Key Serializer for the Kafka Producer.

Input Format : @Expected Key Serializer. Expected values are : string / bytearray / avro

ObjectName Action Input Condition Reference
Kafka 🟢 setKeySerializer @value ⬅ Hardcoded Input
Kafka 🟢 setKeySerializer Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setKeySerializer %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Key Serializer", input = InputType.YES, condition = InputType.NO)
public void setKeySerializer() {
    try {
        kafkaKeySerializer.put(key, Data);
        Report.updateTestLog(Action, "Key Serializer has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Key Serializer setup", ex);
        Report.updateTestLog(Action, "Error in setting Key Serializer: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setValueSerializer

Description: This function is used to set the Value Serializer for the Kafka Producer.

Input Format : @Expected Value Serializer. Expected values are : string / bytearray / avro

ObjectName Action Input Condition Reference
Kafka 🟢 setValueSerializer @value ⬅ Hardcoded Input
Kafka 🟢 setValueSerializer Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setValueSerializer %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Value Serializer", input = InputType.YES, condition = InputType.NO)
public void setValueSerializer() {
    try {
        kafkaValueSerializer.put(key, Data);
        Report.updateTestLog(Action, "Value Serializer has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Value Serializer setup", ex);
        Report.updateTestLog(Action, "Error in setting Value Serializer: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setSchemaRegistryURL

Description: This function is used to set the Schema Registry URL (mostly used when firing Avro serialized value).

Input Format : @Expected Schema Registry URL

ObjectName Action Input Condition Reference
Kafka 🟢 setSchemaRegistryURL @value ⬅ Hardcoded Input
Kafka 🟢 setSchemaRegistryURL Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setSchemaRegistryURL %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Schema Registry URL", input = InputType.YES, condition = InputType.NO)
public void setSchemaRegistryURL() {
    try {
        kafkaSchemaRegistryURL.put(key, Data);
        Report.updateTestLog(Action, "Schema Registry URL has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Schema Registry URL setup", ex);
        Report.updateTestLog(Action, "Error in setting Schema Registry URL: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setAutoRegisterSchemas

Description: When set to true, this function automatically registers the Avro schema with the Schema Registry when producing messages. If set to false, the producer checks for the Avro schema's existence in the registry; if not found, the message will not be published and an error will be thrown.

Input Format : @Boolean Value. Either set to true or false

ObjectName Action Input Condition Reference
Kafka 🟢 setAutoRegisterSchemas @value ⬅ Hardcoded Input
Kafka 🟢 setAutoRegisterSchemas Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setAutoRegisterSchemas %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Auto Register Schemas", input = InputType.YES, condition = InputType.NO)
public void setAutoRegisterSchemas() {
    try {
        kafkaAutoRegisterSchemas.put(key, Boolean.valueOf(Data.toLowerCase().trim()));
        Report.updateTestLog(Action, "Auto Register Schemas has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception Max Poll Record setup", ex);
        Report.updateTestLog(Action, "Error in Auto Register Schemas: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

addSchema

Description: This function is used to register the Avro schema with the Schema Registry.

Input Format : @Path to the schema file

ObjectName Action Input Condition Reference
Kafka 🟢 addSchema @value ⬅ Hardcoded Input
Kafka 🟢 addSchema Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 addSchema %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Add Avro Schema", input = InputType.YES, condition = InputType.NO)
public void addSchema() throws IOException {
    try {
        Schema mainSchema = null;
        Schema.Parser parser = new Schema.Parser();
        if (Data.contains(";")) {
            String[] paths = Data.split(";");
            for (int i = 0; i < paths.length - 1; i++) {

                parser.parse(new File(Paths.get(paths[i]).toString()));
            }
            mainSchema = parser.parse(new File(Paths.get(paths[paths.length - 1]).toString()));

        } else {
            // Only one schema, no dependencies
            mainSchema = new Schema.Parser().parse(new File(Paths.get(Data).toString()));
        }
        kafkaAvroSchema.put(key, mainSchema);
        Report.updateTestLog(Action, "Schema added successfully", Status.DONE);
    } catch (Exception e) {
        Report.updateTestLog(Action, " Unable to add Schema : " + e.getMessage(), Status.FAIL);
    }

}

setPartition

Description: This function is used to set the Partition for the Kafka Producer.

Input Format : @Expected Partition value. Either set an Integer or just set null

ObjectName Action Input Condition Reference
Kafka 🟢 setPartition @value ⬅ Hardcoded Input
Kafka 🟢 setPartition Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setPartition %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Partition", input = InputType.YES, condition = InputType.NO)
public void setPartition() {
    try {
        if (Data.toLowerCase().equals("null")) {
            kafkaPartition.put(key, null);
        } else {
            kafkaPartition.put(key, Integer.valueOf(Data));
        }
        Report.updateTestLog(Action, "Partition has been set successfully", Status.DONE);
    } catch (NumberFormatException ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Partition setup", ex);
        Report.updateTestLog(Action, "Error in setting Partition: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}   

setTimeStamp

Description: This function is used to set the Time Stamp for the Kafka Producer. This sets the System.currentTimeMillis() to the producer record.

ObjectName Action Input Condition Reference
Kafka 🟢 setTimeStamp
@Action(object = ObjectType.KAFKA, desc = "Set TimeStamp", input = InputType.NO, condition = InputType.NO)
public void setTimeStamp() {
    try {
        kafkaTimeStamp.put(key, System.currentTimeMillis());
        Report.updateTestLog(Action, "Time Stamp has been set successfully", Status.DONE);
    } catch (NumberFormatException ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Time Stamp setup", ex);
        Report.updateTestLog(Action, "Error in setting Time Stamp: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

addKafkaHeader

Description: This function is used to add a header to the Kafka Producer Record.

Input Format : @HeaderName=HeaderValue

ObjectName Action Input Condition Reference
Kafka 🟢 addHeader @HeaderName=HeaderValue ⬅ Hardcoded Input
Kafka 🟢 addHeader Sheet:Column containing HeaderName=HeaderValue ⬅ Input from Datasheet
Kafka 🟢 addHeader %dynamicVar% containing HeaderName=HeaderValue ⬅ Input from variable
Kafka 🟢 addHeader @HeaderName={SheetName:ColumnName} ⬅ Input from variable
Kafka 🟢 addHeader @HeaderName=%dynamicVar% ⬅ Input from variable

This function adds all the Headers into a HashMap headerlist. Then those are applied to the request as :

@Action(object = ObjectType.KAFKA, desc = "Add Kafka Header", input = InputType.YES)
public void addKafkaHeader() {
    try {

        List<String> sheetlist = Control.getCurrentProject().getTestData().getTestDataFor(Control.exe.runEnv())
                .getTestDataNames();
        for (int sheet = 0; sheet < sheetlist.size(); sheet++) {
            if (Data.contains("{" + sheetlist.get(sheet) + ":")) {
                com.ing.datalib.testdata.model.TestDataModel tdModel = Control.getCurrentProject().getTestData()
                        .getTestDataByName(sheetlist.get(sheet));
                List<String> columns = tdModel.getColumns();
                for (int col = 0; col < columns.size(); col++) {
                    if (Data.contains("{" + sheetlist.get(sheet) + ":" + columns.get(col) + "}")) {
                        Data = Data.replace("{" + sheetlist.get(sheet) + ":" + columns.get(col) + "}",
                                userData.getData(sheetlist.get(sheet), columns.get(col)));
                    }
                }
            }
        }

        Collection<Object> valuelist = Control.getCurrentProject().getProjectSettings().getUserDefinedSettings()
                .values();
        for (Object prop : valuelist) {
            if (Data.contains("{" + prop + "}")) {
                Data = Data.replace("{" + prop + "}", prop.toString());
            }
        }
        String headerKey = Data.split("=", 2)[0];
        String headerValue = Data.split("=", 2)[1];

        if (kafkaHeaders.containsKey(key)) {
            kafkaHeaders.get(key).add(new RecordHeader(headerKey, headerValue.getBytes()));
        } else {
            ArrayList<Header> toBeAdded = new ArrayList<Header>();
            toBeAdded.add(new RecordHeader(headerKey, headerValue.getBytes()));
            kafkaHeaders.put(key, toBeAdded);
        }

        Report.updateTestLog(Action, "Header added " + Data, Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error adding Header :" + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

produceMessage

Description: This function is used to Produce the Kafka Producer Record.

Input Format : @Expected Payload

ObjectName Action Input Condition Reference
Kafka 🟢 produceMessage @Payload (from Editor) ⬅ Hardcoded Input
Kafka 🟢 produceMessage Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 produceMessage %dynamicVar% ⬅ Input from variable

Inputs in the Input column can be either hardcoded, passed inside the Payload editor which is capable of parameterising the Payload (Press ctrl+space to see the list of variables available ), passed from the data sheet (datasheet name : column name) or passed from a variable value (%variable name%), as given in the above example.

@Action(object = ObjectType.KAFKA, desc = "Produce Kafka Message", input = InputType.YES, condition = InputType.NO)
public void produceMessage() {
    try {
        String value = Data;
        value = handleDataSheetVariables(value);
        value = handleuserDefinedVariables(value);
        kafkaValue.put(key, value);

        if (kafkaHeaders.get(key) != null && kafkaTimeStamp.get(key) != null) {
            produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaTimeStamp.get(key), kafkaKey.get(key), kafkaValue.get(key), kafkaHeaders.get(key));
        } else if (kafkaHeaders.get(key) != null) {
            produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaKey.get(key), kafkaValue.get(key), kafkaHeaders.get(key));
        } else if (kafkaTimeStamp.get(key) != null) {
            produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaTimeStamp.get(key), kafkaKey.get(key), kafkaValue.get(key));
        } else if (kafkaPartition.containsKey(key)) {
            produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaKey.get(key), kafkaValue.get(key));
        } else if (kafkaKey.get(key) != null) {
            produceMessage(kafkaProducerTopic.get(key), kafkaKey.get(key), kafkaValue.get(key));
        } else {
            produceMessage(kafkaProducerTopic.get(key), kafkaValue.get(key));
        }

        Report.updateTestLog(Action, "Message has been produced. ", Status.DONE);

    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Something went wrong in producing the message" + "\n" + ex.getMessage(),
                Status.FAILNS);
        ex.printStackTrace();
    }
}

sendKafkaMessage

Description: This function is used to send the Kafka Record to the Producer Topic.

ObjectName Action Input Condition Reference
Queue 🟢 sendKafkaMessage
@Action(object = ObjectType.KAFKA, desc = "Send Message", input = InputType.NO, condition = InputType.NO)
public void sendKafkaMessage() {
    try {
        createProducer(kafkaValueSerializer.get(key));
        //before.put(key, Instant.now());
        kafkaProducer.get(key).send(kafkaProducerRecord.get(key));
        Report.updateTestLog(Action, "Record sent", Status.DONE);
        kafkaProducer.get(key).close();
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception while sending record", ex);
        Report.updateTestLog(Action, "Error in sending record: " + "\n" + ex.getMessage(), Status.DEBUG);
    } finally {
        clearProducerDetails();
    }
}

setConsumerTopic

Description: This function is used to set the Consumer Topic name

Input Format : @Expected Consumer Topic Name

ObjectName Action Input Condition Reference
Kafka 🟢 setConsumerTopic @value ⬅ Hardcoded Input
Kafka 🟢 setConsumerTopic Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setConsumerTopic %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Topic", input = InputType.YES, condition = InputType.NO)
public void setConsumerTopic() {
    try {
        kafkaConsumerTopic.put(key, Data);
        Report.updateTestLog(Action, "Topic has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Topic setup", ex);
        Report.updateTestLog(Action, "Error in setting Topic: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setConsumerPollRetries

Description: This function is used to set the the number of times the Kafka Consumer will retry the polling of the target record.

Input Format : @Expected number of retries

ObjectName Action Input Condition Reference
Kafka 🟢 setConsumerPollRetries @value ⬅ Hardcoded Input
Kafka 🟢 setConsumerPollRetries Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setConsumerPollRetries %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Retries", input = InputType.YES, condition = InputType.NO)
public void setConsumerPollRetries() {
    try {
        kafkaConsumerPollRetries.put(key, Integer.parseInt(Data));
        Report.updateTestLog(Action, "Poll Retries has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Poll Retries setup", ex);
        Report.updateTestLog(Action, "Error in setting Poll Retries: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setConsumerPollInterval

Description: This function is used to set the interval (in milliseconds) in which the Kafka Consumer will retry the polling of the target record.

Input Format : @Expected interval value (in milliseconds)

ObjectName Action Input Condition Reference
Kafka 🟢 setConsumerPollInterval @value ⬅ Hardcoded Input
Kafka 🟢 setConsumerPollInterval Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setConsumerPollInterval %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Retries", input = InputType.YES, condition = InputType.NO)
public void setConsumerPollInterval() {
    try {
        kafkaConsumerPollDuration.put(key, Long.valueOf(Data));
        Report.updateTestLog(Action, "Poll interval has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Poll interval setup", ex);
        Report.updateTestLog(Action, "Error in setting Poll interval: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setConsumerMaxPollRecords

Description: This function is used to set the maximum number of records that can be retrieved in a single poll operation.

Input Format : @Expected maximum value. Expected value is integer

ObjectName Action Input Condition Reference
Kafka 🟢 setConsumerMaxPollRecords @value ⬅ Hardcoded Input
Kafka 🟢 setConsumerMaxPollRecords Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setConsumerMaxPollRecords %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Max Poll Records", input = InputType.YES, condition = InputType.NO)
public void setConsumerMaxPollRecords() {
    try {
        kafkaConsumerMaxPollRecords.put(key, Integer.valueOf(Data));
        Report.updateTestLog(Action, "Max Poll Records has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception Max Poll Record setup", ex);
        Report.updateTestLog(Action, "Error in setting Max Poll Records: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setConsumerGroupId

Description: This function is used to set the Group Id for the Kafka Consumer.

Input Format : @Expected Group Id

ObjectName Action Input Condition Reference
Kafka 🟢 setConsumerGroupId @value ⬅ Hardcoded Input
Kafka 🟢 setConsumerGroupId Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setConsumerGroupId %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Consumer GroupId", input = InputType.YES, condition = InputType.NO)
public void setConsumerGroupId() {
    try {
        kafkaConsumerGroupId.put(key, Data);
        Report.updateTestLog(Action, "Consumer GroupId has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Consumer GroupId setup", ex);
        Report.updateTestLog(Action, "Error in setting Consumer GroupId: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

setValueDeserializer

Description: This function is used to set the Value Deserializer for the Kafka Consumer.

Input Format : @Expected Value Deserializer. Expected values are : string / bytearray / avro

ObjectName Action Input Condition Reference
Kafka 🟢 setValueDeserializer @value ⬅ Hardcoded Input
Kafka 🟢 setValueDeserializer Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 setValueDeserializer %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Set Value Deserializer", input = InputType.YES, condition = InputType.NO)
public void setValueDeserializer() {
    try {
        kafkaValueDeserializer.put(key, Data);
        Report.updateTestLog(Action, "Value Deserializer has been set successfully", Status.DONE);
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Value Deserializer setup", ex);
        Report.updateTestLog(Action, "Error in setting Value Deserializer: " + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

consumeKafkaMessage

Description: This function is used to consume the Kafka Consumer Record.

ObjectName Action Input Condition Reference
Queue 🟢 consumeKafkaMessage
@Action(object = ObjectType.KAFKA, desc = "Consume Kafka Message", input = InputType.NO)
public void consumeKafkaMessage() {

    createConsumer(kafkaValueDeserializer.get(key));
    kafkaConsumer.get(key).subscribe(Arrays.asList(kafkaConsumerTopic.get(key)));
    try {
        ConsumerRecord record = pollKafkaConsumer();
        if (record != null) {
            Report.updateTestLog(Action, "Kafka message consumed successfully. ", Status.DONE);
        } else {
            Report.updateTestLog(Action, "Kafka message not received. ", Status.FAIL);
        }
    } catch (Exception e) {
        e.printStackTrace();
        Report.updateTestLog(Action, "Error while consuming Kafka message: " + e.getMessage(), Status.FAIL);
    } finally {
        kafkaConsumer.get(key).close();
    }
}

identifyTargetMessage

Description: This function is used to locate a specific Kafka message from the consumed records based on the unique identifier provided.

Input Format : @Expected value

Condition Format: JSONPath or XPath of the unique identifier

ObjectName Action Input Condition Reference
Kafka 🟢 identifyTargetMessage @value JSONPath or XPath ⬅ Hardcoded Input
Kafka 🟢 identifyTargetMessage Sheet:Column JSONPath or XPath ⬅ Input from Datasheet
Kafka 🟢 identifyTargetMessage %dynamicVar% JSONPath or XPath ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Identify target message", input = InputType.YES, condition = InputType.YES)
public void identifyTargetMessage() {
    try {
        kafkaRecordIdentifierValue.put(key, Data);
        kafkaRecordIdentifierPath.put(key, Condition);
        Report.updateTestLog(Action,
                "Target message set with tag value as [" + Data + "] and tag path as [" + Condition + "].",
                Status.DONE);
    } catch (Exception e) {
        Report.updateTestLog(Action, "Error in target message setup : " + e.getMessage(), Status.FAIL);
    }
}

storeKafkaXMLtagInDataSheet

Description: This function is used to store a certain XML tag value into a respective column of a given datasheet.

Input Format : @Expected datasheet name:column name

Condition Format: XPath

ObjectName Action Input Condition Reference
Kafka 🟢 storeKafkaXMLtagInDataSheet Sheet:Column XPath ⬅ Datasheet to where value is supposed br stored

Note: Ensure that your data sheet doesn't contain column names with spaces.

@Action(object = ObjectType.KAFKA, desc = "Store XML tag In DataSheet ", input = InputType.YES, condition = InputType.NO)
public void storeKafkaXMLtagInDataSheet() {

    try {
        String strObj = Input;
        if (strObj.matches(".*:.*")) {
            try {
                System.out.println("Updating value in SubIteration " + userData.getSubIteration());
                String sheetName = strObj.split(":", 2)[0];
                String columnName = strObj.split(":", 2)[1];
                String xmlText = kafkaConsumerRecord.get(key).value().toString();
                DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
                DocumentBuilder dBuilder;
                InputSource inputSource = new InputSource();
                inputSource.setCharacterStream(new StringReader(xmlText));
                dBuilder = dbFactory.newDocumentBuilder();
                Document doc = dBuilder.parse(inputSource);
                doc.getDocumentElement().normalize();
                XPath xPath = XPathFactory.newInstance().newXPath();
                String expression = Condition;
                String value = (String) xPath.compile(expression).evaluate(doc);
                userData.putData(sheetName, columnName, value);
                Report.updateTestLog(Action, "Element text [" + value + "] is stored in " + strObj, Status.DONE);
            } catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
                    | SAXException ex) {
                Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
                Report.updateTestLog(Action, "Error Storing XML element in datasheet :" + "\n" + ex.getMessage(),
                        Status.DEBUG);
            }
        } else {
            Report.updateTestLog(Action,
                    "Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
                    Status.DEBUG);
        }
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error Storing XML element in datasheet :" + "\n" + ex.getMessage(),
                Status.DEBUG);
    }
}

assertKafkaXMLtagEquals

Description: This function is used to validate whether a certain XML tag equals an expected text or not.

Input Format : @Expected Text

Condition Format : XPath

ObjectName Action Input Condition Reference
Kafka 🟢 assertKafkaXMLtagEquals @value XPath ⬅ Hardcoded Input
Kafka 🟢 assertKafkaXMLtagEquals Sheet:Column XPath ⬅ Input from Datasheet
Kafka 🟢 assertKafkaXMLtagEquals %dynamicVar% XPath ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Assert XML Tag Equals ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaXMLtagEquals() {

    try {
        DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
        DocumentBuilder dBuilder;
        InputSource inputSource = new InputSource();
        inputSource.setCharacterStream(new StringReader(kafkaConsumerRecord.get(key).value().toString()));
        dBuilder = dbFactory.newDocumentBuilder();
        Document doc = dBuilder.parse(inputSource);
        doc.getDocumentElement().normalize();
        XPath xPath = XPathFactory.newInstance().newXPath();
        String expression = Condition;
        String value = (String) xPath.compile(expression).evaluate(doc);
        if (value.equals(Data)) {
            Report.updateTestLog(Action, "Element text [" + value + "] is as expected", Status.PASSNS);
        } else {
            Report.updateTestLog(Action, "Element text [" + value + "] is not as expected", Status.FAILNS);
        }
    } catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
            | SAXException ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error validating XML element :" + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

assertKafkaXMLtagContains

Description: This function is used to validate whether a certain XML tag contains an expected text or not.

Input Format : @Expected Text

Condition Format : XPath

ObjectName Action Input Condition Reference
Kafka 🟢 assertKafkaXMLtagContains @value XPath ⬅ Hardcoded Input
Kafka 🟢 assertKafkaXMLtagContains Sheet:Column XPath ⬅ Input from Datasheet
Kafka 🟢 assertKafkaXMLtagContains %dynamicVar% XPath ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Assert XML Tag Contains ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaXMLtagContains() {

    try {
        DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
        DocumentBuilder dBuilder;
        InputSource inputSource = new InputSource();
        inputSource.setCharacterStream(new StringReader(kafkaConsumerRecord.get(key).value().toString()));
        dBuilder = dbFactory.newDocumentBuilder();
        Document doc = dBuilder.parse(inputSource);
        doc.getDocumentElement().normalize();
        XPath xPath = XPathFactory.newInstance().newXPath();
        String expression = Condition;
        String value = (String) xPath.compile(expression).evaluate(doc);
        if (value.contains(Data)) {
            Report.updateTestLog(Action, "Element text contains [" + Data + "] is as expected", Status.PASSNS);
        } else {
            Report.updateTestLog(Action, "Element text [" + value + "] does not contain [" + Data + "]",
                    Status.FAILNS);
        }
    } catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
            | SAXException ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error validating XML element :" + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

storeKafkaResponseInDataSheet

Description: This function is used to store the Kafka message into a respective column of a given datasheet.

Input Format : @Expected datasheet name:column name

Condition Format: XPath

ObjectName Action Input Condition Reference
Kafka 🟢 storeKafkaResponseInDataSheet Sheet:Column XPath ⬅ Datasheet to where value is supposed br stored

Note: Ensure that your data sheet doesn't contain column names with spaces.

@Action(object = ObjectType.KAFKA, desc = "Store Response In DataSheet ", input = InputType.YES, condition = InputType.NO)
public void storeKafkaResponseInDataSheet() {

    try {
        String strObj = Input;
        if (strObj.matches(".*:.*")) {
            try {
                System.out.println("Updating value in SubIteration " + userData.getSubIteration());
                String sheetName = strObj.split(":", 2)[0];
                String columnName = strObj.split(":", 2)[1];
                String response = kafkaConsumerRecord.get(key).value().toString();
                userData.putData(sheetName, columnName, response);
                Report.updateTestLog(Action, "Response is stored in " + strObj, Status.DONE);
            } catch (Exception ex) {
                Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
                Report.updateTestLog(Action, "Error storing Response in datasheet :" + "\n" + ex.getMessage(),
                        Status.DEBUG);
            }
        } else {
            Report.updateTestLog(Action,
                    "Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
                    Status.DEBUG);
        }
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error storing Response in datasheet :" + "\n" + ex.getMessage(),
                Status.DEBUG);
    }
}

assertKafkaResponseMessageContains

Description: This function is used to validate whether Kafka message contains an expected text or not.

Input Format : @Expected Text

ObjectName Action Input Condition Reference
Kafka 🟢 assertKafkaResponseMessageContains @value ⬅ Hardcoded Input
Kafka 🟢 assertKafkaResponseMessageContains Sheet:Column ⬅ Input from Datasheet
Kafka 🟢 assertKafkaResponseMessageContains %dynamicVar% ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Assert Response Message contains ", input = InputType.YES)
public void assertKafkaResponseMessageContains() {
    try {
        if (kafkaConsumerRecord.get(key).value().toString().contains(Data)) {
            Report.updateTestLog(Action, "Response Message contains : " + Data, Status.PASSNS);
        } else {
            Report.updateTestLog(Action, "Response Message does not contain : " + Data, Status.FAILNS);
        }
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error in validating response body :" + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

assertKafkaJSONtagEquals

Description: This function is used to validate whether a certain JSON tag equals an expected text or not.

Input Format : @Expected Text

Condition Format : JSON Path of the tag

ObjectName Action Input Condition Reference
Kafka 🟢 assertKafkaJSONtagEquals @value JSONPath ⬅ Hardcoded Input
Kafka 🟢 assertKafkaJSONtagEquals Sheet:Column JSONPath ⬅ Input from Datasheet
Kafka 🟢 assertKafkaJSONtagEquals %dynamicVar% JSONPath ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Assert JSON Tag Equals ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaJSONtagEquals() {
    try {
        String response = kafkaConsumerRecord.get(key).value().toString();
        String jsonpath = Condition;
        String value = JsonPath.read(response, jsonpath).toString();
        if (value.equals(Data)) {
            Report.updateTestLog(Action, "Element text [" + value + "] is as expected", Status.PASSNS);
        } else {
            Report.updateTestLog(Action, "Element text is [" + value + "] but is expected to be [" + Data + "]",
                    Status.FAILNS);
        }
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error in validating JSON element :" + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

assertKafkaJSONtagContains

Description: This function is used to validate whether a certain JSON tag contains an expected text or not.

Input Format : @Expected Text

Condition Format : JSON Path of the tag

ObjectName Action Input Condition Reference
Kafka 🟢 assertKafkaJSONtagContains @value JSONPath ⬅ Hardcoded Input
Kafka 🟢 assertKafkaJSONtagContains Sheet:Column JSONPath ⬅ Input from Datasheet
Kafka 🟢 assertKafkaJSONtagContains %dynamicVar% JSONPath ⬅ Input from variable
@Action(object = ObjectType.KAFKA, desc = "Assert JSON Tag Contains ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaJSONtagContains() {
    try {
        String response = kafkaConsumerRecord.get(key).value().toString();
        String jsonpath = Condition;
        String value = JsonPath.read(response, jsonpath).toString();
        if (value.contains(Data)) {
            Report.updateTestLog(Action, "Element text contains [" + Data + "] is as expected", Status.PASSNS);
        } else {
            Report.updateTestLog(Action, "Element text [" + value + "] does not contain [" + Data + "]",
                    Status.FAILNS);
        }
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error in validating JSON element :" + "\n" + ex.getMessage(), Status.DEBUG);
    }
}

storeKafkaJSONtagInDataSheet

Description: This function is used to store a certain JSON tag value into a respective column of a given datasheet.

Input Format : @Expected datasheet name:column name

Condition Format: JSONPath of the tag

ObjectName Action Input Condition Reference
Kafka 🟢 storeKafkaJSONtagInDataSheet Sheet:Column JSONPath ⬅ Datasheet to where value is supposed br stored

Note: Ensure that your data sheet doesn't contain column names with spaces.

@Action(object = ObjectType.KAFKA, desc = "Store JSON Tag In DataSheet ", input = InputType.YES, condition = InputType.YES)
public void storeKafkaJSONtagInDataSheet() {

    try {
        String strObj = Input;
        if (strObj.matches(".*:.*")) {
            try {
                System.out.println("Updating value in SubIteration " + userData.getSubIteration());
                String sheetName = strObj.split(":", 2)[0];
                String columnName = strObj.split(":", 2)[1];
                String response = kafkaConsumerRecord.get(key).value().toString();
                String jsonpath = Condition;
                String value = JsonPath.read(response, jsonpath).toString();
                userData.putData(sheetName, columnName, value);
                Report.updateTestLog(Action, "Element text [" + value + "] is stored in " + strObj, Status.DONE);
            } catch (Exception ex) {
                Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
                Report.updateTestLog(Action, "Error Storing JSON element in datasheet :" + "\n" + ex.getMessage(),
                        Status.DEBUG);
            }
        } else {
            Report.updateTestLog(Action,
                    "Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
                    Status.DEBUG);
        }
    } catch (Exception ex) {
        Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
        Report.updateTestLog(Action, "Error Storing JSON element in datasheet :" + "\n" + ex.getMessage(),
                Status.DEBUG);
    }
}

closeConsumer

Description: This function is used to gracefully shut down a Kafka consumer.

ObjectName Action Input Condition Reference
Kafka 🟢 closeConsumer
@Action(object = ObjectType.KAFKA, desc = "Close Consumer", input = InputType.NO, condition = InputType.NO)
public void closeConsumer() {
    try {
        kafkaConsumerRecords.remove(key);
        kafkaConsumerRecord.remove(key);
        kafkaConsumeRecordValue.remove(key);
        kafkaConsumerPollDuration.remove(key);
        kafkaConsumerPollRetries.remove(key);
        kafkaConsumerTopic.remove(key);
        kafkaValueDeserializer.remove(key);
        kafkaSchemaRegistryURL.remove(key);
        kafkaSharedSecret.remove(key);
        kafkaConsumerGroupId.remove(key);
        kafkaConsumerPollRecord.remove(key);
        kafkaRecordIdentifierValue.remove(key);
        kafkaRecordIdentifierPath.remove(key);
        Report.updateTestLog(Action, "Consumer closed successfully", Status.DONE);
    } catch (Exception ex) {
        Report.updateTestLog(Action, "Error in closing Consumer.", Status.DEBUG);
    }

}