Skip to content

Commit

Permalink
Merge branch 'master' of github.com:thingsboard/thingsboard
Browse files Browse the repository at this point in the history
  • Loading branch information
ikulikov committed Nov 7, 2023
2 parents e069b66 + f08e7e5 commit 9afc57b
Show file tree
Hide file tree
Showing 19 changed files with 423 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ private void onAlarmSubUpdate(TenantId tenantId, EntityId entityId, AlarmInfo al
if (subInfo != null) {
log.trace("[{}][{}] Handling alarm update {}: {}", tenantId, entityId, alarm, deleted);
for (Map.Entry<String, TbSubscriptionsInfo> entry : subInfo.getSubs().entrySet()) {
if (entry.getValue().notifications) {
if (entry.getValue().alarms) {
onAlarmSubUpdate(entry.getKey(), entityId, alarm, deleted);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProto
final String certChain = msg.getCertificateChain();
result = handlerExecutor.submit(() -> validateOrCreateDeviceX509Certificate(certChain));
} else if (transportApiRequestMsg.hasGetOrCreateDeviceRequestMsg()) {
result = handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg());
result = handlerExecutor.submit(() -> handle(transportApiRequestMsg.getGetOrCreateDeviceRequestMsg()));
} else if (transportApiRequestMsg.hasEntityProfileRequestMsg()) {
result = handle(transportApiRequestMsg.getEntityProfileRequestMsg());
} else if (transportApiRequestMsg.hasLwM2MRequestMsg()) {
Expand Down Expand Up @@ -223,7 +223,6 @@ public ListenableFuture<TbProtoQueueMsg<TransportApiResponseMsg>> handle(TbProto
}

private TransportApiResponseMsg validateCredentials(String credentialsId, DeviceCredentialsType credentialsType) {
//TODO: Make async and enable caching
DeviceCredentials credentials = deviceCredentialsService.findDeviceCredentialsByCredentialsId(credentialsId);
if (credentials != null && credentials.getCredentialsType() == credentialsType) {
return getDeviceInfo(credentials);
Expand Down Expand Up @@ -336,76 +335,74 @@ private BasicCredentialsValidationResult validateMqttCredentials(TransportProtos
return VALID;
}

private ListenableFuture<TransportApiResponseMsg> handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
private TransportApiResponseMsg handle(GetOrCreateDeviceFromGatewayRequestMsg requestMsg) {
DeviceId gatewayId = new DeviceId(new UUID(requestMsg.getGatewayIdMSB(), requestMsg.getGatewayIdLSB()));
ListenableFuture<Device> gatewayFuture = deviceService.findDeviceByIdAsync(TenantId.SYS_TENANT_ID, gatewayId);
return Futures.transform(gatewayFuture, gateway -> {
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
deviceCreationLock.lock();
try {
Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
if (device == null) {
TenantId tenantId = gateway.getTenantId();
device = new Device();
device.setTenantId(tenantId);
device.setName(requestMsg.getDeviceName());
device.setType(requestMsg.getDeviceType());
device.setCustomerId(gateway.getCustomerId());
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());

device.setDeviceProfileId(deviceProfile.getId());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
device.setAdditionalInfo(additionalInfo);
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, null);
device = savedDevice;

relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(gateway.getId(), device.getId(), "Created"));

TbMsgMetaData metaData = new TbMsgMetaData();
CustomerId customerId = gateway.getCustomerId();
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("gatewayId", gatewayId.toString());
Device gateway = deviceService.findDeviceById(TenantId.SYS_TENANT_ID, gatewayId);
Lock deviceCreationLock = deviceCreationLocks.computeIfAbsent(requestMsg.getDeviceName(), id -> new ReentrantLock());
deviceCreationLock.lock();
try {
Device device = deviceService.findDeviceByTenantIdAndName(gateway.getTenantId(), requestMsg.getDeviceName());
if (device == null) {
TenantId tenantId = gateway.getTenantId();
device = new Device();
device.setTenantId(tenantId);
device.setName(requestMsg.getDeviceName());
device.setType(requestMsg.getDeviceType());
device.setCustomerId(gateway.getCustomerId());
DeviceProfile deviceProfile = deviceProfileCache.findOrCreateDeviceProfile(gateway.getTenantId(), requestMsg.getDeviceType());

device.setDeviceProfileId(deviceProfile.getId());
ObjectNode additionalInfo = JacksonUtil.newObjectNode();
additionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
device.setAdditionalInfo(additionalInfo);
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, null);
device = savedDevice;

relationService.saveRelation(TenantId.SYS_TENANT_ID, new EntityRelation(gateway.getId(), device.getId(), "Created"));

TbMsgMetaData metaData = new TbMsgMetaData();
CustomerId customerId = gateway.getCustomerId();
if (customerId != null && !customerId.isNullUid()) {
metaData.putValue("customerId", customerId.toString());
}
metaData.putValue("gatewayId", gatewayId.toString());

DeviceId deviceId = device.getId();
JsonNode entityNode = JacksonUtil.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, JacksonUtil.toString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
} else {
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, device);
}
DeviceId deviceId = device.getId();
JsonNode entityNode = JacksonUtil.valueToTree(device);
TbMsg tbMsg = TbMsg.newMsg(TbMsgType.ENTITY_CREATED, deviceId, customerId, metaData, TbMsgDataType.JSON, JacksonUtil.toString(entityNode));
tbClusterService.pushMsgToRuleEngine(tenantId, deviceId, tbMsg, null);
} else {
JsonNode deviceAdditionalInfo = device.getAdditionalInfo();
if (deviceAdditionalInfo == null) {
deviceAdditionalInfo = JacksonUtil.newObjectNode();
}
GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
.setDeviceInfo(getDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
if (deviceAdditionalInfo.isObject() &&
(!deviceAdditionalInfo.has(DataConstants.LAST_CONNECTED_GATEWAY)
|| !gatewayId.toString().equals(deviceAdditionalInfo.get(DataConstants.LAST_CONNECTED_GATEWAY).asText()))) {
ObjectNode newDeviceAdditionalInfo = (ObjectNode) deviceAdditionalInfo;
newDeviceAdditionalInfo.put(DataConstants.LAST_CONNECTED_GATEWAY, gatewayId.toString());
Device savedDevice = deviceService.saveDevice(device);
tbClusterService.onDeviceUpdated(savedDevice, device);
}
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceResponseMsg(builder.build())
.build();
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e);
throw new RuntimeException(e);
} finally {
deviceCreationLock.unlock();
}
}, dbCallbackExecutorService);
GetOrCreateDeviceFromGatewayResponseMsg.Builder builder = GetOrCreateDeviceFromGatewayResponseMsg.newBuilder()
.setDeviceInfo(getDeviceInfoProto(device));
DeviceProfile deviceProfile = deviceProfileCache.get(device.getTenantId(), device.getDeviceProfileId());
if (deviceProfile != null) {
builder.setProfileBody(ByteString.copyFrom(dataDecodingEncodingService.encode(deviceProfile)));
} else {
log.warn("[{}] Failed to find device profile [{}] for device. ", device.getId(), device.getDeviceProfileId());
}
return TransportApiResponseMsg.newBuilder()
.setGetOrCreateDeviceResponseMsg(builder.build())
.build();
} catch (JsonProcessingException e) {
log.warn("[{}] Failed to lookup device by gateway id and name: [{}]", gatewayId, requestMsg.getDeviceName(), e);
throw new RuntimeException(e);
} finally {
deviceCreationLock.unlock();
}
}

private ListenableFuture<TransportApiResponseMsg> handle(ProvisionDeviceRequestMsg requestMsg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,47 @@ public void beforeTest() throws Exception {

loginSysAdmin();

ObjectNode config = JacksonUtil.newObjectNode();

ObjectNode http = JacksonUtil.newObjectNode();
http.put("enabled", true);
http.put("host", "");
http.put("port", 8080);
config.set("http", http);

ObjectNode https = JacksonUtil.newObjectNode();
https.put("enabled", true);
https.put("host", "");
https.put("port", 444);
config.set("https", https);

ObjectNode mqtt = JacksonUtil.newObjectNode();
mqtt.put("enabled", true);
mqtt.put("host", "");
mqtt.put("port", 1883);
config.set("mqtt", mqtt);

ObjectNode mqtts = JacksonUtil.newObjectNode();
mqtts.put("enabled", true);
mqtts.put("host", "");
mqtts.put("port", 8883);
config.set("mqtts", mqtts);

ObjectNode coap = JacksonUtil.newObjectNode();
coap.put("enabled", true);
coap.put("host", "");
coap.put("port", 5683);
config.set("coap", coap);

ObjectNode coaps = JacksonUtil.newObjectNode();
coaps.put("enabled", true);
coaps.put("host", "");
coaps.put("port", 5684);
config.set("coaps", coaps);

AdminSettings adminSettings = doGet("/api/admin/settings/connectivity", AdminSettings.class);
JsonNode connectivity = adminSettings.getJsonValue();

((ObjectNode)connectivity.get("http")).put("port", 8080);
((ObjectNode)connectivity.get("http")).put("enabled", true);
((ObjectNode)connectivity.get("https")).put("enabled", true);
((ObjectNode)connectivity.get("https")).put("port", 444);
((ObjectNode)connectivity.get("mqtt")).put("enabled", true);
((ObjectNode)connectivity.get("mqtts")).put("enabled", true);
((ObjectNode)connectivity.get("coap")).put("enabled", true);
((ObjectNode)connectivity.get("coaps")).put("enabled", true);
doPost("/api/admin/settings", adminSettings);
adminSettings.setJsonValue(config);
doPost("/api/admin/settings", adminSettings).andExpect(status().isOk());

Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardExecutors;
import org.thingsboard.server.common.data.AdminSettings;
import org.thingsboard.server.common.data.Device;
Expand Down Expand Up @@ -66,16 +67,47 @@ public void beforeTest() throws Exception {

loginSysAdmin();

ObjectNode config = JacksonUtil.newObjectNode();

ObjectNode http = JacksonUtil.newObjectNode();
http.put("enabled", true);
http.put("host", "");
http.put("port", 80);
config.set("http", http);

ObjectNode https = JacksonUtil.newObjectNode();
https.put("enabled", true);
https.put("host", "");
https.put("port", 443);
config.set("https", https);

ObjectNode mqtt = JacksonUtil.newObjectNode();
mqtt.put("enabled", false);
mqtt.put("host", "");
mqtt.put("port", 1883);
config.set("mqtt", mqtt);

ObjectNode mqtts = JacksonUtil.newObjectNode();
mqtts.put("enabled", false);
mqtts.put("host", "");
mqtts.put("port", 8883);
config.set("mqtts", mqtts);

ObjectNode coap = JacksonUtil.newObjectNode();
coap.put("enabled", false);
coap.put("host", "");
coap.put("port", 5683);
config.set("coap", coap);

ObjectNode coaps = JacksonUtil.newObjectNode();
coaps.put("enabled", false);
coaps.put("host", "");
coaps.put("port", 5684);
config.set("coaps", coaps);

AdminSettings adminSettings = doGet("/api/admin/settings/connectivity", AdminSettings.class);
JsonNode connectivity = adminSettings.getJsonValue();

((ObjectNode) connectivity.get("http")).put("port", 80);
((ObjectNode) connectivity.get("https")).put("enabled", true);
((ObjectNode) connectivity.get("mqtt")).put("enabled", false);
((ObjectNode) connectivity.get("mqtts")).put("enabled", false);
((ObjectNode) connectivity.get("coaps")).put("enabled", false);
((ObjectNode) connectivity.get("coap")).put("enabled", false);
doPost("/api/admin/settings", adminSettings);
adminSettings.setJsonValue(config);
doPost("/api/admin/settings", adminSettings).andExpect(status().isOk());

Tenant tenant = new Tenant();
tenant.setTitle("My tenant");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,13 @@ public void testFindTenantWidgetsBundlesByPageLink() throws Exception {
Collections.sort(loadedWidgetsBundles2, idComparator);

Assert.assertEquals(tenantWidgetsBundles, loadedWidgetsBundles2);

// cleanup
loginSysAdmin();
for (WidgetsBundle sysWidgetsBundle : sysWidgetsBundles) {
doDelete("/api/widgetsBundle/" + sysWidgetsBundle.getId().getId().toString())
.andExpect(status().isOk());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.thingsboard.server.common.data.User;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
import org.thingsboard.server.common.data.notification.NotificationType;
import org.thingsboard.server.common.data.notification.rule.trigger.config.EntityActionNotificationRuleTriggerConfig;
import org.thingsboard.server.common.data.notification.targets.NotificationTarget;
import org.thingsboard.server.common.data.notification.targets.platform.AllUsersFilter;
Expand Down Expand Up @@ -144,6 +145,7 @@ public void whenDeletingTenant_thenDeleteNotificationTarget() throws Exception {
notificationTarget.setConfiguration(targetConfig);
save(notificationTarget, status().isOk());
assertThat(notificationTargetDao.findByTenantIdAndPageLink(differentTenantId, new PageLink(10)).getData()).isNotEmpty();
assertThat(notificationTargetDao.findByTenantIdAndSupportedNotificationTypeAndPageLink(differentTenantId, NotificationType.GENERAL, new PageLink(10)).getData()).isNotEmpty();

deleteDifferentTenant();
assertThat(notificationTargetDao.findByTenantIdAndPageLink(differentTenantId, new PageLink(10)).getData()).isEmpty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ public void testDelete_consumerPerPartition() {
verify(consumer2, never()).unsubscribe();
int msgCount = totalConsumedMsgs.get();

await().atLeast(4, TimeUnit.SECONDS) // based on topicDeletionDelayInSec
await().atLeast(2, TimeUnit.SECONDS) // based on topicDeletionDelayInSec(5) = 5 - ( 3 seconds the code may execute starting consumerManager.delete() call)
.atMost(7, TimeUnit.SECONDS)
.untilAsserted(() -> {
partitions.stream()
Expand Down Expand Up @@ -498,7 +498,7 @@ public void testDelete_singleConsumer() {
verify(consumer, never()).unsubscribe();
int msgCount = totalConsumedMsgs.get();

await().atLeast(4, TimeUnit.SECONDS)
await().atLeast(2, TimeUnit.SECONDS) // based on topicDeletionDelayInSec(5) = 5 - ( 3 seconds the code may execute starting consumerManager.delete() call)
.atMost(7, TimeUnit.SECONDS)
.untilAsserted(() -> {
partitions.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ public class DeviceCacheKey implements Serializable {
private final DeviceId deviceId;
private final String deviceName;

public DeviceCacheKey(DeviceId deviceId) {
this(null, deviceId, null);
}

public DeviceCacheKey(TenantId tenantId, DeviceId deviceId) {
this(tenantId, deviceId, null);
}
Expand All @@ -44,11 +48,12 @@ public DeviceCacheKey(TenantId tenantId, String deviceName) {

@Override
public String toString() {
if (deviceId != null) {
return tenantId + "_" + deviceId;
} else {
if (deviceId == null) {
return tenantId + "_n_" + deviceName;
} else if (tenantId == null) {
return deviceId.toString();
} else {
return tenantId + "_" + deviceId;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,9 @@ private DateTimeFormatOptions getDateFormattingOptions(String optionsStr) {
return opt;
}

public long now() {
public static long now() {
return Instant.now().toEpochMilli();
}

public long parseSecond() {
return instant.getEpochSecond();
}
Expand Down
Loading

0 comments on commit 9afc57b

Please sign in to comment.