Serve a Java App#

To use Java Ray Serve, you need the following dependency in your pom.xml.

<dependency>
  <groupId>io.ray</groupId>
  <artifactId>ray-serve</artifactId>
  <version>${ray.version}</version>
  <scope>provided</scope>
</dependency>

NOTE: After installing Ray with Python, the local environment includes the Java jar of Ray Serve. The provided scope ensures that you can compile the Java code using Ray Serve without version conflicts when you deploy on the cluster.

Example model#

This example use case is a production workflow of a financial application. The application needs to compute the best strategy to interact with different banks for a single task.

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

public class Strategy {

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        results.addAll(calcBankIndicators(time, bank, indicators));
      }
    }
    return results;
  }

  public List<String> calcBankIndicators(Long time, String bank, List<String> indicators) {
    List<String> results = new ArrayList<>();
    for (String indicator : indicators) {
      results.add(calcIndicator(time, bank, indicator));
    }
    return results;
  }

  public String calcIndicator(Long time, String bank, String indicator) {
    // do bank data calculation
    return bank + "-" + indicator + "-" + time; // Demo;
  }
}

This example uses the Strategy class to calculate the indicators of a number of banks.

  • The calc method is the entry of the calculation. The input parameters are the time interval of calculation and the map of the banks and their indicators. The calc method contains a two-tier for loop, traversing each indicator list of each bank, and calling the calcBankIndicators method to calculate the indicators of the specified bank.

  • There is another layer of for loop in the calcBankIndicators method, which traverses each indicator, and then calls the calcIndicator method to calculate the specific indicator of the bank.

  • The calcIndicator method is a specific calculation logic based on the bank, the specified time interval and the indicator.

This code uses the Strategy class:

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StrategyCalc {

  public static void main(String[] args) {
    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    Strategy strategy = new Strategy();
    List<String> results = strategy.calc(time, banksAndIndicators);

    System.out.println(results);
  }
}

When the scale of banks and indicators expands, the three-tier for loop slows down the calculation. Even if you use the thread pool to calculate each indicator in parallel, you may encounter a single machine performance bottleneck. Moreover, you can’t use this Strategy object as a resident service.

Converting to a Ray Serve Deployment#

Through Ray Serve, you can deploy the core computing logic of Strategy as a scalable distributed computing service.

First, extract the indicator calculation of each institution into a separate StrategyOnRayServe class:

public class StrategyOnRayServe {

  public String calcIndicator(Long time, String bank, String indicator) {
    // do bank data calculation
    return bank + "-" + indicator + "-" + time; // Demo;
  }
}

Next, start the Ray Serve runtime and deploy StrategyOnRayServe as a deployment.

  public void deploy() {
    Serve.start(null);

    Application deployment =
        Serve.deployment()
            .setName("strategy")
            .setDeploymentDef(StrategyOnRayServe.class.getName())
            .setNumReplicas(4)
            .bind();
    Serve.run(deployment);
  }

The Deployment.create makes a Deployment object named strategy. After executing Deployment.deploy, the Ray Serve instance deploys this strategy deployment with four replicas, and you can access it for distributed parallel computing.

Testing the Ray Serve Deployment#

You can test the strategy deployment using RayServeHandle inside Ray:

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    Deployment deployment = Serve.getDeployment("strategy");

    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          results.add(
              (String)
                  deployment
                      .getHandle()
                      .method("calcIndicator")
                      .remote(time, bank, indicator)
                      .result());
        }
      }
    }
    return results;
  }

This code executes the calculation of each bank’s indicator serially, and sends it to Ray for execution. You can make the calculation concurrent, which not only improves the calculation efficiency, but also solves the bottleneck of single machine.

  public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {
    Deployment deployment = Serve.getDeployment("strategy");

    List<String> results = new ArrayList<>();
    List<DeploymentResponse> responses = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          responses.add(
              deployment.getHandle().method("calcIndicator").remote(time, bank, indicator));
        }
      }
    }
    for (DeploymentResponse response : responses) {
      results.add((String) response.result());
    }
    return results;
  }

You can use StrategyCalcOnRayServe like the example in the main method:

  public static void main(String[] args) {

    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    StrategyCalcOnRayServe strategy = new StrategyCalcOnRayServe();
    strategy.deploy();
    List<String> results = strategy.parallelCalc(time, banksAndIndicators);

    System.out.println(results);
  }

Calling Ray Serve Deployment with HTTP#

Another way to test or call a deployment is through the HTTP request. However, two limitations exist for the Java deployments:

  • Only the call method of the user class can process the HTTP requests.

  • The call method can only have one input parameter, and the type of the input parameter and the returned value can only be String.

If you want to call the strategy deployment with HTTP, then you can rewrite the class like this code:

import com.google.gson.Gson;

public class HttpStrategyOnRayServe {

  static class BankIndicator {
    long time;
    String bank;
    String indicator;
  }

  private Gson gson = new Gson();

  public String call(String dataJson) {
    BankIndicator data = gson.fromJson(dataJson, BankIndicator.class);
    // do bank data calculation
    return data.bank + "-" + data.indicator + "-" + data.time; // Demo;
  }
}

After deploying this deployment, you can access it with the curl command:

curl -d '{"time":1641038674, "bank":"test_bank", "indicator":"test_indicator"}' http://127.0.0.1:8000/strategy

You can also access it using HTTP Client in Java code:

  private Gson gson = new Gson();

  public String httpCalc(Long time, String bank, String indicator) {
    Map<String, Object> data = new HashMap<>();
    data.put("time", time);
    data.put("bank", bank);
    data.put("indicator", indicator);

    String result;
    try {
      result =
          Request.post("http://127.0.0.1:8000/http-strategy")
              .bodyString(gson.toJson(data), null)
              .execute()
              .returnContent()
              .asString();
    } catch (IOException e) {
      result = "error";
    }

    return result;
  }

The example of strategy calculation using HTTP to access deployment is as follows:

  public List<String> calc(Long time, Map<String, List<List<String>>> banksAndIndicators) {

    List<String> results = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          results.add(httpCalc(time, bank, indicator));
        }
      }
    }
    return results;
  }

You can also rewrite this code to support concurrency:

  private ExecutorService executorService = Executors.newFixedThreadPool(4);

  public List<String> parallelCalc(Long time, Map<String, List<List<String>>> banksAndIndicators) {

    List<String> results = new ArrayList<>();
    List<Future<String>> futures = new ArrayList<>();
    for (Entry<String, List<List<String>>> e : banksAndIndicators.entrySet()) {
      String bank = e.getKey();
      for (List<String> indicators : e.getValue()) {
        for (String indicator : indicators) {
          futures.add(executorService.submit(() -> httpCalc(time, bank, indicator)));
        }
      }
    }
    for (Future<String> future : futures) {
      try {
        results.add(future.get());
      } catch (InterruptedException | ExecutionException e1) {
        results.add("error");
      }
    }
    return results;
  }

Finally, the complete usage of HttpStrategyCalcOnRayServe is like this code:

  public static void main(String[] args) {

    long time = System.currentTimeMillis();
    String bank1 = "demo_bank_1";
    String bank2 = "demo_bank_2";
    String indicator1 = "demo_indicator_1";
    String indicator2 = "demo_indicator_2";
    Map<String, List<List<String>>> banksAndIndicators = new HashMap<>();
    banksAndIndicators.put(bank1, Arrays.asList(Arrays.asList(indicator1, indicator2)));
    banksAndIndicators.put(
        bank2, Arrays.asList(Arrays.asList(indicator1), Arrays.asList(indicator2)));

    HttpStrategyCalcOnRayServe strategy = new HttpStrategyCalcOnRayServe();
    strategy.deploy();
    List<String> results = strategy.parallelCalc(time, banksAndIndicators);

    System.out.println(results);
  }