Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -139,9 +140,16 @@ protected List<AstarteTransport> reloadTransports(
} catch (NullPointerException e) {
throw new AstartePairingException(
"Null Pointer exception - probably got a wrong payload?", e);
} catch (IOException e) {
// here it is possible that we couldn't connect because a network problem is blocking us
// we will setup an unknown transport to handle data sent by the device and store it locally
logger.warning("IOException while calling Pairing API at: " + requestUrl);
logger.warning(e.getMessage());

return Collections.singletonList(
AstarteTransportFactory.createAstarteUnknownTransport(m_astarteRealm, deviceId));
} catch (Exception e) {
throw new AstartePairingException(
"Failure in calling Pairing API to " + requestUrl.toString(), e);
throw new AstartePairingException("Failure in calling Pairing API to " + requestUrl, e);
}

// Iterate Transports and make them available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.astarteplatform.devicesdk.protocol.AstarteProtocolType;
import org.astarteplatform.devicesdk.transport.mqtt.AstarteMqttV1Transport;
import org.astarteplatform.devicesdk.transport.mqtt.MutualSSLAuthenticationMqttConnectionInfo;
import org.astarteplatform.devicesdk.transport.unknown.AstarteUnknownTransport;
import org.astarteplatform.devicesdk.transport.unknown.UnknownTransportConnectionInfo;
import org.json.JSONObject;

public class AstarteTransportFactory {
Expand Down Expand Up @@ -34,4 +36,9 @@ public static AstarteTransport createAstarteTransportFromPairing(
return null;
}
}

public static AstarteUnknownTransport createAstarteUnknownTransport(
String astarteRealm, String deviceId) {
return new AstarteUnknownTransport(new UnknownTransportConnectionInfo(astarteRealm, deviceId));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package org.astarteplatform.devicesdk.transport.unknown;

import java.util.Map;
import org.astarteplatform.devicesdk.crypto.AstarteCryptoException;
import org.astarteplatform.devicesdk.protocol.AstarteAggregateDatastreamInterface;
import org.astarteplatform.devicesdk.protocol.AstarteDatastreamInterface;
import org.astarteplatform.devicesdk.protocol.AstarteInterface;
import org.astarteplatform.devicesdk.protocol.AstarteInterfaceDatastreamMapping;
import org.astarteplatform.devicesdk.protocol.AstarteInterfaceMappingNotFoundException;
import org.astarteplatform.devicesdk.protocol.AstarteProtocolType;
import org.astarteplatform.devicesdk.transport.AstarteTransport;
import org.astarteplatform.devicesdk.transport.AstarteTransportException;
import org.astarteplatform.devicesdk.util.AstartePayload;
import org.joda.time.DateTime;

public class AstarteUnknownTransport extends AstarteTransport {
private final String baseTopic;

public AstarteUnknownTransport(UnknownTransportConnectionInfo connectionInfo) {
super(AstarteProtocolType.UNKNOWN_PROTOCOL);

baseTopic = connectionInfo.getClientId();
}

@Override
public void connect() throws AstarteTransportException, AstarteCryptoException {
// we don't need to do nothing
}

@Override
public void disconnect() throws AstarteTransportException {
// we don't need to do nothing
}

@Override
public boolean isConnected() {
return false;
}

@Override
public void sendIntrospection() {
// we won't do nothing
}

@Override
public void sendEmptyCache() {
// we won't do nothing
}

@Override
public void resendAllProperties() {
// we won't do nothing
}

@Override
public void sendIndividualValue(
AstarteInterface astarteInterface, String path, Object value, DateTime timestamp)
throws AstarteTransportException {
AstarteInterfaceDatastreamMapping mapping = null;
int qos = 2;

if (astarteInterface instanceof AstarteDatastreamInterface) {
try {
// Find a matching mapping
mapping = (AstarteInterfaceDatastreamMapping) astarteInterface.findMappingInInterface(path);
} catch (AstarteInterfaceMappingNotFoundException e) {
throw new AstarteTransportException("Mapping not found", e);
}

qos = qosFromReliability(mapping);
}

String topic = baseTopic + "/" + astarteInterface.getInterfaceName() + path;
byte[] payload =
AstartePayload.serialize(value, (timestamp != null) ? timestamp.toDate() : null);

if (astarteInterface instanceof AstarteDatastreamInterface) {
handleDatastreamFailedPublish(mapping, topic, payload, qos);
} else {
handlePropertiesFailedPublish(topic, payload, qos);
}
}

@Override
public void sendAggregate(
AstarteAggregateDatastreamInterface astarteInterface,
String path,
Map<String, Object> value,
DateTime timestamp)
throws AstarteTransportException {
int qos;
AstarteInterfaceDatastreamMapping mapping;
try {
// Find a matching mapping
mapping =
(AstarteInterfaceDatastreamMapping) astarteInterface.getMappings().values().toArray()[0];
qos = qosFromReliability(mapping);
} catch (Exception e) {
throw new AstarteTransportException("Mapping not found", e);
}

String topic = baseTopic + "/" + astarteInterface.getInterfaceName() + path;
byte[] payload = AstartePayload.serialize(value, timestamp.toDate());

// Aggregate can only be Datastream
handleDatastreamFailedPublish(mapping, topic, payload, qos);
}

private void handlePropertiesFailedPublish(String topic, byte[] payload, int qos)
throws AstarteTransportException {
// We store everything since we are not connected to a proper transport
m_failedMessageStorage.insertStored(topic, payload, qos);
}

private void handleDatastreamFailedPublish(
AstarteInterfaceDatastreamMapping mapping, String topic, byte[] payload, int qos)
throws AstarteTransportException {
int expiry = mapping.getExpiry();

switch (mapping.getRetention()) {
case DISCARD:
// Message won't be retried, so we throw to notify the user
// FIXME we expect to discard messages and we will just drop them eventually a log is enough
// FIXME replace with log
throw new AstarteTransportException("Cannot send value");

case VOLATILE:
{
if (expiry > 0) {
m_failedMessageStorage.insertVolatile(topic, payload, qos, expiry);
} else {
m_failedMessageStorage.insertVolatile(topic, payload, qos);
}
break;
}

case STORED:
{
if (expiry > 0) {
m_failedMessageStorage.insertStored(topic, payload, qos, expiry);
} else {
m_failedMessageStorage.insertStored(topic, payload, qos);
}
break;
}
}
}

// FIXME merge this function with the one in MqttV1Transport
private int qosFromReliability(AstarteInterfaceDatastreamMapping mapping) {
switch (mapping.getReliability()) {
case UNIQUE:
return 2;
case GUARANTEED:
return 1;
case UNRELIABLE:
return 0;
}

return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package org.astarteplatform.devicesdk.transport.unknown;

public class UnknownTransportConnectionInfo {
private final String m_clientId;

public UnknownTransportConnectionInfo(String astarteRealm, String deviceId) {
m_clientId = astarteRealm + "/" + deviceId;
}

public String getClientId() {
return m_clientId;
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
# The setting is particularly useful for tweaking memory settings.
org.gradle.jvmargs=-Xmx1536m
java.util.logging.MemoryHandler.push=ALL
version=1.1.0
version=1.1.1
group=org.astarte-platform
org.gradle.parallel=false
# AndroidX package structure to make it clearer which packages are bundled with the
Expand Down