Background
The Export Queue Recipe provides a generic foundation for building export mechanism to any
external data store. The AccumuloExporter provides an Exporter for writing to
Accumulo. AccumuloExporter is located in the fluo-recipes-accumulo
module and provides the
following functionality:
- Safely batches writes to Accumulo made by multiple transactions exporting data.
- Stores Accumulo connection information in Fluo configuration, making it accessible by Export Observers running on other nodes.
- Provides utility code that make it easier and shorter to code common Accumulo export patterns.
Example Use
Exporting to Accumulo is easy. Follow the steps below:
-
First, implement AccumuloTranslator. Your implementation translates exported objects to Accumulo Mutations. For example, the
SimpleTranslator
class below translates String key/values and into mutations for Accumulo. This step is optional, a lambda could be used in step 3 instead of creating a class.public class SimpleTranslator implements AccumuloTranslator<String,String> { @Override public void translate(SequencedExport<String, String> export, Consumer<Mutation> consumer) { Mutation m = new Mutation(export.getKey()); m.put("cf", "cq", export.getSequence(), export.getValue()); consumer.accept(m); } }
-
Configure an
ExportQueue
and the export table prior to initializing Fluo.FluoConfiguration fluoConfig = ...; String instance = // Name of accumulo instance exporting to String zookeepers = // Zookeepers used by Accumulo instance exporting to String user = // Accumulo username, user that can write to exportTable String password = // Accumulo user password String exportTable = // Name of table to export to // Set properties for table to export to in Fluo app configuration. AccumuloExporter.configure(EXPORT_QID).instance(instance, zookeepers) .credentials(user, password).table(exportTable).save(fluoConfig); // Set properties for export queue in Fluo app configuration ExportQueue.configure(EXPORT_QID).keyType(String.class).valueType(String.class) .buckets(119).save(fluoConfig); // Initialize Fluo using fluoConfig
-
In the applications
ObserverProvider
, register an observer that will process exports and write them to Accumulo using AccumuloExporter. Also, register observers that add to the export queue.public class MyObserverProvider implements ObserverProvider { @Override public void provide(Registry obsRegistry, Context ctx) { SimpleConfiguration appCfg = ctx.getAppConfiguration(); ExportQueue<String, String> expQ = ExportQueue.getInstance(EXPORT_QID, appCfg); // Register observer that will processes entries on export queue and write them to the Accumulo // table configured earlier. SimpleTranslator from step 1 is passed here, could have used a // lambda instead. expQ.registerObserver(obsRegistry, new AccumuloExporter<>(EXPORT_QID, appCfg, new SimpleTranslator())); // An example observer created using a lambda that adds to the export queue. obsRegistry.forColumn(OBS_COL, WEAK).useObserver((tx,row,col) -> { // Read some data and do some work // Add results to export queue String key = // key that identifies export String value = // object to export expQ.add(tx, key, value); }); } }
Other use cases
The getTranslator()
method in AccumuloReplicator creates a specialized AccumuloTranslator for replicating a Fluo table to Accumulo.