Skip to content

Commit

Permalink
[#1085] Add getter for event schema to AdapterDescription
Browse files Browse the repository at this point in the history
  • Loading branch information
tenthe committed Jan 12, 2023
1 parent 71e5bca commit 4981f12
Show file tree
Hide file tree
Showing 21 changed files with 329 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.streampipes.extensions.api.connect;

import org.apache.streampipes.model.schema.EventSchema;

import java.util.List;
import java.util.Map;

Expand All @@ -31,4 +33,6 @@ public interface IAdapterPipeline {
void changePipelineSink(IAdapterPipelineElement pipelineSink);

IAdapterPipelineElement getPipelineSink();

EventSchema getResultingEventSchema();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.streampipes.extensions.api.connect.exception.ParseException;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventSchema;

public interface IProtocol extends Connector {

Expand All @@ -42,6 +41,4 @@ IProtocol getInstance(ProtocolDescription protocolDescription,

String getId();

//TODO remove
void setEventSchema(EventSchema eventSchema);
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ public AdapterPipeline generatePipeline(AdapterDescription adapterDescription) {
}
pipelineElements.add(transformStreamAdapterElement);

// TODO decide what was meant with this comment
// Needed when adapter is (
if (adapterDescription.getEventGrounding() != null
&& adapterDescription.getEventGrounding().getTransportProtocol() != null
&& adapterDescription.getEventGrounding().getTransportProtocol().getBrokerHostname() != null) {
Expand All @@ -84,7 +82,7 @@ public AdapterPipeline generatePipeline(AdapterDescription adapterDescription) {
return new AdapterPipeline(pipelineElements, new DebugAdapterSink());
}

return new AdapterPipeline(pipelineElements);
return new AdapterPipeline(pipelineElements, adapterDescription.getEventSchema());
}

public List<IAdapterPipelineElement> makeAdapterPipelineElements(List<TransformationRuleDescription> rules) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.streampipes.model.connect.adapter.GenericAdapterDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventSchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -69,8 +68,8 @@ public void startAdapter() throws AdapterException {
this.protocol = protocolInstance;

//TODO remove
EventSchema eventSchema = adapterDescription.getEventSchema();
this.protocol.setEventSchema(eventSchema);
// EventSchema eventSchema = adapterDescription.getEventSchema();
// this.protocol.setEventSchema(eventSchema);

logger.debug("Start adatper with format: " + format.getId() + " and " + protocol.getId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,12 @@
import org.apache.streampipes.extensions.api.connect.IFormat;
import org.apache.streampipes.extensions.api.connect.IParser;
import org.apache.streampipes.extensions.api.connect.IProtocol;
import org.apache.streampipes.model.schema.EventSchema;

public abstract class Protocol implements IProtocol {

protected IParser parser;
protected IFormat format;

//TODO remove
protected EventSchema eventSchema;

public Protocol() {

}
Expand All @@ -39,10 +35,4 @@ public Protocol(IParser parser, IFormat format) {
this.parser = parser;
this.format = format;
}

//TODO remove
@Override
public void setEventSchema(EventSchema eventSchema) {
this.eventSchema = eventSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
import org.apache.streampipes.extensions.api.connect.IAdapterPipelineElement;
import org.apache.streampipes.model.schema.EventSchema;

import java.util.List;
import java.util.Map;
Expand All @@ -29,9 +30,11 @@ public class AdapterPipeline implements IAdapterPipeline {
private List<IAdapterPipelineElement> pipelineElements;
private IAdapterPipelineElement pipelineSink;

private EventSchema resultingEventSchema;

public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements) {
public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, EventSchema resultingEventSchema) {
this.pipelineElements = pipelineElements;
this.resultingEventSchema = resultingEventSchema;
}

public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterPipelineElement pipelineSink) {
Expand All @@ -42,12 +45,6 @@ public AdapterPipeline(List<IAdapterPipelineElement> pipelineElements, IAdapterP
@Override
public void process(Map<String, Object> event) {

// TODO remove, just for performance tests
if ("true".equals(System.getenv("SP_DEBUG_CONNECT"))) {
event.put("internal_t1", System.currentTimeMillis());
}


for (IAdapterPipelineElement pipelineElement : pipelineElements) {
event = pipelineElement.process(event);
}
Expand Down Expand Up @@ -76,4 +73,9 @@ public void changePipelineSink(IAdapterPipelineElement pipelineSink) {
public IAdapterPipelineElement getPipelineSink() {
return pipelineSink;
}

@Override
public EventSchema getResultingEventSchema() {
return resultingEventSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public FileStreamProtocol(IParser parser, IFormat format, String selectedFileNam

@Override
public void run(IAdapterPipeline adapterPipeline) {
String timestampKey = getTimestampKey(eventSchema.getEventProperties(), "");
String timestampKey = getTimestampKey(adapterPipeline.getResultingEventSchema().getEventProperties(), "");

// exchange adapter pipeline sink with special purpose replay sink for file replay
if (adapterPipeline.getPipelineSink() instanceof SendToKafkaAdapterSink) {
Expand Down
5 changes: 5 additions & 0 deletions streampipes-model/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@
<artifactId>streampipes-model-shared</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-test-utils</artifactId>
<version>0.91.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.streampipes</groupId>
<artifactId>streampipes-logging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.streampipes.model.grounding.MqttTransportProtocol;
import org.apache.streampipes.model.grounding.SimpleTopicDefinition;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.shared.annotation.TsModel;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.util.Cloner;
Expand Down Expand Up @@ -252,4 +253,6 @@ public String getCorrespondingDataStreamElementId() {
public void setCorrespondingDataStreamElementId(String correspondingDataStreamElementId) {
this.correspondingDataStreamElementId = correspondingDataStreamElementId;
}

public abstract EventSchema getEventSchema();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.streampipes.model.connect.grounding.FormatDescription;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.rules.TransformationRuleDescription;
import org.apache.streampipes.model.schema.EventSchema;

import java.util.List;

Expand All @@ -32,8 +31,5 @@ public interface GenericAdapterDescription {
FormatDescription getFormatDescription();

List<TransformationRuleDescription> getRules();

EventSchema getEventSchema();

}

Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ public class GenericAdapterSetDescription extends AdapterSetDescription implemen

public static final String ID = ElementIdGenerator.makeFixedElementId(GenericAdapterSetDescription.class);

// private String sourceType = "org.apache.streampipes.model.connect.adapter.GenericAdapterSetDescription";

private FormatDescription formatDescription;

private ProtocolDescription protocolDescription;
Expand Down Expand Up @@ -66,14 +64,6 @@ public void setFormatDescription(FormatDescription formatDescription) {
this.formatDescription = formatDescription;
}

@Override
public EventSchema getEventSchema() {
if (this.getDataSet() != null) {
return this.getDataSet().getEventSchema();
}
return null;
}

public ProtocolDescription getProtocolDescription() {
return protocolDescription;
}
Expand All @@ -82,11 +72,8 @@ public void setProtocolDescription(ProtocolDescription protocolDescription) {
this.protocolDescription = protocolDescription;
}

// public String getSourceType() {
// return sourceType;
// }
//
// public void setSourceType(String sourceType) {
// this.sourceType = sourceType;
// }
public EventSchema getEventSchema() {
return this.getDataSet().getEventSchema();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,14 @@ public void setFormatDescription(FormatDescription formatDescription) {
this.formatDescription = formatDescription;
}

@Override
public EventSchema getEventSchema() {
if (this.getDataStream() != null) {
return this.getDataStream().getEventSchema();
}
return null;
}

public ProtocolDescription getProtocolDescription() {
return protocolDescription;
}

public void setProtocolDescription(ProtocolDescription protocolDescription) {
this.protocolDescription = protocolDescription;
}

public EventSchema getEventSchema() {
return this.getDataStream().getEventSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.streampipes.model.connect.adapter;

import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.shared.annotation.TsModel;

@TsModel
Expand All @@ -29,4 +30,8 @@ public SpecificAdapterSetDescription() {
public SpecificAdapterSetDescription(AdapterSetDescription other) {
super(other);
}

public EventSchema getEventSchema() {
return this.getDataSet().getEventSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.streampipes.model.connect.adapter;

import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.shared.annotation.TsModel;

@TsModel
Expand All @@ -29,4 +30,8 @@ public SpecificAdapterStreamDescription() {
public SpecificAdapterStreamDescription(AdapterStreamDescription other) {
super(other);
}

public EventSchema getEventSchema() {
return this.getDataStream().getEventSchema();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@
package org.apache.streampipes.model.util;

import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyNested;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.vocabulary.SO;

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class SchemaUtils {

Expand All @@ -48,4 +54,33 @@ public static List<String> toPropertyList(List<EventProperty> eps) {
}
return properties;
}

/**
* Returns the timestamp property of an event schema as an {@code Optional}.
*
* <p> The method checks all properties if they are of type {@code EventPropertyPrimitive} and if their domain
* properties contains the uri http://schema.org/DateTime </p>
*
* @param eventSchema the event schema for which the timestamp property is to be returned
* @return an {@code Optional} containing the timestamp property, or an empty {@code Optional} if
* no such property was found
*/
public static Optional<EventPropertyPrimitive> getTimestampProperty(EventSchema eventSchema) {
return getTimstampProperty(eventSchema.getEventProperties());
}


private static Optional<EventPropertyPrimitive> getTimstampProperty(List<EventProperty> eventProperties) {
for (EventProperty ep : eventProperties) {
if (ep instanceof EventPropertyPrimitive && ep.getDomainProperties().contains(URI.create(SO.DATE_TIME))) {
return Optional.of((EventPropertyPrimitive) ep);
}

if (ep instanceof EventPropertyNested) {
return getTimstampProperty(((EventPropertyNested) ep).getEventProperties());
}
}

return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class AdapterDescriptionTest {

@Before
public void init() {
adapterDescription = new AdapterDescription() {
adapterDescription = new SpecificAdapterStreamDescription() {
};

List rules = new ArrayList<>();
Expand Down
Loading

0 comments on commit 4981f12

Please sign in to comment.