How to Run a Method Asynchronously in a Reactive Chain in Spring WebFlux?

RMAG news

I’m trying to execute a method asynchronously within an existing reactive chain in my Project Reactor-based application. The method doUpdateLayoutInAsync is intended to perform a heavy background task, but it seems like my approach isn’t working as expected. Here’s my current implementation:

public Mono<Boolean> publishPackage(String branchedPackageId) {
PackagePublishingMetaDTO publishingMetaDTO = new PackagePublishingMetaDTO();
publishingMetaDTO.setPublishEvent(true);

return packageRepository
.findById(branchedPackageId, packagePermission.getPublishPermission())
.switchIfEmpty(Mono.error(new AppsmithException(
AppsmithError.ACL_NO_RESOURCE_FOUND, FieldName.PACKAGE_ID, branchedPackageId)))
.flatMap(originalPackage -> {
String nextVersion = PackageUtils.getNextVersion(originalPackage.getVersion());

Package packageToBePublished = constructPackageToBePublished(originalPackage);

originalPackage.setVersion(nextVersion);
originalPackage.setLastPublishedAt(packageToBePublished.getLastPublishedAt());
publishingMetaDTO.setOriginPackageId(branchedPackageId);
publishingMetaDTO.setWorkspaceId(originalPackage.getWorkspaceId());

Mono<Void> unsetCurrentLatestMono = packageRepository.unsetLatestPackageByOriginId(originalPackage.getId(), null);
Mono<Package> saveOriginalPackage = packageRepository.save(originalPackage);
Mono<Package> savePackageToBePublished = packageRepository.save(packageToBePublished);

return unsetCurrentLatestMono
.then(Mono.zip(saveOriginalPackage, savePackageToBePublished))
.flatMap(tuple2 -> {
Package publishedPackage = tuple2.getT2();
publishingMetaDTO.setPublishedPackage(publishedPackage);

return modulePackagePublishableService
.publishEntities(publishingMetaDTO)
.flatMap(publishedModules -> {
if (publishedModules.isEmpty()) {
return Mono.error(new AppsmithException(
AppsmithError.PACKAGE_CANNOT_BE_PUBLISHED,
originalPackage.getUnpublishedPackage().getName()));
}
return moduleInstancePackagePublishableService
.publishEntities(publishingMetaDTO)
.then(Mono.defer(() ->
newActionPackagePublishableService.publishEntities(publishingMetaDTO))
.then(Mono.defer(() ->
actionCollectionPackagePublishableService
.publishEntities(publishingMetaDTO))));
})
.then(Mono.defer(() -> autoUpgradeService.handleAutoUpgrade(publishingMetaDTO)));
})
.as(transactionalOperator::transactional)
.then(Mono.defer(() -> doUpdateLayoutInAsync(publishingMetaDTO)));
});
}

private Mono<Boolean> doUpdateLayoutInAsync(PackagePublishingMetaDTO publishingMetaDTO) {
Mono<List<String>> updateLayoutsMono = Flux.fromIterable(publishingMetaDTO.getAutoUpgradedPageIds())
.flatMap(pageId -> updateLayoutService
.updatePageLayoutsByPageId(pageId)
.onErrorResume(throwable -> {
log.warn(“Update layout failed for pageId: {} with error: {}”, pageId, throwable.getMessage());
return Mono.just(pageId);
}))
.collectList();

// Running the updateLayoutsMono task asynchronously
updateLayoutsMono.subscribeOn(Schedulers.boundedElastic()).subscribe();

return Mono.just(Boolean.TRUE);
}

Issue: I want doUpdateLayoutInAsync to run in the background while the rest of the reactive chain completes. However, the method seems to execute synchronously, and the reactive chain does not continue as expected.

Question: How can I ensure that doUpdateLayoutInAsync runs asynchronously and does not block the continuation of the reactive chain?

Please follow and like us:
Pin Share