Kafka Operations
Description: This function is used to set the bootstrap servers for establishing the kafka connection.
Input Format : @Expected server details
ObjectName | Action | Input | Condition | Reference | |
Kafka | setBootstrapServers |
@value | |||
Kafka | setBootstrapServers |
Sheet:Column | |||
Kafka | setBootstrapServers |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Bootstrap Servers", input = InputType.YES, condition = InputType.NO)
public void setBootstrapServers() {
try {
kafkaServers.put(key, Data);
Report.updateTestLog(Action, "Bootstrap Servers have been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Bootstrap Servers setup", ex);
Report.updateTestLog(Action, "Error in setting Bootstrap Servers: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Producer Topic name
Input Format : @Expected Producer Topic Name
ObjectName | Action | Input | Condition | Reference | |
Kafka | setProducerTopic |
@value | |||
Kafka | setProducerTopic |
Sheet:Column | |||
Kafka | setProducerTopic |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Producer Topic", input = InputType.YES, condition = InputType.NO)
public void setProducerTopic() {
try {
kafkaProducerTopic.put(key, Data);
Report.updateTestLog(Action, "Topic has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Topic setup", ex);
Report.updateTestLog(Action, "Error in setting Topic: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the key for the Kafka connection.
Input Format : @Expected Key
ObjectName | Action | Input | Condition | Reference | |
Kafka | setKey |
@value | |||
Kafka | setKey |
Sheet:Column | |||
Kafka | setKey |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Key", input = InputType.YES, condition = InputType.NO)
public void setKey() {
try {
kafkaKey.put(key, Data);
Report.updateTestLog(Action, "Key has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Key setup", ex);
Report.updateTestLog(Action, "Error in setting Key: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Key Serializer for the Kafka Producer.
Input Format : @Expected Key Serializer. Expected values are : string
/ bytearray
/ avro
ObjectName | Action | Input | Condition | Reference | |
Kafka | setKeySerializer |
@value | |||
Kafka | setKeySerializer |
Sheet:Column | |||
Kafka | setKeySerializer |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Key Serializer", input = InputType.YES, condition = InputType.NO)
public void setKeySerializer() {
try {
kafkaKeySerializer.put(key, Data);
Report.updateTestLog(Action, "Key Serializer has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Key Serializer setup", ex);
Report.updateTestLog(Action, "Error in setting Key Serializer: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Value Serializer for the Kafka Producer.
Input Format : @Expected Value Serializer. Expected values are : string
/ bytearray
/ avro
ObjectName | Action | Input | Condition | Reference | |
Kafka | setValueSerializer |
@value | |||
Kafka | setValueSerializer |
Sheet:Column | |||
Kafka | setValueSerializer |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Value Serializer", input = InputType.YES, condition = InputType.NO)
public void setValueSerializer() {
try {
kafkaValueSerializer.put(key, Data);
Report.updateTestLog(Action, "Value Serializer has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Value Serializer setup", ex);
Report.updateTestLog(Action, "Error in setting Value Serializer: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Schema Registry URL (mostly used when firing Avro
serialized value).
Input Format : @Expected Schema Registry URL
ObjectName | Action | Input | Condition | Reference | |
Kafka | setSchemaRegistryURL |
@value | |||
Kafka | setSchemaRegistryURL |
Sheet:Column | |||
Kafka | setSchemaRegistryURL |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Schema Registry URL", input = InputType.YES, condition = InputType.NO)
public void setSchemaRegistryURL() {
try {
kafkaSchemaRegistryURL.put(key, Data);
Report.updateTestLog(Action, "Schema Registry URL has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Schema Registry URL setup", ex);
Report.updateTestLog(Action, "Error in setting Schema Registry URL: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Partition for the Kafka Producer.
Input Format : @Expected Partition value. Either set an Integer
or just set null
ObjectName | Action | Input | Condition | Reference | |
Kafka | setPartition |
@value | |||
Kafka | setPartition |
Sheet:Column | |||
Kafka | setPartition |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Partition", input = InputType.YES, condition = InputType.NO)
public void setPartition() {
try {
if (Data.toLowerCase().equals("null")) {
kafkaPartition.put(key, null);
} else {
kafkaPartition.put(key, Integer.valueOf(Data));
Report.updateTestLog(Action, "Partition has been set successfully", Status.DONE);
} catch (NumberFormatException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Partition setup", ex);
Report.updateTestLog(Action, "Error in setting Partition: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Time Stamp for the Kafka Producer. This sets the System.currentTimeMillis()
to the producer record.
ObjectName | Action | Input | Condition | Reference | |
Kafka | setTimeStamp |
@Action(object = ObjectType.KAFKA, desc = "Set TimeStamp", input = InputType.NO, condition = InputType.NO)
public void setTimeStamp() {
try {
kafkaTimeStamp.put(key, System.currentTimeMillis());
Report.updateTestLog(Action, "Time Stamp has been set successfully", Status.DONE);
} catch (NumberFormatException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Time Stamp setup", ex);
Report.updateTestLog(Action, "Error in setting Time Stamp: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to add a header to the Kafka Producer Record.
Input Format : @HeaderName
ObjectName | Action | Input | Condition | Reference | |
Kafka | addHeader |
@HeaderName =HeaderValue |
Kafka | addHeader |
Sheet:Column containing HeaderName =HeaderValue |
Kafka | addHeader |
%dynamicVar% containing HeaderName =HeaderValue |
Kafka | addHeader |
@HeaderName ={SheetName:ColumnName} |
Kafka | addHeader |
@HeaderName =%dynamicVar% |
This function adds all the Headers into a HashMap headerlist
. Then those are applied to the request as :
@Action(object = ObjectType.KAFKA, desc = "Add Kafka Header", input = InputType.YES)
public void addKafkaHeader() {
try {
List<String> sheetlist = Control.getCurrentProject().getTestData().getTestDataFor(Control.exe.runEnv())
for (int sheet = 0; sheet < sheetlist.size(); sheet++) {
if (Data.contains("{" + sheetlist.get(sheet) + ":")) {
com.ing.datalib.testdata.model.TestDataModel tdModel = Control.getCurrentProject().getTestData()
List<String> columns = tdModel.getColumns();
for (int col = 0; col < columns.size(); col++) {
if (Data.contains("{" + sheetlist.get(sheet) + ":" + columns.get(col) + "}")) {
Data = Data.replace("{" + sheetlist.get(sheet) + ":" + columns.get(col) + "}",
userData.getData(sheetlist.get(sheet), columns.get(col)));
Collection<Object> valuelist = Control.getCurrentProject().getProjectSettings().getUserDefinedSettings()
for (Object prop : valuelist) {
if (Data.contains("{" + prop + "}")) {
Data = Data.replace("{" + prop + "}", prop.toString());
String headerKey = Data.split("=", 2)[0];
String headerValue = Data.split("=", 2)[1];
if (kafkaHeaders.containsKey(key)) {
kafkaHeaders.get(key).add(new RecordHeader(headerKey, headerValue.getBytes()));
} else {
ArrayList<Header> toBeAdded = new ArrayList<Header>();
toBeAdded.add(new RecordHeader(headerKey, headerValue.getBytes()));
kafkaHeaders.put(key, toBeAdded);
Report.updateTestLog(Action, "Header added " + Data, Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error adding Header :" + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to Produce the Kafka Producer Record.
Input Format : @Expected Payload
ObjectName | Action | Input | Condition | Reference | |
Kafka | produceMessage |
@Payload (from Editor) | |||
Kafka | produceMessage |
Sheet:Column | |||
Kafka | produceMessage |
%dynamicVar% |
Inputs in the Input column can be either hardcoded
, passed inside the Payload editor which is capable of parameterising the Payload (Press ctrl+space to see the list of variables available ), passed from the data sheet (datasheet name : column name
) or passed from a variable value (%variable name%
), as given in the above example.
@Action(object = ObjectType.KAFKA, desc = "Produce Kafka Message", input = InputType.YES, condition = InputType.NO)
public void produceMessage() {
try {
String value = Data;
value = handleDataSheetVariables(value);
value = handleuserDefinedVariables(value);
kafkaValue.put(key, value);
if (kafkaHeaders.get(key) != null && kafkaTimeStamp.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaTimeStamp.get(key), kafkaKey.get(key), kafkaValue.get(key), kafkaHeaders.get(key));
} else if (kafkaHeaders.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaKey.get(key), kafkaValue.get(key), kafkaHeaders.get(key));
} else if (kafkaTimeStamp.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaTimeStamp.get(key), kafkaKey.get(key), kafkaValue.get(key));
} else if (kafkaPartition.containsKey(key)) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaKey.get(key), kafkaValue.get(key));
} else if (kafkaKey.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaKey.get(key), kafkaValue.get(key));
} else {
produceMessage(kafkaProducerTopic.get(key), kafkaValue.get(key));
Report.updateTestLog(Action, "Message has been produced. ", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Something went wrong in producing the message" + "\n" + ex.getMessage(),
Description: This function is used to send the Kafka Record to the Producer Topic.
ObjectName | Action | Input | Condition | Reference |
Queue | sendKafkaMessage |
@Action(object = ObjectType.KAFKA, desc = "Send Message", input = InputType.NO, condition = InputType.NO)
public void sendKafkaMessage() {
try {
//before.put(key, Instant.now());
Report.updateTestLog(Action, "Record sent", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception while sending record", ex);
Report.updateTestLog(Action, "Error in sending record: " + "\n" + ex.getMessage(), Status.DEBUG);
} finally {
Description: This function is used to set the Consumer Topic name
Input Format : @Expected Consumer Topic Name
ObjectName | Action | Input | Condition | Reference | |
Kafka | setConsumerTopic |
@value | |||
Kafka | setConsumerTopic |
Sheet:Column | |||
Kafka | setConsumerTopic |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Topic", input = InputType.YES, condition = InputType.NO)
public void setConsumerTopic() {
try {
kafkaConsumerTopic.put(key, Data);
Report.updateTestLog(Action, "Topic has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Topic setup", ex);
Report.updateTestLog(Action, "Error in setting Topic: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the the number of times the Kafka Consumer will retry the polling of the target record.
Input Format : @Expected number of retries
ObjectName | Action | Input | Condition | Reference | |
Kafka | setConsumerPollRetries |
@value | |||
Kafka | setConsumerPollRetries |
Sheet:Column | |||
Kafka | setConsumerPollRetries |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Retries", input = InputType.YES, condition = InputType.NO)
public void setConsumerPollRetries() {
try {
kafkaConsumerPollRetries.put(key, Integer.parseInt(Data));
Report.updateTestLog(Action, "Poll Retries has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Poll Retries setup", ex);
Report.updateTestLog(Action, "Error in setting Poll Retries: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the interval (in milliseconds) in which the Kafka Consumer will retry the polling of the target record.
Input Format : @Expected interval value (in milliseconds)
ObjectName | Action | Input | Condition | Reference | |
Kafka | setConsumerPollInterval |
@value | |||
Kafka | setConsumerPollInterval |
Sheet:Column | |||
Kafka | setConsumerPollInterval |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Retries", input = InputType.YES, condition = InputType.NO)
public void setConsumerPollInterval() {
try {
kafkaConsumerPollDuration.put(key, Long.valueOf(Data));
Report.updateTestLog(Action, "Poll interval has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Poll interval setup", ex);
Report.updateTestLog(Action, "Error in setting Poll interval: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Group Id for the Kafka Consumer.
Input Format : @Expected Group Id
ObjectName | Action | Input | Condition | Reference | |
Kafka | setConsumerGroupId |
@value | |||
Kafka | setConsumerGroupId |
Sheet:Column | |||
Kafka | setConsumerGroupId |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer GroupId", input = InputType.YES, condition = InputType.NO)
public void setConsumerGroupId() {
try {
kafkaConsumerGroupId.put(key, Data);
Report.updateTestLog(Action, "Consumer GroupId has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Consumer GroupId setup", ex);
Report.updateTestLog(Action, "Error in setting Consumer GroupId: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to set the Value Deserializer for the Kafka Consumer.
Input Format : @Expected Value Deserializer. Expected values are : string
/ bytearray
/ avro
ObjectName | Action | Input | Condition | Reference | |
Kafka | setValueDeserializer |
@value | |||
Kafka | setValueDeserializer |
Sheet:Column | |||
Kafka | setValueDeserializer |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Value Deserializer", input = InputType.YES, condition = InputType.NO)
public void setValueDeserializer() {
try {
kafkaValueDeserializer.put(key, Data);
Report.updateTestLog(Action, "Value Deserializer has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Value Deserializer setup", ex);
Report.updateTestLog(Action, "Error in setting Value Deserializer: " + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to consume the Kafka Consumer Record.
ObjectName | Action | Input | Condition | Reference |
Queue | consumeKafkaMessage |
@Action(object = ObjectType.KAFKA, desc = "Consume Kafka Message", input = InputType.NO)
public void consumeKafkaMessage() {
try {
ConsumerRecord record = pollKafkaConsumer();
if (record != null) {
Report.updateTestLog(Action, "Kafka message consumed successfully. ", Status.DONE);
} else {
Report.updateTestLog(Action, "Kafka message not received. ", Status.FAIL);
} catch (Exception e) {
Report.updateTestLog(Action, "Error while consuming Kafka message: " + e.getMessage(), Status.FAIL);
} finally {
Description: This function is used to store a certain XML tag value into a respective column of a given datasheet.
Input Format : @Expected datasheet name:column name
Condition Format: XPath
ObjectName | Action | Input | Condition | Reference | |
Kafka | storeKafkaXMLtagInDataSheet |
Sheet:Column | XPath |
Note: Ensure that your data sheet doesn't contain column names with spaces.
@Action(object = ObjectType.KAFKA, desc = "Store XML tag In DataSheet ", input = InputType.YES, condition = InputType.NO)
public void storeKafkaXMLtagInDataSheet() {
try {
String strObj = Input;
if (strObj.matches(".*:.*")) {
try {
System.out.println("Updating value in SubIteration " + userData.getSubIteration());
String sheetName = strObj.split(":", 2)[0];
String columnName = strObj.split(":", 2)[1];
String xmlText = kafkaConsumerRecord.get(key).value().toString();
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder;
InputSource inputSource = new InputSource();
inputSource.setCharacterStream(new StringReader(xmlText));
dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(inputSource);
XPath xPath = XPathFactory.newInstance().newXPath();
String expression = Condition;
String value = (String) xPath.compile(expression).evaluate(doc);
userData.putData(sheetName, columnName, value);
Report.updateTestLog(Action, "Element text [" + value + "] is stored in " + strObj, Status.DONE);
} catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
| SAXException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
Report.updateTestLog(Action, "Error Storing XML element in datasheet :" + "\n" + ex.getMessage(),
} else {
"Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error Storing XML element in datasheet :" + "\n" + ex.getMessage(),
Description: This function is used to validate whether a certain XML tag equals an expected text or not.
Input Format : @Expected Text
Condition Format : XPath
ObjectName | Action | Input | Condition | Reference | |
Kafka | assertKafkaXMLtagEquals |
@value | XPath | ||
Kafka | assertKafkaXMLtagEquals |
Sheet:Column | XPath | ||
Kafka | assertKafkaXMLtagEquals |
%dynamicVar% | XPath |
@Action(object = ObjectType.KAFKA, desc = "Assert XML Tag Equals ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaXMLtagEquals() {
try {
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder;
InputSource inputSource = new InputSource();
inputSource.setCharacterStream(new StringReader(kafkaConsumerRecord.get(key).value().toString()));
dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(inputSource);
XPath xPath = XPathFactory.newInstance().newXPath();
String expression = Condition;
String value = (String) xPath.compile(expression).evaluate(doc);
if (value.equals(Data)) {
Report.updateTestLog(Action, "Element text [" + value + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text [" + value + "] is not as expected", Status.FAILNS);
} catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
| SAXException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error validating XML element :" + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to validate whether a certain XML tag contains an expected text or not.
Input Format : @Expected Text
Condition Format : XPath
ObjectName | Action | Input | Condition | Reference | |
Kafka | assertKafkaXMLtagContains |
@value | XPath | ||
Kafka | assertKafkaXMLtagContains |
Sheet:Column | XPath | ||
Kafka | assertKafkaXMLtagContains |
%dynamicVar% | XPath |
@Action(object = ObjectType.KAFKA, desc = "Assert XML Tag Contains ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaXMLtagContains() {
try {
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder;
InputSource inputSource = new InputSource();
inputSource.setCharacterStream(new StringReader(kafkaConsumerRecord.get(key).value().toString()));
dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(inputSource);
XPath xPath = XPathFactory.newInstance().newXPath();
String expression = Condition;
String value = (String) xPath.compile(expression).evaluate(doc);
if (value.contains(Data)) {
Report.updateTestLog(Action, "Element text contains [" + Data + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text [" + value + "] does not contain [" + Data + "]",
} catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
| SAXException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error validating XML element :" + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to store the Kafka message into a respective column of a given datasheet.
Input Format : @Expected datasheet name:column name
Condition Format: XPath
ObjectName | Action | Input | Condition | Reference | |
Kafka | storeKafkaResponseInDataSheet |
Sheet:Column | XPath |
Note: Ensure that your data sheet doesn't contain column names with spaces.
@Action(object = ObjectType.KAFKA, desc = "Store Response In DataSheet ", input = InputType.YES, condition = InputType.NO)
public void storeKafkaResponseInDataSheet() {
try {
String strObj = Input;
if (strObj.matches(".*:.*")) {
try {
System.out.println("Updating value in SubIteration " + userData.getSubIteration());
String sheetName = strObj.split(":", 2)[0];
String columnName = strObj.split(":", 2)[1];
String response = kafkaConsumerRecord.get(key).value().toString();
userData.putData(sheetName, columnName, response);
Report.updateTestLog(Action, "Response is stored in " + strObj, Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
Report.updateTestLog(Action, "Error storing Response in datasheet :" + "\n" + ex.getMessage(),
} else {
"Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error storing Response in datasheet :" + "\n" + ex.getMessage(),
Description: This function is used to validate whether Kafka message contains an expected text or not.
Input Format : @Expected Text
ObjectName | Action | Input | Condition | Reference | |
Kafka | assertKafkaResponseMessageContains |
@value | |||
Kafka | assertKafkaResponseMessageContains |
Sheet:Column | |||
Kafka | assertKafkaResponseMessageContains |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Assert Response Message contains ", input = InputType.YES)
public void assertKafkaResponseMessageContains() {
try {
if (kafkaConsumerRecord.get(key).value().toString().contains(Data)) {
Report.updateTestLog(Action, "Response Message contains : " + Data, Status.PASSNS);
} else {
Report.updateTestLog(Action, "Response Message does not contain : " + Data, Status.FAILNS);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error in validating response body :" + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to validate whether a certain JSON tag equals an expected text or not.
Input Format : @Expected Text
Condition Format : JSON Path of the tag
ObjectName | Action | Input | Condition | Reference | |
Kafka | assertKafkaJSONtagEquals |
@value | JSONPath | ||
Kafka | assertKafkaJSONtagEquals |
Sheet:Column | JSONPath | ||
Kafka | assertKafkaJSONtagEquals |
%dynamicVar% | JSONPath |
@Action(object = ObjectType.KAFKA, desc = "Assert JSON Tag Equals ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaJSONtagEquals() {
try {
String response = kafkaConsumerRecord.get(key).value().toString();
String jsonpath = Condition;
String value = JsonPath.read(response, jsonpath).toString();
if (value.equals(Data)) {
Report.updateTestLog(Action, "Element text [" + value + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text is [" + value + "] but is expected to be [" + Data + "]",
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error in validating JSON element :" + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to validate whether a certain JSON tag contains an expected text or not.
Input Format : @Expected Text
Condition Format : JSON Path of the tag
ObjectName | Action | Input | Condition | Reference | |
Kafka | assertKafkaJSONtagContains |
@value | JSONPath | ||
Kafka | assertKafkaJSONtagContains |
Sheet:Column | JSONPath | ||
Kafka | assertKafkaJSONtagContains |
%dynamicVar% | JSONPath |
@Action(object = ObjectType.KAFKA, desc = "Assert JSON Tag Contains ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaJSONtagContains() {
try {
String response = kafkaConsumerRecord.get(key).value().toString();
String jsonpath = Condition;
String value = JsonPath.read(response, jsonpath).toString();
if (value.contains(Data)) {
Report.updateTestLog(Action, "Element text contains [" + Data + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text [" + value + "] does not contain [" + Data + "]",
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error in validating JSON element :" + "\n" + ex.getMessage(), Status.DEBUG);
Description: This function is used to store a certain JSON tag value into a respective column of a given datasheet.
Input Format : @Expected datasheet name:column name
Condition Format: JSONPath of the tag
ObjectName | Action | Input | Condition | Reference | |
Kafka | storeKafkaJSONtagInDataSheet |
Sheet:Column | JSONPath |
Note: Ensure that your data sheet doesn't contain column names with spaces.
@Action(object = ObjectType.KAFKA, desc = "Store JSON Tag In DataSheet ", input = InputType.YES, condition = InputType.YES)
public void storeKafkaJSONtagInDataSheet() {
try {
String strObj = Input;
if (strObj.matches(".*:.*")) {
try {
System.out.println("Updating value in SubIteration " + userData.getSubIteration());
String sheetName = strObj.split(":", 2)[0];
String columnName = strObj.split(":", 2)[1];
String response = kafkaConsumerRecord.get(key).value().toString();
String jsonpath = Condition;
String value = JsonPath.read(response, jsonpath).toString();
userData.putData(sheetName, columnName, value);
Report.updateTestLog(Action, "Element text [" + value + "] is stored in " + strObj, Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
Report.updateTestLog(Action, "Error Storing JSON element in datasheet :" + "\n" + ex.getMessage(),
} else {
"Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error Storing JSON element in datasheet :" + "\n" + ex.getMessage(),