Thursday, January 04, 2018

A nice animation to show how Raft works

This animation explains the Raft protocol so clearly I cannot help but recommend it:

Saturday, December 30, 2017

Some notes on microservice design

The key to design a cloud native platform is the microservice architecture. I was investigating some of the microserice architecture for my work, and here're some of my notes with my thinkings and understandings.

- Service Scoping:
We need to decide how much functionalities to fit into one service. There can be several considerations: a) Partition the services along logical function lines. A function unit called by many others should be wrapped as its own. b) Mirror the business logic, each logic unit is a service. c) 2-week rule: a service could be re-implemented by the team in a two-week period. In general the services are loosely coupled and updated independently, by doing a separate build for each of them.

In case of rewriting a working service, it's also suggested to keep the old one, while iterating on the design and testing of the new one till it's stable.

- API:
Typically services are exposed through REST APIs. Individual services should be able to reliably send and receive data, and should be tested thoroughly beforehand. One big challenge is to maintain API stable and forward compatible across different versions, which requires careful design.

- Traffic management
Calling and called services need to communicate status and coordinate traffic loads. Calls should be tracked, responses delays be monitored, and services be redirected under backpressure. Service instances should support auto scaling w.r.t.  service loads.

- Data Store
It's recommended to create a separate data store for each microservice. The challenge is how to maintain data consistency across these data stores. On the other hand, overall service must be available despite individual service might be transient. This requires to migrate user-specific data off of service instances and into a shared, redundant storage system that's accessible from all service instances, in case of user session failure. Suggested solution is to insert a shared, memory-based cache system between a given service and the storage associated with that service. The caching system becomes another service itself.

- Deploying
It's suggested to deploy microservices in containers as one tool can deploy everything. Also treat the servers as stateless, and replace it when one failed, and use autoscaling to adjust the numbers up and down according to the needs.

- Monitoring
The monitoring system for a microservices-based application must allow for ongoing resource change, be able to capture monitoring data in a central location, and display information that reflects the frequently changing nature of microservices applications. As an end-user action triggers application work, API calls and service work cascade down the application topology, and a single action may result in tens, or hundreds, of monitorable events. Most microservices monitoring systems place a monitoring agent on each service instance, where it can track specific instance data.

In order to handle the incredibly complex and massive amount of service-to-service communication, the most recent approach is to add a service mesh layer, to reliably deliver request. Check the open sourced Linkerd and Istio project. It's worth mentioning that it also fits the nature of serverless computing.


Sunday, November 26, 2017

Nice Java Builder pattern example

Have to say I mostly use just the bean style setters and getters in the past, but came across this blog online:
It's really a neat explanation. I think in the future when we want to build immutable objects with optional fields, we should definitely use the builder pattern instead. I especially like the invariant checking part where the check is on the object fields rather than the builder fields, and make it thread safe.

Saturday, September 30, 2017

Hadoop is obsolete, cloud and data lake are the next

An interesting article. Refer to the link here:

Hadoop is not dying, but it's obsolete, and not pioneering. It's just a marking of the 10 year tech cycle. Key takeaways:
1. The workloads that need optimizing will run mostly on emerging cloud architectures, getting Spark/HDFS working on Kubernetes.
2. Hadoop’s main architectural concept – that data should be centralized and that application workloads should be moved to the data — is still strong.
3. Cloud + data lake is the next, and schema on read is the answer to heterogeneous and dynamic data offering.

Saturday, August 13, 2016

Comparison of all the stream processing engines

Found a nice graph comparing all the stream processing engines on market now, from this link:

Monday, November 30, 2015

Apache Kudu, a mimic of SAP Hana?

Cloudera released Apache Kudu this late Sept. The technical paper is published here.
The primary strength of Kudu is the fast data access rates for both sequential scan and random access. This is similar to what SAP Hana has claimed. By reading the details of the paper above, we can see the implementation details are also similar: It keeps the in-memory version of the table/tablets RowSets, and modification delta stores. Maintenance is done on the flushing and compaction of the deltas and rowsets. The market will become interesting now ... I guess Kudu's focus and application scenarios are on the NoSQL world, working together with the Hadoop/Spark stack, while Hana is still a RDBMS solution.

Friday, November 13, 2015

Taobao's distributed file system

Taobao, which is China’s biggest online shopping website similar to Amazon/EBay, just broke the sales record worth $14.3bn in a single day on Nov. 11. It has it’s file system called TFS(Taobao File System), open sourced at

From the description on this Chinese website, I can see it's classic HDFS file system, integrated with Zookeeper's leader selection algorithm for the data nodes, and Hive's myssql metadata store. It also says it's going to integrate with Erasure Code, which I believe is the version just open sourced by Cloudera this September. These Chinese companies are really good at integrating the latest open source technologies, and put it to scale!

Monday, December 15, 2014

Hadoop native-hadoop library problem

If you install Hadoop using the Apache pre-built distribution, and running it on a 64-bit platform, you may get the following warning message every time you run some command. It doesn't affect the query result but quite annoying to some:
WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

If we set the logging level:

You see it’s complaining about the word width of

14/12/15 16:40:05 DEBUG util.NativeCodeLoader: Failed to load native-hadoop with error: java.lang.UnsatisfiedLinkError: /local/home/mzhang/hadoop-2.4.1/lib/native/ /local/home/mzhang/hadoop-2.4.1/lib/native/ wrong ELF class: ELFCLASS32 (Possible cause: architecture word width mismatch)
14/12/15 16:40:05 DEBUG util.NativeCodeLoader: java.library.path=/local/home/mzhang/hadoop-2.4.1/lib/native
14/12/15 16:40:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

go to $HADOOP_HOME/lib/native:

[root@pool-12 native]# file *
libhadoop.a:        current ar archived
libhadooppipes.a:   current ar archive       symbolic link to `' ELF 32-bit LSB shared object, Intel 80386, version 1 (SYSV), dynamically linked, not stripped
libhadooputils.a:   current ar archive
libhdfs.a:          current ar archive         symbolic link to `'   ELF 32-bit LSB shared object, Intel 80386, version 1 (SYSV), dynamically linked, not stripped

We can see the so files are 32-bit. I don't know why Apache still provides them in 32-bit nowadays when most production systems are running in 64-bit. But anyway, we can rebuild the hadoop source on 64 bit platform easily, then replace these two so files in your $HADOOP_HOME. We’re good to go! This time you'll see the log message as:

14/12/15 16:26:43 DEBUG util.NativeCodeLoader: Trying to load the custom-built native-hadoop library...
14/12/15 16:26:43 DEBUG util.NativeCodeLoader: Loaded the native-hadoop library

Remember to setup these LIB parameters in the setup file:
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib/native"

Finally switch back the HADOOP_ROOT_LOGGER to default:

The default value setup can be found in: $HADOOP_HOME/etc/hadoop/

Friday, October 31, 2014

Linux RPM database recovery

Sometimes you may get error message when you do yum list/yum install/yum clean like the following:
[root@pool-02 home]# yum list
rpmdb: Thread/process 2417/139877686400768 failed: Thread died in Berkeley DB library
error: db3 error(-30974) from dbenv->failchk: DB_RUNRECOVERY: Fatal error, run database recovery
error: cannot open Packages index using db3 -  (-30974)
error: cannot open Packages database in /var/lib/rpm
Or a message like: 
rpmdb: PANIC: fatal region error detected; run recovery
error: db4 error(-30977) from dbenv->open: DB_RUNRECOVERY 
It's because the rpm db is corrupted. A quick fix is issuing the following command: 
rm -f /var/lib/rpm/_*
rpm --rebuilddb 

Monday, January 13, 2014

Setup MySQL as Hive metastore

The default Derby database only allows one active user at a time. You may get the error message:
Another instance of Derby may have already booted the database /home/.../metastore_db
in your log if you want to access it using multiple users or JDBC/ODBC connections.

The most popular solution is to use MySQL as the underlying metastore for Hive. It's pretty simple.

1. install mysql

2. download and copy mysql-connector-java-xxxxx.jar to $HIVEHOME/lib folder

3. create hive user/password:
mysql -u root
CREATE USER 'hiveuser'@'pool-08' IDENTIFIED BY 'hivepass';

4. add the following configuration into hive-site.xml:






5. // Starting Hive Metastore
hive --service metastore &

// Starting Hive Server
hive --service hiveserver &

If you get the error:
org.apache.thrift.transport.TTransportException: Could not create ServerSocket on address
That means some earlier process already using the port. Check the service and kill it:
sudo lsof -i -P | grep -i "listen"
kill -9 pid

Then retry and you should be fine.

Sunday, October 27, 2013

Column-oriented Database System, and query execution

Column database is evolving very fast these days, accompanied with the requirement in storing and querying "big data". I've made some notes here based on Daniel Abadi's doctor dissertation, which wins the Sigmod Jim Gray Doctoral Dissertation Award. It also contains materials from the VLDB 09' tutorial on columnar dbms.

In a column store db,  queries only read in relevant attributes, but tuple writes need multiple seeks. So generally it's good for read-intensive data warehouses, which is characterized by lots of column scans and batch writes.

Simplest store solution is to use tuple ID: each column is a separate table, with table header and tuple id for each value. Optimization is very expensive representing in this way. We can remove the tuple id and store data in its original order, and table header is also stored in separate column. For each query, we first join all accessed attributes into traditional row tuples, then apply row optimization.
Observing that I/O cost is the most expensive, the opportunity for column store is to utilize the high data value locality and apply compression. As an example below, for sorted data we can save it as (value, start idx, length)  tuples to avoid repetition.
We can apply encoding for values within a limited domain. In general, column data are from the same domain, thus contain significantly less entropy than row store, and gives a better compression efficiency: for row store, compression ratio is often 1:3, while for column store we can do 1:10. We can also use extra space to store multiple copies of data in different sort orders, to further speed up query execution. Sorted columns compress better and are faster for range queries. We can also store column data in a BTree, while compacting the values in the leaf nodes.
Columns store offers more opportunities in block and vectorized processing. Queries can operate on compressed data directly and avoid decompression, hereby we trade off I/O for CPU. Here's the example query on the above data:
A tuple can be materialized either at the beginning of of query plan. However, the most efficient way is to operate on the columns as long as possible. Basically we can
- first compute the predicates on the selected columns,
- then join the result bit array or bloom filter,
- then do projection on the selection attribute,
- finally do aggregation on the projection.

Sunday, October 20, 2013

Dremel: PB analysis in 3 seconds

Google published its paper "Dremel: Interactive Analysis of WebScaleDatasets" in VLDB 2010. The purpose of this work is to perform interactive data analysis at scale, on shared clusters of commodity machines, with fast speed and fault tolerance. It access data in situ (in place), and execute queries on such data at a fraction of time compare with Map Reduce. But it's not a replacement of MR, instead it's used to analyze outputs of MR pipelines. A typical use case is to first use MR and pre-process the input data containing signals, then run some trial interactive queries in Dremel in seconds to verify the signal and improve the algorithm. Finally come out with a mature long term job.

Dremel is built on the idea of web search and parallel DBMSs: a query gets pushed down the tree, and is rewritten at each step. Then replies are aggregated and assembled as result. It also provides SQL-like language to express ad-hoc queries, execute them natively without translating into MR jobs as pig or hive.

Dremel adopts a nested columnar storage model, claiming the properties of lossless representation, fast encoding, and efficient record assembly. It uses the Protocol Buffers as the serialization approach. The syntax is as:
where * denote repeated fields, and ? denote optional fields. The column striped representation is based on the notion of Repetition Levels and Definition Levels:
- Repetition Levels: the value is repeated at which repeated field
- Definition Levels: how many fields that are undefined(optional or repeated) are actually presented.
Here's an example from the paper:

The paper gives formulas, and computation of some example entries. I'd like to give a complete explanation of the values in Name.Language.Code and Links.Backword here:
- Name.Language.Code: 
> (en-us, 0, 2): it's the first Code in document r1, so r=0. code is required, so we only need to look at its parents for d. Parent is Name.Language, so d=2
> (en, 2, 2): it's in 2nd language, so r=2. d=2 as above.
> (null, 1, 1): the next Name has only url field, so Code is null. This null Code is in the 1st language, so r=1. Only Name is present, so d=1
> (en-gb, 1, 2): it's in the 1st language of its Name, so r=1. Name.Language exists, so d=2
> (null, 0, 1): this is document r2, the 1st name has no Code, so it's null. r=0 denotes a new record.  Only Name exists, so d=1.

- Links.Backward:
> (null, 0, 1): r=0 as it's the 1st in doc r1. But it has no Backward value, so null. d=1 as Links exists.
> (10, 0, 2): r=0 as we're finished with doc r1, and start doc r2. Links.Backwards are  optional/repeated, so d=2.
> (30, 1, 2): r=1 as 30 is in 1st Links. d=2 as above.

When storing it, each column is stored as a set of blocks, containing the r and d values, and compressed filed values. Many compression techniques can be used here to reduce the storage cost, by utilizing the properties of the encoding algorithm.

To assembly the records efficiently, the key idea is to use a FSM labeled by repetition labels. The FSM is traversed once for each record. Record assembly is for record-oriented data processing such as MR. Dremel's query execution itself can bypass this step.

Dremel adopts a SQL-like language. It inputs one or more tables with schemas, and output a nested table and its output schema.  The selection operator prunes away branches of the tree, and the projection operator emits a value at the same level of nesting, as the most repeated input field.

In query execution phase, Dremel adopts a multi-level serving tree. Root server receives incoming queries, reads metadata from tables, and routes queries to next level. The root determines all horizontal partitions of the table, and intermediate servers performs parallel aggregation of partial results. The query dispatcher maintains a histogram of tablet processing times, and can re-dispatch the job based on the system load. Each server has an internal execution tree, corresponds to a physical query execution plan. The selection-projection-aggregate queries is specially optimized as it's commonly used. It can do within record and cross record aggregation directly on column data, and bypass record assembly entirely.

Monday, July 01, 2013

Column statistics and Histogram in Sybase ASE

Some notes on the column level statistics and histogram, and their usage in Search Argument and Join:

Histogram: When you create an index, a histogram is created on the first column of the index. It represents the distribution of values in a column. It contains cells with weights that represent the percentage of rows of the column occupied by the values within the cell.
- Frequency cell: equality relationship, the weight of the cell is the portion of rows that has this value.
- Range celll: portion of rows whose values fall in this range. It can be used for estimating the number of rows to be returned when search argument values falls within a range cell.

When merging histograms from several partitions: we order the histograms from different partitions by the lower bound, then interpolate other values into the range.

Density (range cell and total): Represent the average number of duplicates in the column.
- Range cell density: measures the average number of duplicates of all values that are represented by range cells, excluding frequency cells. It's used for search arguments.
- Total density: measures the average number of duplicates in entire column including both range and frequency cells. Used for estimation of the number matching rows for joins.

Seach argument (SARG) estimation:
- For single SARG, the weight of the range/frequency cell covered are added up and multiplied with the total number of rows in the table, to get the estimated selectivity.
- For multiple SARGs, the selectivity on different columns are computed, then combined (can be simple weight multiplication) to get the total selectivity estimation.

Join estimation:
When statistics are available, use the total density to estimate the number of rows matching the join key. For multi-column join, use the composite total density.

Use join density estimates derived from join histograms that give accurate estimates of qualifying joining rows and the rows to be scanned in the outer and inner tables.

Use table-normalized histograms of the joining attributes. This technique gives an exact value for the skewed values (that is, frequency count) and uses the range cell densities from each histogram to estimate the cell counts of corresponding range cells.

The join density is dynamically computed from the join histogram. The first histogram join occurs typically between two base tables when both attributes have histograms. Every histogram join creates a new histogram on the corresponding attribute of the parent join's projection.

Thursday, May 23, 2013

Intelligent scan selectivity reduction: Sybase optimized index selection

Sybase ASE has a nice optimization feature called "Intelligent scan selectivity reduction", which optimizes the index selection, when the predicate columns do not include the prefix columns in the composite index.  It seems there's not much documentation about it. So I'll give a simple introduction about this hidden feature here.

This feature is only available for data page lock(DPL)/data row lock (DRL) tables, when the following conditions are met:
- the table has a composite index
- index prefix column is not in predicate
- cardinality of the prefix column is fairly low

Without this feature, we may need to do a full table scan, or create new index on the predicates.
With this feature, the optimizer will treat the composite index search as a number of small sub-indexes to retrieve the row IDs.
- determine the domain of distinct values for the index prefix column
- iterate through each distinct value in this domain
- for each distinct value, the optimizer will perform a regular index scan

The Intelligent Scan feature will function as if you issued a sequence of SQL statements having each statement specifying a single value for the index prefix column. Probing the table several times using index scan is often more efficient than performing a table scan.

A simple example:
create table t1(c1 int, c2 int, c3 varchar(10), c4 varchar(100)) lock datarow
create index t1_i1 on t1(c1, c2, c3)
select c4 from t1 where c2 = 2
If c1 only has 10 distinct values, then optimizer will still use index ti_i1. For each c1 value, it'll use t1_i1 to find the mathcing c2 values. For large table this is often better than a full table scan on t1.

You can always turn off this feature by using trace flag 15353.

Thursday, May 09, 2013

Scaling Memcache at Facebook

Facebook has a new publication in this year's NSDI'13 conference. You can find the original paper by searching the title online. Here's my detailed notes on this paper:

System properties of FB that affects design:
- Read intensive: user consumes much more data than creates.
- data is fetched from heterogeneous data sources
These call for the importance of a flexible cache design.

Memcache is used as query cache for read load on databases.
- if read miss, select from db, then set(k, v) in the cache
- for write requests, delete cache entry instead of update.
Memcache can also be used as generic cache for applications.

In Cluster Performance
- Reduce latency on fetching
In memcache, items are distributed across servers through consistent hashing. Upon request, web server employ an all-to-all communication pattern, which may cause in-cast congestion, or single server bottleneck. Solution is focused on the memcache client(like a job control node) residing on each web server.
> parallel requests and batching: use a DAG to represent dependencies between data, and maximize the number of items fetched concurrently.
> client-server communication: keep client stateless. Use UDP for get request and avoiding establishment of connections, and TCP for set/delete to maintain reliability.
> in cast congestion: use a sliding window at client side to control the total number of outstanding requests.

- Reduce load on cache miss
> Leases: it's a token bounded with the key the client requested to set the value. It's used to address 2 problems: (1) stale set: set a value that's not latest; (2) thundering herd: heavy read/write to a specific key.
Additionally, when a key is deleted, it's transferred to a data structure for a short time before flushed. Request can return either a lease token or data marked as stale.
> Memcache pools: when used as general cache, we partition a cluster's servers into separate pools, depending the characteristics of the key access patterns.
> Replication: Under some conditions we may choose to replicate instead of further dividing the key space, if (1) keys are fetched together (2) data too big (3) request rate is heavy

- Failure Handling
dedicate a small set of machines named Gutter to take over the failed servers. When client receives no response, it assumes server has failed, and send 2nd request to gutter pool. If miss again, query the db and insert into gutter.

In Region Replication

Web and memcached servers are split into multiple frontend clusters. Along with a storage cluster containing the db, this defines a region. Multiple frontend clusters share the same storage cluster.

- Regional invalidations
Storage cluster is responsible for invalidating. web server also invalidates its own cluster after modification. On every database, deploy invalidation daemons, which inspects SQL statements, extracts deletes, and broadcast to the memcache in every frontend cluster.
Doing this directly may result in high packet rates. To address this, invalidation daemons batch deletes into fewer packets and send to a set of servers running mcrouter instances. These mcrouters then unpack each batch and route to  destination.

- Regional pools
If user's requests are randomly routed to all frontend clusters, cached data will roughly be the same. For large and rarely accessed items, we can reduce the number of replicas by having multiple frontend clusters sharing the same set of memcache servers, called regional pool.

- Cold cluster warmup
Allowing clients in the cold cluster to retrieve data from the warm ones rather than the persistent storage.

Across Region Consistency

As defined earlier, each region consists of a storage cluster and several frontend clusters. In multi-region deployment, designate one region to hold the master database, and the others contain read-only replicas. Consistency is the challenge here as replica may lag behind the master.

- write from master
Same as discussed above: storage cluster invalidates data via daemons.

- write from non-master region
Employs a remote marker to indicate that data in the local replica db are potentially stale and queries should be directed to the master region. When a web server updates data with key k: (1) set marker r_k in the region, (2) write to master embedding k and r_k to be invalidated. (3) delete k in local cluster. When request for k getting a cache miss, a server will check existence of r_k to decide whether to direct the query to master or local region.

- operational consideration
Sharing the same communication channel for delete and replication, to gain network efficiency.

Single Server Improvements

- Performance optimization
> allow automatic expansion of the hash table
> make the server multi-threaded using a global fine-grained lock
> giving each thread its own UDP port to reduce contention

- Adaptive slab allocator
Allocator organizes memory into slab classes containing pre-allocated, uniformly sized chunks of memory. Items are stored in the smallest possible fit. Each slab class maintains a free-list of available chunks and request more if the list is empty. When server cannot allocate free memory, evict the LRU item within that slab class. Adaptively re-balance slab assignments if the currently evicted items are more recent than the average of the LRU items. Then the slab with the LRU is freed and transferred to the needy class.

- Transient Item Cache
A hybrid scheme, lazy eviction on most keys, proactively evicts short-lived keys when they expire. The short lived keys are put into a circular buffer/linked list called transient item cache indexed by seconds. Every second the head of the bucket is evicted and the head advances by one.

Tuesday, May 07, 2013

Some random notes on large data system design

Early stage:
Considerations in early stage are mostly focused on the architecture of
- DB: permanent storage and prevention of data loss at backend.
- Cache: increasing of response speed in front end.
- Sharding: load distribution over distributed network.

New techniques for low concurrency systems:
cons: single point failure, all metadata on master so capacity limited for many small files.
- Map/Reduce
- BigTable (HBase, Cassandra, etc)
- Dynamo:
Distributed Hash Table(DHT) based sharding to resolve single failure. (Amazon, Cassandra)
- Virtual nodes: extension of the traditional mod based hash. In case of node change, no need to remap all the data. Instead, using a bigger mod value and each node gets a range. Remapping is only moving of partial mode keys.
(Cassandra details it here:

Considerations for high concurrency systems:
- Cache
- Index
- Sharding (horizontal vs vertical)
- Locking granularity

More general considerations:
- replication storage
the famous 3-replication: (local, local cluster, remote)
- random write vs. sequential write:
write concurrent large data into memory, then sequentially write to disk.

Wednesday, March 20, 2013

Parallelism in SAP Sybase ASE

Here're some random notes on the parallelism of SAP Sybase ASE.

The Parallel execution in ASE is implemented by the EXCHANGE operator. It marks the boundary between producer and consumer. Consumer process reads data from a pipe.

Parallelism implementations:

Table Scan:
For an unpartitioned table, hash-based table scan is used. Each work process examines 1/k rows of the page.
For partitioned-table, each partition is process by a work process

Scalar aggregation:
The aggregation operation can be performed in two phases or serial:
- Two-phase: using two scalar aggregate operators. The lower scalar aggregation operator performs aggregation on the data stream in parallel. The result of scalar aggregation is merged and aggregated a second time.
- Serial:  the result of the parallel scan is merged and aggregated.

Union all:

- parallel: each of its operands must be of the same degree, then union all is done in parallel, and result is merged.
- serial: when restricted by selective predicates, the amount of data being sent through the union all operator is small enough, and only scan is done in parallel. The merged scan result is send to union all.

- two tables with same useful partitioning
This is called an equipartitioned join. Joins are done in each partition in parallel, then merged. (nested loop join is often used)

- One of the tables(tb2) with useful partitioning
The query optimizer dynamically repartitions the table (tb1) to match the partitioning of tb2, by using hash partitioning on the join column. So the 1st exchange operator does the parallel repartition of tb1. The 2nd exchange operator does the join. (merge join is often used)

- Both tables with useless partitioning
Both tables will be repartitioned. The repartitioned operands of the join are equipartitioned with respect to the join predicate. (hash join is often used)

- Replicated join:
When a large table has a useful index on the joining column, but useless partitioning, and joins to a small table. The small table can be replicated N ways to that of the inner table, where N is the number of partitions of the large table. Each partition of the large table is joined with the small table and, because no exchange operator is needed on the inner side of the join, an index nested-loop join is allowed.

- Parallel reformatting
When there is no useful index on the joining column or nested-loop join is the only viable option.
The outer side may have useful partitioning or we can repartitioned to create that. But for the inner side of a nested-loop join, any repartitioning means that the table must be reformatted into a worktable that uses the new partitioning strategy, then then creating an index on the joining predicate. The inner scan of a nested-loop join then access the worktable.

Vector aggregation (group-by):
- In-partitioned vector aggregation
If grouping is partitioned on a subset, or on the same columns as that of the columns in the group by clause, the grouping operation can be done in parallel on each of the partitions.

- Repartitioned vector aggregation
If the the partitioning is not useful, repartitioning the source data to match the grouping columns, then applying the parallel vector aggregation.

same as group by.

Queries with an in list
in (...) list is transformed into an or list. The values in the in list are put into a special in-memory table and sorted for removal of duplicates.The table is then joined back with the base table using an index nested-loop join.

Queries with or clauses
The set of conjunctive predicates on each side of the disjunction must be indexable. We apply each side of the disjunction separately to qualify a set of row IDs (RIDs). We can use different indexes here. The predicates may qualify an overlapping set of data rows. These row IDs are then merged to duplicate elimination. Finally, RIDs are joined back to the base table to get the results.

Queries with an order by clause
If no inherent ordering is available, it needs to repartition an existing data stream or it may use the existing partitioning scheme, then apply the sort to each of the constituent streams. Result is merged.

Select into clauses
- Creates the new table using the columns specified in the select into statement.
- Creates N partitions in the new table, where N is the degree of parallelism that the optimizer chooses for the insert operation in the query.
- Populates the new table with query results, using N worker processes.
- Unpartitions the new table, if no specific destination partitioning is required.
If destination partitioning is specified:
- If the destination table has the same partition as the source data, and there is enough data to insert, the insert operator executes in parallel.
- If the source partitioning does not match that of the destination table, the source data must be repartitioned. Insert is done in parallel.

insert, delete, and update operations are done in serial.
However, tables other than the destination table can be accessed in parallel.

Partition elimination
Query processor is able to disqualify range, hash, and list partitions at compile time. With hash partitions, only equality predicates can be used, whereas for range and list partitions, equality and in-equality predicates can be used to eliminate partitions.

Thursday, March 14, 2013

SAP DKOM 2013 talk: HANA Query Federation

In SAP DKOM 2013 conference, most topics are about the HANA platform, which clearly reflects SAP's determination and efforts to becoming the 2nd largest database company. Of all the topics, an interesting one to me is HANA federation.

The SAP real time data platform includes four major components: Transactional Data Management(Sybase ASE), In-Memory Innovations(HANA DB), Analytics EDW Data Management(Sybase IQ), Mobile Data Management(SQL Anywhere). It may also contain a Complex Event Processing(CEP) module as the data source. Built on top of that are SAP's solutions including Business Suites and Cloud/Mobile/Analytics Apps. Of course, the core of this platform is the HANA DB which provide real-time analytics.

With so many different DBMS underneath, an obvious and challenging question is how to federate the access to those data? SAP is developing solutions so that in the HANA studio we can issue queries not only on HANA tables, but also on any kinds of data sources as along as ODBC is supported. We can join the HANA tables with other tables like Sybase ASE, IQ, or even Hadoop, and everything is transparent for the customers.

In order to achieve this, we first create proxy tables that links to the other underlying data sources, for example Hadoop. Then the query plan will ship corresponding operators to those servers, get translated into Hive scripts, and run remotely there. The results will be transmitted back to HANA studio. Specifically filtering operators are also passed to the remote server to minimize the data transmitted.

Sometimes the remote server may take quite some time to finish the job. But the optimizer in the main server cannot have any estimates at this time. For example Hadoop. As an ongoing future work SAP is collaborating with Intel, to estimate and monitor the running time of jobs in remote server, so that the main server won't need to hang there waiting.

Saturday, March 09, 2013

Brief notes on Map Reduce 2 (YARN)

YARN stands for “Yet-Another-Resource-Negotiator”. The fundamental idea of MRv2 is to split up the two major functionalities of the JobTracker: resource management and job scheduling/monitoring, into separate daemons:
- A ResourceManager (RM) that manages the global assignment of compute resources to applications.
- A per-application ApplicationMaster (AM) that manages the application’s life cycle.
It is a more isolated and scalable model than the MR1 system.

The ResourceManager has two main components:
- Scheduler
Responsible for allocating resources to the various running applications subject to familiar constraints of capacities, queues etc

- ApplicationsManager (AsM)
Responsible for accepting job-submissions, negotiating the first container for executing the application specific AM and provides the service for restarting the AM container on failure.

NodeManager (NM): per-node
It's per-node slave of the ResourceManager, responsible for containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager/Scheduler. The design also allows plugging long-running application-specific auxiliary services to the NMs during startup. Shuffle is a typical auxiliary service loaded by the NMs.

ApplicationMaster (AM): per-application(job)
Responsible for negotiating appropriate resource containers from the Scheduler, tracking their status and monitoring for progress. Each AM manages the application’s individual tasks, and each task runs within a Container on each node

Resource Allocation Process:
Resources are requested in the form of containers, where each container has a number of non-static attributes.

Client - RM
Client submit request to RM/AsM, upon response from RM, sent application submission context.
Client also asks for and gets response of application report from RM/AsM.

AM registers itself to RM, RM gives it resource stat of the cluster.
AM sends resource allocation request to RM. RM/scheduler will give a list of containers.

AM sends container start request to NM.

Friday, March 08, 2013

Watch out QR code security

Attacks are everywhere! Just noticed that there're malicious QR code attacks that might trick you into some fake website, or steal information from you. Due to the limited amount information encoded, most of the attacks are either phishing attack or code injection. You really need to pay attention to the website you opened and see if it's as expected. Especially be cautious if you got a prompted form asking for filling out some private info, or asking you to install some package. Best prevention is: only scan those you trust, and ignore those fancy ads on the wall in the streets :)

Thursday, February 28, 2013

SQL on Hadoop, for real now

One of this week's big news is EMC's Project Hawq. The trend is pretty clear now, that the future of big data solution is the fusion of Hadoop platforms with SQL and complex analytics from traditional RDBMS. Many companies have started their efforts, and we're sure to see yet more. What Hadapt claimed earlier last year: Why Database-to-Hadoop Connectors are Fundamentally Flawed and Entirely Unnecessary, is being proved by many other companies.

Tuesday, January 15, 2013

Statement Cache vs Procedure Cache for ad hoc queries in Sybase ASE

Statement Cache
For ad hoc SQL queries, ASE wraps the SQL statement with a lightweight stored procedure (LWP) and changes any local variables into procedure parameters. ASE will assign an object ID to this LWP. The statement cache will save the SQL text and the corresponding LWP object ID.

For an incoming statement, ASE computes a hash value from the statement, and uses this hash value to search for a matching statement in the statement cache.
- If a match is found in the statement cache, then search for the object ID in the procedure cache.
- If a match is not found, ASE first wraps up the statement in LWP, assigns an object ID, and caches this statement in the statement cache. Then the LWP is transferred to procedure cache.

Procedure cache
The procedure cache is a MRU/LRU (most recently used/least recently used) chain of stored procedure query plans. The entries are identified by the object ID.

When there is no plan for the corresponding object ID in the procedure cache, ASE compiles the procedure and caches the plan. The plan is compiled using the assigned runtime values for the local variables. If the plan exists but is invalid, ASE will build a new LWP  using the text of the cached SQL statement.

Statement Cache Matching
ASE matches an ad hoc SQL statement to a cached statement by the SQL text and by login (particularly if both users have sa_role), user ID, database ID, and session state settings. So if two users submit the same query, as the bindings are different (for example, both may have table tb1, but obviously they're different objects, since they belong to different database), they're still treated as different queries.

It's also possible that two queries were identical except for one or more literal values. By default they're treated as different queries. By "enable literal autoparam", the 2nd query can reuse the plan generated for the 1st query. For example, the query "select * from tb1 where col =1" will be saved internally as "select * from tb1 where col = @@@v0_INT". In this way we can reduce the compilation time, the entries and storage in the statement and procedure cache.

Why wrap up statement in LWP?
Surely we can use a single statement cache that includes everything: statement text, plan, literal form, etc. In ASE, we decide that statement cache should only cache the text, and perform object binding. The procedure cache take care of all plan related info. By doing this functionality separation,  we have a clearer framework, and we can do memory allocation in a more consistent way.

When to enable literal parametrization?
For OLTP, queries are often simple and homogeneous. We may consider using this "enable literal autoparam" configuration and reuse previous plans, to save the compilation time and storage.
For Business Warehouse, queries are often more complex and skewed. The plans may be quite different for different parameters after optimization. So it is desired that queries be recompiled optimized for each set of different parameters.

Streamlined dynamic SQL:
The statement cache can also be used to store dynamic SQL statements (prepared statements) converted to LWPs. Because the statement cache is shared among all connections, dynamic SQL statements can be reused across connections
To enable using the statement cache to store dynamic SQL statements, set the "streamlined dynamic SQL" configuration options.

Saturday, January 12, 2013

Stored procedure processing in Sybase ASE

When a stored procedure is created:
- the ASCII text of the procedure is stored in the syscomments table
- normalized form of the procedure(query tree) is stored in the sysprocedures table.

Resolution phase: Query Tree generation
Parses the SQL in to query tree, and resolves all objects involved into their internal representations.
- table names are resolved into their object IDs
- column names are resolved into their column IDs.

Compilation phase: building query plan
A query plan is built on the first execution of a stored procedure.
- SQL Server reads the corresponding query tree from the sysprocedures table
- loads it into the procedure cache
- creates a query plan and places it in the procedure cache.

The query plan is the optimized data access path that SQL Server uses to execute the procedure, based on the following:
- The SQL stored in the query tree
- Statistics for each table and index referenced in the procedure
- The values of any parameters passed to the procedure on the first execution

Query plans are held only in procedure cache and not on disk, they must be rebuilt if the SQL Server was restarted.

Multi-copy Query Plan in cache

- Only one user at a time can execute a given copy of a procedure's query plan.
- If two or more users try to execute the same procedure at the same time, SQL Server creates an additional query plan based on the parameters used in the latter execution. When a user finishes using the procedure, the query plan is available in cache for reuse by anyone with execute permissions.

- The second query plan may not be the same as the first one
  > different set of parameters
  > an index is added to a referenced table
  > updated the statistics

Adding an index or updating statistics does not force recompilation

When there're mulitple copies of query plans, there's no control which execution plan will be chosen for a given execution. You may see unexpectedly different execution times for the same procedure given the same data and parameters. To remove this uncertainty,
- Dropping and re-creating the procedure will cause all existing plans to be flushed out of cache.
- To ensure that you always get your own plan, you can use exec with recompile or create with recompile . Creating a procedure with the with recompile option decreases performance because every execution causes a compilation.

Recompilation takes place whenever one of the following events occurs:
- The procedure is loaded from disk to the procedure cache.
- An index on any table referred to in the procedure is dropped.
- All copies of the execution plan in cache are currently in use,
  and a new user wants to execute the procedure.
- A procedure is executed using the "with recompile" option.
- A table is flagged with the sp_recompile stored procedure.
  This causes SQL Server to re-resolve and then recompile any procedures or triggers that access that table, at execution time.

Dropping an index or a table referenced by a query causes SQL Server to mark the affected procedure as needing re-resolution and recompilation at execution time.

Neither the update statistics nor the create index command causes an automatic recompilation of stored procedures.

Re-resolution causes the generation of a new plan, updates the existing query tree in the sysprocedures table.

Re-resolution occurs when one of the tables changes in such a way that the query tree stored in the sysprocedures table may be invalid. The datatypes, column offsets, object IDs, or other parts of the table may have changed.

Deferred compilation
Previously all statements in a stored procedure are compiled before they were executed. This meant that the actual values of local variables or knowledge of temporary tables created within the stored procedure were not available during optimization, so the plan just use magical numbers.

When using deferred compilation, stored procedures that reference local variables or temporary tables are not compiled until they are ready to be executed. Then optimizer can select a better plan for executing the stored procedure for the given data set.

The same plan can be reused for subsequent executions of the stored procedure. So the plan is optimized specifically for the values and data set used in the first execution, it may not be a good plan for subsequent executions of the stored procedure with different values and data sets.

With the option of "with recompile", the stored procedure will be recompiled for each execution rather than using the plan from a previous execution.

Deferred name resolution
The deferred name resolution feature allows objects, except for user-defined datatype objects, to be resolved when the stored procedure is executed for the first time, instead of at creation time. Using deferred_name_resolution allows creating a procedure that references a table that does not yet exist.

Procedure cache
The procedure cache is a MRU/LRU (most recently used/least recently used) chain of stored procedure query plans. As users execute stored procedures, Adaptive Server looks in the procedure cache for a query plan to use. If a query plan is available, it is placed on the MRU end of the chain, and execution begins.

If no plan is in memory, or if all copies are in use, the query tree for the procedure is read from the sysprocedures table. It is then optimized, using the parameters provided to the procedure, and put on the MRU end of the chain, and execution begins. Plans at the LRU end of the page chain that are not in use are aged out of the cache.

Monday, November 19, 2012

Reset the IP address of your Ubuntu VM after changing host network

If you changed your host machine to a different network, your previous network setup might not be working, as the gateway and IP address of the host VM might have become invalid. So you need to reset the VM and assign new IP to your network. I'm using VMware Workstation 6.5 as an illustration. And I'm showing only the NAT connection as an example here:

Note the default setup of the following virtual ethernet cards:
VMnet0 is bridged to eth0
VMnet1: for private host-only network
VMnet8: for NAT

Suppose I'm using the NAT connection, then I will change the VMnet8 setup. Before you start, you need to take a look at your host network setup, identify the gateway, and decide your desired IP address range for your VM.

//setup VMnet8 address:
control panel--> Networking settings --> VMnet8 --> TCP/IP --> Properties
Suppose I choose the address as

//next setup VM WorkStation network configuration:

Edit --> Virtual Network Editor --> Host Virtual Network Mapping--> VMnet8
> subnet
IP Address:
Subnet Mask:

Start IP Address:
End IP Address:

Gateway IP:  (this should match your host gateway)

//double check if the DHCP tab is correct
Edit --> Virtual Network Editor --> DHCP -->

//in ubuntu vm:
1. change /etc/network/interfaces
 update the ip address and gateway

2. change /etc/resolv.conf
update the nameserver

//finally, restart network, and we are done!
sudo /etc/init.d/networking restart

Tuesday, November 13, 2012

Taobao's big data rocks!

Taobao, the Chinese version of Amazon, hit a record $3.2B online sales revenue in a single day on 11.11.2012 (the so called "singles' day" by the look of 11.11), which tops the sum of the black Friday sales in the US of the past 4 years! At the same time, Alipay, the Chinese version of Paypal, reported 4.1B transactions, 28.4B SQL queries, 15.TB log, 193.1B visits to in-memory data blocks, and 1.3B physical reads, also in the same day.

A quick study tells us that Taobao uses MSSQL clusters for transaction processing, and they designed their own Hadoop-based data analytics platform called OceanBase. Alipay also uses MSSQL clusters as the core foir their transaction processing.

Wednesday, October 03, 2012

SAP HANA can support 250T DRAM now!

Just learned from Vishal's blog today, biggest SAP HANA system now has 100T DRAM, and expandable to 250T, and it can be doubled by early next year!
Blog: Setting The Record Straight -  SAP H... | Experience SAP HANA
Moore's Law is still effective in hardware!

Saturday, September 29, 2012

SAP vs ORACLE, the database market is reshuffling?

In the past several days, ERP on HANA and ERP on ASE has successfully taken some big orders which were on Oracle platform previously. The HANA-Sybase Database Platform (HSDP, as officially referred by SAP) has shown its impact on the market, and the future of the database market looks rather interesting from this point.

After SAP brought up HANA, and bought and integrated Sybase successfully, SAP has announced its goal is to be the No. 2 database company in the world by 2015. Oracle and SAP are now competing head to toe on either the application or the database market. However, I think these two companies have differed fundamentally in their view to the relationships between application and database. Oracle is essentially a database company, and it thinks applications should be developed and adjusted given what the underlying database can offer. SAP, on the other hand, put more emphasis on the application side, and requires that database meets the application requirement. The good thing with Oracle is: it can focus on the database development itself, adding new features, improving its performance and making it more powerful. However, currently SAP's major goal is to package its ERP and business analytics products, which is already a big share of the overall database market, with its own database platforms. Making the database seamlessly integrated with its applications obviously gives big advantage. In fact, the current development process of HANA and Sybase has already shown this: A feature development is firstly evaluated whether it should be put in application side or in database side. In this view, Oracle may be better at providing a generic data platform at this time, but in no way it can provide as good performance for SAP applications.

HANA system may looks expensive now. But as claim by the HANA team: we're making best product, not the cheapest, pricing is not an issue given its superior performance, especially for the SAP ERP products. For those customer who cannot afford the high cost of memory, SAP also have an alternative plan: hook up HANA with Sybase IQ, which is a low cost and yet powerful data analytics system. Overall, the full stack of database/warehouse products SAP now has in hand, gives it great flexibility in offering the customers the right package.

Thursday, May 24, 2012

Big data talk by Dheeraj from Zynga

Went to the meetup "Big Data - Challenges, Solutions and A Comparative Analysis" organized by Taleo this evening. Dheeraj Soti, principal engineer from Zynga gave the talk. An interesting fact: Zynga is using Vertica as their big data analytics platform (they're actually the largest customer of Vertica, and their system size has grown from single digit to 1000 nodes nowadays). I was just wondering why they didn't use any of the popular NoSQL solutions such as Hadoop. As Dheeraj introduced, the data are collected from many game centers and then bulk load to Vertica. General reports are run overnight. I had thought Zynga is a social game company, and NoSQL should be very effective for social data analysis, and are actually deployed by most social leading companies such as Facebook. Maybe I had some misunderstanding in his talk. But can it be true that Zynga didn't use any NoSQL?

Labels: ,

Sunday, May 13, 2012

Hadoop install on Ubuntu Linux

Some friends asked me about how to setup Hadoop in Ubuntu Linux. I would strongly suggest Michael G. Noll's tutorial online: running-hadoop-on-ubuntu-linux-single-node-cluster
It's a very clearly written step-by-step tutorial and especially good for new users.

For his multi-node cluster setup, I do have some additional explanations on the network setup that's ommitted in his tutorial. So if you're done with his single node tutorial and got problems with the cluster setup, you may try examine the setups here:

1. Setup multiple single nodes
You can simply following Michael's tutorial in the above link.

2. Cluster Networking setup
2.1 Update hostname if needed:
//to display hostname:
//to change host name:
 vi /etc/hostname
 vi /etc/hosts

2.2  networking setup
2.2.1 set up static IP adress in /etc/network/interfaces in each node
Example contents in the interfaces file:
auto lo
iface lo inet loopback

auto eth0
iface eth0 inet static

2.2.2 Update /etc/hosts on all machines. Example:    master    slave    slave2

2.3 SSH access control
2.3.1 Truncate all files in $HOME/.ssh,  and regenerate SSH keys as in the single node
2.3.2 Add master's key to slave's authorized_keys file, to enable password-less SSH login
ssh-copy-id -i $HOME/.ssh/ hduser@slave
2.3.3 On slave machines do the same and add key's to master's authorized_keys file
ssh-copy-id -i $HOME/.ssh/ hduser@master
2.3.4 Try ssh from both master and slave to master/slave

3. Hadoop setup
You need to modify the namespaceID of the datanodes to the same as that of the namenode in the master.
The namespaceIDs of the namenode and datanode are defined in:

Remember that here the {hadoop.tmp.dir} is setup in conf/core-site.xml

if you don't do this, you might get the error: Incompatible namespaceIDs in logs/hadoop-hduser-datanode-master(or: slave).log

After all these are done, you can then follow the multinode cluster setup tutorial in Michael's link.


Friday, May 04, 2012

A nice trick to truncate a file in linux

I alway use /dev/null to truncate a file. However, just found that you can use a simple command:
to truncate a file to zero size in linux.