MacGyver Scaling

MacGyver Scaling

Big Data gehört zu den heißen Eisen der IT und ist längst nicht mehr ausschließlich Thema für große Unternehmen. Wir sammeln immer mehr Daten, um immer komplexere Analyseabfragen fahren zu können. Archiviert oder gar gelöscht wird nur noch selten, da Speicherplatz und Rechenleistung nicht mehr als entscheidende Kostenfaktoren wahrgenommen werden. Ein wichtiger Aspekt im Umgang mit großen Datenmengen ist die Skalierbarkeit.

Skalierung einer gewachsenen 3-Schichten-Architektur

Neben den schier unbegrenzten Möglichkeiten werden jedoch die Schattenseiten schnell deutlich: Das Erfassen, Verwalten und Auswerten großer Nutzerzahlen und Datenmengen stellt hohe Anforderungen an Hardware und Software, insbesondere aber an die gewählte Lösungsarchitektur. Die Frage nach der Skalierbarkeit der unterliegenden IT-Systeme stellt eine der größten Herausforderungen von Big Data dar.

Das ultimative Ziel: eine lineare Skalierbarkeit, die jederzeit eine Anpassung an die aktuelle Situation und Größe eines Unternehmens ermöglicht. Eine Anforderung, die für gewachsene Systeme auf Basis klassischer Datenbanken nur mit erheblichen Aufwand und schwierig einzuschätzendem Risiko nachträglich erreicht werden kann.

Mittlerweile existieren eine Vielzahl komplexer Produkte, die als Panacea aller Skalierungsprobleme angepriesen werden. Diese finden sich sowohl im kommerziellen als auch im Open Source Bereich oft unter dem Namen NoSQL. Dass man aber auch wie MacGuyver mit einem Schweizer Taschenmesser und einer Kleberolle bzw. in unserem Fall mit reinen Java-Bordmitteln und einer handelsüblichen SQL-Datenbank eine vorhandene Architektur zu einer Big-Data-fähigen Lösung erweitern kann, wird im Folgenden gezeigt.

Ausgangsarchitektur und gewählter Ansatz

Ausgangspunkt unserer Betrachtungen bildet ein gewachsenes Softwaresystem mit einer relationalen Persistenzschicht. Die konkrete Ausprägung (Technologieprojektion) der Präsentations- und Logikschicht spielt keine Rolle. Beispielhaft gehen wir von einem Mix aus Rich- und Webclients, eingesetzt wahlweise auf einem Java Enterprise Server oder z.B. einer Spring/Tomcat-Kombination aus.

Als Blueprint für den Umbau der bestehenden Architektur dient ein Ansatz, der Sharding genannt wird. Beim Sharding wird die Datenmenge über mehrere so genannte Shards (engl. (Glas-)Splitter) bzw. Partitionen aufgeteilt. Die jeweiligen Shards haben die gleiche Datenstruktur, beinhalten aber unterschiedliche Daten. Technisch gesehen ist jeder Shard unabhängig von den anderen Shards, logisch gesehen bilden die Shards aber eine Einheit (vgl. Abbildung 1) und der Zugriff darauf sollte sich aus Sicht der Applikation/Applikationsentwicklung möglichst transparent gestalten. Der Ansatz kommt vor allem in Szenarien zum Einsatz bei denen mit sehr hohem Datenaufkommen bei gleichzeitig hohen Anforderungen an den erwarteten Datendurchsatz gerechnet wird.

Abbildung 1: Aufteilung einer monolithischen Datenstruktur in Shards
Abbildung 1: Aufteilung einer monolithischen Datenstruktur in Shards

Die Aufteilung selbst kann dabei anhand verschiedener Strategien erfolgen. Zu den wichtigsten Vertretern zählen hier Hash-basierte sowie Range- bzw. List-basierte Verfahren.

Hash-basierte Aufteilung

Bei Hash-basierten Verfahren wird ein Hash (Streuwert) eines Datensatzes bzw. Datensatz-Attributes berechnet und jedem Shard eine Menge von Hashwerten zugeordnet, für die er zuständig ist. Verwendet man gute Hashfunktionen, sind diese Werte schnell zu berechnen, ordnen jedem Datensatz eindeutig einen Shard zu und verteilen die Datensätze gleichmäßig auf den zur Verfügung stehenden "Shardpool".

Range- und List-basierte Aufteilung

Ein typischer Kandidat für eine Range-basierte Aufteilung ist die zeitraumbezogene Aufteilung. In Abbildung 2 ist sie exemplarisch dargestellt. Hier werden Bestellungen nach Jahren aufgeteilt und alle Bestellungen eines Jahrgangs werden eigens gehalten.

Abbildung 2: Zeitraumbezogene Aufteilung der Datensätze Sharding_Timeslice
Abbildung 2: Zeitraumbezogene Aufteilung der Datensätze Sharding_Timeslice

Eine mandantenbezogene Aufteilung ist ein Beispiel für eine List-basierte Aufteilung. Abbildung 3 veranschaulicht diese Strategie anhand der Buchungen eines oder mehrerer Mandanten, die getrennt von anderen Mandanten gehalten werden.

Abbildung 3: Mandantenbezogene Aufteilung der Datensätze Sharding_Tenant
Abbildung 3: Mandantenbezogene Aufteilung der Datensätze Sharding_Tenant

In unserem Fall haben wir uns für den mandantenbezogenen Ansatz entschieden, da die Daten schon entsprechend geclustered waren.

Weiterhin war ein wesentliches Kriterium bei unseren Überlegungen die Skalierung der bestehenden Architektur mit möglichst minimal-invasiven Maßnahmen. Das Sharding findet daher zwischen Präsentations- und Logikschicht statt und die einzelnen Shards bestehen somit aus Kombinationen von Applikationsserver- und Datenbank-Instanzen (vgl. Abbildung 4).

Abbildung 4: Shardinglogik zwischen Präsentations- und Logikschicht Sharding_Appserver
Abbildung 4: Shardinglogik zwischen Präsentations- und Logikschicht Sharding_Appserver

Für diesen Ansatz haben wir uns vor allem wegen folgender Aspekte entschieden:

  • Sowohl die zum Einsatz kommende Datenbank als auch der Applikationsserver benötigen keine Shardingfähigkeit. Somit lässt sich auch eine bestehende Architektur weiter verwenden bzw. zu der hier vorgestellten Architektur weiterentwickeln.
  • Damit kann sich nicht nur die Persistenzschicht, sondern auch die Logikschicht auf ihre eigenen Daten konzentrieren und diese wesentlich effizienter/platzsparender cachen, weil der Cache nur einen Teil der Daten enthält und nicht synchronisiert werden muss.
  • Die Skalierbarkeit des Systems lässt sich auf einfach Weise durch das Erstellen weiterer Kopien des Datenbank/Applikationsserver-Setups erreichen.
  • Da man sich zwischen der Präsentations- und Logikschicht im gewohnten Programmierkontext befindet, fällt es leicht dort die Shardinglogik anwendungsbezogen mit der gesamten Mächtigkeit von Java zu implementieren.

Der Ansatz ist flexibel genug, um dann nach und nach auf die identifizierten verbliebenen Schwachstellen reagieren zu können. Tabelle 1 zeigt die wichtigsten Pros und Contras des Ansatzes noch einmal im Überblick.

Pro Kontra
  • bestehende Architektur kann (inkl. Infrastruktur) erhalten bleiben
  • annähernd lineare Skalierung
  • Massendaten besser handhabbar
  • kein Totalausfall bei Verlust eines Knotens
  • geographische Verteilung von Shards möglich
  • Compiletime-Sicherheit der Mapper und Reducer
  • höhere Programmier-Komplexität durch totale Unabhängigkeit der Shards
  • keine Unterstützung von Replikation & Failver out-of-the-box
  • (aktuell noch) händische Rebalancierung notwenig
  • (aktuell noch) Verhältnis von Logik- zu Persistenzschicht 1:1

Tabelle 1: Ausgewählte Trade-offs des gewählten Architekturansatzes

Umsetzung mittels MultiServerProxy

Für den Client verlief die Umstellung auf eine geshardete Serverlandschaft transparent, da alle Aufrufe über einen neu dazwischen geschalteten Proxy abgefangen werden und ausschließlich dieser sharding-spezifischen Code enthält.

Der so genannte MultiServerProxy (kurz MSP, siehe Listing 1) erfüllt folgende Aufgaben:

  • Bestimmen der oder des richtigen Shard-Nodes für aufgerufenen Methode und ihre Parameter (map)
  • Paralleles Ausführen der Remote-Aufrufe und Sammeln der Rückgabewerte
  • Zusammenführen der Rückgabewerte zu einem einzigen Rückgabewert, falls mehre Nodes angesprochen wurden (reduce)

Listing 1

public class MultiServerProxy implements InvocationHandler { Class interface, mapperClass, reducerClass; public Object invoke (final Object proxy, final Method method, final Object[] args) throws Throwable { // reflect mapper & reducer if (reducerClass == null) { interface = proxy.getClass().getInterfaces()[0]; // depends on architecture mapperClass = Class.forName(interface.getName() + “Mapper”); reducerClass = Class.forName(interface.getName() + “Reducer”); } // map AbstractMapper mapper = (AbstractMapper) mapperClass.newInstance(); method.invoke(mapper, args); List shards = mapper.getMappedShards(); // execute final Map results = new ConcurrentHashMap(); final CountDown countdown = new CountDown(shards.size()); for (final String shard : shards) { new Thread() { public void run() { Object service = getService(interface, shard); // depends on architecture try { Object result = method.invoke(service, args); results.put(shard, result); } catch (Exception e) { results.put(shard, e); } finally { countdown.release(); } } }.start(); } countdown.acquire(); // wait for all // merge & return AbstractReducer = (AbstractReducer) reducerClass.newInstance(); reducer.setResults(results); Object result = method.invoke(reducer, args); if (result instanceof Throwable) throw (Throwable) result; else return result; } }

Verwendung des Proxies am Beispiel „Kundenverwaltung“

Schauen wir uns nun die Schritte anhand des in Abbildung 5 ersichtlichen Beispiels einer Kundenverwaltung genauer an:

Für beide Phasen (map & reduce) wird pro Interface der Logikschicht eine implementierende Klasse des Interfaces realisiert. Der MSP sucht diese Klassen anhand des Namens des Logik-Interface, indem er per Konvention Klassen mit demselben Namen und dem Suffix "Mapper" bzw. "Reducer" versucht zu instanziieren.

Abbildung 5: Klassendiagramm des MSP inkl. eines Beispiel-Interface Class_diagram_MSP
Abbildung 5: Klassendiagramm des MSP inkl. eines Beispiel-Interface Class_diagram_MSP

Er ruft vor dem eigentlichen Aufruf der Shards zunächst die Methode des Mappers auf, beispielsweise findCustomer(100). Da der Mapper zunächst keine Ahnung hat, auf welchem Shard sich der Kunde mit der ID 100 befindet (Listing 2), selektiert er alle Shards. Der Rückgabewert der Methode wird ignoriert. Stattdessen holt sich der MSP mittels getMappedShards das Ergebnis des Mapping-Schrittes, befragt jeweils in einem eigenen Thread die entsprechenden Shards und wartet bis alle ein Ergebnis geliefert haben.

Listing 2

public class CustomerControllerMapper extends AbstractMapper implements CustomerController { static Map customerShards = new ConcurrentHashMap(); public Customer findCustomer(long id) { String shard = customerShards.get(id); if (shard != null) mappedShards.add(shard); else mappedShards.addAll(ALL_SHARDS); return null; // will be ignored } public List findCustomers(String lastName) { mappedShards.addAll(ALL_SHARDS); return null; // will be ignored } public void saveCustomer(Customer c) { if (c.getId() == null) { // new customer c.setId(UUID.generate()); int rand = Random.nextInt(ALL_SHARDS.size()); // depends on sharding rule String shard = ALL_SHARDS.get(rand); mappedShards.add(shard); CustomerControllerMapper.customerShards.put(c.getId(), shard); } else findCustomer(c.getId()); } public void removeCustomer(long id) { findCustomer(id); } }

Die Ergebnisse stellt er als results-Map dem Reducer zur Verfügung und ruft dann auch bei ihm findCustomer(100) auf (Listing 3). Dieser verwertet die Ergebnisse. Einer der Shards wird vermutlich den richtigen Kunden geliefert haben, die anderen null, weil sie den Kunden nicht kennen. Nachdem beim Mapper die ID des Kunden für immer auf die Adresse des Shards gemappt wird, wird dieser Kunde zurück gegeben und vom MSP danach dem Client als Ergebnis zurück geliefert. Für diesen war der Aufruf also transparent.

Beim nächsten Aufruf weiß der CustomerControllerMapper schon auf welchem Shard sich der Kunde mit der ID 100 befindet und braucht nicht mehr alle Shards zu befragen. Wie die anderen Methoden funktionieren erfährt der geübte Javacode-Leser ebenfalls aus Listing 2 und 3.

Listing 3

public class CustomerControllerReducer extends AbstractReducer implements CustomerController { public Customer findCustomer(long id) { for (MapEntry result : results) { if (result.getValue() instanceof Customer) { Customer c = (Customer) result.getValue(); String shard = result.getKey(); CustomerControllerMapper.customerShards.put(c.getId(), shard); return c; } } return null; // customer not found } public List findCustomers(String lastName) { List ret = new ArrayList(); for (MapEntry result : results) { List part = (List) result.getValue(); String shard = result.getKey(); for (Customer c : part) CustomerControllerMapper.customerShards.put(c.getId(), shard); ret.addAll(part); } ret.sort(); // sort the same way the server did for all parts return ret; } }

Fazit

Wir haben einen Ansatz skizziert mit denen man eine „typisch“ gewachsene Java-EE-Architektur zu einer skalierbaren Architektur weiterentwickeln kann ohne neue Produkte einsetzen zu müssen. Damit entgeht man so manchen Risiken bei einer Umstellung. Der Ansatz ist aufgrund der Unabhängigkeit der Shards untereinander fast linear skalierbar.

  • Hochverfügbarkeit, d.h. Failover und Recovery leistet der Ansatz nicht, hier setzen wir eine asynchrone Replikation der Datenbank ein.
  • Scripten zur Verwaltung u.U. vieler Shards sind zu erstellen, damit sie nicht einzeln administriert werden müssen.
  • U.U. müssen foreign key constraints im RDBMS zugunsten des Shardings aufgegeben werden. Dies hätte allerdings auch jeder andere Ansatz zur Folge.