In this workshop we will use Java & Spring Cloud Stream (SCS) to create Event-Driven Applications with PubSub+ in PWS.
After creating a few by hand we will see how the AsyncAPI specification can be used to define your event-driven microservices and even generate a microservice!

story_section1

story_section2

Developer IDE & Code Access

IDE Setup

The recommended IDE for the workshop is Spring Tools Suite (STS) Download Here. STS comes with some niceties, such as autodeploy, when used with spring-boot-devtools. Participants can of course use another IDE if preferred. It is also recommended that you begin the workshop with an empty STS workspace to avoid any unforeseen issues with existing projects/configurations/code.

Required libraries:

Code Access

After importing everything you should see the following projects in STS:

Create and/or Verify access to a Solace PubSub+ Service

PubSub+ Service in Solace Cloud

If you want to stand up your Solace PubSub+ Service in Solace Cloud go ahead and login or signup at the Cloud Signup Page. Note that a free tier is available and will work for this workshop. THIS IS THE PREFERRED OPTION FOR THIS WORKSHOP

solace_credentails

Local Solace PubSub+ Instance

When developing your application, you may want to test using a local instance of the Solace PubSub+ Event Broker. Refer to the Solace Docker Getting Started Guide to get you up and running quickly with a broker instance running in Docker. IF CHOOSING THIS OPTION YOU WILL HAVE TO RUN YOUR APPS LOCALLY TO USE THE LOCAL BROKER

story_section3

Application Architecture

At the end of this section we will have created the apps below!
The Source will send out tweets that will be received by the marketing Sink.

1 Application Architecture

Deploying a Source

Before our company can do anything with the tweets we have to start to receive an incoming stream of them! Let's get started! Please navigate to the "02-scs-source-tweets" project in your IDE.

Learn the Project Structure

Before we take a look at the code, let's take a quick look at the structure of a Spring Cloud Streams project.

SCS Project Structure

SCS Maven Dependencies

SCS Sample Implementation

SCS Application Config File

Deploy our 02-scs-source-tweets app

story_section3_g2

To do this we will deploy a sink app. Recall that a sink app binds to an INPUT channel.

Deploying a Sink

story_section4

Configure a Cloud Foundry Target in STS

STS provides integrated support for deploying, running and debugging your SCS services in PWS. In the Boot Dashboard view, configure a connection to your PWS deployment by clicking the "+" button as seen in the image below.

Configuring a connection to Cloud Foundry in STS(use button circled in red)

Follow the dialog prompts and fill in the username / password associated with your PWS account.

Cloud Foundry CLI Setup

Stop local microservices!

Stop the source & sink that you currently have running locally as we are going to deploy them to PWS using the same PubSub+ instance and want to avoid duplicate messages.

Deploy the Source to PWS

Deploy the Sink to PWS

story_section5_g1

Application Architecture

At the end of this section we will have added the Factory Tweet Board Sink.

2 Application Architecture

Creating the Tweet Board Sink

We obviously don't have a giant LED board that we can use so we're going to settle for logging the tweets as they come in.

Deploying the Tweet Board

At this point we have created our "04-scs-sink-tweetboard" application and it needs to be deployed.

story_section5_g2

So far in this workshop we have created source or sink applications. In this section we will create our first processor.

story_section6_g1

Application Architecture

In order to meet our new goal we will add the Features processor and a new Sink as seen below.

3 Application Architecture

Create the Feature Processor

story_section6_g2

Processor with a Custom Binding Interface

Processor using Dynamic Destinations

Create the Feature Sink for the Boss

Update the Tweet Board Subscription

Note that our processor that we created earlier in this lab publishes to multiple topics essentially splitting our feed into two. Due to our new requirements to not show new features on the twitter board we need to update that sink appropriately.

In this section we are going to take a glimpse into the future of Event-Driven microservices by using AsyncAPI to generate a SCS Processor microservice. AsyncAPI is the industry standard for defining asynchronous APIs - more info can be found on the specification website..

story_section7

Application Architecture

We're going to add a "No Yelling" processor in our event driven architecture in order to meet this new need.

4 Application Architecture

Clone the Spring Cloud Stream AsyncAPI Code Generator

Use https

$ git clone https://github.com/jschabowsky/AsyncAPI-Spring-Cloud-Streams-Generator.git

OR Use SSH

$ git clone git@github.com:jschabowsky/AsyncAPI-Spring-Cloud-Streams-Generator.git

OR Navigate to https://github.com/jschabowsky/AsyncAPI-Spring-Cloud-Streams-Generator, click "Clone or download" -> "Download ZIP" & unzip in your desired directory

Find the NoYellingProcessor AsyncAPI Specification File

This is an example AsyncAPI Specification file that defines our event-driven microservice.
It specifies information about the application such as servers where you can interact with the microservice, channels which messages are exchanged over, and components such as the messages that are expected and the schemas which define them.

In the git repo that you cloned you'll find a 08-yellingArtifacts directory. Navigate to that directory (keep in mind this directory is not a maven project so it wouldn't have been imported into your IDE as one) and you should find a file called NoYellingProcessor.yaml
Open the file up and take a look at how the microservice is defined using the AsyncAPI specification.

Generate your SCS Project Skeleton

Now that you have cloned the necessary artifacts let's go ahead and generate the project skeleton!

Insert Business Logic using Spring Cloud Function

Open the ScsprocessoryellingApplication class

Add the spring.cloud.stream.function.definition argument to the SpringApplication.run command in the main method as seen below. This specifies which functional bean to bind to the external destination(s) exposed by the bindings.

SpringApplication.run(ScsprocessoryellingApplication.class, "--spring.cloud.stream.function.definition=handleInboundTweet");

Update the handleInboundTweet method to change uppercase letters to lowercae letters in the tweet text using the reactive programming model; an example of this can be seen in the code snippet below.

Note that although we still have the @EnableBinding(Processor.class) annotation we are now binding a bean of type "java.util.function.Function" to the external destinations exposed by the bindings by providing the spring.cloud.stream.function.definition property.

package com.solace.spring.cloud.streams.test;

import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.context.annotation.Bean;

import com.solace.spring.cloud.streams.test.types.Tweet;

import reactor.core.publisher.Flux;

@SpringBootApplication
@EnableBinding(Processor.class)
public class ScsprocessoryellingApplication {
    
private static final Logger logger = LoggerFactory.getLogger(ScsprocessoryellingApplication.class);

public static void main(String[] args) {
    // Defining the reactive function to bind to the INPUT channel of the Processor
    SpringApplication.run(ScsprocessoryellingApplication.class, "--spring.cloud.stream.function.definition=handleInboundTweet");
}
    
@Bean
public Function<Flux<Tweet>, Flux<Tweet>> handleInboundTweet() {
    return flux -> flux
            .doOnNext(t ->logger.info("====Tweet BEFORE mapping: " + t.toString()))
            .map(t -> { t.setText(t.getText().toLowerCase());
                return t;
                })
            .doOnNext(t ->logger.info("++++Tweet AFTER mapping: " + t.toString()))
            ;
    
}

Deploy to PWS

At this point our microservice is ready to run, but in order to deploy to PWS we need to create a manifest.yml file at the top level of the project.
Create a manifest.yml file with the contents below - make sure to change <ATTENDEE_NAME> to your username.

---
applications:
- name: scsprocessoryelling-<ATTENDEE_NAME>
  memory: 1024M
  path: target/scsprocessoryelling-0.0.1-SNAPSHOT.jar

At this point we have created our no yelling microservice and it needs to be deployed.

Pretty cool huh? With AsyncAPI & the Spring Cloud Stream generator all you had to do was enter your business logic and your event-driven microservice was ready to go!

story_section8

Application Architecture

A processor will be added to our architecture in order to convert negative words to positive ones.

5 Application Architecture

Create the Processor

Let's get started and hopefully have a bit of fun!

Update the Tweet Board Subscription

story_section9

Application Architecture

To meet this new requirement we are going to add the MQTT Web App shown in the diagram below:

6 Application Architecture

Obtain PubSub+ Credentials for an App that can't use the Cloud Connector & Auto-config

IF PARTICIPATING IN AN INSTRUCTOR LED WORKSHOP THE INSTRUCTOR WILL PERFORM THIS SECTION. YOU ARE WELCOME TO RUN LOCALLY IF YOU WOULD LIKE

Create the Web App

IF PARTICIPATING IN AN INSTRUCTOR LED WORKSHOP THE INSTRUCTOR WILL PERFORM THIS SECTION. YOU ARE WELCOME TO RUN LOCALLY IF YOU WOULD LIKE

Review

story_section10

Continued learning topics:

This course was just an introduction to Spring Cloud Streams, but we've included some resources below if you're interested in learning more about it or some of the features that complement it! Happy Learning :)