Previous topic

hustle – Hustle Distributed OLAP Database

This Page

hustle.core.marble – The Hustle Database Core

class hustle.core.marble.Marble(name=None, fields=(), partition=None)

The Marble is the smallest unit of distribution and replication in Hustle. The Marble is a wrapper around a standalone LMDB key/value store. An LMDB key/value store may have many sub key/value stores, which are called DBs in LMDB terminology. Each Hustle column is represented by one LMDB DB (called the column DB), plus another LMDB DB if that column is indexed (called the index DB) (see Hustle Schema Design Guide).

The Marble file is also the unit of insertion and replication of data into the Hustle system. Marbles can be built on remote systems, in a completely distributed manner. They are then pushed into the cluster’s DDFS file system, which is a relatively inexpensive operation. This is Hustle’s distributed insert functionality.

In addition each Marble contains several LMDB meta DBs for meta-data for prefix Tries, schema, and table statistics used by the query optimizer.

The column DB is a key/value store that stores data for a particular column. It has a locally unique row identifier (RID) as the key, and the actual value for that column’s data as its value, encoded depending on the schema data type of the column. All integer types are directly encoded in LMDB as integers, whereas the Trie compression types are encoded as integers (called VIDs), which actually are keys in the two dedicated Trie meta DBs (one for 32 and one for 16 bit Tries). Uncompressed strings, as well as lz4 and binary style data is simply encoded as byte string values.

The index DB for a column is a key/value store that inverts the key and the value of the column DB. It is used to perform the identity and range queries required form Hustle’s where clause. The key in the index DB is the actual value for that column, but the value is a bitmap index of the RIDs where that value is present. This is a very efficient and compact way to store the index in an append-only database like Hustle.

Parameters:
  • name (basestring) – the name of the Marble to create
  • name – the schema specification of the columns (see Hustle Schema Guide
  • partition (basestring) – the column that will serve as the partition for this Marble
classmethod from_file(filename)

Instantiate a Marble from an LMDB

class hustle.core.marble.Column(name, table, index_indicator=0, partition=False, type_indicator=0, compression_indicator=0, rtrie_indicator=2, alias=None, boolean=False)

A Column is the named, typed field of a Marble. Columns are typically created automatically by parsing the fields of a the Marble instantiation.

The Column overrides Python’s relational operators > < <= >= == which forms the basis for the Query DSL. All of these operators expect a Python literal as their second (right hand side) argument which should be the same type as the Column. These Column Expressions are represented by the Expr class.

Note that the Marble and Table classes expose their Columns as Python attributes:

# instantiate a table
imps = Table.from_tag('impressions')

# access a the date column
date_column = imps.date

# create a Column Expression
site_column_expression = imps.site_id == 'google.com'

# create another Column Expression
date_column_expression = date_column > '2014-03-07'

# query
select(date_column, where=date_column_expression & )
named(alias)

return a new column that has an alias that will be used in the resulting schema :type alias: str :param alias: the name of the alias

schema_string()

return the schema for this column. This is used to build the schema of a query result, so we need to use the alias.

description()

Return a human-readable type description for this column.

class hustle.core.marble.Aggregation(name, column, f=None, g=<function <lambda> at 0x1101e9c08>, h=<function <lambda> at 0x1101e9c80>, default=<function <lambda> at 0x1101e9cf8>, is_numeric=None, is_binary=None)

An Aggregation is a Column Function that represents some aggregating computation over the values of that column. It is exclusively used in the project section of the select() function.

An Aggregation object holds onto four distinct function references which are called at specific times during the group_by stage of the query pipeline.

Parameters:
  • f (func(accumulator, value)) – the function called for every value of the column, returns a new accumulator (the MAP aggregator)
  • g (func(accumulator)) – the function called to produce the final value (the REDUCE aggregator)
  • h (func(accumulator)) – the function called to produce intermediate values (the COMBINE aggregator)
  • default (func()) – the function called to produce the initial aggregation accumulator
  • column (Column) – the column to aggregate
  • name (basestring) – the unique name of the aggregator. Used to assign a column name to the result

Note

Here is the actual implementation of the h_avg() Aggregation which will give us the average value for a numeric column in a select() statement:

def h_avg(col):
    return Aggregation("avg",
                       col,
                       f=lambda (accum, count), val: (accum + val, count + 1),
                       g=lambda (accum, count): float(accum) / count,
                       h=lambda (accum, count): (accum, count),
                       default=lambda: (0, 0))

First look at the default() function which returns the tuple (0, 0). This sets the accum and count values that we will be tracking both to zero. Next, let’s see what’s happening in the f() function. Note that it performs two computations, one accum + val builds a sum of the values, and the count + 1 will count the total number of values. The difference between the g() and h() functions is when they take place. The h() function is used to summarize results. It should always return an accum that can be further inputted into the f() function. The g() function is used at the very end of the computation to compute the final value to return the client.

Note that most aggreations are much simpler than h_avg(). Consider the implementation for h_sum():

def h_sum(col):
    return Aggregation("sum",
                       col,
                       f=lambda accum, val: accum + val
                       default=0)

We don’t have to implement the h() or g() functions, as they simply default to funcitons that return the accum, which is normally sufficient.

See also

h_sum(), h_count(), h_avg()
Some of Hustle’s aggregation functions
class hustle.core.marble.Expr(table, f=None, part_f=None, is_partition=False)

The Expr is returned by the overloaded relational operators of the Column class.

The Expr is a recursive class, that can be composed of other Exprs all connected with the logical & | ~ operators (and, or, not).

Each Expr instance must be aware if its sub-expressions have partitions or are partitions. This is to because the & and | operators will optimize expressions over patitioned columns differently. Consider the following query:

select(impressions.site_id, where=(impressions.date == '2014-02-20') & (impressions.amount > 10))

Let’s assume that the impressions.date column is a partition column. It should be clear that we can optimize this query by only executing the query on the ‘2014-02-20’ partition, which if we had many dates would vastly improve our query execution.

On the other hand consider the following, almost identical query:

select(impressions.site_id, where=(impressions.date == '2014-02-20') | (impressions.amount > 10))

In this case, we cannot optimize according to our partition. We need to visit all partitions in impressions and execute the OR operation across those rows for the amount expression.

Note

It is important to realize that all Column Expressions in an Expr must refer to the same Table

Parameters:
  • table (Table) – the Table this Expr queries
  • f (func(MarbleStream)) – the function to execute to actually perform the expression on data in the Marble
  • part_f (func(list of strings)) – the function to execute to perform the expresson on data in the partition
  • is_partition (bool) – indicates if this Expr only has partition columns
hustle.core.marble.check_query(select, join, order_by, limit, wheres)

Query checker for hustle.