Saturday, January 18, 2014

Indexing in Cassandra to Support Events/Activity Search in WSO2 BAM

Latest WSO2 BAM (WSO2 BAM 2.4.0) supports events and activity search functionality. Here, I’m going to discuss about the Cassandra indexing techniques we have considered in order to facilitate our new WSO2 BAM big data search feature.

Indexing is essential to support events and activity search functionality. Since we are storing data in Cassandra Column Families, columns which search operations to be performed on has to be indexed. First, we thought of using projects like Solandra/Lucandra in order to implement our big data search feature. But since those projects are no longer active and big data search is somewhat beyond WSO2 BAM scope we have agreed to provide limited search capabilities with the use of inbuilt Cassandra secondary indexes. However, after we finalized the basic requirements we felt that secondary indexes alone may not be enough to support those requirements.

Followings were understood as the basic requirements in our search feature.
  1. Equality comparisons of indexed properties.
ex: bam_mediation_stats_data_publisher.host = '192.168.2.8'
  1. Inequality comparisions (Greater than/Less than operations).
ex: bam_mediation_stats_data_publisher.max_processing_time > 1000
  1. AND operation support.
ex: bam_mediation_stats_data_publisher.max_processing_time > 1000 AND bam_mediation_stats_data_publisher.host = '192.168.2.8'
  1. OR operation support.
ex: bam_mediation_stats_data_publisher.host = '192.168.2.8' OR bam_mediation_stats_data_publisher.host = '192.168.2.7'
  1. Get search results within specified time interval(understood as most commonly used in search queries).
ex: bam_mediation_stats_data_publisher.timestamp > start_time and  < end_time
  1. Need to support pagination.

Even though we initially thought of using secondary indexes, it has a lot of limitations in providing above requirements. So that I have looked into some of the other Cassandra custom indexing techniques as well. Find below the details of indexing models that I have come up with.

1. Native Secondary Indexes

Secondary indexes in Cassandra refer to indexes on column values. Cassandra implements secondary indexes as a hidden table, separate from the table that contains the values being indexed.

Pros
  • The row and index updates are one, atomic operation.
  • When we create a secondary index on an existing column, it indexes existing data in the background (without blocking reads or writes).
  • Querying can be done easily using IndexedSlicesQuery provided by hector
  • Pagination can also be done somewhat easily (setRowCount, get a batch, then setStartKey to be the last key in the previous batch, and get the next batch). But if there are OR operations to be performed then things get complex because only AND operation is supported in queries.
  • Query can have operations on none index properties as well(require at least one equality comparison of indexed property. Other operation performed in memory).

Cons
  • Not recommended for high cardinality values(i.e. timestamps, messageIDs, keywords, etc.).
  • Requires at least one equality comparison in a query.
  • Unsorted - results are in token order, not query value order.
  • Not great to query a huge volume of records for a small number of results.
  • Not great for less-than/greater-than/range queries.
  • The range operations get performed by in memory by coordinator node.

2. Keep a separate index column family per index columns of primary column family - a skinny row model.

In this model separate index column family created per index column. Values of the index columns(of primary CF) are the row keys of new index CF. Columns keys of index row keys are composite keys created using timestamp and original row key(Comparator type should be DynamicCompositeType). Column value is the original row key. There is a special row in index CF which keeps all the row keys of Index CF as column keys(we need this in order to support GT/LT operations on data). Below diagram depicts how the 'host' column is indexed using this model.


  1. Read an event from primary column family. (Basically a row from primary column family)
  2. If the event contain 'host'(index column) value, that value will be the row key of Index CF and column key for the particular row is composite value of timestamp and original row key.
  3. Insert the new row to Index CF. (If the row key already exists, add new column key to existing row)
  4. Add newly added row key value as a column key to indexRow.

Let's see how the search can be performed in above model.

Scenario 1
Search query --> 'bam_mediation_stats_data_publisher.host = '192.168.2.7' AND bam_mediation_stats_data_publisher.timestamp > start_time AND timestamp < end_time'

  1. To support above search we need to have Index CF created for 'host' column as explained above.
  2. Since this is an equality comparison we can do a range query on row(in Index CF) which the key value is '192.168.2.7' and get the original row keys in particular time range. (Here columns are sorted in timestamp so we can perform range query and get the rowkeys between the given time interval)
  3. Perform another look up in primary column family and get the actual rows using the above fetched resulting row keys.

Scenario 2
Search query --> 'bam_mediation_stats_data_publisher.count > 5 AND count < 10 AND bam_mediation_stats_data_publisher.timestamp > start_time AND < end_time'

  1. To support above query we need to have INDEX CF created for 'count' column as explained above
  2. Since this is an inequality comparison, we first need to perform a range query on our special indexRow (in index CF) and get the index row keys between 5 and 10 (Column keys are sorted).
  3. Then we need to perform range queries on individual resulting index row keys (Similar to step 2 in scenario 1)
  4. Perform another look up in primary column family and get the actual rows using the above fetched resulting row keys.

Scenario 3
Search query --> 'bam_mediation_stats_data_publisher.host = '192.168.2.7' AND 'bam_mediation_stats_data_publisher.count > 5 AND count < 10 AND bam_mediation_stats_data_publisher.timestamp > start_time AND < end_time'

  1. Here we have to follow the steps described in both above two scenarios and get the intersection.

Pros
  • Can overcome the limitations Cassandra native secondary indexes have. (Basically, using this model we can overcome all the cons that are mentioned for Native Secondary Indexes)

Cons
  • Index CF inserts has to be done manually.
  • Difficult to index existing data(in case we create the index on CFs which are already having data)
  • Index data is spread over many Cassandra nodes. The cluster need to fetch them all from different node and merge them. It has a cost.
  • Pagination can be done up to some extent. Can be supported for queries mentioned in above scenarios 1 and 2(by providing last result row key of previous page and page size), but has to be manually implemented for queries that are similar to one that is mentioned in scenario 3(here we have to fetch all the results (or limited result set) to our memory(or RDBMS) and manually support pagination)

3. Keep a separate index column family per index columns of primary column family - a wide row model

In this model, one row will keep all the index data as column keys(one row can have 2 billion columns). Composite key(DynamicComposite key) created using values of the index column, timestamp and the row key of primary CF. These composite keys are  inserted as a column keys of the index row of Index CF. (The composite columns are ordered first by its first component, then by its second component etc…). Column value will be the original row key. There should be a special row in index CF which keeps all unique index values(we need this in order to support GT/LT operations on data). Below diagram depicts how the 'host' column is indexed using this model.


  1. Read an event from primary column family. (Basically a row from primary column family)
  2. If the event contain host(index colum) value, create a composite key using the host value, timestamp and rowkey(DynamicComposite type).
  3. Add the column key to particular index row('INDEX_ROW') in Index CF. Also add the host value as a column key to our special INDEX_VALUE_ROW.

Let's see how the search can be performed with this model.

Scenario 1
Search query --> 'bam_mediation_stats_data_publisher.host = '192.168.2.7' AND bam_mediation_stats_data_publisher.timestamp > start_time AND timestamp < end_time'

  1. Results are retrieved by performing a range query on index row(INDEX_ROW). Here we have composite keys as columns keys. Since this is an equality comparison, first component of the composite column is fixed then we can filter by the second component(timestamp). For example, range start = 192.168.2.7:start_time and range end = 192.168.2.7:end_time
  2. Perform another look up in primary column family and get the actual rows using the above fetched resulting row keys.

Scenario 2
Search query --> 'bam_mediation_stats_data_publisher.count > 5 AND count < 10 AND bam_mediation_stats_data_publisher.timestamp > start_time AND < end_time'

  1. Since this is an inequality comparison, we first need to perform a range query on special INDEX_VALUE_ROW (in index CF) and get the index values between 5 and 10.
  2. Then we need to perform range queries on individual resulting index values(similarly as step 1 in scenario 1). For example, lets assume that the index values retrieved from above step 1 are {6,8}. Then, two range queries need to be performed as below on INDEX_ROW.
    1. i.  range start = 6:start_time and range end = 6:end_time
    2. ii. range start = 8:start_time and range end = 8:end_time
  3. Perform another look up in primary column family and get the actual rows using the above fetched resulting row keys.

scenario 3
Search query --> 'bam_mediation_stats_data_publisher.host = '192.168.2.7' AND 'bam_mediation_stats_data_publisher.count > 5 AND count < 10 AND bam_mediation_stats_data_publisher.timestamp > start_time AND < end_time'

  1. Here we have to follow the steps described in both above two scenarios and get the intersection

Pros
  • Can overcome the limitations Cassandra native secondary indexes have.
  • All columns of a row are stored on the same node, in the same data block and sorted on disk so accessing and scanning these columns is extremely fast.

Cons
  • Index CF inserts has to be done manually.
  • Difficult to index existing data(in case we create the index on CFs which are already having data)
  • Pagination can be done up to some extent. Can be supported for queries mentioned in above scenarios 1 and 2(by providing last result row key of previous page and page size), but has to be manually implemented for queries that are similar to one that is mentioned in scenario 3(here we have to fetch all the results (or limited result set) to our memory(or RDBMS) and manually support pagination)

You can see that we can overcome most of the limitations native secondary have by using other two custom indexing models. But still index insertion has to be done manually and it is very difficult create index for a CF which is already having data. Pagination support is also a concern.

By considering all the pros and cons, finally we have chosen above described indexing model 3(a wide row model) to facilitate our Cassandra data search feature. Arguably, it seems the best model of above described models. 


References
http://www.datastax.com/docs/1.1/ddl/indexes
http://chamibuddhika.wordpress.com/2011/11/27/cassandra-lessons-learnt/

Connecting to Cassandra Keyspaces Created by WSO2 BAM using CQL3 Commands

Here, I'm going to explain how to connect to Cassandra Keyspaces created by WSO2 BAM using CQL 3 Commands.

Most people able to connect to Cassandra Keypaces using CQL2 commands, but fail by using CQL3 commands. That is because, in CQL3, names are case insensitive by default, while they were case sensitive in CQL2. So you have to force whatever case you want in CQL3 by using double quotes.


You will be able to connect to Cassandra Keyspaces by using following sequence of commands.


..:/apache-cassandra-1.2.4/bin$ ./cqlsh -3 localhost 9160 -u admin -p admin
Connected to Test Cluster at localhost:9160.
[cqlsh 2.3.0 | Cassandra 1.1.3 | CQL spec 3.0.0 | Thrift protocol 19.32.0]
Use HELP for help.
cqlsh> use EVENT_KS;
cqlsh:EVENT_KS> select * from bam_mediation_stats_data_publisher;
Bad Request: Keyspace event_ks does not exist
Perhaps you meant to use CQL 2? Try using the -2 option when starting cqlsh.
cqlsh:EVENT_KS> USE "EVENT_KS";
cqlsh:EVENT_KS> select * from bam_mediation_stats_data_publisher;

key                                    | Description                      | Name                               | Nick_Name               | StreamId                                 | Timestamp     | Version | meta_host | payload_avg_processing_time | payload_count | payload_direction | payload_fault_count | payload_max_processing_time | payload_min_processing_time | payload_resource_id                 | payload_stats_type | payload_timestamp
----------------------------------------+----------------------------------+------------------------------------+-------------------------+------------------------------------------+---------------+---------+-----------+-----------------------------+---------------+-------------------+---------------------+-----------------------------+-----------------------------+-------------------------------------+--------------------+-------------------
1384321342409::10.100.0.40::9443::1232 | A sample for Mediator Statistics | bam_mediation_stats_data_publisher | MediationStatsDataAgent | bam_mediation_stats_data_publisher:1.0.0 | 1384321342409 |   1.0.0 | 127.0.0.3 |                       915.5 |             1 |                In |                   0 |                        1270 |                         561 |    Simple_Stock_Quote_Service_Proxy |              Proxy |     1386174113944