Kafka Operations
setBootstrapServers
Description: This function is used to set the bootstrap servers for establishing the kafka connection.
Input Format : @Expected server details
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setBootstrapServers |
@value | |||
Kafka | setBootstrapServers |
Sheet:Column | |||
Kafka | setBootstrapServers |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Bootstrap Servers", input = InputType.YES, condition = InputType.NO)
public void setBootstrapServers() {
try {
kafkaServers.put(key, Data);
Report.updateTestLog(Action, "Bootstrap Servers have been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Bootstrap Servers setup", ex);
Report.updateTestLog(Action, "Error in setting Bootstrap Servers: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setProducerTopic
Description: This function is used to set the Producer Topic name
Input Format : @Expected Producer Topic Name
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setProducerTopic |
@value | |||
Kafka | setProducerTopic |
Sheet:Column | |||
Kafka | setProducerTopic |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Producer Topic", input = InputType.YES, condition = InputType.NO)
public void setProducerTopic() {
try {
kafkaProducerTopic.put(key, Data);
Report.updateTestLog(Action, "Topic has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Topic setup", ex);
Report.updateTestLog(Action, "Error in setting Topic: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setKey
Description: This function is used to set the key for the Kafka connection.
Input Format : @Expected Key
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setKey |
@value | |||
Kafka | setKey |
Sheet:Column | |||
Kafka | setKey |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Key", input = InputType.YES, condition = InputType.NO)
public void setKey() {
try {
kafkaKey.put(key, Data);
Report.updateTestLog(Action, "Key has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Key setup", ex);
Report.updateTestLog(Action, "Error in setting Key: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setKeySerializer
Description: This function is used to set the Key Serializer for the Kafka Producer.
Input Format : @Expected Key Serializer. Expected values are : string
/ bytearray
/ avro
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setKeySerializer |
@value | |||
Kafka | setKeySerializer |
Sheet:Column | |||
Kafka | setKeySerializer |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Key Serializer", input = InputType.YES, condition = InputType.NO)
public void setKeySerializer() {
try {
kafkaKeySerializer.put(key, Data);
Report.updateTestLog(Action, "Key Serializer has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Key Serializer setup", ex);
Report.updateTestLog(Action, "Error in setting Key Serializer: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setValueSerializer
Description: This function is used to set the Value Serializer for the Kafka Producer.
Input Format : @Expected Value Serializer. Expected values are : string
/ bytearray
/ avro
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setValueSerializer |
@value | |||
Kafka | setValueSerializer |
Sheet:Column | |||
Kafka | setValueSerializer |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Value Serializer", input = InputType.YES, condition = InputType.NO)
public void setValueSerializer() {
try {
kafkaValueSerializer.put(key, Data);
Report.updateTestLog(Action, "Value Serializer has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Value Serializer setup", ex);
Report.updateTestLog(Action, "Error in setting Value Serializer: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setSchemaRegistryURL
Description: This function is used to set the Schema Registry URL (mostly used when firing Avro
serialized value).
Input Format : @Expected Schema Registry URL
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setSchemaRegistryURL |
@value | |||
Kafka | setSchemaRegistryURL |
Sheet:Column | |||
Kafka | setSchemaRegistryURL |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Schema Registry URL", input = InputType.YES, condition = InputType.NO)
public void setSchemaRegistryURL() {
try {
kafkaSchemaRegistryURL.put(key, Data);
Report.updateTestLog(Action, "Schema Registry URL has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Schema Registry URL setup", ex);
Report.updateTestLog(Action, "Error in setting Schema Registry URL: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setAutoRegisterSchemas
Description: When set to true, this function automatically registers the Avro
schema with the Schema Registry when producing messages. If set to false, the producer checks for the Avro
schema's existence in the registry; if not found, the message will not be published and an error will be thrown.
Input Format : @Boolean Value. Either set to true
or false
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setAutoRegisterSchemas |
@value | |||
Kafka | setAutoRegisterSchemas |
Sheet:Column | |||
Kafka | setAutoRegisterSchemas |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Auto Register Schemas", input = InputType.YES, condition = InputType.NO)
public void setAutoRegisterSchemas() {
try {
kafkaAutoRegisterSchemas.put(key, Boolean.valueOf(Data.toLowerCase().trim()));
Report.updateTestLog(Action, "Auto Register Schemas has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception Max Poll Record setup", ex);
Report.updateTestLog(Action, "Error in Auto Register Schemas: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
addSchema
Description: This function is used to register the Avro
schema with the Schema Registry.
Input Format : @Path to the schema file
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | addSchema |
@value | |||
Kafka | addSchema |
Sheet:Column | |||
Kafka | addSchema |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Add Avro Schema", input = InputType.YES, condition = InputType.NO)
public void addSchema() throws IOException {
try {
Schema mainSchema = null;
Schema.Parser parser = new Schema.Parser();
if (Data.contains(";")) {
String[] paths = Data.split(";");
for (int i = 0; i < paths.length - 1; i++) {
parser.parse(new File(Paths.get(paths[i]).toString()));
}
mainSchema = parser.parse(new File(Paths.get(paths[paths.length - 1]).toString()));
} else {
// Only one schema, no dependencies
mainSchema = new Schema.Parser().parse(new File(Paths.get(Data).toString()));
}
kafkaAvroSchema.put(key, mainSchema);
Report.updateTestLog(Action, "Schema added successfully", Status.DONE);
} catch (Exception e) {
Report.updateTestLog(Action, " Unable to add Schema : " + e.getMessage(), Status.FAIL);
}
}
setPartition
Description: This function is used to set the Partition for the Kafka Producer.
Input Format : @Expected Partition value. Either set an Integer
or just set null
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setPartition |
@value | |||
Kafka | setPartition |
Sheet:Column | |||
Kafka | setPartition |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Partition", input = InputType.YES, condition = InputType.NO)
public void setPartition() {
try {
if (Data.toLowerCase().equals("null")) {
kafkaPartition.put(key, null);
} else {
kafkaPartition.put(key, Integer.valueOf(Data));
}
Report.updateTestLog(Action, "Partition has been set successfully", Status.DONE);
} catch (NumberFormatException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Partition setup", ex);
Report.updateTestLog(Action, "Error in setting Partition: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setTimeStamp
Description: This function is used to set the Time Stamp for the Kafka Producer. This sets the System.currentTimeMillis()
to the producer record.
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setTimeStamp |
@Action(object = ObjectType.KAFKA, desc = "Set TimeStamp", input = InputType.NO, condition = InputType.NO)
public void setTimeStamp() {
try {
kafkaTimeStamp.put(key, System.currentTimeMillis());
Report.updateTestLog(Action, "Time Stamp has been set successfully", Status.DONE);
} catch (NumberFormatException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Time Stamp setup", ex);
Report.updateTestLog(Action, "Error in setting Time Stamp: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
addKafkaHeader
Description: This function is used to add a header to the Kafka Producer Record.
Input Format : @HeaderName
=HeaderValue
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | addHeader |
@HeaderName =HeaderValue |
|||
Kafka | addHeader |
Sheet:Column containing HeaderName =HeaderValue |
|||
Kafka | addHeader |
%dynamicVar% containing HeaderName =HeaderValue |
|||
Kafka | addHeader |
@HeaderName ={SheetName:ColumnName} |
|||
Kafka | addHeader |
@HeaderName =%dynamicVar% |
This function adds all the Headers into a HashMap headerlist
. Then those are applied to the request as :
@Action(object = ObjectType.KAFKA, desc = "Add Kafka Header", input = InputType.YES)
public void addKafkaHeader() {
try {
List<String> sheetlist = Control.getCurrentProject().getTestData().getTestDataFor(Control.exe.runEnv())
.getTestDataNames();
for (int sheet = 0; sheet < sheetlist.size(); sheet++) {
if (Data.contains("{" + sheetlist.get(sheet) + ":")) {
com.ing.datalib.testdata.model.TestDataModel tdModel = Control.getCurrentProject().getTestData()
.getTestDataByName(sheetlist.get(sheet));
List<String> columns = tdModel.getColumns();
for (int col = 0; col < columns.size(); col++) {
if (Data.contains("{" + sheetlist.get(sheet) + ":" + columns.get(col) + "}")) {
Data = Data.replace("{" + sheetlist.get(sheet) + ":" + columns.get(col) + "}",
userData.getData(sheetlist.get(sheet), columns.get(col)));
}
}
}
}
Collection<Object> valuelist = Control.getCurrentProject().getProjectSettings().getUserDefinedSettings()
.values();
for (Object prop : valuelist) {
if (Data.contains("{" + prop + "}")) {
Data = Data.replace("{" + prop + "}", prop.toString());
}
}
String headerKey = Data.split("=", 2)[0];
String headerValue = Data.split("=", 2)[1];
if (kafkaHeaders.containsKey(key)) {
kafkaHeaders.get(key).add(new RecordHeader(headerKey, headerValue.getBytes()));
} else {
ArrayList<Header> toBeAdded = new ArrayList<Header>();
toBeAdded.add(new RecordHeader(headerKey, headerValue.getBytes()));
kafkaHeaders.put(key, toBeAdded);
}
Report.updateTestLog(Action, "Header added " + Data, Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error adding Header :" + "\n" + ex.getMessage(), Status.DEBUG);
}
}
produceMessage
Description: This function is used to Produce the Kafka Producer Record.
Input Format : @Expected Payload
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | produceMessage |
@Payload (from Editor) | |||
Kafka | produceMessage |
Sheet:Column | |||
Kafka | produceMessage |
%dynamicVar% |
Inputs in the Input column can be either hardcoded
, passed inside the Payload editor which is capable of parameterising the Payload (Press ctrl+space to see the list of variables available ), passed from the data sheet (datasheet name : column name
) or passed from a variable value (%variable name%
), as given in the above example.
@Action(object = ObjectType.KAFKA, desc = "Produce Kafka Message", input = InputType.YES, condition = InputType.NO)
public void produceMessage() {
try {
String value = Data;
value = handleDataSheetVariables(value);
value = handleuserDefinedVariables(value);
kafkaValue.put(key, value);
if (kafkaHeaders.get(key) != null && kafkaTimeStamp.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaTimeStamp.get(key), kafkaKey.get(key), kafkaValue.get(key), kafkaHeaders.get(key));
} else if (kafkaHeaders.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaKey.get(key), kafkaValue.get(key), kafkaHeaders.get(key));
} else if (kafkaTimeStamp.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaTimeStamp.get(key), kafkaKey.get(key), kafkaValue.get(key));
} else if (kafkaPartition.containsKey(key)) {
produceMessage(kafkaProducerTopic.get(key), kafkaPartition.get(key), kafkaKey.get(key), kafkaValue.get(key));
} else if (kafkaKey.get(key) != null) {
produceMessage(kafkaProducerTopic.get(key), kafkaKey.get(key), kafkaValue.get(key));
} else {
produceMessage(kafkaProducerTopic.get(key), kafkaValue.get(key));
}
Report.updateTestLog(Action, "Message has been produced. ", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Something went wrong in producing the message" + "\n" + ex.getMessage(),
Status.FAILNS);
ex.printStackTrace();
}
}
sendKafkaMessage
Description: This function is used to send the Kafka Record to the Producer Topic.
ObjectName | Action | Input | Condition | Reference |
---|---|---|---|---|
Queue | sendKafkaMessage |
@Action(object = ObjectType.KAFKA, desc = "Send Message", input = InputType.NO, condition = InputType.NO)
public void sendKafkaMessage() {
try {
createProducer(kafkaValueSerializer.get(key));
//before.put(key, Instant.now());
kafkaProducer.get(key).send(kafkaProducerRecord.get(key));
Report.updateTestLog(Action, "Record sent", Status.DONE);
kafkaProducer.get(key).close();
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception while sending record", ex);
Report.updateTestLog(Action, "Error in sending record: " + "\n" + ex.getMessage(), Status.DEBUG);
} finally {
clearProducerDetails();
}
}
setConsumerTopic
Description: This function is used to set the Consumer Topic name
Input Format : @Expected Consumer Topic Name
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setConsumerTopic |
@value | |||
Kafka | setConsumerTopic |
Sheet:Column | |||
Kafka | setConsumerTopic |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Topic", input = InputType.YES, condition = InputType.NO)
public void setConsumerTopic() {
try {
kafkaConsumerTopic.put(key, Data);
Report.updateTestLog(Action, "Topic has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Topic setup", ex);
Report.updateTestLog(Action, "Error in setting Topic: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setConsumerPollRetries
Description: This function is used to set the the number of times the Kafka Consumer will retry the polling of the target record.
Input Format : @Expected number of retries
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setConsumerPollRetries |
@value | |||
Kafka | setConsumerPollRetries |
Sheet:Column | |||
Kafka | setConsumerPollRetries |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Retries", input = InputType.YES, condition = InputType.NO)
public void setConsumerPollRetries() {
try {
kafkaConsumerPollRetries.put(key, Integer.parseInt(Data));
Report.updateTestLog(Action, "Poll Retries has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Poll Retries setup", ex);
Report.updateTestLog(Action, "Error in setting Poll Retries: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setConsumerPollInterval
Description: This function is used to set the interval (in milliseconds) in which the Kafka Consumer will retry the polling of the target record.
Input Format : @Expected interval value (in milliseconds)
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setConsumerPollInterval |
@value | |||
Kafka | setConsumerPollInterval |
Sheet:Column | |||
Kafka | setConsumerPollInterval |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Retries", input = InputType.YES, condition = InputType.NO)
public void setConsumerPollInterval() {
try {
kafkaConsumerPollDuration.put(key, Long.valueOf(Data));
Report.updateTestLog(Action, "Poll interval has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Poll interval setup", ex);
Report.updateTestLog(Action, "Error in setting Poll interval: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setConsumerMaxPollRecords
Description: This function is used to set the maximum number of records that can be retrieved in a single poll operation.
Input Format : @Expected maximum value. Expected value is integer
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setConsumerMaxPollRecords |
@value | |||
Kafka | setConsumerMaxPollRecords |
Sheet:Column | |||
Kafka | setConsumerMaxPollRecords |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer Max Poll Records", input = InputType.YES, condition = InputType.NO)
public void setConsumerMaxPollRecords() {
try {
kafkaConsumerMaxPollRecords.put(key, Integer.valueOf(Data));
Report.updateTestLog(Action, "Max Poll Records has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception Max Poll Record setup", ex);
Report.updateTestLog(Action, "Error in setting Max Poll Records: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setConsumerGroupId
Description: This function is used to set the Group Id for the Kafka Consumer.
Input Format : @Expected Group Id
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setConsumerGroupId |
@value | |||
Kafka | setConsumerGroupId |
Sheet:Column | |||
Kafka | setConsumerGroupId |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Consumer GroupId", input = InputType.YES, condition = InputType.NO)
public void setConsumerGroupId() {
try {
kafkaConsumerGroupId.put(key, Data);
Report.updateTestLog(Action, "Consumer GroupId has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Consumer GroupId setup", ex);
Report.updateTestLog(Action, "Error in setting Consumer GroupId: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
setValueDeserializer
Description: This function is used to set the Value Deserializer for the Kafka Consumer.
Input Format : @Expected Value Deserializer. Expected values are : string
/ bytearray
/ avro
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | setValueDeserializer |
@value | |||
Kafka | setValueDeserializer |
Sheet:Column | |||
Kafka | setValueDeserializer |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Set Value Deserializer", input = InputType.YES, condition = InputType.NO)
public void setValueDeserializer() {
try {
kafkaValueDeserializer.put(key, Data);
Report.updateTestLog(Action, "Value Deserializer has been set successfully", Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.SEVERE, "Exception during Value Deserializer setup", ex);
Report.updateTestLog(Action, "Error in setting Value Deserializer: " + "\n" + ex.getMessage(), Status.DEBUG);
}
}
consumeKafkaMessage
Description: This function is used to consume the Kafka Consumer Record.
ObjectName | Action | Input | Condition | Reference |
---|---|---|---|---|
Queue | consumeKafkaMessage |
@Action(object = ObjectType.KAFKA, desc = "Consume Kafka Message", input = InputType.NO)
public void consumeKafkaMessage() {
createConsumer(kafkaValueDeserializer.get(key));
kafkaConsumer.get(key).subscribe(Arrays.asList(kafkaConsumerTopic.get(key)));
try {
ConsumerRecord record = pollKafkaConsumer();
if (record != null) {
Report.updateTestLog(Action, "Kafka message consumed successfully. ", Status.DONE);
} else {
Report.updateTestLog(Action, "Kafka message not received. ", Status.FAIL);
}
} catch (Exception e) {
e.printStackTrace();
Report.updateTestLog(Action, "Error while consuming Kafka message: " + e.getMessage(), Status.FAIL);
} finally {
kafkaConsumer.get(key).close();
}
}
identifyTargetMessage
Description: This function is used to locate a specific Kafka message from the consumed records based on the unique identifier provided.
Input Format : @Expected value
Condition Format: JSONPath or XPath of the unique identifier
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | identifyTargetMessage |
@value | JSONPath or XPath | ||
Kafka | identifyTargetMessage |
Sheet:Column | JSONPath or XPath | ||
Kafka | identifyTargetMessage |
%dynamicVar% | JSONPath or XPath |
@Action(object = ObjectType.KAFKA, desc = "Identify target message", input = InputType.YES, condition = InputType.YES)
public void identifyTargetMessage() {
try {
kafkaRecordIdentifierValue.put(key, Data);
kafkaRecordIdentifierPath.put(key, Condition);
Report.updateTestLog(Action,
"Target message set with tag value as [" + Data + "] and tag path as [" + Condition + "].",
Status.DONE);
} catch (Exception e) {
Report.updateTestLog(Action, "Error in target message setup : " + e.getMessage(), Status.FAIL);
}
}
storeKafkaXMLtagInDataSheet
Description: This function is used to store a certain XML tag value into a respective column of a given datasheet.
Input Format : @Expected datasheet name:column name
Condition Format: XPath
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | storeKafkaXMLtagInDataSheet |
Sheet:Column | XPath |
Note: Ensure that your data sheet doesn't contain column names with spaces.
@Action(object = ObjectType.KAFKA, desc = "Store XML tag In DataSheet ", input = InputType.YES, condition = InputType.NO)
public void storeKafkaXMLtagInDataSheet() {
try {
String strObj = Input;
if (strObj.matches(".*:.*")) {
try {
System.out.println("Updating value in SubIteration " + userData.getSubIteration());
String sheetName = strObj.split(":", 2)[0];
String columnName = strObj.split(":", 2)[1];
String xmlText = kafkaConsumerRecord.get(key).value().toString();
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder;
InputSource inputSource = new InputSource();
inputSource.setCharacterStream(new StringReader(xmlText));
dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(inputSource);
doc.getDocumentElement().normalize();
XPath xPath = XPathFactory.newInstance().newXPath();
String expression = Condition;
String value = (String) xPath.compile(expression).evaluate(doc);
userData.putData(sheetName, columnName, value);
Report.updateTestLog(Action, "Element text [" + value + "] is stored in " + strObj, Status.DONE);
} catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
| SAXException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
Report.updateTestLog(Action, "Error Storing XML element in datasheet :" + "\n" + ex.getMessage(),
Status.DEBUG);
}
} else {
Report.updateTestLog(Action,
"Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
Status.DEBUG);
}
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error Storing XML element in datasheet :" + "\n" + ex.getMessage(),
Status.DEBUG);
}
}
assertKafkaXMLtagEquals
Description: This function is used to validate whether a certain XML tag equals an expected text or not.
Input Format : @Expected Text
Condition Format : XPath
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | assertKafkaXMLtagEquals |
@value | XPath | ||
Kafka | assertKafkaXMLtagEquals |
Sheet:Column | XPath | ||
Kafka | assertKafkaXMLtagEquals |
%dynamicVar% | XPath |
@Action(object = ObjectType.KAFKA, desc = "Assert XML Tag Equals ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaXMLtagEquals() {
try {
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder;
InputSource inputSource = new InputSource();
inputSource.setCharacterStream(new StringReader(kafkaConsumerRecord.get(key).value().toString()));
dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(inputSource);
doc.getDocumentElement().normalize();
XPath xPath = XPathFactory.newInstance().newXPath();
String expression = Condition;
String value = (String) xPath.compile(expression).evaluate(doc);
if (value.equals(Data)) {
Report.updateTestLog(Action, "Element text [" + value + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text [" + value + "] is not as expected", Status.FAILNS);
}
} catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
| SAXException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error validating XML element :" + "\n" + ex.getMessage(), Status.DEBUG);
}
}
assertKafkaXMLtagContains
Description: This function is used to validate whether a certain XML tag contains an expected text or not.
Input Format : @Expected Text
Condition Format : XPath
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | assertKafkaXMLtagContains |
@value | XPath | ||
Kafka | assertKafkaXMLtagContains |
Sheet:Column | XPath | ||
Kafka | assertKafkaXMLtagContains |
%dynamicVar% | XPath |
@Action(object = ObjectType.KAFKA, desc = "Assert XML Tag Contains ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaXMLtagContains() {
try {
DocumentBuilderFactory dbFactory = DocumentBuilderFactory.newInstance();
DocumentBuilder dBuilder;
InputSource inputSource = new InputSource();
inputSource.setCharacterStream(new StringReader(kafkaConsumerRecord.get(key).value().toString()));
dBuilder = dbFactory.newDocumentBuilder();
Document doc = dBuilder.parse(inputSource);
doc.getDocumentElement().normalize();
XPath xPath = XPathFactory.newInstance().newXPath();
String expression = Condition;
String value = (String) xPath.compile(expression).evaluate(doc);
if (value.contains(Data)) {
Report.updateTestLog(Action, "Element text contains [" + Data + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text [" + value + "] does not contain [" + Data + "]",
Status.FAILNS);
}
} catch (IOException | ParserConfigurationException | XPathExpressionException | DOMException
| SAXException ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error validating XML element :" + "\n" + ex.getMessage(), Status.DEBUG);
}
}
storeKafkaResponseInDataSheet
Description: This function is used to store the Kafka message into a respective column of a given datasheet.
Input Format : @Expected datasheet name:column name
Condition Format: XPath
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | storeKafkaResponseInDataSheet |
Sheet:Column | XPath |
Note: Ensure that your data sheet doesn't contain column names with spaces.
@Action(object = ObjectType.KAFKA, desc = "Store Response In DataSheet ", input = InputType.YES, condition = InputType.NO)
public void storeKafkaResponseInDataSheet() {
try {
String strObj = Input;
if (strObj.matches(".*:.*")) {
try {
System.out.println("Updating value in SubIteration " + userData.getSubIteration());
String sheetName = strObj.split(":", 2)[0];
String columnName = strObj.split(":", 2)[1];
String response = kafkaConsumerRecord.get(key).value().toString();
userData.putData(sheetName, columnName, response);
Report.updateTestLog(Action, "Response is stored in " + strObj, Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
Report.updateTestLog(Action, "Error storing Response in datasheet :" + "\n" + ex.getMessage(),
Status.DEBUG);
}
} else {
Report.updateTestLog(Action,
"Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
Status.DEBUG);
}
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error storing Response in datasheet :" + "\n" + ex.getMessage(),
Status.DEBUG);
}
}
assertKafkaResponseMessageContains
Description: This function is used to validate whether Kafka message contains an expected text or not.
Input Format : @Expected Text
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | assertKafkaResponseMessageContains |
@value | |||
Kafka | assertKafkaResponseMessageContains |
Sheet:Column | |||
Kafka | assertKafkaResponseMessageContains |
%dynamicVar% |
@Action(object = ObjectType.KAFKA, desc = "Assert Response Message contains ", input = InputType.YES)
public void assertKafkaResponseMessageContains() {
try {
if (kafkaConsumerRecord.get(key).value().toString().contains(Data)) {
Report.updateTestLog(Action, "Response Message contains : " + Data, Status.PASSNS);
} else {
Report.updateTestLog(Action, "Response Message does not contain : " + Data, Status.FAILNS);
}
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error in validating response body :" + "\n" + ex.getMessage(), Status.DEBUG);
}
}
assertKafkaJSONtagEquals
Description: This function is used to validate whether a certain JSON tag equals an expected text or not.
Input Format : @Expected Text
Condition Format : JSON Path of the tag
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | assertKafkaJSONtagEquals |
@value | JSONPath | ||
Kafka | assertKafkaJSONtagEquals |
Sheet:Column | JSONPath | ||
Kafka | assertKafkaJSONtagEquals |
%dynamicVar% | JSONPath |
@Action(object = ObjectType.KAFKA, desc = "Assert JSON Tag Equals ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaJSONtagEquals() {
try {
String response = kafkaConsumerRecord.get(key).value().toString();
String jsonpath = Condition;
String value = JsonPath.read(response, jsonpath).toString();
if (value.equals(Data)) {
Report.updateTestLog(Action, "Element text [" + value + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text is [" + value + "] but is expected to be [" + Data + "]",
Status.FAILNS);
}
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error in validating JSON element :" + "\n" + ex.getMessage(), Status.DEBUG);
}
}
assertKafkaJSONtagContains
Description: This function is used to validate whether a certain JSON tag contains an expected text or not.
Input Format : @Expected Text
Condition Format : JSON Path of the tag
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | assertKafkaJSONtagContains |
@value | JSONPath | ||
Kafka | assertKafkaJSONtagContains |
Sheet:Column | JSONPath | ||
Kafka | assertKafkaJSONtagContains |
%dynamicVar% | JSONPath |
@Action(object = ObjectType.KAFKA, desc = "Assert JSON Tag Contains ", input = InputType.YES, condition = InputType.YES)
public void assertKafkaJSONtagContains() {
try {
String response = kafkaConsumerRecord.get(key).value().toString();
String jsonpath = Condition;
String value = JsonPath.read(response, jsonpath).toString();
if (value.contains(Data)) {
Report.updateTestLog(Action, "Element text contains [" + Data + "] is as expected", Status.PASSNS);
} else {
Report.updateTestLog(Action, "Element text [" + value + "] does not contain [" + Data + "]",
Status.FAILNS);
}
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error in validating JSON element :" + "\n" + ex.getMessage(), Status.DEBUG);
}
}
storeKafkaJSONtagInDataSheet
Description: This function is used to store a certain JSON tag value into a respective column of a given datasheet.
Input Format : @Expected datasheet name:column name
Condition Format: JSONPath of the tag
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | storeKafkaJSONtagInDataSheet |
Sheet:Column | JSONPath |
Note: Ensure that your data sheet doesn't contain column names with spaces.
@Action(object = ObjectType.KAFKA, desc = "Store JSON Tag In DataSheet ", input = InputType.YES, condition = InputType.YES)
public void storeKafkaJSONtagInDataSheet() {
try {
String strObj = Input;
if (strObj.matches(".*:.*")) {
try {
System.out.println("Updating value in SubIteration " + userData.getSubIteration());
String sheetName = strObj.split(":", 2)[0];
String columnName = strObj.split(":", 2)[1];
String response = kafkaConsumerRecord.get(key).value().toString();
String jsonpath = Condition;
String value = JsonPath.read(response, jsonpath).toString();
userData.putData(sheetName, columnName, value);
Report.updateTestLog(Action, "Element text [" + value + "] is stored in " + strObj, Status.DONE);
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, ex.getMessage(), ex);
Report.updateTestLog(Action, "Error Storing JSON element in datasheet :" + "\n" + ex.getMessage(),
Status.DEBUG);
}
} else {
Report.updateTestLog(Action,
"Given input [" + Input + "] format is invalid. It should be [sheetName:ColumnName]",
Status.DEBUG);
}
} catch (Exception ex) {
Logger.getLogger(this.getClass().getName()).log(Level.OFF, null, ex);
Report.updateTestLog(Action, "Error Storing JSON element in datasheet :" + "\n" + ex.getMessage(),
Status.DEBUG);
}
}
closeConsumer
Description: This function is used to gracefully shut down a Kafka consumer.
ObjectName | Action | Input | Condition | Reference | |
---|---|---|---|---|---|
Kafka | closeConsumer |
@Action(object = ObjectType.KAFKA, desc = "Close Consumer", input = InputType.NO, condition = InputType.NO)
public void closeConsumer() {
try {
kafkaConsumerRecords.remove(key);
kafkaConsumerRecord.remove(key);
kafkaConsumeRecordValue.remove(key);
kafkaConsumerPollDuration.remove(key);
kafkaConsumerPollRetries.remove(key);
kafkaConsumerTopic.remove(key);
kafkaValueDeserializer.remove(key);
kafkaSchemaRegistryURL.remove(key);
kafkaSharedSecret.remove(key);
kafkaConsumerGroupId.remove(key);
kafkaConsumerPollRecord.remove(key);
kafkaRecordIdentifierValue.remove(key);
kafkaRecordIdentifierPath.remove(key);
Report.updateTestLog(Action, "Consumer closed successfully", Status.DONE);
} catch (Exception ex) {
Report.updateTestLog(Action, "Error in closing Consumer.", Status.DEBUG);
}
}