Previous topic

Deleting Data in Hustle

Next topic

hustle.core.marble – The Hustle Database Core

This Page

hustle – Hustle Distributed OLAP Database

The hustle module contains everything a client application will typically need to create hustle Tables and to insert and query data to/from them.

Hustle Tables are stored in multiple, LMDB memory mapped files, which are replicated into Disco’s Distributed File System (DDFS). Queries are run using a Python DSL which is translated dynamically into a Disco pipelined map/reduce job.

Hustle is ideal for low latency OLAP queries over massive data sets.

class hustle.Table(name=None, fields=(), partition=None, **kwargs)

The fundamental data type to support Hustle’s relational model. A Table contains a number of named Columns, each of which is decorated with schema information. Note that the table is stored in Disco’s DDFS distributed file system as a series of replicated sub-database files encapsulated by LMDB memory mapped b+ tree files called Marbles. Each Marble contains the data for the rows and columns of the Table.

Normally, a table is created using create(), which creates the appropriately named DDFS tag and attributes. To instantiate an existing Table (to use in a query, for example), the from_tag() method is used.

see Hustle Schema Design Guide for a detailed look at Hustle’s schema design language and features.

classmethod from_tag(name, **kwargs)

Instantiate a named Table based on meta data from a DDFS tag.

Parameters:name (string) – the name of the table
classmethod create(name, columns=(), fields=(), partition=None, force=False, **kwargs)

Create a new Table, replace existing table if force=True.

Parameters:
  • name (string) – the name of the table to create
  • columns (sequence of string) – the list of columns and their extended index/type information
  • fields (sequence of string) – the list of columns and their encoded index/type information
  • partition (string) – the name of the column to act as the partition for this table
  • force (bool) – overwrite the existing DDFS base tag with this schema

If columns is set, the fields parameter is ignored.

Example:

pixels = Table.create('pixels',
      columns=['index string token', 'index uint8 isActive', 'index site_id', 'uint32 amount',
               'index int32 account_id', 'index city', 'index trie16 state', 'index int16 metro',
               'string ip', 'lz4 keyword', 'index string date'],
      partition='date',
      force=True)

Warning

This function will not delete or update existing data in any way. If you use force=True to change the schema, make sure you either make the change backward compatible (by only adding new columns), or by deleting and reloading your data.

See also

For a good example of creating a partitioned Hustle database see Hustle Integration Test Suite For detailed schema design docs look no further than Hustle Schema Design Guide

classmethod base_tag(name, partition=None)

return the DDFS tag name for a given hustle table name

Parameters:
  • name (string) – the name of the table
  • partition (string) – the value of the partition
hustle.insert(table, phile=None, streams=None, preprocess=None, maxsize=104857600, tmpdir='/tmp', decoder=None, lru_size=10000, **kwargs)

Insert data into a Hustle Table.

Create a Marble file given the input file or streams according to the schema of the table. Push this (these) file(s) into DDFS under the appropriated (possibly) partitioned DDFS tags.

Note that a call to insert() may actually create and push more than one file, depending on how many partition values exist in the input. Be careful.

For a good example of inserting into a partitioned Hustle database see Inserting Data To Hustle

Parameters:
  • table (Table) – the table to perform the insert on
  • phile (string) – the file path to open
  • streams (sequence of iterable) – as an alternative to the phile argument, you can specify a list of generators as input
  • preprocess (function) –

    a function that accepts and returns a dict()

    The input is transformed into a dict by the decoder param, then the preprocess function is called for every record. This gives you the opportunity to transform, filter or otherwise clean your data before it is inserted into the Marble

  • maxsize (int) –

    the initial size in bytes of the LMDB memory mapped file

    Note that the actual underlying LMDB file will grow as data is added to it - this setting is just for its initial size.

  • tmpdir (string) –

    the temporary directory to write the LMDB memory mapped file

    Note that choosing a directory on an SSD drive will nicely increase throughput.

  • decoder (function) –

    accepts a line of raw input from the input and returns a dict

    The dict is expected to have keys that correspond to the column names in the table you are inserting to. There are two built-in decoders in Hustle: json_decoder() (default) and kv_decoder() for processing JSON and Disco chain input files, respectively.

  • lru_size (int) –

    the size in records of the LRU cache for holding bitmapped indexes

    You probably won’t have to worry about this unless you find your insert is running out of memory or is too slow when inserting gigantic files or on nodes with limited memory resources.

hustle.select(*project, **kwargs)

Perform a relational query, by selecting rows and columns from one or more tables.

The return value is either:

* an iterator over the resulting tuples when :code:`nest==False`
* a :class:`Table <hustle.Table>` instance when :code:`nest==True`
* in the case of :code:`nest==False and dump==True` return None (this is the default CLI interaction)

For all of the examples below, imps and pix are instances of Table.

Parameters:
  • project (list of Column | Aggregation) –

    a positional argument list of columns and aggregate expressions to return in the result

    A simple projection:

    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps)
    

    Selects three columns from the imps table.

    Hustle also allows for aggregation functions such as h_sum(), h_count, h_min(), h_max(), h_avg as in this example which sums the imps.cpm_millis column:

    select(imps.ad_id, h_sum(imps.cpm_millis), h_count(), where=imps.date == '2014-01-27')
    

    Note that Hustle doesn’t have a group by clause. In this query, the output will be grouped by the imps.ad_id column implicitly. Note that in Hustle, if there is an aggregation function present in the project param, the query results will be grouped by all non-aggregation present.

  • where ((optional) sequence of Table | Expr) –

    the Tables to fetch data from, as well as the conditions in the where clause

    This two purposes: to specify the tables that are to be queried and to allow for the selection of data under specific criteria with our Python DSL selection syntax, much the like SQL’s where clause:

    # simple projection with restriction
    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps.date == '2014-01-27')
    

    Note the == operation between the imps.date column and the date string. The Column class overrides all of Python’s comparison operators, which, along with the &, | and ~ logical operators allows you to build arbitrarily complex column selection expressions like this:

    select(imps.ad_id, imps.date, imps.cpm_millis,
            where=((imps.date >= '2014-01-21') & (imps.date <= '2014-01-23')) |
                  ~(imps.site_id == 'google.com))
    

    Note that for these expressions, the column must come first. This means that the following expression is illegal:

    select(imps.ad_id, imps.date, imps.cpm_millis, where='2014-01-27' == imps.date)
    

    Where clause also supports in and not in statements by using special operators “<<” and “>>” respectively:

    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps.ad_id << [1000, 1005])
    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps.ad_id >> [1000, 1005])
    

    Note that the right value “<<” and “>>” could be any type of iterable with each element must be a valid single right value.

    In addition, multiple tables can be specified in the where clause like this:

    select(imps.ad_id, pix.amount, where=(imps.date < '2014-01-13', pix))
    

    which specifies an expression, imps.date < '2014-01-13' and a Table tuple. This query will simply return all of the ad_id values in imps for dates less than January 13th followed by all of the amount values in the pix table.

    Using multiple columns is typically reserved for when you use a join clause

  • join (string | sequence of exactly length 2 of Column) –

    specified the columns to perform a relational join operation on for the query

    The join columns can be specified either as a list of 2 columns, or a list of 2 strings. In particular, if two columns have the same names, a single string is valid as well.

    Here’s an example of a Hustle join:

    select(imps.ad_id, imps.site_id, h_sum(pix.amount), h_count(),
           where=(imps.date < '2014-01-13', pix.date < '2014-01-13'),
           join=(imps.site_id, pix.site_id))
    

    or equivalently:

    select(imps.ad_id, imps.site_id, h_sum(pix.amount), h_count(),
           where=(imps.date < '2014-01-13', pix.date < '2014-01-13'),
           join='site_id')
    

    which joins the imps and pix tables on their common site_id column, then returns the sum of the pix.amount columns and a count, grouped by the ad_id and the site_id. The equivalent query in SQL is:

    select i.ad_id, i.site_id, sum(p.amount), count(*)
    from imps i
    join pix p on p.site_id = p.site_id
    where i.date < '2014-01-13' and i.date < '2014-01-13'
    group by i.ad_id, i.site_id
    
  • full_join (bool) – if True, specifies that a full join between the specified tables in the where clause should be joined in a full cross-product. Note that if both full_join and join are specified, join will be ignored.
  • order_by (string | Column | int | (sequence of string | Column | int)) –

    the column(s) to sort the result by

    The sort columns can be specified either as a Column or a list of Columns. Alternatively, you can specify a column by using a string with either the name of the column or the table.column string notation. Furthermore, you can also represent the column using a zero based index of the projected columns. This last case would be used for Aggregations. Here are a few examples:

    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps, order_by=imps.date)
    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps, order_by=(imps.date, imps.ad_id))
    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps, order_by='date')
    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps, order_by='imps.date')
    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps, order_by=('date', imps.ad_id))
    select(imps.ad_id, imps.date, imps.cpm_millis, where=imps, order_by=('date', 2))
    select(imps.ad_id, imps.date, h_sum(imps.cpm_millis), where=imps, order_by=2)
    
  • desc (boolean) – affects sort order of the order_by clause to descending (default ascending)
  • distinct (boolean) – indicates whether to remove duplicates in results
  • limit (int) – limits the total number of records in the output
  • block (boolean) – make select call either blocking (default) or non-blocking. If True, causes select() to return

a Future object

Parameters:
  • nest (boolean (default = False)) –

    specify that the return value is a Table to be used in another query

    This allows us to build nested queries. You may want to do this to join more than two tables, or to reuse the results of a query in more than one subsequent query. For example:

    active_pix = select(*star(pix), where=pix.isActive > 0, nest=True)
    select(h_sum(active_pix.amount), where=active_pix)
    
  • kwargs (dict) – custom settings for this query see hustle.core.settings
hustle.h_sum(col)

Return an aggregation for the sum of the given column. Like SQL sum() function. This is used in hustle.select() calls to specify the sum aggregation over a column in a query:

select(h_sum(employee.salary), employee.department, where=employee.age > 25)

returns the total salaries for each departments employees over 25 years old

Parameters:col (hustle.core.marble.Column) – the column to aggregate
hustle.h_count()

Return an aggregation for the count of each grouped key in a query. Like SQL count() function:

select(h_count(), employee.department, where=employee)

returns a count of the number of employees in each department.

hustle.h_max(col)

Return an aggregation for the maximum of the given column. Like the SQL max() function:

select(h_max(employee.salary), employee.department, where=employee)

returns the highest salary for each department.

Parameters:col (hustle.core.marble.Column) – the column to aggregate
hustle.h_min(col)

Return an aggregation for the minimum of the given column. Like the SQL min() function:

select(h_min(employee.salary), employee.department, where=employee)

returns the lowest salary in each department.

Parameters:col (hustle.core.marble.Column) – the column to aggregate
hustle.h_avg(col)

Return an aggregation for the average of the given column. Like the SQL avg() function:

select(h_avg(employee.salary), employee.department, where=employee)

returns the average salary in each department

Parameters:col (hustle.core.marble.Column) – the column to aggregate
hustle.star(table)

Return the list of all columns in a table. This is used much like the * notation in SQL:

``select(*star(employee), where=employee.department == 'Finance')``

returns all of the columns from the employee table for the Finance department.

Parameters:col – the table to extract the column names from
hustle.cat(result, width=80)

Pretty print the results of a query or table.

Parameters:
  • result (iterator over tuples) – result of a query or a table
  • width (int) – the number of columns to constrain output to
hustle.get_tables(**kwargs)

return the visible Hustle tables in the currently configured DDFS server. Hustle finds tables by looking for DDFS tags that have a hustle: prefix.

Parameters:kwargs (dict) – custom settings for this query see hustle.core.settings
hustle.tables(**kwargs)

Print all available tables.

Parameters:kwargs (dict) – custom settings for this query see hustle.core.settings
hustle.schema(tab, index_only=False, **kwargs)

Print the schema for a given table

Parameters:kwargs (dict) – custom settings for this query see hustle.core.settings
hustle.get_partitions(table, **kwargs)

Get partitions for a given table.

Parameters:kwargs (dict) – custom settings for this query see hustle.core.settings
hustle.partitions(table, **kwargs)

Print the partitions for a given table.

Parameters:kwargs (dict) – custom settings for this query see hustle.core.settings
hustle.delete(table_or_expr, **kwargs)

Delete data and partitions for a given table, keep the table definition.

Parameters:
  • table_or_expr (Table | Expr) – A table object or an expression with only a partition column
  • kwargs (dict) – custom settings for this query see hustle.core.settings

Warning

Given a table object, all partitions will be deleted. Use a Hustle expression to delete a specific range of partitions, e.g. ‘impression.date < 2014-01-01’.

hustle.drop(table, **kwargs)

Drop all data, partitions, and table definition for a given table.

Parameters:
  • table_or_expr (Table) – A table object
  • kwargs (dict) – custom settings for this query see hustle.core.settings
class hustle.Future(name, job, disco, nest, *project)

Return value of non-blocking function select_nb().

It has a series of functions to check query’s status and fetch query’s results.

status()

Return the status of the attached query.

The status of a query might be one of following strings:
‘dead’: query is dead ‘active’: query is still being executed ‘ready’: query is finished, all results are available ‘unknown’: status is unknown, e.g. connection to server is busy
wait()

Block and wait the query to finish. If the query is already finished, it returns the result at once.

The return value is the same as select(). If query is nested, it returns a table. Otherwise, a list of urls.

done

A helper function that shows whether query is done or not

table

A helper function for the nested query to return its table result.

If query is not nested or query is not finished, an exception will be raised.