如何在10分钟内创建分布式数据存储

发布于:2021-01-25 10:59:01

0

255

0

分布式数据存储 Copycat 教程

在一个典型的系统中,您可以在安全的地方(如ZooKeeper或etcd)进出数据。然后在不一定安全的地方对数据进行操作,例如应用程序。尽管etcd和ZooKeeper提供了对其拥有的数据的一致性的保证,但它们不能保证可能涉及应用程序其他部分的更广泛的状态转换和事务的一致性。为此,我们需要另一种方法。

改变范式

Copycat是一个框架,它避开了依赖于外部系统构建应用程序以实现数据一致性的标准范例,允许您将作为状态机编写的应用程序逻辑直接嵌入Copycat中,在那里,一致性和容错性由您负责。其结果是能够以相对简单和简洁的方式实现复杂分布式协调问题的解决方案,并封装应用程序的逻辑和语义,而不必担心可靠性保证。

我们可以用模仿者建立什么样的东西?这取决于你。从低级分布式原语(如锁、组和映射)到成熟的分布式系统(用于调度、消息传递、服务发现或数据存储),几乎任何事情都是可能的。

从零到分布式数据存储

从Copycat开始的一个好地方是用它构建一些东西,所以让我们创建一个分布式的键值数据存储。不过,我们的目标并不是只创建任何数据存储,我们需要的是具有强大一致性保证、网络分区容错性、节点故障耐久性以及数据更改通知的数据存储—类似于etcd。真的有可能在10分钟内建立一个etcd克隆吗?好吧,不,但我们可以惊人地关闭,建立一个数据存储与相同的基本功能,更重要的是,相同的可靠性保证,在这段时间内。

状态机

构建数据存储的第一步是定义一个状态机来包含数据存储的状态和逻辑。因为我们的数据存储存储的是键值对,所以我们将使用一个简单的HashMap将数据封装到内存中。说真的,哈希图?线程安全怎么办?耐用性如何?模仿者将为我们处理这些事情,因为我们稍后将了解更多。但首先,让我们定义状态机:

public class KeyValueStore extends StateMachine {
 private Mapstorage = new HashMap<>();
}

为了在状态机上操作,我们需要声明一些操作。Copycat支持两种类型的操作:用于写入的命令和用于读取的查询。让我们从定义一些基本的etcd样式操作开始:putgetdelete

public class Put implements Command{
 public Object key;
 public Object value;

 public Put(Object key, Object value) {
   this.key = key;
   this.value = value;
 }
}

public class Get implements Query{
 public Object key;

 public Get(Object key) {
   this.key = key;
 }
}

public class Delete implements Command{
 public Object key;

 public Delete(Object key) {
   this.key = key;
 }
}

定义了操作之后,让我们在StateMachine中实现对它们的处理:

public class KeyValueStore extends StateMachine {
 private Mapstorage = new HashMap<>();

 public Object put(Commitcommit) {
   Commitput = storage.put(commit.operation().key, commit);
   return put == null ? null : put.operation().value;
 }

 public Object get(Commitcommit) {
   try {
     Commitput = map.get(commit.operation().key);
     return put == null ? null : put.operation().value;
   } finally {
     commit.release();
   }
 }

 public Object delete(Commitcommit) {
   Commitput = null;
   try {
     put = storage.remove(commit.operation().key);
     return put == null ? null : put.operation().value;
   } finally {
     if (put != null)
       put.release();
     commit.release();
   }
 }
}

如您所见,putgetdelete实现处理包含提交到状态机的操作的对象。操作是在一个线程上执行的,因此线程安全不是问题,在处理之后,操作返回一个反映机器内部状态的结果。

除了状态机的存储之外,Copycat还存储状态机处理的每个命令的内部日志及其结果,用于故障处理和其他目的。定期对日志执行压缩,以便删除不再需要的提交。为了帮助Copycat知道何时可以安全地从其日志中删除提交,状态机应该release不影响机器状态的提交。例如,Put操作只有在接收到同一按键的Delete操作后才会释放。另一方面,Get操作会立即释放,因为它不会影响机器的状态。

有了它,我们的基本键值存储现在就实现了!稍后我们将添加一些更高级的操作,但现在让我们准备尝试一下。

创建服务器

为了管理状态机,我们需要构建一个CopycatServer实例。服务器必须用一个地址初始化,以便在其上侦听通信:

Address address = new Address("123.456.789.0", 5000);
CopycatServer.Builder builder = CopycatServer.builder(address);

我们将配置服务器以使用状态机:

builder.withStateMachine(KeyValueStore::new);

并为服务器配置一个Transport,以便在与群集中的其他服务器进行通信时使用:

builder.withTransport(NettyTransport.builder()
 .withThreads(4)
 .build());

我们将为状态机的日志配置Storage实现,在本例中使用磁盘存储:

builder.withStorage(Storage.builder()
 .withDirectory(new File("logs"))
 .withStorageLevel(StorageLevel.DISK)
 .build());

最后我们将创建服务器:

CopycatServer server = builder.build();

引导群集

一旦建立了一个服务器,我们就可以使用它来引导一个新的集群:

server.bootstrap().thenAccept(srvr -> System.out.println(srvr + " has bootstrapped a cluster"));

此时,我们的状态机已启动并运行,但让我们将一些其他服务器加入群集:

Address clusterAddress = new Address("123.456.789.0", 5000);
server.join(clusterAddress).thenAccept(srvr -> System.out.println(srvr + " has joined the cluster"));

就这样,我们创建了一个集群键值存储!

执行操作

为了将操作提交到数据存储,我们需要创建一个CopycatClient。我们将确保为客户端配置与为服务器配置相同的Transport

CopycatClient client = CopycatClient.builder()
 .withTransport(NettyTransport.builder()
   .withThreads(2)
   .build())
 .build();

然后,我们将客户端指向集群中的任何服务器,并connect

Address clusterAddress = new Address("123.456.789.0", 5000);
client.connect(clusterAddress).join();

我们的客户端已连接,让我们提交一个put操作:

CompletableFuturefuture = client.submit(new Put("foo", "Hello world!"));
Object result = future.get();

我们也可以提交getdelete操作,提交方式与提交put相同:

client.submit(new Get("foo")).thenAccept(result -> System.out.println("foo is: " + result));
client.submit(new Delete("foo")).thenRun(() -> System.out.println("foo has been deleted"));

从这里开始,我们可以将客户机包装在CLI或restapi中,以允许其他类型的访问,但我们将把它作为练习留到下次使用。

实现一致性

现在我们已经有了一个初始的系统并在运行,让我们退一步来讨论一下在幕后发生了什么。请记住,在一开始我们就说过,仅仅构建自己的键值存储是不够的,我们希望它能够完全复制、持久、高度一致,并且能够处理失败。我们该怎么做?原来,我们已经有了。

Copycat利用Raft一致性算法的复杂实现来确保以安全的方式将针对状态机的每个操作复制到集群的每个成员。为了实现这一点,集群中的每台服务器都维护一个单独的状态机副本以及在状态机上执行的所有操作及其结果的日志。日志可以根据配置的StorageLevel持久存储,并用于在发生故障时恢复机器的状态。

为了实现强一致性,Copycat利用多数仲裁来确保写入操作在生效之前得到集群中大多数节点的批准。如果网络分区或系统出现故障,无法再实现仲裁,Copycat将停止处理写操作,以防止发生数据不一致。

复制集群选择一个领导者作为处理操作的焦点。当客户机向服务器提交一个命令时,它将被转发给leader,leader将命令发送给集群的其余部分。然后,每个服务器将该命令应用于其状态机,将结果附加到其日志中,并将响应返回给领队。一旦leader接收到来自集群大多数成员(包括它自己)的响应,它就会将命令应用到自己的状态机,然后log将响应发送回客户机。

Copycat支持每个查询操作的可配置一致性级别。当客户机向服务器提交查询时,如果需要线性一致性,则可以将其转发给leader;如果顺序一致性足够,则可以由任何服务器响应。

实现容错

Copycat利用心跳和超时来维护服务器之间的健康连接。如果一个领导者未能在配置的超时时间内发出心跳信号,集群的其余成员将选择一个新的领导者来协调操作的处理。同样,如果跟随者未能响应心跳,则该服务器可能会从集群中删除。

由于Copycat需要多数仲裁才能保持一致性并保持可用性,因此Copycat支持被动服务器和保留服务器,以便在发生故障时替换主动服务器。当一个新服务器加入集群时,领导者将其日志流式传输到服务器,然后服务器将记录的操作应用到其状态机。一旦服务器被完全捕获,领导者将把新服务器提升为集群的活动成员。

现在我们了解了一点Copycat是如何将我们的基本状态机转变成一个健壮的、分布式的键值存储的,让我们回到我们的实现,并添加一些更高级的功能。

活下去的时间

etcd支持的一个很好的特性是按键的生存时间。这允许在特定时间段后自动删除密钥。让我们将TTL支持添加到我们的数据存储中。我们首先定义一个新的PutWithTtl命令:

public class PutWithTtl implements Command{
 public Object key;
 public Object value;
 public long ttl;

 public PutWithTtl(Object key, Object value, long ttl) {
   this.key = key;
   this.value = value;
   this.ttl = ttl;
 }

 @Override
 public CompactionMode compaction() {
   return CompactionMode.EXPIRING;
 }
}

由于一个PutWithTtl命令应该会在一段时间后删除状态,因此我们需要向Copycat指出这一点,以便它可以从日志中正确压缩这些提交。为此,我们提供了一个返回CompactionMode.EXPIRINGcompaction()实现。

接下来,我们需要在状态机中实现对PutWithTtl命令的处理:

public Object putWithTtl(Commitcommit) {
 Object result = storage.put(commit.operation().key, commit);
 executor.schedule(Duration.ofMillis(commit.operation().ttl), () -> {
   storage.remove(commit.operation().key);
   commit.release();
 });
 return result;
}

在这里,我们计划在超过TTL之后执行一个将来的操作,这将从存储中删除commit并释放它,类似于前面的delete实现。我们使用状态机的内部执行器来安排条目删除,因为这样可以确保在状态机内部不会遇到任何线程安全问题。

看看会发生什么

随着TTL的实现,让我们添加最后一个特性:观察者。etcd和ZooKeeper中的观察者允许客户端在访问密钥时接收通知。这是实现各种协调模式的一个重要特性,但它通常带有各种警告,包括严格的语义和较低的可靠性保证。

另一方面,Copycat提供了一种会话事件处理功能,允许从状态机中的任何位置将任意数据直接发布到客户端。这种灵活性使我们能够轻松地建模复杂的分布式原语,如组、领导人选举和消息传递,其中服务器端信息以高效和语义适当的方式发布到客户端。会话事件保证在服务器发生故障时不会丢失,并且始终按顺序传递。

为了利用数据存储的会话事件,我们将首先定义一个新的Listen命令,该命令将指示客户端对从状态机接收事件的兴趣:

public class Listen implements Command{
}

接下来,我们将增强KeyValueStore实现以处理Listen命令:

public class KeyValueStore extends StateMachine {
 private Mapstorage = new HashMap<>();
 private Setlisteners = new HashSet<>();
 
 public void listen(Commitcommit) {
   listeners.add(commit);
 }

listen方法只存储客户端提交的commit,稍后我们将使用它将事件发布回客户端。我们需要定义一个EntryEvent类型来封装我们的事件数据:

public class EntryEventimplements Serializable {
 public Object key;
 public Object oldValue;
 public Object newValue;

 public EntryEvent(Object key, Object oldValue, Object newValue) {
   this.key = key;
   this.oldValue = oldValue;
   this.newValue = newValue;
 }
 
 public String toString() {
   return String.format("EntryEvent [key=%s, oldValue=%s, newValue=%s]", key, oldValue, newValue);
 }
}

最后,我们将增强KeyValueStore以使用与任何Listen命令关联的客户端会话,从现有的命令处理程序中发布EntryEvent

private void publish(String event, Object key, Object oldValue, Object newValue) {
 listeners.forEach(commit -> {
   commit.session().publish(event, new EntryEvent(key, oldValue, newValue));
 });
}

public Object put(Commitcommit) {
 Commitput = storage.put(commit.operation().key, commit);
 Object oldValue = put == null ? null : put.operation().value;
 publish("put", commit.operation().key, oldValue, commit.operation().value);
 return oldValue;
}

public Object putWithTtl(Commitcommit) {
 Object result = storage.put(commit.operation().key, commit);
 executor.schedule(Duration.ofMillis(commit.operation().ttl), () -> {
   Commitput = storage.remove(commit.operation().key);
   Object oldValue = put == null ? null : put.operation().value;
   publish("expire", commit.operation().key, oldValue, null);
   commit.release();
 });
 return result;
}

public Object delete(Commitcommit) {
 Commitput = null;
 try {
   put = storage.remove(commit.operation().key);
   Object oldValue = put == null ? null : put.operation().value;
   publish("delete", commit.operation().key, oldValue, null);
   return oldValue;
 } finally {
   if (put != null)
     put.release();
   commit.release();
 }
}

在客户端,我们将发布一个Listen命令来表示我们对接收事件的兴趣:

client.submit(new Listen()).thenRun(() -> LOG.info("Now listening for events")).join();

然后我们可以为特定事件注册事件侦听器:

client.onEvent("put", (EntryEvent event) -> System.out.println("Put: " + event));
client.onEvent("delete", (EntryEvent event) -> System.out.println("Delete: " + event));
client.onEvent("expire", (EntryEvent event) -> System.out.println("Expire: " + event));

现在,当数据存储中发生状态更改时,将通知客户机。

总结

好吧,就这样。我们的10分钟结束了,在Copycat的帮助下,我们从头开始创建了一个生产就绪、高度一致的集群键值存储。我们还学习了一些关于分布式系统中的一致性和容错性的知识,希望现在我们可以用我们的新知识创建一些其他的东西。

Copycat和它的姐妹项目Atomix的目标并不是要构建一个像etcd这样的特定技术的克隆,就像现在看起来的那样。其目标是授权用户构建适合自己需要的系统。

模仿可以让我们比以前更快、更安全、更容易地构建复杂的系统。那么,既然你已经看到了它能做什么,你将建立什么?