cloudmarker.stores package

A package for store plugins packaged with this project.

This package contains store plugins that are packaged as part of this project. The store plugins implement a function named write() that accepts input records and typically stores them into a persistent data store. The event plugins also implement and a function named done that perform cleanup work when called.

Submodules

cloudmarker.stores.esstore module

Elasticsearch store plugin.

class cloudmarker.stores.esstore.EsStore(host='localhost', port=9200, index='cloudmarker', buffer_size=5000000)

Bases: object

Elasticsearch adapter to index cloud data in Elasticsearch.

Create an instance of EsStore plugin.

The plugin uses the default port for Elasticsearch if not specified.

The buffer_size for the plugin is the value for the maximum number of bytes of data to be sent in a bulk API request to Elasticsearch.

Parameters:
  • host (str) – Elasticsearch host
  • port (int) – Elasticsearch port
  • index (str) – Elasticsearch index
  • buffer_size (int) – Maximum number of bytes of data to hold in the in-memory buffer.
done()

Flush pending records to Elasticsearch.

write(record)

Write JSON records to the Elasticsearch index.

Flush the buffer by saving its content to Elasticsearch when the buffer size exceeds the configured size.

Parameters:record (dict) – Data to save to Elasticsearch.

cloudmarker.stores.filestore module

Filesystem store plugin.

class cloudmarker.stores.filestore.FileStore(path='/tmp/cloudmarker')

Bases: object

A plugin to store records on the filesystem.

Create an instance of FileStore plugin.

Parameters:path (str) – Path of directory where files are written to.
done()

Perform final cleanup tasks.

This method is called after all records have been written. In this example implementation, we properly terminate the JSON array in the .tmp file. Then we rename the .tmp file to .json file.

Note that other implementations of a store may perform tasks like closing a connection to a remote store or flushing any remaining records in a buffer.

write(record)

Write JSON records to the file system.

This method is called once for every record read from a cloud. In this example implementation of a store, we simply write the record in JSON format to a file. The list of records is maintained as JSON array in the file. The origin worker name in record['com']['origin_worker'] is used to determine the filename.

The records are written to a .tmp file because we don’t want to delete the existing complete and useful .json file prematurely.

Note that other implementations of a store may choose to buffer the records in memory instead of writing each record to the store immediately. They may then flush the buffer to the store based on certain conditions such as buffer size, time interval, etc.

Parameters:record (dict) – Data to write to the file system.

cloudmarker.stores.mongodbstore module

MongoDB store plugin.

class cloudmarker.stores.mongodbstore.MongoDBStore(host='localhost', port=27017, db='cloudmarker', collection='cloudmarker', username=None, password=None, buffer_size=1000)

Bases: object

A plugin to store records on MongoDB.

Create an instance of MongoDBStore plugin.

It will use the default port for mongodb 27017 if not specified. The Authentication scheme will be negotiated by MongoDB and the client for v4.0+ to SCRAM-SHA-1 or SCRAM-SHA-256 by default aftere negotiation.

Parameters:
  • host (str) – hostname for the DB server
  • port (int) – port for mongoDB is listening
  • db (str) – name of the database
  • collection (str) – Name of MongoDB collection.
  • username (str) – username for the database
  • password (str) – password for username to authenticate with the db
  • buffer_size (int) – maximum number of records to buffer
done()

Flush pending records to MongoDB and close MongoDB client.

write(record)

Write JSON records to the MongoDB collections.

This method is called once for every record read from a cloud. This method saves the records into in-memory buffers. A separate buffer is created and maintained for each record type found in record['record_type']. When the number of records in a buffer equals or exceeds the buffer size specified while creating an instance of MongoDBStore plugin, the records in the buffer are flushed (saved into a MongoDB collection).

The record type, i.e., record['record_type'] is used to determine the collection name in MongoDB.

Parameters:record (dict) – Data to save in MongoDB.

cloudmarker.stores.splunkhecstore module

SplunkStore plugin to index data in Splunk using HEC token.

class cloudmarker.stores.splunkhecstore.SplunkHECStore(uri, token, index, ca_cert, buffer_size=1000)

Bases: object

SplunkHECStore plugin to index cloud data in Splunk using HEC token.

Create an instance of SplunkHECStore plugin.

Parameters:
  • uri (str) – Splunk collector service URI.
  • token (str) – Splunk HEC token.
  • index (str) – Splunk HEC token accessible index.
  • ca_cert (str) – Location of cetificate file to verify the identity of host in URI, or False to disable verification
  • buffer_size (int) – Maximum number of records to hold in in-memory buffer for each record type.
done()

Flush any remaining records.

write(record)

Save the record in a bulk-buffer.

Also, flush the buffer by saving its content to Splunk when the buffer size exceeds configured self._buffer_size

Parameters:record (dict) – Data to save to the Splunk.