Sunday, September 30, 2012

Coherence Cluster Capacity

Hello!, Understanding the capacity of a cluster is very important, as an overloaded cluster will not perform well.

The overall capacity of a cluster is determined by how many members it has, two physical resources determine the number of members that can be available at a given time, memory and cpu cores.

CPU Considerations:
It is important to understand that coherence is a series of services running in a jvm and these services are asynchronous named threads, all of which require CPU cycles to work, as does the JVM, and the OS, so, when determining your capacity it is very important to remember that each member will increase this overhead, limiting the number of members per box.

Memory Considerations:When deciding the amount of data it is vital to remember that not all of the heap space in the storage node is available for primary storage, in fact, most of it is not. There are several overheads that need to be accounted for:

  • For Partitioned caches each piece of data is backed up on a different JVM, therefore we must account for the size of twice the serialized object size per Object.
  • For Replicated caches the objects are kept unserialized after the first access, and are present in every member and must be sized accordingly.
  • The Keys of the objects and Indexed data are kept unserialized.
  • Every Named Cache has an overhead of 250kb
  • Each entry has an overhead of 144 bytes.
  • Coherence needs some scrap space; that is, space to unserialize and work with objects, when executing listeners, entry processors and aggregators.
  • The JVM has its own overhead and to perform optimally should not be executed over 75 to 80% capacity.  

Sizing your members:
Taking all of those overheads into consideration, we can start to size our members, the sum of whom will be our total capacity.
  • ¼ of the capacity should remain unused to ensure optimal performance by the JVM.
  • ¼ of the capacity should not be used for primary storage and will be used for the overheads Coherence needs.
  • This leaves us with ½ of the JVM to use as storage in to be used in a mix of Partitioned and Replicated storage, however, it is important to take into consideration that:
    • For Partitioned caches you must consider twice the size of the pof serialized object, as it will be stored twice.
    • For Replicated caches you must take into account the native size of the objects.
Taking this into account once you know the size of your objects it becomes trivial to determine how much data you can store per storage node.

Enforcing your cache sizing:
In order to ensure that coherence enforces your cache sizing you must specify on the backing map of each cache Binary as the unit calculator, the maximum size in bytes of primary storage for that cache (remembering that for Partitioned caches the actual memory spent spent is twice that due to the backup) and the eviction strategy.

The dangers of wildcards:As explained above, the sizing is performed per cache, so wildcards break this contract and if you are using wildcards there is no way to directly enforce a sizing policy.
For instance, if you have a cache mapping of my-cache-* and set a high units count to 100MB. When you create a cache named my-cache-one, the maximum amount of data that can be added is 100MB. The problem is you can create an unlimited number of caches using that mapping, and each of them will allow 100MB, therefore, potentially resulting in a out of memory exception.  

Sizing your cluster:
Based on these considerations we can determine how many storage nodes we will need to store the data required by applying a simple formula where x is the number of nodes required:

Which in essence means that you must add the size of each object that you aim to place into a distributed cache and multiply it by the number of objects in that cache, multiply it by two to account for the backups, and divide that by the storage capacity of the JVM, which is, as discussed in the previous session half of the JVM size, minus the size times the number of objects of each replicated cache, as they will take up space in every JVM.
After calculating the number of members that are necessary, you must add overhead to account for the loss of one physical machine, so, if each physical machine holds 3 members, you must add 3.
For instance, if you had three distributed caches, one with 100,000 5k objects, 50,000 1Mb objects, and 1,000,000 2k objects and one replicated cache with 4,000 50k objects, and we are hosting 3 processes per physical machines making the calculation:

next we convert all to the same unit and get our final value:

so, now we know we will need 35 storage enabled members to store the data.

Calculating the number of Members per physical server:
Once the number of members has been determined, you can decide how many members will be in each physical box, however there are some more things to consider, the server hosting the member still needs memory and cycles for the OS to work optimally, and paging/swapping should never be used, so we need to make sure not to use all of the available physical memory with Coherence members.
The CPU usage, as explained above is also very important and if not correctly managed performance will be greatly degraded due to threads fighting over cores.The exact number of members per core will depend on the load, but one core per member is a reasonable starting number, but this should be profiled and tuned by running tests and checking the task backlog Mbean and the CPU usage in the OS, making sure that it is not running too hot or too slow, and adjusting the number of members per box accordingly.

Calculating the number of extend members per cluster
Members running the Coherence*Extend service should be storage disabled and should have JVM’s the same size as the other members, because, even though they will not store data they will potentially be doing multiple aggregations for different clients. It is important to remember that you will need one Thread per active concurrent connection by setting the Thread-count on the proxy configuration, and should derive the number of proxies you need from that, keeping in mind the cpu considerations discussed earlier and that you should at the very least have two on different physical machines for reliability.


Saturday, August 25, 2012

Entry Processors part 2 - Deadlocking

This is Part two of my entry processor post and deals with the advantages and disadvantages of cross cache entry processors.

Cross cache Entry processors
As mentioned in the previous post entry processors allow us to update values in the most efficient manner, however, often you will want to update values in multiple caches, or update a value that is derived from related entries, to do this, all you would need to do is a call from CacheFactory.getCache,    from inside the entry processor, for instance:

By looking at the diagram we can see the problem straight away, as our code will perform an extra network hop to get one of the blue objects, nothing guarantees the object will be within the same node. So how can we solve this?

Coherence offers something called key association, which allows us to ensure that two or more related entries that share the same business key are co-located, this will require for both caches to be sharing a service. I have previously shown some benefits of key association in the versioning post, and the documentation is here, so, I’ll assume everyone is familiar with it, if not, take a look at the mechanism and i’ll wait and check my emails.
Welcome back, now, after we implement key association our cache would look like this:

Which is exactly what we wanted, but when we run the code we get this exception:
2012-08-07 11:49:41,510 ERROR [Logger@9252516] Coherence(3) - 2012-08-07 11:49:41.507/14.742 Oracle Coherence GE <Error> (thread=DistributedCache:first-service, member=1): Assertion failed: poll() is a blocking call and cannot be called on the Service thread

This is very dangerous, because if we have multiple threads for the service instead of an exception you will get is the warning:
2012-08-07 11:40:50,040 WARN  [Logger@9243192] Coherence(3) - 2012-08-07 11:40:50.040/5.833 Oracle Coherence GE <Warning> (thread=first-serviceWorker:1, member=1): Application code running on "first-service" service thread(s) should not call ensureCache as this may result in deadlock. The most common case is a CacheFactory call from a custom CacheStore implementation.

This happens because you are accessing the service from a thread from the same service,  and here is where the behavior starts getting tricky, as if you have two or more free threads it will work as expected, barring the exception, however if you only have one free thread or only one extra thread on your service, the execution of the entry processor will deadlock until being killed by the service guardian. You should never do this, so how should we go cross cache properly?

Direct backing map cross cache access 
As the title, which is a mouthful, explains we should access the entires directly in the backing. Because we are using key association we are sure the entries will be co-located, as explained in figure 2.

Entry Processors in coherence 3.7

Entry Processors have gotten better in 3.7, as its transactionality and locking has been extended to include cross cache access, so all of the wonderful transactional guarantees and locking explained in my previous post are extended to cross cache access. This however creates quite a big problem, as you can potentially deadlock your application. It might not be clear at first glance, but the diagram below should make it clearer:

As all access locks, the following example will deadlock and the entry processor that was invoked last will fail due to deadlocking while the one who arrived first will continue its execution. This is a deceptively simple problem to avoid with good design… Creating and documenting the expected flow through the caches-  i.e. all cross cache entry processors should go from the first to the second cache, this way they will queue and provide the expected behavior from entry processors.


Tuesday, August 21, 2012

Entry Processors - Part 1

Hello! This is the first of a two part series on entry processors:
Entry processors are a very powerful tool in our coherence toolbox Coherence 3.7 improved the  entry processors API even further. In case you haven’t looked at the documentation, below is a quick description:

Entry Processors:
Entry Processors, process entries. I say this because I cannot be the only one who interpreted the name as something that  processes things as they enter, which is the job of the trigger, but I digress.
Entry Processors allow us to execute code server side against entries or sets of entries, either passed as a parameter or that match a filter that has been passed as a parameter to the processor. This is the preferred method of updating the value of entries in a cache, as it will be performed in parallel in each node, an implicit lock is acquired in the object and multiple entry processors changing the same object are queued, guaranteeing ordering of updates thus ensuring data consistency.

Why use entry processors?
The main reason is because coherence is a hashmap, so all the operations are put/gets, which means that it is quite easy to end up with an inconsistent state. The diagram below explains the most common scenario.

●      At T0 the object has its fields with a value of “Initial” and “Initial”.
●      At T4 The first field has a value of “changed” and the second one of “initial”.
●      At T6 the object will have the values of “Initial” and “changed”.

From this we can see that the changes made by the client 1 where overridden by client 2 because he had a stale version of the object.
The simplest and heavy handed way to solve this is:

●      Before actually getting the objects the clients attempt to get a lock on them.
●      At T3 client two will ask for a lock, this will either return false, or he will block until it is released if he asked for the lock passing a long for the maximum time to wait for the lock (this will avoid multiple network calls to attempt to get the lock, but you must set a value smaller than the service guardian, or risk it killing your Thread.)
●      At T7 Client 2 finally has the lock and can get the object.

This will protect you from stale data, however, we are not doing government work here, our implementations shouldn’t only work, but they need to be efficient and easy to maintain and this solution has quite a few problems, for starters it slows the cache as a whole, as the clients are constantly locking the data. Secondly, there are extra network hops required for each locking step, further slowing down the application.

Simple Entry Processor solution:
In order to solve this problem when writing a processor we need to ensure it is a simple as possible  and that code is  deployed on the member that holds the data, as the entry processor will be serialized and sent over the wire. Below is the example code for the firstField entry processor, the one for the second field looks exactly the same, except it changes a different value. In production it would be more efficient to use a POF updater, like the one used in the Versioning post, but I wanted to keep this example as simple as possible.

And the graph:

●      If the entry processor at T2 arrives before T1 it will get queued to be executed after the lock is released, as entry processors take an implicit lock on the objects.

Hopefully this has made the advantages of using entry processors clear.
Entry processor Particulars:
As explained above, locking is very important for the correct functioning of the entry processor. I found the documentation a bit confusing in this regard, the description is: “EntryProcessors are individually executed atomically, however multiple EntryProcessor invocations by using InvocableMap.invokeAll() do not execute as one atomic unit.”  A simpler way I found to think about it, is that each group of keys will be executed atomically, and each group of keys will lock for the entire duration of that group of keys, for instance, lets say our invokeAll evaluates to keys 1,3,4,7 and 9. For those purposes, it is inconsequential if it evaluates to those keys through a filter or if you passed in a set of keys. The partition that contains keys 1,3 and 7 are on member 1 and the ones with keys 4 and 9 are on member 2. Therefore the changes made on each of those groups will lock and be transactional for the duration of the execution of all of the keys on that member.

Entry Processors on a replicated cache:
Although not very common, as any writes to a replicated cache are expensive, sometimes entry processors are executed in replicated caches and unlike partitioned caches where the entry processor is executed on the nodes that own the entries,in a replicated cache the execution takes place on the member that invoked the entry processor. It is important to note that the locking still takes place, however the locks are implicit without extra the network calls, as would be needed if we were using the methods in concurrent map interface.


Saturday, June 23, 2012

Filtering Filters

Hello ! (Who needs a greeting anyway), distributed queries are a very powerful part of coherence, but can introduce show stopping performance degradation, so it is vital to get your filters right.

The filters provided by coherence, take an Extractor as a parameter, and will execute that extractor across all of the nodes, and the evaluate method will run on the result of that extractor.
This means that, if you are using a normal reflexion extractor(what is used by default when you call a filter with just a method name) new EqualsFilter(“getName”,”some_name”). This is quite slow and Inefficient,  as every object will be deserialized in order to evaluate that filter. There are two ways to avoid this,

The simplest was to avoid this is to use an Index to store those values. indexed values are kept unserialized and are matched in the same was as regular db indexes. The Filter classes provided by coherence implement IndexAwareFilter, and know how to lookup those indexes, therefore avoiding the deserialization.

The other way is to simply pass a POF extractor as a parameter, POF extractors do not deserialize an object In order to extract the desired value. However this approach is slower than having Indexes, as each object will still be evaluated, as opposed to simply looking up the Index table. However, this provides more flexibility, as you can decide at run time which fields will be used.

Custom Filters:
Sometimes you need more complex logic than the custom filters provide you, so you may decide to code your own, the simplest way to do that is to simply implement the Filter interface, however, a filter created in this manner will not use indexes even if they are relevant and will deserialize every object in that particular cache resulting in catastrophic performance.

There are a few ways to create a custom filter efficiently:

Implementing EntryFilter instead of Filter allows you to implement the evaluateEntry Method, which receives a BinaryEntry as a parameter, allowing you to extract the values yourself in a clever manner, avoiding deserializing the whole entry, however the extraction will still occur for every entry, which can be time consuming.

Extending ExtractorFilter allows you to, through its superclasse’s constructor, to take advantange of the created indexes and, by implementing the evaluateExtracted method to write your complex custom logic to evaluate that Object.


Saturday, May 12, 2012

Server Side Versioning

Hello World!(Nothing yet on the new greeting front), I was reading a very good post about versioned put on Ben Stopford blog's, this stimulated me to run some tests and write something to perform server side versioning.

As Ben explained quite well on his blog many caches will need some concept of versioning, for instance, you might want to know not only the latest version of a piece of market data for a given instrument, but you might also want the last 30, to have a view of the overall movement of that instrument.

Why Server Side Versioning

I wanted to write about server side versioning because that is the type of versioning i prefer to use, I find that Client side versioning makes distributed puts non trivial, can create bottlenecks and misses the opportunity to take advantage of Coherence’s distributed infrastructure.

The Goals

  • Multiple distributed clients must be able to push data at the same time and this should not affect the versioning.
  • Multiple Objects with the same business key must be kept in the cache in the same partition.
  • At any given time, a client, using the well known latest version marker (-1) must be able to access the latest version of an object with a simple get.

The steps

  • The first step was to create a key class (in this example called versioned key) that wraps the business key and version, and that implements KeyAssociation, using the business key as the associated key. This will ensure that all of the objects for that same business key will share a partition.
  • My Model class needed a version placeholder field, this will be explained in the the map trigger step,  and is one of the main drawbacks of this approach.
  • Creation of a map trigger that takes advantage of the Entries<link> getPreviousBinaryEntry() method to access the previous version of an entry, update its version and directly put it in the backing map.
  • Whenever a new VersionedKey Object is created its version is set to the well known latest version marker, in this example, -1.
  • When an Entity is added for that key is added to the cache, the VersioningMapTrigger will be executed, having two different flows, one if it is the first object to be added, and another if there are already other versions of that Entity in the cache.
  • If there are no other entries with that key in the cache (getPreviousBinaryEntry is null)  all you need to do is to set the version in the Entity (not its key) to 1 using a POF updater to avoid deserializing it.
  • If another entry exists in the cache for that same key (getPreviousBinaryEntry is not null), several step are required
    • The first step is to get the previous real version(not the version in the key, as it will be -1 since it was the latest) from the Entity that was already in the cache and set the version in the key of that Entity to the same value.
    • We then set the version on the Entity that was just added to that value +1, again using a POF Updater.
    • Finally we put the previous entry and previous key directly in the backing map.
This approach meets all of my requirements, and to be honest there is only one thing I do not like about it, and that is having the version on my model objects, however, since I cannot easily change the value of the key that will be added after a map trigger I decided it was worth it.  I also tried some different approaches, such as my own implementation of a local cache and a backing map listener, however both of them required more backing map calls and were less performant, so I decided against it.

Wednesday, April 11, 2012

Unit testing POF

Hello world!(still looking for that better greeting...), It has become a recurrent topic here in the blog, ok, this is the first post and nothing is recurrent, however I hope it will be.

POF is probably the most used type of serialization for coherence, because of its performance (it is around 10-12 times faster, which saves a lot on CPU cycles and the generated Binaries are about six times smaller than in normal serialization.) is far better than normal java serialization, however POF requires some configuration and implementation, as explained in the documentation.

Another factor that need to be considered is the maintenance cost to the development process. It is important  to make sure that mappings are up-to date to avoid, at best wasting time to fix something and at worst ending up with missing data or inverting two fields.

Proposed solutions:
The easiest and crudest way to do this is to simply have a unit test that puts all of your objects to their respective caches, then gets them and compares them to assert if they are equal. This however will take some time, as you must start a cache server, and, for these tests to be effective they must always be run at build time, and as you add more domain classes this will increase even more and your co-workers will no doubt plot to murder you.

When you are testing POF there is something simpler you can do, something I picked up from Dr. Alberto Forti on a project we worked together a while ago.

You can easily create and use the configurable POF context directly. The ability to do that also opens up some interesting possibilities in using POF with external systems, a topic I will cover on a later post.

Below is an example of a unit test using Configurable POF context: