Fluo Tour: Memory limits and self notify code

Tour page 25 of 26

  static Column NC = new Column("ntyf", "sum");
  static Column TOTAL_COL = new Column("sum", "total");
  static Column UPDATE_COL = new Column("sum", "update");
  static Column CONTINUE_COL = new Column("sum", "continue");

  public static class SummingObserver implements StringObserver {

    private int maxToProcess;

    SummingObserver(int maxToProcess) {
      this.maxToProcess = maxToProcess;
    }

    @Override
    public void process(TransactionBase tx, String row, Column col) throws Exception {

      Map<Column, String> colVals = tx.gets(row, TOTAL_COL, CONTINUE_COL);

      int sum = Integer.parseInt(colVals.getOrDefault(TOTAL_COL, "0"));

      // construct a scan range that uses the continue row
      String startRow = colVals.getOrDefault(CONTINUE_COL, row + "/");
      String endRow = row + "/:"; // after the character '9' comes ':'
      CellScanner scanner = tx.scanner().over(new Span(startRow, true, endRow, false)).build();

      int processed = 0;

      for (RowColumnValue rcv : scanner) {
        if (processed >= maxToProcess) {
          // stop processing and set the continue row
          tx.set(row, CONTINUE_COL, rcv.getsRow());
          tx.setWeakNotification(row, col);
          break;
        }
        sum += Integer.parseInt(rcv.getsValue());
        tx.delete(rcv.getRow(), rcv.getColumn());
        processed++;
      }

      System.out.println("sum : " + sum + "  start: " + startRow + "  processed: " + processed);

      tx.set(row, TOTAL_COL, "" + sum);

      // if did not set the continue column and it exists, then delete it
      if (processed < maxToProcess && colVals.containsKey(CONTINUE_COL)) {
        tx.delete(row, CONTINUE_COL);
        // need to start over at the beginning and see if there is new data before the continue
        // column
        tx.setWeakNotification(row, col);
      }
    }
  }

  public static class TourObserverProvider implements ObserverProvider {
    @Override
    public void provide(Registry obsRegistry, Context ctx) {
      int maxToProcess = ctx.getAppConfiguration().getInt("maxToProcess");
      obsRegistry.forColumn(NC, NotificationType.WEAK)
          .useObserver(new SummingObserver(maxToProcess));
    }
  }

  private static void preInit(FluoConfiguration fluoConfig) {
    fluoConfig.getAppConfiguration().setProperty("maxToProcess", 500);
    fluoConfig.setObserverProvider(TourObserverProvider.class);
  }

  private static void exercise(MiniFluo mini, FluoClient client) {
    try (LoaderExecutor le = client.newLoaderExecutor()) {
      Random r = new Random(42);
      for (int i = 0; i < 5000; i++) {
        // The Loader interface only has one function and can therefore be written as a lambda
        // below.
        le.execute((tx, ctx) -> {
          String row = "counter001/" + String.format("%07d", r.nextInt(10000000));
          int curVal = Integer.parseInt(tx.gets(row, UPDATE_COL, "0"));
          tx.set(row, UPDATE_COL, curVal + 1 + "");
          tx.setWeakNotification("counter001", NC);
        });
      }
    }

    mini.waitForObservers();

    try (Snapshot snap = client.newSnapshot()) {
      System.out.println("final sum : " + snap.gets("counter001", TOTAL_COL));
    }
  }

The code above will print something like the following.

$ mvn -q clean compile exec:java
Starting MiniFluo ... started.
sum : 500  start: counter001/  processed: 500
sum : 891  start: counter001/7945963  processed: 390
sum : 1391  start: counter001/  processed: 500
sum : 1891  start: counter001/2938489  processed: 500
sum : 2391  start: counter001/5210523  processed: 500
sum : 2892  start: counter001/6912090  processed: 500
sum : 3392  start: counter001/8410312  processed: 500
sum : 3398  start: counter001/9991522  processed: 6
sum : 3898  start: counter001/  processed: 500
sum : 4398  start: counter001/1824962  processed: 500
sum : 4899  start: counter001/4076664  processed: 500
sum : 5000  start: counter001/6993690  processed: 101
sum : 5000  start: counter001/  processed: 0
final sum : 5000

< 25 / 26 >