Macrometa Technology Overview

THE EDGE NEEDS A NEW DATA ARCHITECTURE

Edge computing and its step-sibling, "fog computing", are shrouded in the excitement and mystery of a new frontier, where trillions of dollars in a new addressable market will open up: and makers of digital picks and shovels are busy lighting the fires of their software-defined forges, to design and build the edge infrastructure of tomorrow. Bold predictions are being made about the disruptive new applications that edge computing will unleash on enterprises and consumers.
 

Edge computing is a distributed data problem (and not simply a distributed computing problem that spinning up stateless compute on the edge will solve). Creating enterprise grade systems that can deliver geo distributed database capabilities with useful consistency guarantees is extremely challenging and conventional approaches fail at doing these at large scales because these architectures are built around fundamental tenets of centralizing state change coordination and data. 

If you can take your data (state) and make giant piles of it in one data center, it's easier to do useful things with it: but, if you have a little bit of it spread everywhere, there is a horrendous problem of keeping everything consistent and coordinated across the locations that its stored on, so that you can achieve idempotent computing. 

Challenges of bringing stateful data to the Edge

Distributed databases that scale out within a datacenter do not cleanly generalize to scaling out across geography and breakdown under the weight of their design assumptions:

  • Reliable data center class local area network
    • Low latency
    • Highly available
    • Consistent latency & jitter behavior
    • Network splits are rare
  • Accurate time keeping using physical clocks and network time protocol (NTP)
    • NTP is good enough for use cases where data ordering is handled across servers within the same rack or data center (NTP slippage is < 1ms)
  • Consensus mechanisms are good enough due to the low latencies and high availability of the data center class LAN.

The design assumptions for a geo distributed database are almost entirely the opposite:

  • Unreliable Internet Wide Area Networks
    • High and variable latency especially in inter-city or inter-continental distances.
    • Dynamic network behavior with topology changes and sporadic partitions. 
  • Lossy time keeping
    • Asymmetric routes cause inter-city and inter-continental clock coordination challenges resulting in slippage of hundreds of milliseconds across a set of geo distributed time servers.
    • Atomic clocks may be used to overcome this problem but are prohibitively expensive and complex to operate.
  • Mechanisms such as state machine replication and consensus are too expensive and too slow to coordinate with over the internet (especially with a large number of participants)
    • Physically distant participants slow everyone down as it takes more time for them to send and receive messages to prepare, accept and acknowledge consensus information due to network latency.
    • Adding more participants (i.e. edge locations) adds more overhead and slows consensus down as more participants need to vote.
    • State machine replication and consensus are brittle in that they involve centralization and need to be highly available. If network splits (particularly asymmetric splits) occur - getting reliable state machine replication and  consensus become very challenging.

For edge computing to become a reality, we need geo-distributed databases that can scale across hundreds of locations worldwide yet act in concert to provide a single coherent multi master database. This in turn requires us to design a system that works on the internet with its unpredictable network topology, use some form of time keeping that is not lossy, and avoid centralized forms of consensus yet still arrive at some shared version of truth in real time.

Macrometa is designed as a serverless platform for stateful,  data intensive, realtime applications and web services that need to run in a distributed manner, at the edge, closer to humans (end users), their things (devices) than the cloud can.  

Where is the ‘Edge’ in edge computing?

The edge is easily one of the most bandied about marketing buzzwords at this time (blockchain and AI/ML complete the buzzy trinity of avantgarde marketecture terms).  From our perspective we classify the edge as two physically distinct types of infrastructures and needs — (i) a device edge and (ii) an infrastructure edge. But neither of these definitions really help the software architect or developer think about and understand how best to build distributed applications so we identify a third type of edge which abstracts the physical aspects of the edge which we call the logical edge.  

  • Device Edge: refers to edge computing capabilities on the device or user side of the network. The device usually depends on a gateway or similar device in the field to collect and process data from devices. May also use limited spare compute and data storage capability from user devices such as smartphones, laptops and sensors to process edge computing workloads. Typical aspects of programming the device edge involve mostly stateless use cases (collecting metrics of many types and sending to the cloud for processing). Stateful data use cases are limited to the scope of the device itself and there are very limited forms of performing distributed state operations across these types of devices. 
  • Infrastructure Edge:refers to edge computing capability, typically in the form of one or more data centers, which is deployed on the operator side of the last mile network. Compute, data storage and network resources positioned at the infrastructure edge allow for cloud-like capabilities similar to those found in centralized data centers such as the elastic allocation of resources, but with lower latency and lower data transport costs due to a higher degree of locality to user than with a centralized or regional data center.  The infrastructure edge is a combination of public cloud elements in an urban dense area including a combination of  CDN pop, enterprise private cloud type datacenter or  a metro hybrid cloud datacenter, or an IX pop  (anything < 10ms one way latency from users or devices).  In the future this will extend to telecom C-RAN and Base stations on 5G
  • Logical Edge:The logical edge is the closest place where server class compute, storage and network are located and generating and/or using data. The logical edge is defined by the application and not by the network topology

Macrometa is designed as a platform for stateful and data intensive applications and web services that run on the logical edge.   The challenges of building distributed apps on either the device or on a more cloud like infrastructure at the infrastructure edge overlap significantly. We believe it helps to think of designing and writing apps for the logical edge as that will allow for capabilities to be exploited on both sides of the last mile network. Our longer-term goal is to abstract the infrastructure edge and the device edge completely to allow data and code orchestration across a blend of both.

 

Edge requires coordination free architectures

Coordination is difficult because participants in a large geographically distributed system need to agree that certain events have happened in some order temporally - mechanisms like quorums are used in conventional distributed systems to implement such coordination. In geo distributed systems, the mechanisms of coordination used, become the constraining factor in how many participants or actors can participate and perform complex behavior in a network of coordinating nodes.  For geo distributed databases & streams that need to work over the Internet, a coordination free approach is required that minimizes or even eliminates the need for coordination among participating actors.

Macrometa provides a distributed data platform composed of (in theory) an unbounded number of edge locations connected across the internet using a coordination free approach. Our overarching goal is that application designers and architects must not have to re-architect or re-design their cloud applications to scale and service millions of users. The underlying cloud platform must provide multi region / multi data center orchestration of data and code without any special knowledge of how to design, architect or build these applications on the part of the developer. 

For stateful edge computing to be a viable at global scale with the ability to handle real world workloads, edge locations need to work together in a way that is coordination free and able to make forward progress independently even when network partitions may occur. 

The Macrometa Approach

Macrometa delivers 5 important capabilities to create a coordination free architecture that enables each edge location to make progress forward independently even in the presence of network partitions:

Geo Distributed Event Ledger (Event source)
In Macrometa, database records are stored as JSON documents in an event ledger (event source) built around an LSM tree database structure with enhancements for seeking and modifying the contents of the tree.
 
We maintain a copy of the distributed event ledger at every edge location.  This event ledger captures all mutation operations that result in state changes, causally orders them, applies them locally and then publishes them to peer edge locations using our geo distributed streams. 

Geo Distributed Streams
A highly reliable form of messaging with delivery guarantees that connect the event ledgers so that they can replicate data across multiple edge locations. These streams provide real-time replication of database operations as well as replication of low latency global pub-sub & queue-based messaging.
 
The streams are stored in a distributed append-only log structure for persistence. The geo distributed streams publish causally ordered messages to peer edge event ledgers which subscribe to each other’s events. Therefore, each event ledger is both a publisher of its changes to other edge event ledgers and a subscriber to changes that other edge event ledgers are producing.  
 
Causal Broadcast, Causal Delivery & Message Ordering Using Vector Clocks
We use vector clocks to track logical time instead of physical time (or wall clock time).  
  • With wall clock time, the events are ordered by physical time in following way:
    • Joe arrived at the restaurant at 10:01:03:12
    • Jane left the restaurant at 10:05:45:1 
  • With causal consistency, the events are ordered by their vector clocks:
    • Say, we have following two events generated with the vector clock 
      • (Joe1, Jane1) - Jane arrived at the restaurant.
      • (Joe1, Jane2) - Jane left the restaurant.
    • By comparing the vector clock – we see that the Jane2 is the higher order event and therefore it occurred after Jane1 – this gives us a means to logically order events.
Real-time Intelligent Convergence Engine (RICE)
A proprietary convergence engine to intelligently model data (state) as commutative and convergent operations by implementing a form of Operations based Conflict Free Data Types (CRDTs). This is a significant improvement over eventually consistent databases which deal with data instead of operations and simply pick an arbitrary winner via consensus for all types of conflicts.
 
Typically, eventually consistent databases don’t prioritize operations because they don’t track operations and instead track the state and replicate that. The information of what the operation is lost and only the new state is replicated. This increases the surface area for conflicts dramatically.

We convert the database operations at each edge location into granular CRDT operation that are then replicated to other participants (actors) in the geo distributed database and applied to each location’s event ledger. It is important to understand that we do not replicate state.
 
RICE uses vector clock information in each message to construct a causal ordering of operation events across the distributed set of event ledger. While vector clocks allow us to get the causal relationship between events, vector clocks cannot order events if two events are truly concurrent. For example, say Joe and Jane events happened concurrently or were observed by two separate actors, their vector clocks would be ambiguous on causality and this indicates conflict between the events.
 
RICE provides an untangling mechanism for such truly concurrent conflicts by examining operations and applying prioritization rules such as  inserts overrides deletes, updates overrides deletes and deterministic ordering when dealing with “true conflicts”. 

In Macrometa, “true conflicts” can only occur for operations of the same type (such as two “concurrent updates” of the same atomic variable in the database) on the same attribute. Unlike other databases which conflict at the object level (the JSON document or the record), Macrometa manages concurrency at the attribute level and thus an object may have different attributes modified by two independent writers and have their changes converge (or merge). When a true conflict occurs i.e. two concurrent operations are updating the same attribute – We record the conflict in the event ledger and pick a winner by calculating a deterministic value. Thus every event ledger always arrives at the same deterministic value for the attribute while allowing concurrent changes of other attributes for the same object.  

 

Putting it all together…

Macrometa uses the mechanisms described in the previous section to get a coordination free geo distributed database with strong guarantees and geo distributed streams with ordered delivery and multi messaging semantics (at-most-once, at-least-once and effectively-once).It achieves this with a coordination free convergence for participants to always deterministically arrive at the same version of truth without requiring quorum for consensus or any forms of coordination.

Each edge location need only apply operations locally, prepare a vector clock and generate operations via RICE to broadcast the message to all applicable edge locations (using reliable causal broadcast). The receiving edge location replays the message on its local copy of the event ledger. If there are any conflicts, the operation prioritization rules are applied to resolve conflicts locally because the same rules apply to all participants.

Note: Neither of these approaches (causal broadcast or rules-based prioritization) require the edge locations to coordinate. Each edge location simply follows the same rules to deterministically arrive at the same result providing strong serial ordering guarantees. 

 

Replication between edge sites

Macrometa uses geo fenced pub/sub & streams to connect nodes and abstract location, transport and storage. As discussed before — no coordination is needed across edge locations (inter edge) for serial ordering of data. 

The underlying stream guarantees that messages are delivered regardless of network conditions (latency, jitter, loss) with strong causal ordering guarantees.  The stream architecture is designed for message delivery across an unbounded set of edge locations with high latency, high jitter and network partitions.

Macrometa’s RICE uses a proprietary high speed, seek and insert optimized tree data structure for storing operations and JSON document fragments.  This is built on to an LSM tree database that provides high performance key value storage.  We call this combination of our seek optimized data-structure and LSM tree database as a causal tree and use it extensively for fast access and traversal of the event source.  This tree allows quick collapse of the ordered operations to get the “current state” of fields in the JSON document.  This collapsing is the basis for a materialized view created in response to a query. 

The causal tree and the streams work together on a continuous exchange of live data mutations modeled as associative, commutative operations. There is no single source of truth/master replica. It is a multi-master system.

Every change to the data is an event. Every event creates one or more operations. Each operation is immutable. The operations flow from peering replicas to each other in a fully meshed architecture and are tracked using vector clocks.

Every object in the database may have an unlimited set of replicas on the network. These replicas synchronize in real time deterministically and correctly. An object’s current state is simply a reduction of its operations (a materialized view).

Edge nodes are simply nothing but event sources that subscribe to a stream topic about a nested collections of objects.  Once a node subscribes to a collection via its topic, it receives all operation updates till the collection is deleted. Note: The message format is not optimized for human reading – it is specifically tuned for compression and transport over networks.

 

Consistency Guarantees

Macrometa delivers an adaptive consistency model where data is
guaranteed to converge deterministically manner.

For scenarios like the double spend problem (https://en.wikipedia.org/wiki/Double-spending), causal consistency or eventual consistency cannot provide guarantees that all readers are seeing the latest write or change.  To handle such scenarios, Macrometa lets  you make collections strictly consistent by defining them as a SPOT (Single Point Of Truth) collection via the UI, CLI or API.  The SPOT collection unlike default collections are centralized in a single data center (but replicated across two availability zones) and provide ACID (Serializability) consistency. When operations mutate a SPOT collection - strict consistency guarantees are applied by ensuring global transaction isolation. This approach provides the flexibility to developers to mix and match consistency levels and not have to pay the penalty of strict consistency for every database mutation. 

In CAP theorem parlance, a SPOT collection is CP while regular collections behave as follows:

  • Session Consistency: Clients with open session to the same edge location will get linearizability behavior. Clients can read their own writes and read writes from other clients to the same edge location. 
  • If for any reason an edge location gets partitioned completely from the rest of the Macrometa edge locations, but outbound connectivity still exists, clients will still get eventually consistent behavior as the writes to the database will be replicated once the network partition heals.
  • If an edge location is completely disconnected, then requests will be routed to the next closet location and the client gets session consistency behavior from the newly homed edge location..
  • Latency bound reads: Clients with open sessions across multiple edge locations will get causal consistency with serial ordering of operations, and clients are guaranteed to see consistent data as soon as replication completes. 
  • In the scenario where an event ledger is partitioned from its peers but is still accessible by clients, it will continue to accept changes but will synchronize and converge state with the rest of network once the partition is healed.  This allows for the Macrometa DB to handle offline use cases.

Programmatically speaking, the only choice that developers need to make to have ACID like serializability is to define a collection as a SPOT and the collection will automatically be centralized without any change to how the collection is accessed or modified.  However, the default behavior for a collection is to have copies on all edge locations that form the database and offer Causal Consistency.

We recommend using SPOT collections only for those elements that need very strict atomic operations and transaction isolation as SPOT collections incur latency penalties i.e., data requests must be serviced from the centralized location instead of the nearest edge location. 

Macrometa Edge Cloud Capabilities

Macrometa is a new, next generation edge data platform for creating global applications. These applications may be run on edge infrastructure like CDNs  or more conventionally on the cloud as a compliment or replacement for traditional SQL and NOSQL databases.  Macrometa’s edge cloud is a hosted service and is available in 20+ regions worldwide today. 

Building applications that span multiple regions, cloud providers and networks is hard. Building applications that need to handle high volumes of data, process and do useful things with it is even harder. We created Macrometa with the goal of making building new and interesting web services and apps that run globally simple, quick and fun.  

To use Macrometa, you don’t need to be an experienced distributed systems developer - you don’t have to know the intricacies of distributed data handling or even know the first thing about building multi-region applications. Macrometa takes care of all those details for you by making all the complex, distributed data and compute decisions for you. You bring your code and data model - Macrometa does the rest.  

The Macrometa edge cloud service provides a combination of the following capabilities:

  1. Geo distributed database, a multi-model, multi-master, decentralized, real-time database
  2. Geo distributed streams , global pub-sub and queueing streams
  3. Compute service for deploying, orchestrating and executing containers and functions on edge

We call it edge native because it is designed to sit across 100s of worldwide locations/pops and present one global multi-master real-time data (DB & Streams) platform.

Multi Region, Multi Master

Multi-master  provides high levels of availability, local latencies to write data and scalability with built-in comprehensive and flexible conflict resolution support. These features significantly simplifies development of globally distributed applications. For globally distributed applications, multi-master support is crucial.

With multi-master support, you can perform writes on data (for example, documents, collections graphs, k/v pairs) distributed anywhere in the world. You can update data in any region that is associated with your fabric account. These data updates can propagate asynchronously.

In addition to providing fast access and write latency to your data, multi-master also provides a practical solution for failover and load-balancing issues.

Multi-Model

Most database management systems are organized around a single data model that determines how data can be organized, stored, and manipulated. In contrast, Macrometa’s multi-model databaseis designed to support multiple data access models (document, graph, and key-value models) against a single, integrated backend. 

One can certainly use multiple databases in the same project, with each catering to a different interface - potentially resulting in some operational friction, data consistency and duplication issues. This is where a native multi-model database with all its benefits and flexibility comes into play.

Macrometa’s multi-model database (a collection in the geo fabric) is designed to support multiple data interfaces against a single, integrated data model. With this native multi-model approach you can build high-performance applications and scale horizontally using all three data models to their full extent. You can model your data and access collections in following ways:

Key Value interface- The key/value store data model is the easiest to scale. A regular collection always has a primary attribute and in the absence of further secondary indexes the document collection behaves like a simple key/value store.

  • The only operations that are possible in this context are single key lookups and key/value pair insertions and updates.

Document interface -The documents you can store in a regular collection closely follow the JSON format. 

  • A document contains zero or more attributes with each of these attributes having a value. 
  • A value can either be an atomic type, i.e. number, string, boolean or null, or a compound type, i.e. an array or embedded document/object. 
  • Arrays and sub-objects can contain all of these types, which means that arbitrarily nested data structures can be represented in a single document. 
  • Documents are grouped into collections. A collection contains zero or more documents. If you are familiar with Relational Database Management Systems (RDBMS), then it is safe to compare collections to tables, and documents to rows. 
  • In a traditional RDBMS, you have to define columns before you can store records in a table. Such definitions are also known as schemas. Collections are schema-less, and there is no need to define what attributes a document must have. Documents can have a completely different structure and still be stored together with other documents in a single collection. 
  • In practice, there will be common denominators among the documents in a collection, but Macrometa itself doesn't force you to limit yourself to a certain data structure. 

Graph interface- Macrometa enables you to turn your documents into graph structures for semantic queries with nodes, edges and properties to represent and store data. A key concept of the system is the idea of a graph, which directly relates data items in the database. 

  • A graph collection is simply a regular collection but with some special attributes that enable you to create graph queries and analyze the relationships between objects.  
  • In SQL databases, you have the notion of a relation table to store n:m relationships between two data tables. An edge collection is somewhat similar to these relation tables; vertex collections resemble the data tables with the objects to connect. 
  • While simple graph queries with fixed number of hops via the relation table may be doable in SQL with several nested joins, graph databases can handle an arbitrary number of these hops over edge collections - this is called traversal. Also edges in one edge collection may point to several vertex collections. It is common to have attributes attached to edges, i.e. a label naming this interconnection. 
  • Edges have a direction, with their relations _from and _to pointing from one document to another document stored in vertex collections. In queries you can define in which directions the edge relations may be followed. 

Real time capabilities

Most databases take a pull-based approach to provide data. When your application code polls for data, it becomes slow, unscalable, and cumbersome to maintain. Macrometa supports both pull and push based updates across multiple edge locations. This make building real-time, globally distributed apps dramatically easier. It is a great choice when your applications benefit from real-time feeds to your data.

The query-response database access model works well on the web because it maps directly to HTTP’s request-response. However, modern applications require sending data directly to the client in real-time. 

Use cases that can benefit from Macrometa’s real-time push architecture include:

  • Collaborative web and mobile apps
  • Streaming analytics apps
  • Multiplayer games
  • Realtime marketplaces
  • Connected devices

For example, when a user changes the position of a button in a collaborative design app, the server has to notify other users that are simultaneously working on the same project. Web browsers support these use cases via WebSockets and long-lived HTTP connections, but adapting database systems to real time needs still presents a huge engineering challenge.

Macrometa’s database is designed specifically to push data to applications in real time across multiple data centers. It dramatically reduces the time and effort necessary to build scalable real time apps.

Geo distributed streams

Streams are a type of collection that capture data in motion. Messages are sent via streams by publishers to consumers who then do something with the message. Streams can be created via client drivers (PyC8), REST API or the web console. 

Streams unify queuing and pub-sub messaginginto a single converged messaging model that provides a lot of flexibility to users to consume messages in a way that is best for the use case at hand.

producer→stream→subscription→consumer.  

  • A stream is a named channel for sending messages. Each stream is backed by a distributed append-only log and can be local (at one edge location only) or global (across all edge locations in the global Fabric). Similarly the streams can be persistent or non-persistent.
  • Messages from publishers are only stored once on a stream, and can be consumed as many times as necessary by consumers. The stream is the source of truth for consumption. Although messages are only stored once on the stream, there can be different ways of consuming these messages. 
  • Consumers are grouped together for consuming messages. Each group of consumers is a subscription on a stream. Each consumer group can have its own way of consuming the messages—exclusively, shared, or failover

Global Compute Service with containers and functions

Application developers need robust software abstractions to simplify development, deployment and operations in the cloud. Cloud native approaches around micro-services like containerization and serverless functions abstract much of the burden of managing servers. However, even with the sophisticated capabilities of modern container orchestration systems like Kubernetes and Mesosphere, it gets complex and expensive to manage globally distributed workloads.  While, on the one hand, micro service architectures allow for a great deal of elasticity in scaling the individual component services of an application, they create a great deal of complexity in the number of instances of services they create from an operations stand point for security, control, monitoring, debugging in a centralized single cloud / single datacenter / single region application. This complexity increases by an order of magnitude in distributed global applications that might run on dozens if not hundreds of edge locations.

Macrometa’s global compute service is a framework for building, deploying and running serverless functions with Docker and Kubernetes on any combination of Macrometa edge locations . Any process can be packaged as a function enabling you to consume the full suite of Macrometa’s data platform services (Multi model Database, real-time updates, streams etc.) and web events using native language client drivers or REST API calls.  

Global Compute Service overview

  • Allows developers to write and run code in any language they want
  • Enables long lived services – we don’t distinguish between functions and containers and will run the process till exit without preemptive termination
  • Rich CLI and GUI to create, package and deploy functions easily and quickly
  • Templating – developers need only write a handler in their chosen language and our tool chain will use a template to bundle it to a docker container and deploy it as a serverless function
  • Build rich event driven pipelines by chaining functions as asynchronous processes
    • Event driven and the processing may or may not provide a result
    • Run long jobs like machine learning and tensor flow that may time to execute or initialize
    • Run batch jobs that do large volumes of ingest
  • In-Situ processing – containers and functions are scheduled in close physical proximity to data either on the same server or in the same pod providing data locality to the multi model database and streams