0%

什么是Transformer模型并且他们的工作原理是什么?

长话短说:
Transformer 是机器学习领域的一项新发展,最近引起了很大轰动。 他们非常擅长跟踪上下文,这就是为什么他们写的文本有意义。 在这篇博文中,我们将介绍它们的架构及其工作原理。

Transformer 模型是机器学习中最激动人心的新发展之一。 它们是在论文 Attention is All You Need 中介绍的。 Transformer 可以用来写故事、散文、诗歌、回答问题、翻译语言、与人类聊天,甚至可以通过对人类来说很难的考试! 但它们是什么? 你会很高兴知道 transformer 模型的架构并不那么复杂,它只是一些非常有用的组件的串联,每个组件都有自己的功能。 在本文中,您将学习所有这些组件。

这篇博文包含一个简单的概念介绍。 如需更详细地描述Transformer 模型及其工作原理,请查看来自 Jay Alammar 的两篇优秀文章:

简而言之,Transformer 的作用是什么?想象一下,您正在用手机写一条短信。 在每个单词之后,您可能会得到三个单词的建议。 例如,如果您输入“Hello, how are”,手机可能会建议下一个词,例如“you”或“your”。当然,如果你继续选择手机中的建议词,你很快就会发现这些词组成的信息毫无意义。如果你看 3 或 4 个连续的单词,它可能是有道理的,但这些单词并没有连接成任何有意义的东西。这是因为手机中使用的模型不包含消息的整体上下文,它只是预测哪个词更有可能出现在最后几个词之后。 另一方面,Transformer 会跟踪所写内容的上下文,这就是它们编写的文本有意义的原因。

The phone can suggest the next word to use in a text message, but does not have the power to generate coherent text.

我必须对你说实话,当我第一次发现变压器是一个字一个字地构建文本时,我无法相信它。首先,这不是人类形成句子和思想的方式。我们首先形成一个基本的思想,然后开始完善它并向它添加词语。这也不是ML模型做其他事情的方式。例如,图像就不是这样构建的。大多数基于神经网络的图形模型形成一个粗略的图像版本,然后慢慢完善它或添加细节,直到它变得完美。那么,为什么一个Transformer 模型会逐字建立文本呢?一个答案是,因为这样做效果非常好。一个更令人满意的答案是,因为Transformer 在跟踪上下文方面是如此令人难以置信的好,他们挑选的下一个词正是它所需要的,以继续推进一个想法。

那么Transformer 是如何训练的呢?通过大量的数据,事实上是互联网上的所有数据。因此,当你把 "你好,你好 "这句话输入转化器时,它只是知道,根据互联网上的所有文本,最好的下一个词是 "你"。如果你给它一个更复杂的命令,比如说,"写一个故事",它可能会发现,一个好的下一个词是 "Once(曾经)"。然后它将这个词添加到命令中,并计算出下一个好的词是“upon”,(once upon的意思是“从前”)依此类推。一个字一个字地写下去,直到写成一个故事。

Command: Write a story.
Response: Once

Next command: Write a story. Once
Response: upon

Next command: Write a story. Once upon
Response: a

Next command: Write a story. Once upon a
Response: time

Next command: Write a story. Once upon a time
Response: there

etc.

现在我们知道了Transformer 的作用,让我们来看看它们的架构。如果你看过 Transformer 模型的架构,你可能会像我第一次看到它时那样惊叹不已,它看起来相当复杂!然而,当你把它分解成最重要的部分时,它并没有那么糟糕。Transformer 有4个主要部分:

  1. Tokenization(标记化)
  2. Embedding(嵌入)
  3. Positional encoding(位置编码)
  4. Transformer block (several of these)
  5. Softmax(软最大)

第四个,Transformer 块,是最复杂的。 其中许多可以串联起来,每个都包含两个主要部分:attention 和前馈组件。

The architecture of a transformer model

让我们一一研究这些部分。

Tokenization

标记化是最基本的步骤。 它由一个大的标记数据集组成,包括所有单词、标点符号等。标记化步骤获取每个单词、前缀、后缀和标点符号,并将它们发送到库中的已知标记。

Tokenization: Turning words into tokens

例如,如果句子是“Write a story”,那么对应的 4 个 token 将是

1
<Write>, <a>, <story>, and <.>.

Embedding

一旦输入被标记化,就该把单词变成数字了。为此,我们使用一个嵌入。嵌入是任何大型语言模型中最重要的部分之一;它是橡胶与道路的结合处。其原因是,它是将文本变成数字的桥梁。由于人类擅长处理文本,而计算机擅长处理数字,这座桥梁越强大,语言模型就越强大。

简而言之,文本嵌入将每段文本发送到一个数字向量(列表)。 如果两段文本相似,那么它们对应的向量中的数字彼此相似(从分量上来说,意味着相同位置的每对数字相似)。否则,如果两段文本不同,则它们对应的向量中的数字也不同。 如果您想了解更多信息,请查看这篇关于文本嵌入的帖子及其相应的视频

尽管嵌入是数字的,但我喜欢用几何来想象它们。 想象一下,有一个非常简单的嵌入,它将每个单词发送到长度为 2 的向量(即 2 个数字的列表)。如果我们将每个词定位在这两个数字给出的坐标中(想象一下数字在一条街道和一条大道上的位置),那么我们就有所有的词站在一个大平面上。在这个平面上,相似的词出现在彼此之间,而不同的词则出现在彼此之间的远方。例如,在下面的嵌入中,樱桃的坐标是[6,4],与草莓[5,4]很接近,但与城堡[1,2]很远。

Embedding: Turning words (tokens) into vectors (lists of numbers)

在更大的嵌入的情况下,每个词被发送到一个更长的向量(例如,长度为 4096),那么这些词不再存在于二维平面中,而是存在于一个大的 4096 维空间中。然而,即使在这个大空间里,我们也可以认为词与词之间有近有远,所以嵌入的概念仍然有意义。

词嵌入泛化为文本嵌入,其中整个句子、段落甚至更长的文本被发送到一个向量。 然而,在 transformers 的情况下,我们将使用词嵌入,这意味着句子中的每个词都被发送到相应的向量。 更具体地说,输入文本中的每个标记都将被发送到嵌入中的相应向量。

例如,如果我们正在考虑的句子是“写一个故事”。 标记是 <Write><a><story><.>,然后这些标记中的每一个都将被发送到一个长向量,我们将有四个向量。

In general embeddings send every word (token) to a long list of numbers

Positional encoding

一旦我们有了句子中每个标记对应的向量,下一步就是将所有这些都变成一个向量来处理。将一堆向量变成一个向量的最常见方法是按分量相加。这意味着,我们分别相加每个坐标。例如,如果向量(长度为 2)为 [1,2] 和 [3,4],则它们对应的总和为 [1+3, 2+4],等于 [4, 6]。这可以奏效,但有一个小警告。加法是可交换的,这意味着如果你以不同的顺序添加相同的数字,你会得到相同的结果。在那种情况下,“I’m not sad, I’m happy” and the sentence “I’m not happy, I’m sad”, 会产生相同的向量,因为它们有相同的词,除了 以不同的顺序。这不太好。因此,我们必须想出一些方法,为这两个句子提供不同的向量。有几种方法可行,我们将采用其中一种:位置编码(positional encoding)。位置编码包括将一系列预定义向量添加到单词的嵌入向量中。这确保我们为每个句子获得一个唯一的向量,并且具有不同顺序的相同单词的句子将被分配不同的向量。在下面的示例中,对应于单词“Write”、“a”、“story”和“.”的向量。成为携带有关其位置信息的修改向量,标记为“Write (1)”、“a (2)”、“story (3)”和“. (4)”。

Positional encoding adds a positional vector to each word, in order to keep track of the positions of the words.

现在我们知道我们有一个对应于句子的唯一向量,并且这个向量包含句子中所有单词及其顺序的信息,我们可以进入下一步。

Transformer block

让我们回顾一下到目前为止我们所拥有的。 单词进入并转化为标记(标记化),然后考虑顺序(位置编码)。这为我们输入到模型的每个标记提供了一个向量。现在,下一步是预测这句话中的下一个单词。这是通过一个非常非常大的神经网络完成的,该神经网络经过精确训练以预测句子中的下一个单词。

我们可以训练如此庞大的网络,但我们可以通过添加一个关键步骤来极大地改进它:attention 组件。在开创性论文Attention is All you Need中被介绍,它是 Transformer 模型的关键要素之一,也是它们运行良好的原因之一。下一节将解释Attention,但现在,将其想象成一种为文本中的每个单词添加上下文的方法。

在前馈网络的每个块中都添加了Attention组件。因此,如果你想象一个大型前馈神经网络,其目标是预测下一个单词,它由几个较小的神经网络块组成,那么每个块都会添加一个Attention组件。Transformer 的每个组件,称为Transformer 块,然后由两个主要组件组成:

Attention 组件.
前馈组件.

Transformer 是许多Transformer 块的串联。

The transformer is a concatenation of many transformer blocks. Each one of these is composed by an attention component followed by a feedforward component (a neural network)

Attention

Attention这一步,处理一个非常重要的问题:上下文问题。如您所知,有时同一个词可以有不同的含义。这往往会混淆语言模型,因为嵌入只是将单词发送到向量,而不知道它们使用的是哪个单词的定义。

Attention 是一种非常有用的技术,可以帮助语言模型理解上下文。为了理解注意力是如何工作的,请考虑以下两个句子:

Sentence 1: The bank of the river.
Sentence 2: Money in the bank.

如您所见,“银行”一词出现在两者中,但定义不同。在第一句中,我们指的是河边的土地,在第二句中指的是持有货币的机构。计算机对此一无所知,因此我们需要以某种方式将这些知识注入其中。什么可以帮我们?好吧,看起来句子中的其他词可以拯救我们。对于第一句话,“the”和“of”这两个词对我们没有用处。但是“river”这个词让我们知道我们在谈论河边的土地。同样,在句子 2 中,“Money ”一词帮助我们理解“bank”一词现在指的是持有货币的机构。

Attention helps give context to each word, based on the other words in the sentece (or text)

简而言之,Attention 的作用是将句子(或一段文本)中的词的词义更贴近句子中的其他词。这样,“Money in the bank”这句话中的“bank”就会离“money”更近一些。同样,在“The bank of the river”这句话中,“bank”这个词会被移近“river”这个词。这样,两个句子中的修饰词“bank”都会携带一些相邻词的信息,增加上下文。

Transformer 模型中使用的Attention 步骤实际上要强大得多,它被称为多头Attention 。在多头Attention 中,使用了几种不同的嵌入来修改向量并为其添加上下文。多头Attention 帮助语言模型在处理和生成文本时达到更高的效率水平。如果您想更详细地了解Attention 机制,请查看这篇博文(blog post)及其相应的视频(video)。

The Softmax Layer

既然您已经知道 Transformer 由许多层 Transformer 块组成,每层都包含一个Attention 层和一个前馈层,您可以将其视为一个大型神经网络,用于预测句子中的下一个单词。Transformer 输出所有单词的分数,最有可能出现在句子中的单词会被给出最高的分数。

Transformer 的最后一步是 softmax 层,它将这些分数转化为概率(加起来为 1),其中最高分数对应于最高概率。然后,我们可以从这些概率中抽取下一个单词。在下面的示例中,Transformer 将 0.5 的最高概率提供给“Once”,将 0.3 和 0.2 的概率提供给“Somewhere”和“There”。一旦我们抽取,“Once”这个词就会被选中,这就是Transformer 的输出。

The softmax layer turns the scores into probabilities, and these are used to pick the next word in the text.

然后呢?好吧,我们重复这个步骤。我们现在输入文本“Write a story. Once“到模型,输出很可能是“upon”。一次又一次地重复这个步骤,Transformer 最终会写出一个故事,比如“从前,有一个……”。

Summary

在这篇博文中,您了解了Transformer 的工作原理。它们由几个块组成,每个块都有自己的功能,它们一起工作以理解文本并生成下一个单词。 这些块如下:

Tokenizer: 把单词转变成tokens。

Embedding: 把tokens转变成数字(向量)。

Positional encoding: 为文本中的单词添加顺序。

Transformer block: 猜测下一个单词。 它由一个Attention 块和一个前馈块组成。
Attention: 为文本添加上下文。
Feedforward: 是Transformer 神经网络中的一个块,猜测下一个词。

Softmax: 将分数转化为概率,以便对下一个单词进行采样。

这些步骤的重复就是写出你所见过的Transformer 创造的惊人文本的原因。

Post Training(训练后)

既然您知道transformers 是如何工作的,我们还有一些工作要做。想象一下:您作为transformers“What is the capital of Algeria?”。 我们希望它回答“Algiers”,然后继续前进。然而,transformer 是在整个互联网上训练的。互联网是一个很大的地方,它不一定是最好的问答库。例如,许多页面会有一长串没有答案的问题。在这种情况下,“What is the capital of Algeria?”之后的下一句 可能是另一个问题,例如“What is the population of Algeria?”,或“What is the capital of Burkina Faso?”。transformer 不是考虑这些响应的人,它只是模仿它在互联网上看到的(或已提供的任何数据集)。那么我们如何让transformer回答问题呢?

答案是训练后(post-training)。就像你教一个人完成某些任务一样,你可以让一个transformer 来执行任务。一旦在整个互联网上训练了一个transformer,它就会在一个大型数据集上再次训练,该数据集对应于许多问题及其各自的答案。transformer(与人类一样)对他们最近学到的东西有偏见,因此post-training已被证明是帮助transformer成功完成任务的非常有用的步骤。

Post-training还有助于完成许多其他任务。 例如,可以使用大型对话数据集对 Transformer 进行post-training,以帮助它作为聊天机器人表现良好,或者帮助我们编写故事、诗歌甚至代码。

参考:https://txt.cohere.com/what-are-transformer-models/

一、固定窗口限流算法

1.什么是固定窗口限流算法

固定窗口限流算法(Fixed Window Rate Limiting Algorithm)是一种最简单的限流算法,其原理是在固定时间窗口(单位时间)内限制请求的数量。该算法将时间分成固定的窗口,并在每个窗口内限制请求的数量。具体来说,算法将请求按照时间顺序放入时间窗口中,并计算该时间窗口内的请求数量,如果请求数量超出了限制,则拒绝该请求。

假设单位时间(固定时间窗口)是1秒,限流阀值为3。在单位时间1秒内,每来一个请求,计数器就加1,如果计数器累加的次数超过限流阀值3,后续的请求全部拒绝。等到1s结束后,计数器清0,重新开始计数。如下图:

Image

2.固定窗口算法的优缺点

  • 优点:固定窗口算法非常简单,易于实现和理解。
  • 缺点:存在明显的临界问题,比如: 假设限流阀值为5个请求,单位时间窗口是1s,如果我们在单位时间内的前0.8-1s1-1.2s,分别并发5个请求。虽然都没有超过阀值,但是如果算0.8-1.2s,则并发数高达10,已经超过单位时间1s不超过5阀值的定义啦。

Image

二、滑动窗口限流算法

1.什么是滑动窗口限流算法

滑动窗口限流算法是一种常用的限流算法,用于控制系统对外提供服务的速率,防止系统被过多的请求压垮。它将单位时间周期分为n个小周期,分别记录每个小周期内接口的访问次数,并且根据时间滑动删除过期的小周期。它可以解决固定窗口临界值的问题

用一张图解释滑动窗口算法,如下:

Image

假设单位时间还是1s,滑动窗口算法把它划分为5个小周期,也就是滑动窗口(单位时间)被划分为5个小格子。每格表示0.2s。每过0.2s,时间窗口就会往右滑动一格。然后呢,每个小周期,都有自己独立的计数器,如果请求是0.83s到达的,0.8~1.0s对应的计数器就会加1

我们来看下,滑动窗口,去解决固定窗口限流算法的临界问题,思想是怎样

假设我们1s内的限流阀值还是5个请求,0.8~1.0s内(比如0.9s的时候)来了5个请求,落在黄色格子里。时间过了1.0s这个点之后,又来5个请求,落在紫色格子里。如果是固定窗口算法,是不会被限流的,但是滑动窗口的话,每过一个小周期,它会右移一个小格。过了1.0s这个点后,会右移一小格,当前的单位时间段是0.2~1.2s,这个区域的请求已经超过限定的5了,已触发限流啦,实际上,紫色格子的请求都被拒绝啦。

当滑动窗口的格子周期划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确

2.滑动窗口限流算法的优缺点

优点

  • 简单易懂
  • 精度高(通过调整时间窗口的大小来实现不同的限流效果)
  • 可扩展性强(可以非常容易地与其他限流算法结合使用)

缺点

  • 突发流量无法处理(无法应对短时间内的大量请求,但是一旦到达限流后,请求都会直接暴力被拒绝。酱紫我们会损失一部分请求,这其实对于产品来说,并不太友好),需要合理调整时间窗口大小。

三、漏桶限流算法

1.什么是漏桶限流算法

漏桶限流算法(Leaky Bucket Algorithm)是一种流量控制算法,用于控制流入网络的数据速率,以防止网络拥塞。它的思想是将数据包看作是水滴,漏桶看作是一个固定容量的水桶,数据包像水滴一样从桶的顶部流入桶中,并通过桶底的一个小孔以一定的速度流出,从而限制了数据包的流量。

漏桶限流算法的基本工作原理是:对于每个到来的数据包,都将其加入到漏桶中,并检查漏桶中当前的水量是否超过了漏桶的容量。如果超过了容量,就将多余的数据包丢弃。如果漏桶中还有水,就以一定的速率从桶底输出数据包,保证输出的速率不超过预设的速率,从而达到限流的目的。

Image

  • 流入的水滴,可以看作是访问系统的请求,这个流入速率是不确定的。
  • 桶的容量一般表示系统所能处理的请求数。
  • 如果桶的容量满了,就达到限流的阀值,就会丢弃水滴(拒绝请求)
  • 流出的水滴,是恒定过滤的,对应服务按照固定的速率处理请求。

2.漏桶限流算法的优缺点

优点

  • 可以平滑限制请求的处理速度,避免瞬间请求过多导致系统崩溃或者雪崩。
  • 可以控制请求的处理速度,使得系统可以适应不同的流量需求,避免过载或者过度闲置。
  • 可以通过调整桶的大小和漏出速率来满足不同的限流需求,可以灵活地适应不同的场景。

缺点

  • 需要对请求进行缓存,会增加服务器的内存消耗。
  • 对于流量波动比较大的场景,需要较为灵活的参数配置才能达到较好的效果。
  • 但是面对突发流量的时候,漏桶算法还是循规蹈矩地处理请求,这不是我们想看到的。流量变突发时,我们肯定希望系统尽量快点处理请求,提升用户体验。

四、令牌桶算法

1.什么是令牌桶算法

令牌桶算法是一种常用的限流算法,可以用于限制单位时间内请求的数量。该算法维护一个固定容量的令牌桶,每秒钟会向令牌桶中放入一定数量的令牌。当有请求到来时,如果令牌桶中有足够的令牌,则请求被允许通过并从令牌桶中消耗一个令牌,否则请求被拒绝。

Image

2.令牌桶算法的优缺点

优点:

  • 稳定性高:令牌桶算法可以控制请求的处理速度,可以使系统的负载变得稳定。
  • 精度高:令牌桶算法可以根据实际情况动态调整生成令牌的速率,可以实现较高精度的限流。
  • 弹性好:令牌桶算法可以处理突发流量,可以在短时间内提供更多的处理能力,以处理突发流量。

GuavaRateLimiter限流组件,就是基于令牌桶算法实现的。

缺点:

  • 实现复杂:相对于固定窗口算法等其他限流算法,令牌桶算法的实现较为复杂。对短时请求难以处理:在短时间内有大量请求到来时,可能会导致令牌桶中的令牌被快速消耗完,从而限流。这种情况下,可以考虑使用漏桶算法。
  • 时间精度要求高:令牌桶算法需要在固定的时间间隔内生成令牌,因此要求时间精度较高,如果系统时间不准确,可能会导致限流效果不理想。

总体来说,令牌桶算法具有较高的稳定性和精度,但实现相对复杂,适用于对稳定性和精度要求较高的场景。

一、CAP

简介

CAP 也就是 Consistency(一致性)Availability(可用性)Partition Tolerance(分区容错性) 这三个单词首字母组合。

CAP 定理(CAP theorem)指出对于一个分布式系统来说,只能同时满足以下三点中的两个:

  • 一致性(Consistency) : 所有节点访问同一份最新的数据副本
  • 可用性(Availability): 非故障的节点在合理的时间内返回合理的响应(不是错误或者超时的响应)。
  • 分区容错性(Partition Tolerance) : 分布式系统出现网络分区的时候,仍然能够对外提供服务。

什么是网络分区?

分布式系统中,多个节点之前的网络本来是连通的,但是因为某些故障(比如部分节点网络出了问题)某些节点之间不连通了,整个网络就分成了几块区域,这就叫 网络分区

当发生网络分区的时候,如果我们要继续服务,那么强一致性和可用性只能 2 选 1。也就是说当网络分区之后 P 是前提,决定了 P 之后才有 C 和 A 的选择。也就是说分区容错性(Partition tolerance)我们是必须要实现的。

因此,分布式系统理论上不可能选择 CA 架构,只能选择 CP 或者 AP 架构。比如 ZooKeeper、HBase 就是 CP 架构,Cassandra、Eureka 就是 AP 架构,Nacos 不仅支持 CP 架构也支持 AP 架构。

为啥不可能选择 CA 架构呢?

举个例子:若系统出现“分区”,系统中的某个节点在进行写操作。为了保证 C, 必须要禁止其他节点的读写操作,这就和 A 发生冲突了。如果为了保证 A,其他节点的读写操作正常的话,那就和 C 发生冲突了。

选择 CP 还是 AP 的关键在于当前的业务场景,没有定论,比如对于需要确保强一致性的场景如银行一般会选择保证 CP 。

另外,需要补充说明的一点是: 如果网络分区正常的话(系统在绝大部分时候所处的状态),也就说不需要保证 P 的时候,C 和 A 能够同时保证。

总结

如果系统发生“分区”,我们要考虑选择 CP 还是 AP。如果系统没有发生“分区”的话,我们要思考如何保证 CA 。

二、BASE

简介

BASEBasically Available(基本可用)Soft-state(软状态)Eventually Consistent(最终一致性) 三个短语的缩写。

BASE 理论的核心思想

即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。

BASE 理论本质上是对 CAP 的延伸和补充,更具体地说,是对 CAP 中 AP 方案的一个补充。

为什么这样说呢?

AP 方案只是在系统发生分区的时候放弃一致性,而不是永远放弃一致性。在分区故障恢复后,系统应该达到最终一致性。这一点其实就是 BASE 理论延伸的地方。

BASE 理论三要素

基本可用

基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性。但是,这绝不等价于系统不可用。

什么叫允许损失部分可用性呢?

  • 响应时间上的损失: 正常情况下,处理用户请求需要 0.5s 返回结果,但是由于系统出现故障,处理用户请求的时间变为 3 s。
  • 系统功能上的损失:正常情况下,用户可以使用系统的全部功能,但是由于系统访问量突然剧增,系统的部分非核心功能无法使用。
软状态

软状态指允许系统中的数据存在中间状态(CAP 理论中的数据不一致),并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时。

最终一致性

最终一致性强调的是系统中所有的数据副本,在经过一段时间的同步后,最终能够达到一个一致的状态。因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性。

总结

ACID 是数据库事务完整性的理论,CAP 是分布式系统设计理论,BASE 是 CAP 理论中 AP 方案的延伸。

三、Paxos

介绍

Paxos 算法是第一个被证明完备的分布式系统共识算法。共识算法的作用是让分布式系统中的多个节点之间对某个提案(Proposal)达成一致的看法。

兰伯特当时提出的 Paxos 算法主要包含 2 个部分:

  • Basic Paxos 算法 : 描述的是多节点之间如何就某个值(提案 Value)达成共识。
  • Multi-Paxos 思想 : 描述的是执行多个 Basic Paxos 实例,就一系列值达成共识。Multi-Paxos 说白了就是执行多次 Basic Paxos ,核心还是 Basic Paxos 。

总结

Paxos 算法是兰伯特在 1990 年提出的一种分布式系统共识算法。

兰伯特当时提出的 Paxos 算法主要包含 2 个部分: Basic Paxos 算法和 Multi-Paxos 思想。

Raft 算法、ZAB 协议、 Fast Paxos 算法都是基于 Paxos 算法改进而来。

Basic Paxos 算法

Basic Paxos 中存在 3 个重要的角色:

  1. 提议者(Proposer):也可以叫做协调者(coordinator),提议者负责接受客户端的请求并发起提案。提案信息通常包括提案编号 (Proposal ID) 和提议的值 (Value)。
  2. 接受者(Acceptor):也可以叫做投票员(voter),负责对提议者的提案进行投票,同时需要记住自己的投票历史;
  3. 学习者(Learner):如果有超过半数接受者就某个提议达成了共识,那么学习者就需要接受这个提议,并就该提议作出运算,然后将运算结果返回给客户端。

为了减少实现该算法所需的节点数,一个节点可以身兼多个角色。并且,一个提案被选定需要被半数以上的 Acceptor 接受。这样的话,Basic Paxos 算法还具备容错性,在少于一半的节点出现故障时,集群仍能正常工作。

Multi Paxos 思想

Basic Paxos 算法的仅能就单个值达成共识,为了能够对一系列的值达成共识,我们需要用到 Multi Paxos 思想。

四、Raft 算法

Raft 集群中每个节点都处于以下三种角色之一:

  • Leader: 所有请求的处理者,接收客户端发起的操作请求,写入本地日志后同步至集群其它节点。
  • Follower: 请求的被动更新者,从 leader 接收更新请求,写入本地文件。如果客户端的操作请求发送给了 follower,会首先由 follower 重定向给 leader。
  • Candidate: 如果 follower 在一定时间内没有收到 leader 的心跳,则判断 leader 可能已经故障,此时启动 leader election 过程,本节点切换为 candidate 直到选主结束。

参考:https://juejin.cn/post/6907151199141625870#heading-7

为什么需要分布式锁?

与分布式锁相对应的是「单机锁」,我们在写多线程程序时,避免同时操作一个共享变量产生数据问题,通常会使用一把锁来「互斥」,以保证共享变量的正确性,其使用范围是在「同一个进程」中。

如果换做是多个进程,需要同时操作一个共享资源,如何互斥呢?

例如,现在的业务应用通常都是微服务架构,这也意味着一个应用会部署多个进程,那这多个进程如果需要修改 MySQL 中的同一行记录时,为了避免操作乱序导致数据错误,此时,我们就需要引入「分布式锁」来解决这个问题了。

想要实现分布式锁,必须借助一个外部系统,所有进程都去这个系统上申请「加锁」。

而这个外部系统,必须要实现「互斥」的能力,即两个请求同时进来,只会给一个进程返回成功,另一个返回失败(或等待)。

Redis分布式锁怎么实现?

想要实现分布式锁,必须要求 Redis 有「互斥」的能力,我们可以使用 SETNX 命令,这个命令表示SET if Not eXists,即如果 key 不存在,才会设置它的值,否则什么也不做。

两个客户端进程可以执行这个命令,达到互斥,就可以实现一个分布式锁。

客户端 1 申请加锁,加锁成功:

1
2
127.0.0.1:6379> SETNX lock 1
(integer) 1 // 客户端1,加锁成功

客户端 2 申请加锁,因为它后到达,加锁失败:

1
2
127.0.0.1:6379> SETNX lock 1
(integer) 0 // 客户端2,加锁失败

此时,加锁成功的客户端,就可以去操作「共享资源」,例如,修改 MySQL 的某一行数据,或者调用一个 API 请求。

操作完成后,还要及时释放锁,给后来者让出操作共享资源的机会。如何释放锁呢?

也很简单,直接使用 DEL 命令删除这个 key 即可:

1
2
127.0.0.1:6379> DEL lock // 释放锁
(integer) 1

这个逻辑非常简单,整体的路程就是这样:

但是,它存在一个很大的问题,当客户端 1 拿到锁后,如果发生下面的场景,就会造成「死锁」:

  1. 程序处理业务逻辑异常,没及时释放锁
  2. 进程挂了,没机会释放锁

这时,这个客户端就会一直占用这个锁,而其它客户端就「永远」拿不到这把锁了。

如何避免死锁?

很容易想到的方案是,在申请锁时,给这把锁设置一个「租期」。

在 Redis 中实现时,就是给这个 key 设置一个「过期时间」。这里我们假设,操作共享资源的时间不会超过 10s,那么在加锁时,给这个 key 设置 10s 过期即可:

1
2
3
4
127.0.0.1:6379> SETNX lock 1    // 加锁
(integer) 1
127.0.0.1:6379> EXPIRE lock 10 // 10s后自动过期
(integer) 1

这样一来,无论客户端是否异常,这个锁都可以在 10s 后被「自动释放」,其它客户端依旧可以拿到锁。

但这样还是有问题。

现在的操作,加锁、设置过期是 2 条命令,有没有可能只执行了第一条,第二条却「来不及」执行的情况发生呢?例如:

  1. SETNX 执行成功,执行 EXPIRE 时由于网络问题,执行失败
  2. SETNX 执行成功,Redis 异常宕机,EXPIRE 没有机会执行
  3. SETNX 执行成功,客户端异常崩溃,EXPIRE 也没有机会执行

总之,这两条命令不能保证是原子操作(一起成功),就有潜在的风险导致过期时间设置失败,依旧发生「死锁」问题。

在 Redis 2.6.12 版本之前,我们需要想尽办法,保证 SETNX 和 EXPIRE 原子性执行,还要考虑各种异常情况如何处理。

但在 Redis 2.6.12 之后,Redis 扩展了 SET 命令的参数,用这一条命令就可以了:

1
2
3
// 一条命令保证原子性执行
127.0.0.1:6379> SET lock 1 EX 10 NX
OK

这样就解决了死锁问题,也比较简单。

再来分析下,它还有什么问题?

试想这样一种场景:

  1. 客户端 1 加锁成功,开始操作共享资源
  2. 客户端 1 操作共享资源的时间,「超过」了锁的过期时间,锁被「自动释放」
  3. 客户端 2 加锁成功,开始操作共享资源
  4. 客户端 1 操作共享资源完成,释放锁(但释放的是客户端 2 的锁)

看到了么,这里存在两个严重的问题:

  1. 锁过期:客户端 1 操作共享资源耗时太久,导致锁被自动释放,之后被客户端 2 持有
  2. 释放别人的锁:客户端 1 操作共享资源完成后,却又释放了客户端 2 的锁

先从第二个问题说起。

第二个问题在于,一个客户端释放了其它客户端持有的锁。

导致这个问题的关键点在于,每个客户端在释放锁时,都是「无脑」操作,并没有检查这把锁是否还「归自己持有」,所以就会发生释放别人锁的风险。

解决办法是:客户端在加锁时,设置一个只有自己知道的「唯一标识」进去。

1
2
3
// 锁的VALUE设置为UUID
127.0.0.1:6379> SET lock $uuid EX 20 NX
OK

之后,在释放锁时,要先判断这把锁是否还归自己持有,伪代码可以这么写:

1
2
3
// 锁是自己的,才释放
if redis.get("lock") == $uuid:
redis.del("lock")

在这里,判断是不是当前线程加的锁释放锁不是一个原子操作。在释放锁的时候,可能这把锁已经不属于当前客户端,会解除他人加的锁。这时就需要 Lua 脚本来保证解锁的原子性。

安全释放锁的 Lua 脚本如下:

1
2
3
4
5
6
7
// 判断锁是自己的,才释放
if redis.call("GET",KEYS[1]) == ARGV[1]
then
return redis.call("DEL",KEYS[1])
else
return 0
end

基于 Redis 实现的分布式锁,一个严谨的的流程如下:

  1. 加锁:SET lock_key $unique_id EX $expire_time NX
  2. 操作共享资源
  3. 释放锁:Lua 脚本,先 GET 判断锁是否归属自己,再 DEL 释放锁

第一个问题,可能是我们评估操作共享资源的时间不准确导致的。

增大冗余时间,确实可以「缓解」这个问题,降低出问题的概率,但依旧无法「彻底解决」问题。

原因在于,客户端在拿到锁之后,在操作共享资源时,遇到的场景有可能是很复杂的,例如,程序内部发生异常、网络请求超时等等。

既然是「预估」时间,也只能是大致计算,除非你能预料并覆盖到所有导致耗时变长的场景,但这其实很难。

那怎么办呢?

是否可以设计这样的方案:加锁时,先设置一个过期时间,然后我们开启一个「守护线程」,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源还未完成,那么就自动对锁进行「续期」,重新设置过期时间。

这确实一种比较好的方案。幸运的是,已经有一个库把这些工作都封装好了:Redisson

Redisson 是一个 Java 语言实现的 Redis SDK 客户端,在使用分布式锁时,它就采用了「自动续期」的方案来避免锁过期,这个守护线程我们一般也把它叫做「看门狗」线程。

除此之外,这个 SDK 还封装了很多易用的功能:

  • 可重入锁
  • 乐观锁
  • 公平锁
  • 读写锁
  • Redlock

这个 SDK 提供的 API 非常友好,它可以像操作本地锁的方式,操作分布式锁。如果你是 Java 技术栈,可以直接把它用起来。

总结

基于 Redis 的实现分布式锁,前面遇到的问题,以及对应的解决方案:

  • 死锁:设置过期时间
  • 过期时间评估不好,锁提前过期:守护线程,自动续期
  • 锁被别人释放:锁写入唯一标识,释放锁先检查标识,再释放(检查并释放需要使用Lua脚本)

1. 什么是幂等?

幂等是一个数学与计算机科学概念。

  • 在数学中,幂等用函数表达式就是:f(x) = f(f(x))。比如求绝对值的函数,就是幂等的,abs(x) = abs(abs(x))
  • 计算机科学中,幂等表示一次和多次请求某一个资源应该具有同样的副作用,或者说,多次请求所产生的影响与一次请求执行的影响效果相同。

2. 为什么需要幂等

举个例子:

我们开发一个转账功能,假设我们调用下游接口超时了。一般情况下,超时可能是网络传输丢包的问题,也可能是请求时没送到,还有可能是请求到了,返回结果却丢了。这时候我们是否可以重试呢?如果重试的话,是否会多转了一笔钱呢?

转账超时

当前互联网的系统几乎都是解耦隔离后,会存在各个不同系统的相互远程调用。调用远程服务会有三个状态:成功,失败,或者超时。前两者都是明确的状态,而超时则是未知状态。我们转账超时的时候,如果下游转账系统做好幂等控制,我们发起重试,那即可以保证转账正常进行,又可以保证不会多转一笔

其实除了转账这个例子,日常开发中,还有很多很多例子需要考虑幂等。比如:

  • MQ(消息中间件)消费者读取消息时,有可能会读取到重复消息。(重复消费
  • 比如提交form表单时,如果快速点击提交按钮,可能产生了两条一样的数据(前端重复提交

3. 接口超时了,到底如何处理?

两种方案处理:

  • 方案一:就是下游系统提供一个对应的查询接口。如果接口超时了,先查下对应的记录,如果查到是成功,就走成功流程,如果是失败,就按失败处理。

拿我们的转账例子来说,转账系统提供一个查询转账记录的接口,如果渠道系统调用转账系统超时时,渠道系统先去查询一下这笔记录,看下这笔转账记录成功还是失败,如果成功就走成功流程,失败再重试发起转账。

图片

  • 方案二:下游接口支持幂等,上游系统如果调用超时,发起重试即可。

图片

两种方案都是挺不错的,但是如果是MQ重复消费的场景,方案一处理并不是很妥,所以,我们还是要求下游系统对外接口支持幂等

4. 如何设计幂等

既然这么多场景需要考虑幂等,那我们如何设计幂等呢?

幂等意味着一条请求的唯一性。不管是你哪个方案去设计幂等,都需要一个全局唯一的ID,去标记这个请求是独一无二的。

  • 如果你是利用唯一索引控制幂等,那唯一索引是唯一的
  • 如果你是利用数据库主键控制幂等,那主键是唯一的
  • 如果你是悲观锁的方式,底层标记还是全局唯一的ID

4.1 全局的唯一性ID

全局唯一性ID,我们怎么去生成呢?你可以回想下,数据库主键Id怎么生成的呢?

是的,我们可以使用UUID,但是UUID的缺点比较明显,它字符串占用的空间比较大,生成的ID过于随机,可读性差,而且没有递增。

我们还可以使用雪花算法(Snowflake) 生成唯一性ID。

雪花算法是一种生成分布式全局唯一ID的算法,生成的ID称为Snowflake IDs。这种算法由Twitter创建,并用于推文的ID。

一个Snowflake ID有64位。

  • 第1位:Java中long的最高位是符号位代表正负,正数是0,负数是1,一般生成ID都为正数,所以默认为0。
  • 接下来前41位是时间戳,表示了自选定的时期以来的毫秒数。
  • 接下来的10位代表计算机ID,防止冲突。
  • 其余12位代表每台机器上生成ID的序列号,这允许在同一毫秒内创建多个Snowflake ID。

雪花算法

当然,全局唯一性的ID,还可以使用百度的Uidgenerator,或者美团的Leaf

4.2 幂等设计的基本流程

幂等处理的过程,说到底其实就是过滤一下已经收到的请求,当然,请求一定要有一个全局唯一的ID标记哈。然后,怎么判断请求是否之前收到过呢?把请求储存起来,收到请求时,先查下存储记录,记录存在就返回上次的结果,不存在就处理请求。

一般的幂等处理就是这样啦,如下:

图片

5. 实现幂等的8种方案

幂等设计的基本流程都是类似的

5.1 select+insert+主键/唯一索引冲突

日常开发中,为了实现交易接口幂等,我是这样实现的:

交易请求过来,我会先根据请求的唯一流水号 bizSeq字段,先select一下数据库的流水表

  • 如果数据已经存在,就拦截是重复请求,直接返回成功;
  • 如果数据不存在,就执行insert插入,如果insert成功,则直接返回成功,如果insert产生主键冲突异常,则捕获异常,接着直接返回成功。

流程图如下

图片

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 幂等处理
*/
Rsp idempotent(Request req){
Object requestRecord =selectByBizSeq(bizSeq);

if(requestRecord !=null){
//拦截是重复请求
log.info("重复请求,直接返回成功,流水号:{}",bizSeq);
return rsp;
}

try{
insert(req);
}catch(DuplicateKeyException e){
//拦截是重复请求,直接返回成功
log.info("主键冲突,是重复请求,直接返回成功,流水号:{}",bizSeq);
return rsp;
}

//正常处理请求
dealRequest(req);

return rsp;
}

为什么前面已经select查询了,还需要try...catch...捕获重复异常呢?

是因为高并发场景下,两个请求去select的时候,可能都没查到,然后都走到insert的地方啦。

当然,用唯一索引代替数据库主键也是可以的哈,都是全局唯一的ID即可。

5.2. 直接insert + 主键/唯一索引冲突

在5.1方案中,都会先查一下流水表的交易请求,判断是否存在,然后不存在再插入请求记录。如果重复请求的概率比较低的话,我们可以直接插入请求,利用主键/唯一索引冲突,去判断是重复请求

流程图如下:

图片

伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 幂等处理
*/
Rsp idempotent(Request req){

try{
insert(req);
}catch(DuplicateKeyException e){
//拦截是重复请求,直接返回成功
log.info("主键冲突,是重复请求,直接返回成功,流水号:{}",bizSeq);
return rsp;
}

//正常处理请求
dealRequest(req);
return rsp;
}

温馨提示 :

大家别搞混哈,防重和幂等设计其实是有区别的。防重主要为了避免产生重复数据,把重复请求拦截下来即可。而幂等设计除了拦截已经处理的请求,还要求每次相同的请求都返回一样的效果。不过呢,很多时候,它们的处理流程可以是类似的。

5.3 状态机幂等

很多业务表,都是有状态的,比如转账流水表,就会有0-待处理,1-处理中、2-成功、3-失败状态。转账流水更新的时候,都会涉及流水状态更新,即涉及**状态机 (即状态变更图)**。我们可以利用状态机实现幂等,一起来看下它是怎么实现的。

比如转账成功后,把处理中的转账流水更新为成功状态,SQL这么写:

1
update transfr_flow set status=2 where biz_seq=666and status=1;

简要流程图如下:

图片

伪代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Rsp idempotentTransfer(Request req){
String bizSeq = req.getBizSeq();
int rows= "update transfr_flow set status=2 where biz_seq=#{bizSeq} and status=1;"
if(rows==1){
log.info(“更新成功,可以处理该请求”);
//其他业务逻辑处理
return rsp;
}else if(rows==0){
log.info(“更新不成功,不处理该请求”);
//不处理,直接返回
return rsp;
}

log.warn("数据异常")
return rsp:
}

状态机是怎么实现幂等的呢?

  • 第1次请求来时,bizSeq流水号是 666,该流水的状态是处理中,值是 1,要更新为2-成功的状态,所以该update语句可以正常更新数据,sql执行结果的影响行数是1,流水状态最后变成了2。
  • 第2请求也过来了,如果它的流水号还是 666,因为该流水状态已经2-成功的状态了,所以更新结果是0,不会再处理业务逻辑,接口直接返回。

5.4 建防重表

有时候表中并非所有的场景都不允许产生重复的数据,只有某些特定场景才不允许。这时候,直接在表中加唯一索引,显然是不太合适的。

针对这种情况,我们可以通过建防重表来解决问题。

该表可以只包含两个字段:id唯一索引,唯一索引可以是多个字段比如:name、code等组合起来的唯一标识,例如:susan_0001。

具体流程图如下:

图片

具体步骤:

  1. 用户通过浏览器发起请求,服务端收集数据。
  2. 将该数据插入mysql防重表
  3. 判断是否执行成功,如果成功,则做mysql其他的数据操作(可能还有其他的业务逻辑)。
  4. 如果执行失败,捕获唯一索引冲突异常,直接返回成功。

需要特别注意的是:防重表和业务表必须在同一个数据库中,并且操作要在同一个事务中。

5.5 token令牌

token 令牌方案一般包括两个请求阶段:

  1. 客户端请求申请获取token,服务端生成token返回
  2. 客户端带着token请求,服务端校验token

流程图如下:

图片

  1. 客户端发起请求,申请获取token。
  2. 服务端生成全局唯一的token,保存到redis中(一般会设置一个过期时间),然后返回给客户端。
  3. 客户端带着token,发起请求。
  4. 服务端去redis确认token是否存在,一般用 redis.del(token)的方式,如果存在会删除成功,即处理业务逻辑,如果删除失败不处理业务逻辑,直接返回结果。

5.6 悲观锁(如select for update)

什么是悲观锁

通俗点讲就是很悲观,每次去操作数据时,都觉得别人中途会修改,所以每次在拿数据的时候都会上锁。官方点讲就是,共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程。

悲观锁如何控制幂等的呢?就是加锁呀,一般配合事务来实现。

举个更新订单的业务场景:

假设先查出订单,如果查到的是处理中状态,就处理完业务,再然后更新订单状态为完成。如果查到订单,并且是不是处理中的状态,则直接返回

整体的伪代码如下:

1
2
3
4
5
6
7
8
9
begin;  # 1.开始事务
select * from order where order_id='666' # 查询订单,判断状态
if(status !=处理中){
//非处理中状态,直接返回;
return ;
}
## 处理业务逻辑
update order set status='完成' where order_id='666' # 更新完成
commit; # 5.提交事务

这种场景是非原子操作的,在高并发环境下,可能会造成一个业务被执行两次的问题:

当一个请求A在执行中时,而另一个请求B也开始状态判断的操作。因为请求A还未来得及更改状态,所以请求B也能执行成功,这就导致一个业务被执行了两次。

可以使用数据库悲观锁(select ...for update)解决这个问题.

1
2
3
4
5
6
7
8
9
begin;  # 1.开始事务
select * from order where order_id='666' for update # 查询订单,判断状态,锁住这条记录
if(status !=处理中){
//非处理中状态,直接返回;
return ;
}
## 处理业务逻辑
update order set status='完成' where order_id='666' # 更新完成
commit; # 5.提交事务
  • 这里面order_id需要是索引主键哈,要锁住这条记录就好,如果不是索引或者主键,会锁表的!
  • 悲观锁在同一事务操作过程中,锁住了一行数据。别的请求过来只能等待,如果当前事务耗时比较长,就很影响接口性能。所以一般不建议用悲观锁做这个事情。

5.7 乐观锁

悲观锁有性能问题,可以试下乐观锁

什么是乐观锁

乐观锁在操作数据时,则非常乐观,认为别人不会同时在修改数据,因此乐观锁不会上锁。只是在执行更新的时候判断一下,在此期间别人是否修改了数据。

怎样实现乐观锁呢?

就是给表的加多一列version版本号,每次更新记录version都升级一下(version=version+1)。具体流程就是先查出当前的版本号version,然后去更新修改数据时,确认下是不是刚刚查出的版本号,如果是才执行更新

比如,我们更新前,先查下数据,查出的版本号是version =1

1
select order_id,version from order where order_id='666';

然后使用version =1订单Id一起作为条件,再去更新

1
update order set version = version +1,status='P' where  order_id='666' and version =1

最后更新成功,才可以处理业务逻辑,如果更新失败,默认为重复请求,直接返回。

流程图如下:

图片

为什么版本号建议自增的呢?

因为乐观锁存在ABA的问题,如果version版本一直是自增的就不会出现ABA的情况啦。

5.8 分布式锁

分布式锁实现幂等性的逻辑就是,请求过来时,先去尝试获得分布式锁,如果获得成功,就执行业务逻辑,反之获取失败的话,就舍弃请求直接返回成功。执行流程如下图所示:

图片

  • 分布式锁可以使用Redis,也可以使用ZooKeeper,不过还是Redis相对好点,因为较轻量级。
  • Redis分布式锁,可以使用命令SET EX PX NX + 唯一流水号实现,分布式锁的key必须为业务的唯一标识哈
  • Redis执行设置key的动作时,要设置过期时间哈,这个过期时间不能太短,太短拦截不了重复请求,也不能设置太长,会占存储空间。

6. HTTP的幂等

我们的接口,一般都是基于http的,所以我们再来聊聊Http的幂等吧。HTTP 请求方法主要有以下这几种,我们看下各个接口是否都是幂等的。

  • GET方法
  • HEAD方法
  • OPTIONS方法
  • DELETE方法
  • POST 方法
  • PUT方法

6.1 GET 方法

HTTP 的GET方法用于获取资源,可以类比于数据库的select查询,不应该有副作用,所以是幂等的。它不会改变资源的状态,不论你调用一次还是调用多次,效果一样的,都没有副作用。

如果你的GET方法是获取最近最新的新闻,不同时间点调用,返回的资源内容虽然不一样,但是最终对资源本质是没有影响的哈,所以还是幂等的。

6.2 HEAD 方法

HTTP HEAD和GET有点像,主要区别是HEAD不含有呈现数据,而仅仅是HTTP的头信息,所以它也是幂等的。如果想判断某个资源是否存在,很多人会使用GET,实际上用HEAD则更加恰当。即HEAD方法通常用来做探活使用。

6.3 OPTIONS方法

HTTP OPTIONS 主要用于获取当前URL所支持的方法,也是有点像查询,因此也是幂等的。

6.4 DELETE方法

HTTP DELETE 方法用于删除资源,它是的幂等的。比如我们要删除id=666的帖子,一次执行和多次执行,影响的效果是一样的呢。

6.5 POST 方法

HTTP POST 方法用于创建资源,可以类比于提交信息,显然一次和多次提交是有副作用,执行效果是不一样的,不满足幂等性

比如:POST http://www.tianluo.com/articles的语义是在http://www.tianluo.com/articles下创建一篇帖子,HTTP 响应中应包含帖子的创建状态以及帖子的 URI。两次相同的POST请求会在服务器端创建两份资源,它们具有不同的 URI;所以,POST方法不具备幂等性

6.6 PUT 方法

HTTP PUT 方法用于创建或更新操作,所对应的URI是要创建或更新的资源本身,有副作用,它应该满足幂等性。

比如:PUT http://www.tianluo.com/articles/666的语义是创建或更新 ID 为666的帖子。对同一 URI 进行多次 PUT 的副作用和一次 PUT 是相同的;因此,PUT 方法具有幂等性。

一、IO模型

1.I/O 模型基本说明

(1)I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的性能

(2)Java 共支持 3 种网络编程模型/IO 模式:BIO、NIO、AIO

(3)Java BIO :同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销【简单示意图】

image-20230126104041687

(4)Java NIO : 同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理 【简单示意图】

image-20230126104146221

(5)Java AIO(NIO.2) : 异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用

2.BIO、NIO、AIO 适用场景分析

(1)BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序简单易理解。

(2)NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持。

(2)AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS 参与并发操作,编程比较复杂,JDK7 开始支持。

二、Java BIO 编程

1.Java BIO 基本介绍

(1)Java BIO 就是传统的 java io 编程,其相关的类和接口在 java.io

(2)BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。 【后有应用实例】

(3)BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,程序简单易理解

2.Java BIO 工作机制

image-20230126104948264

对 BIO 编程流程的梳理

(1)服务器端启动一个 ServerSocket

(2)客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户 建立一个线程与之通讯

(3)客户端发出请求后, 先咨询服务器是否有线程响应,如果没有则会等待,或者被拒绝

(4)如果有响应,客户端线程会等待请求结束后,在继续执行

3.Java BIO 应用实例

实例说明:

(1)使用 BIO 模型编写一个服务器端,监听 6666 端口,当有客户端连接时,就启动一个线程与之通讯.

(2)要求使用线程池机制改善,可以连接多个客户端.

(3)服务器端可以接收客户端发送的数据(telnet 方式即可).

(4)代码演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BIOServer {

public static void main(String[] args) throws Exception {
//线程池机制
//思路
//1. 创建一个线程池
//2. 如果有客户端连接,就创建一个线程,与之通讯(单独写一个方法)
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
//创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while (true) {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
//监听,等待客户端连接
System.out.println("等待连接....");
final Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//就创建一个线程,与之通讯(单独写一个方法)
newCachedThreadPool.execute(new Runnable() {
public void run() {//我们重写
//可以和客户端通讯
handler(socket);
}
});
}
}

//编写一个handler方法,和客户端通讯
public static void handler(Socket socket) {
try {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
//通过socket获取输入流
InputStream inputStream = socket.getInputStream();
//循环的读取客户端发送的数据
while (true) {
System.out.println("线程信息id = " + Thread.currentThread().getId() + "名字 = " + Thread.currentThread().getName());
System.out.println("read....");
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));//输出客户端发送的数据
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("关闭和client的连接");
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

4.Java BIO 问题分析

(1)每个请求都需要创建独立的线程,与对应的客户端进行数据Read,业务处理,数据Write

(2)当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大

(3)连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费

三、Java NIO 编程

1.Java NIO 基本介绍

(1)Java NIO 全称 java non-blocking IO,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 New IO),是同步非阻塞的

(2)NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。【基本案例】

(3)NIO 有三大核心部分:Channel(通道),Buffer(缓冲区), Selector(选择器)

(4)NIO 是面向缓冲区 ,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络

(5)Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。【后面有案例说明】

(6)通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。

(7)HTTP2.0 使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比HTTP1.1大了好几个数量级

(8)案例说明 NIO 的 Buffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.atguigu.nio;

import java.nio.IntBuffer;

public class BasicBuffer {

public static void main(String[] args) {

//举例说明 Buffer 的使用(简单说明)
//创建一个 Buffer,大小为 5,即可以存放 5 个 int
IntBuffer intBuffer = IntBuffer.allocate(5);

//向buffer存放数据
//intBuffer.put(10);
//intBuffer.put(11);
//intBuffer.put(12);
//intBuffer.put(13);
//intBuffer.put(14);
for (int i = 0; i < intBuffer.capacity(); i++) {
intBuffer.put(i * 2);
}
//如何从 buffer 读取数据
//将 buffer 转换,读写切换(!!!)
intBuffer.flip();
while (intBuffer.hasRemaining()) {
System.out.println(intBuffer.get());
}
}
}

2.NIO 和 BIO 的比较

(1)BIO 以流的方式处理数据,而 NIO 以块的方式处理数据,块 I/O 的效率比流 I/O 高很多

(2)BIO 是阻塞的,NIO 则是非阻塞的

(3)BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel(通道)和 Buffer(缓冲区)进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。Selector(选择器)用于监听多个通道的事件(比如:连接请求,数据到达等),因此使用单个线程就可以监听多个客户端通道

3.NIO 三大核心原理示意图

一张图描述 NIOSelectorChannelBuffer 的关系。

Selector 、 Channel 和 Buffer 的关系图(简单版)

关系图的说明:

img

  1. 每个 Channel 都会对应一个 Buffer
  2. Selector 对应一个线程,一个线程对应多个 Channel(连接)。
  3. 该图反应了有三个 Channel 注册到该 Selector //程序
  4. 程序切换到哪个 Channel 是由事件决定的,Event 就是一个重要的概念。
  5. Selector 会根据不同的事件,在各个通道上切换。
  6. Buffer 就是一个内存块,底层是有一个数组。
  7. 数据的读取写入是通过 Buffer,这个和 BIO不同,BIO 中要么是输入流,或者是输出流,不能双向,但是 NIOBuffer 是可以读也可以写,需要 flip 方法切换
  8. Channel 是双向的,可以反映底层操作系统的情况,比如 Linux,底层的操作系统通道就是双向的。

4.缓冲区(Buffer)

基本介绍

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer,如图:【后面举例说明】

img

Buffer 类及其子类

(1)在 NIO 中,Buffer 是一个顶层父类,它是一个抽象类, 类的层级关系图:

img

(2)Buffer 类定义了所有的缓冲区都具有的四个属性来提供关于其所包含的数据元素的信息:

img

(3)Buffer 类相关方法一览

img

ByteBuffer

从前面可以看出对于 Java 中的基本数据类型(boolean 除外),都有一个 Buffer 类型与之相对应,最常用的自然是 ByteBuffer 类(二进制数据),该类的主要方法如下:

img

5.通道(Channel)

基本介绍

(1)NIO 的通道类似于流,但有些区别如下:

  • 通道可以同时进行读写,而流只能读或者只能写
  • 通道可以实现异步读写数据
  • 通道可以从缓冲读数据,也可以写数据到缓冲

(2)BIO 中的 stream 是单向的,例如 FileInputStream 对象只能进行读取数据的操作,而NIO 中的通道(Channel)是双向的,可以读操作,也可以写操作。

(3)Channel 在 NIO 中是一个接口public interface Channel extends Closeable{}

(4)常 用 的 Channel 类 有 : FileChannel 、 DatagramChannel 、 ServerSocketChannel 和SocketChannel 。【ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket】

(5)FileChannel 用于文件的数据读写,DatagramChannel 用于 UDP 的数据读写,ServerSocketChannel 和SocketChannel 用于 TCP 的数据读写。

(6)图示

img

6.Selector(选择器)

基本介绍

  1. JavaNIO,用非阻塞的 IO 方式。可以用一个线程,处理多个的客户端连接,就会使用到 Selector(选择器)。
  2. Selector 能够检测多个注册的通道上是否有事件发生(注意:多个 Channel 以事件的方式可以注册到同一个 Selector),如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。【示意图】
  3. 只有在连接/通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程。
  4. 避免了多线程之间的上下文切换导致的开销。

Selector 示意图和特点说明

img

说明如下:

  1. NettyIO 线程 NioEventLoop 聚合了 Selector(选择器,也叫多路复用器),可以同时并发处理成百上千个客户端连接。
  2. 当线程从某客户端 Socket 通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。
  3. 线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。
  4. 由于读写操作都是非阻塞的,这就可以充分提升 IO 线程的运行效率,避免由于频繁 I/O 阻塞导致的线程挂起。
  5. 一个 I/O 线程可以并发处理 N 个客户端连接和读写操作,这从根本上解决了传统同步阻塞 I/O 一个连接一个线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

Selector 类相关方法

Selector 类是一个抽象类, 常用方法和说明如下:

img

注意事项

1.NIO 中的 ServerSocketChannel 功能类似 ServerSocket,SocketChannel 功能类似 Socket

2.selector 相关方法说明

  • selector.select(); //阻塞
  • selector.select(1000); //阻塞 1000 毫秒,在 1000 毫秒后返回
  • selector.wakeup(); //唤醒 selector
  • selector.selectNow(); //不阻塞,立马返还

7.NIO 非阻塞网络编程原理分析图

NIO 非阻塞网络编程相关的(SelectorSelectionKeyServerScoketChannelSocketChannel)关系梳理图

img

对上图的说明:

  1. 当客户端连接时,会通过 ServerSocketChannel 得到 SocketChannel
  2. Selector 进行监听 select 方法,返回有事件发生的通道的个数。
  3. socketChannel 注册到 Selector 上,register(Selector sel, int ops),一个 Selector 上可以注册多个 SocketChannel
  4. 注册后返回一个 SelectionKey,会和该 Selector 关联(集合)。
  5. 进一步得到各个 SelectionKey(有事件发生)。
  6. 在通过 SelectionKey 反向获取 SocketChannel,方法 channel()
  7. 可以通过得到的 channel,完成业务处理。
  8. 代码撑腰。。。

8.NIO 非阻塞网络编程快速入门

案例要求:

  1. 编写一个 NIO 入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2. 目的:理解 NIO 非阻塞网络编程机制
  3. 看老师代码演示

NIOServer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package com.example.netty.discard.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {
public static void main(String[] args) throws Exception{

//创建ServerSocketChannel -> ServerSocket

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//得到一个Selecor对象
Selector selector = Selector.open();

//绑定一个端口6666, 在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);

//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1



//循环等待客户端连接
while (true) {

//这里我们等待1秒,如果没有事件发生, 返回
if(selector.select(1000) == 0) { //没有事件发生
System.out.println("服务器等待了1秒,无连接");
continue;
}

//如果返回的>0, 就获取到相关的 selectionKey集合
//1.如果返回的>0, 表示已经获取到关注的事件
//2. selector.selectedKeys() 返回关注事件的集合
// 通过 selectionKeys 反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys 数量 = " + selectionKeys.size());

//遍历 Set<SelectionKey>, 使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();

while (keyIterator.hasNext()) {
//获取到SelectionKey
SelectionKey key = keyIterator.next();
//根据key 对应的通道发生的事件做相应处理
if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接
//该该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
//将 SocketChannel 设置为非阻塞
socketChannel.configureBlocking(false);
//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel
//关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..


}
if(key.isReadable()) { //发生 OP_READ

//通过key 反向获取到对应channel
SocketChannel channel = (SocketChannel)key.channel();

//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
channel.read(buffer);
System.out.println("form 客户端 " + new String(buffer.array()));

}

//手动从集合中移动当前的selectionKey, 防止重复操作
keyIterator.remove();

}

}

}
}

NIOClient:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.netty.discard.nio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class NIOClient {
public static void main(String[] args) throws Exception{

//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的ip 和 端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)) {

while (!socketChannel.finishConnect()) {
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
}
}

//...如果连接成功,就发送数据
String str = "hello, 尚硅谷~";
//Wraps a byte array into a buffer
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
//发送数据,将 buffer 数据写入 channel
socketChannel.write(buffer);
System.in.read();

}
}

9.SelectionKey

1.SelectionKey,表示 Selector 和网络通道的注册关系, 共四种:

  • int OP_ACCEPT:有新的网络连接可以 accept,值为 16
  • int OP_CONNECT:代表连接已经建立,值为 8
  • int OP_READ:代表读操作,值为 1
  • int OP_WRITE:代表写操作,值为 4

源码中:

1
2
3
4
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

2.SelectionKey 相关方法

img

10.ServerSocketChannel

  1. ServerSocketChannel 在服务器端监听新的客户端 Socket 连接
  2. 相关方法如下

img

11.SocketChannel

  1. SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。
  2. 相关方法如下

img

12.NIO 网络编程应用实例-群聊系统

实例要求:

  1. 编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
  2. 实现多人群聊
  3. 服务器端:可以监测用户上线,离线,并实现消息转发功能
  4. 客户端:通过 Channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(有服务器转发得到)
  5. 目的:进一步理解 NIO 非阻塞网络编程机制
  6. 示意图分析和代码

img

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
// 服务端:

package com.atguigu.nio.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class GroupChatServer {

//定义属性
private Selector selector;
private ServerSocketChannel listenChannel;

private static final int PORT = 6667;

//构造器
//初始化工作
public GroupChatServer() {
try {
//得到选择器
selector = Selector.open();
//ServerSocketChannel
listenChannel = ServerSocketChannel.open();
//绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞模式
listenChannel.configureBlocking(false);
//将该 listenChannel 注册到 selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}

public void listen() {
try {
//循环处理
while (true) {
int count = selector.select();
if (count > 0) { //有事件处理
// 遍历得到 selectionKey 集合
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
//取出 selectionkey
SelectionKey key = iterator.next();
//监听到 accept
if (key.isAcceptable()) {
SocketChannel sc = listenChannel.accept();
sc.configureBlocking(false);
//将该 sc 注册到 seletor
sc.register(selector, SelectionKey.OP_READ);
//提示
System.out.println(sc.getRemoteAddress() + " 上线 ");
}
if (key.isReadable()) {//通道发送read事件,即通道是可读的状态
// 处理读(专门写方法..)
readData(key);
}
//当前的 key 删除,防止重复处理
iterator.remove();
}
} else {
System.out.println("等待....");
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//发生异常处理....
}
}

//读取客户端消息
public void readData(SelectionKey key) {
SocketChannel channel = null;
try {
//得到 channel
channel = (SocketChannel) key.channel();
//创建 buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
//根据 count 的值做处理
if (count > 0) {
//把缓存区的数据转成字符串
String msg = new String(buffer.array());
//输出该消息
System.out.println("form客户端:" + msg);
//向其它的客户端转发消息(去掉自己),专门写一个方法来处理
sendInfoToOtherClients(msg, channel);
}
} catch (IOException e) {
try {
System.out.println(channel.getRemoteAddress() + "离线了..");
//取消注册
key.cancel();
//关闭通道
channel.close();
} catch (IOException e2) {
e2.printStackTrace();
}
}
}

//转发消息给其它客户(通道)
private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException {

System.out.println("服务器转发消息中...");
//遍历所有注册到 selector 上的 SocketChannel,并排除 self
for (SelectionKey key : selector.keys()) {
//通过 key 取出对应的 SocketChannel
Channel targetChannel = key.channel();
//排除自己
if (targetChannel instanceof SocketChannel && targetChannel != self) {
//转型
SocketChannel dest = (SocketChannel) targetChannel;
//将 msg 存储到 buffer
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
//将 buffer 的数据写入通道
dest.write(buffer);
}
}
}

public static void main(String[] args) {
//创建服务器对象
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}

// 客户端:

package com.atguigu.nio.groupchat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;

public class GroupChatClient {

//定义相关的属性
private final String HOST = "127.0.0.1";//服务器的ip
private final int PORT = 6667;//服务器端口
private Selector selector;
private SocketChannel socketChannel;
private String username;

//构造器,完成初始化工作
public GroupChatClient() throws IOException {

selector = Selector.open();
//连接服务器
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
//设置非阻塞
socketChannel.configureBlocking(false);
//将 channel 注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
//得到 username
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + " is ok...");
}

//向服务器发送消息
public void sendInfo(String info) {
info = username + " 说:" + info;
try {
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}

//读取从服务器端回复的消息
public void readInfo() {
try {
int readChannels = selector.select();
if (readChannels > 0) {//有可以用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
//得到相关的通道
SocketChannel sc = (SocketChannel) key.channel();
//得到一个 Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取
sc.read(buffer);
//把读到的缓冲区的数据转成字符串
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
}
iterator.remove(); //删除当前的 selectionKey,防止重复操作
} else {
//System.out.println("没有可以用的通道...");
}
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {

//启动我们客户端
GroupChatClient chatClient = new GroupChatClient();
//启动一个线程,每个 3 秒,读取从服务器发送数据
new Thread() {
public void run() {
while (true) {
chatClient.readInfo();
try {
Thread.currentThread().sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();

//发送数据给服务器端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}

12.NIO 与零拷贝

零拷贝基本介绍

  1. 零拷贝是网络编程的关键,很多性能优化都离不开。
  2. Java 程序中,常用的零拷贝有 mmap(内存映射)和 sendFile。那么,他们在 OS 里,到底是怎么样的一个的设计?我们分析 mmapsendFile 这两个零拷贝
  3. 另外我们看下 NIO 中如何使用零拷贝

传统 IO 数据读写

Java 传统 IO 和网络编程的一段代码

1
2
3
4
5
6
7
8
File file = new File("test.txt");
RandomAccessFile raf = new RandomAccessFile(file, "rw");

byte[] arr = new byte[(int) file.length()];
raf.read(arr);

Socket socket = new ServerSocket(8080).accept();
socket.getOutputStream().write(arr);

传统 IO 模型

img

DMAdirect memory access 直接内存拷贝(不使用 CPU

mmap 优化

  1. mmap 通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以共享内核空间的数据。这样,在进行网络传输时,就可以减少内核空间到用户空间的拷贝次数。如下图
  2. mmap 示意图

img

sendFile 优化

1.Linux2.1 版本提供了 sendFile 函数,其基本原理如下:数据根本不经过用户态,直接从内核缓冲区进入到 SocketBuffer,同时,由于和用户态完全无关,就减少了一次上下文切换

2.示意图和小结

img

3.提示:零拷贝从操作系统角度,是没有 cpu 拷贝

4.Linux在2.4 版本中,做了一些修改,避免了从内核缓冲区拷贝到 Socketbuffer 的操作,直接拷贝到协议栈,从而再一次减少了数据拷贝。具体如下图和小结:

5.这里其实有一次 cpu 拷贝 kernel buffer -> socket buffer 但是,拷贝的信息很少,比如 lenghtoffset 消耗低,可以忽略

零拷贝的再次理解

  1. 我们说零拷贝,是从操作系统的角度来说的。因为内核缓冲区之间,没有数据是重复的(只有 kernel buffer 有一份数据)。
  2. 零拷贝不仅仅带来更少的数据复制,还能带来其他的性能优势,例如更少的上下文切换,更少的 CPU 缓存伪共享以及无 CPU 校验和计算。

mmap 和 sendFile 的区别

  1. mmap 适合小数据量读写,sendFile 适合大文件传输。
  2. mmap 需要 4 次上下文切换,3 次数据拷贝;sendFile 需要 3 次上下文切换,最少 2 次数据拷贝。
  3. sendFile 可以利用 DMA 方式,减少 CPU 拷贝,mmap 则不能(必须从内核拷贝到 Socket缓冲区)。

NIO 零拷贝案例

案例要求:

  1. 使用传统的 IO 方法传递一个大文件
  2. 使用 NIO 零拷贝方式传递(transferTo)一个大文件
  3. 看看两种传递方式耗时时间分别是多少
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
NewIOServer.java

package com.atguigu.nio.zerocopy;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

//服务器
public class NewIOServer {

public static void main(String[] args) throws Exception {
InetSocketAddress address = new InetSocketAddress(7001);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(address);

//创建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);

while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
int readcount = 0;
while (-1 != readcount) {
try {
readcount = socketChannel.read(byteBuffer);
} catch (Exception ex) {
// ex.printStackTrace();
break;
}
//
byteBuffer.rewind(); //倒带 position = 0 mark 作废
}
}
}
}

NewIOClient.java

package com.atguigu.nio.zerocopy;

import java.io.FileInputStream;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;

public class NewIOClient {

public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 7001));
String filename = "protoc-3.6.1-win32.zip";
//得到一个文件channel
FileChannel fileChannel = new FileInputStream(filename).getChannel();
//准备发送
long startTime = System.currentTimeMillis();
//在 linux 下一个 transferTo 方法就可以完成传输
//在 windows 下一次调用 transferTo 只能发送 8m, 就需要分段传输文件,而且要主要
//传输时的位置=》课后思考...
//transferTo 底层使用到零拷贝
long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("发送的总的字节数 = " + transferCount + " 耗时: " + (System.currentTimeMillis() - startTime));

//关闭
fileChannel.close();
}
}

四、Java AIO 基本介绍

  1. JDK7 引入了 AsynchronousI/O,即 AIO。在进行 I/O 编程中,常用到两种模式:ReactorProactorJavaNIO 就是 Reactor,当有事件触发时,服务器端得到通知,进行相应的处理
  2. AIONIO2.0,叫做异步不阻塞的 IOAIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用
  3. 目前 AIO 还没有广泛应用,Netty 也是基于 NIO,而不是 AIO,因此我们就不详解 AIO 了,有兴趣的同学可以参考《Java新一代网络编程模型AIO原理及Linux系统AIO介绍》

五、BIO、NIO、AIO 对比表

BIO NIO AIO
IO模型 同步阻塞 同步非阻塞(多路复用) 异步非阻塞
编程难度 简单 复杂 复杂
可靠性
吞吐量

例子讲解

海底捞很好吃,但是经常要排队。我们就以生活中的这个例子进行讲解。

  • A顾客去吃海底捞,就这样干坐着等了一小时,然后才开始吃火锅。(BIO)
  • B顾客去吃海底捞,他一看要等挺久,于是去逛商场,每次逛一会就跑回来看有没有排到他。于是他最后既购了物,又吃上海底捞了。(NIO)
  • C顾客去吃海底捞,由于他是高级会员,所以店长说,你去商场随便玩吧,等下有位置,我立马打电话给你。于是C顾客不用干坐着等,也不用每过一会儿就跑回来看有没有等到,最后也吃上了海底捞(AIO)

参考:https://juejin.cn/post/6844903985158045703#heading-3

一、HTTP

首先浏览器做的第一步工作就是要对 URL 进行解析,从而生成发送给 Web 服务器的HTTP请求消息。

二、DNS解析

但在发送之前,还有一项工作需要完成,那就是查询服务器域名对应的 IP 地址,因为委托操作系统发送消息时,必须提供通信对象的 IP 地址。

查找域名对应的IP地址

通过域名查找IP过程:浏览器缓存 -> 系统缓存 -> 本地DNS服务器缓存

  • 浏览器搜索自己的DNS缓存(维护一张域名与IP地址对应表)
  • 搜索操作系统中的DNS缓存(维护一张域名与IP地址对应表)
  • 搜索操作系统的hosts文件(windows环境下,维护一张域名与IP地址对应表)
  • 操作系统将域名发送到本地DNS服务器,进行查找,成功则返回结果;失败,本地DNS会去问它的根域名服务器(具体过程如下图)
  • 本地DNS服务器将得到的IP地址返回给操作系统,同时也将IP地址缓存起来
  • 操作系统将IP地址返回给浏览器,同时将IP地址缓存起来

域名的层级关系类似一个树状结构:

  • 根 DNS 服务器(.)
  • 顶级域 DNS 服务器(.com)
  • 权威 DNS 服务器(server.com)

三、TCP

在HTTP传输数据之前,首先需要TCP建立连接,TCP连接的建立,通常称为三次握手

三次握手目的是保证双方都有发送和接收的能力

TCP 分割数据

如果 HTTP 请求消息比较长,超过了 MSS 的长度,这时 TCP 就需要把 HTTP 的数据拆解成一块块的数据发送,而不是一次性发送所有数据。

  • MTU:一个网络包的最大长度,以太网中一般为 1500 字节。
  • MSS:除去 IP 和 TCP 头部之后,一个网络包所能容纳的 TCP 数据的最大长度。

数据会被以 MSS 的长度为单位进行拆分,拆分出来的每一块数据都会被放进单独的网络包中。也就是在每个被拆分的数据加上 TCP 头信息,然后交给 IP 模块来发送数据。

四、IP

TCP模块在执行连接、收发、断开等各阶段操作时,都需要委托IP模块将数据封装成网络包发送给通信对象。

五、MAC

生成了IP头部之后,接下来网络包还需要在IP头部的前面加上MAC头部

在将IP地址转换为MAC地址的过程中,需要使用ARP协议,即地址解析协议(Address Resolution Protocol)

六、各种协议与HTTP协议的关系

参考:

https://xiaolincoding.com/network/1_base/what_happen_url.html

《图解HTTP》

一、什么是Elasticsearch

Elasticsearch是基于 Lucene 的 Restful 的分布式实时全文搜索引擎,每个字段都被索引并可被搜索,可以快速存储、搜索、分析海量的数据。

全文检索是指对每一个词建立一个索引,指明该词在文章中出现的次数和位置。当查询时,根据事先建立的索引进行查找,并将查找的结果反馈给用户的检索方式。这个过程类似于通过字典中的检索字表查字的过程。

二、结构化数据和非结构化数据

结构化数据

结构化数据是在放入数据存储之前已经预定义并格式化为集合结构的数据,这通常被称为写时模式。 结构化数据的最佳示例是关系数据库:数据已被格式化为精确定义的字段,例如信用卡号或地址,以便使用 SQL 轻松查询。

非结构化数据

非结构化数据是以其初始格式存储的数据,在使用之前不会对其进行处理,这称为读取模式。 它有多种文件格式,如媒体、图像、音频、传感器数据、文本数据等。

三、为什么不直接使用Lucene?

Lucene可以说是当下最先进、高性能、全功能的搜索引擎库。但是 Lucene 仅仅只是一个库并且Lucene 非常复杂。

Elasticsearch通过隐藏Lucene的复杂性,取而代之的提供一套简单一致的RESTful API。

Elasticsearch不仅仅是Lucene,并且也不仅仅只是一个全文搜索引擎。它可以被下面这样准确的形容:

四、为什么不用MySQL模糊查询

1
select * from user where name like '%yuanli%'

这不就可以把yuanli相关的内容搜索出来了吗?

的确,这样做的确可以。但是要明白的是:name like %yuanli%这类的查询是不走索引的,不走索引意味着:只要你的数据库的量很大(1亿条),你的查询肯定会是级别的。

而且,即便给你从数据库根据模糊匹配查出相应的记录了,那往往会返回大量的数据给你,往往你需要的数据量并没有这么多,可能50条记录就足够了。

还有一个就是:用户输入的内容往往并没有这么的精确,比如我从Google输入ElastcSeach(打错字),但是Google还是能估算我想输入的是Elasticsearch

而Elasticsearch是专门做搜索的,就是为了解决上面所讲的问题而生的,换句话说:

  • Elasticsearch对模糊搜索非常擅长(搜索速度很快)
  • 从Elasticsearch搜索到的数据可以根据评分过滤掉大部分的,只要返回评分高的给用户就好了(原生就支持排序)
  • 没有那么准确的关键字也能搜出相关的结果(能匹配有相关性的记录)

五、Elasticsearch的基础概念

Elasticsearch 是面向文档型数据库,一条数据在这里就是一个文档。

  • Near Realtime(NRT) 近实时。数据提交索引后,立马就可以搜索到。
  • Cluster 集群,一个集群由一个唯一的名字标识,默认为“elasticsearch”。集群名称非常重要,具有相同集群名的节点才会组成一个集群。集群名称可以在配置文件中指定。
  • Node 节点:存储集群的数据,参与集群的索引和搜索功能。像集群有名字,节点也有自己的名称,默认在启动时会以一个随机的UUID的前七个字符作为节点的名字,你可以为其指定任意的名字。通过集群名在网络中发现同伴组成集群。一个节点也可是集群。
  • Index 索引: 一个索引是一个文档的集合(等同于solr中的集合)。每个索引有唯一的名字,通过这个名字来操作它。一个集群中可以有任意多个索引。
  • Type 类型:指在一个索引中,可以索引不同类型的文档,如用户数据、博客数据。从6.0.0 版本起已废弃,一个索引中只存放一类数据。
  • Document 文档:被索引的一条数据,索引的基本信息单元,以JSON格式来表示。
  • Shard 分片:在创建一个索引时可以指定分成多少个分片来存储。每个分片本身也是一个功能完善且独立的“索引”,可以被放置在集群的任意节点上。
  • Replication 备份: 一个分片可以有多个备份(副本)

一、进程

进程是一个具有一定独立功能的程序,是操作系统进行资源分配和调度的独立单位,是应用程序运行的载体。

二、线程

线程是程序执行中一个单一的顺序控制流程,是程序执行流的最小单元,是处理器调度和分派的基本单位。

进程与线程的区别

1.线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;

2.一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;

3.进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间及一些进程级的资源,某进程内的线程在其它进程不可见;

4.调度和切换:线程上下文切换比进程上下文切换要快得多。

三、协程

协程在线程内执行。 一个线程内部可以有多个协程,但在给定时间一个线程中只能执行一条指令。 这意味着如果在同一个线程中有十个协程,那么在给定的时间点只会运行其中一个。

协程的优点

1.线程的切换由操作系统负责调度,协程由用户自己进行调度,因此减少了上下文切换,提高了效率。

2.由于在同一个线程上,因此可以避免竞争关系而使用锁。

参考:

https://www.cnblogs.com/Survivalist/p/11527949.html#%E5%8D%8F%E7%A8%8B

https://www.liaoxuefeng.com/wiki/1016959663602400/1017968846697824

一、归并排序(Merge sort)

  • 基本思路:借助额外空间,合并两个有序数组,得到更长的有序数组。

  • 算法思想:分而治之(分治思想)。

算法步骤:

归并排序算法是一个递归过程,边界条件为当输入序列仅有一个元素时,直接返回,具体过程如下:

1.如果输入内只有一个元素,则直接返回,否则将长度为 n 的输入序列分成两个长度为 n/2 的子序列;

2.分别对这两个子序列进行归并排序,使子序列变为有序状态;

3.设定两个指针,分别指向两个已经排序子序列的起始位置;

4.比较两个指针所指向的元素,选择相对小的元素放入到合并空间(用于存放排序结果),并移动指针到下一位置;

5.重复步骤 3 ~4 直到某一指针达到序列尾;

6.将另一序列剩下的所有元素直接复制到合并序列尾。

7.将合并空间排好序的数据复制到原数组的对应位置。

图解算法:

代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/* 归并排序 - 递归实现 */

/* L = 左边起始位置, R = 右边起始位置, RightEnd = 右边终点位置*/
void Merge( ElementType A[], ElementType TmpA[], int L, int R, int RightEnd )
{ /* 将有序的A[L]~A[R-1]和A[R]~A[RightEnd]归并成一个有序序列 */
int LeftEnd, NumElements, Tmp;
int i;

LeftEnd = R - 1; /* 左边终点位置 */
Tmp = L; /* 有序序列的起始位置 */
NumElements = RightEnd - L + 1;

while( L <= LeftEnd && R <= RightEnd ) {
if ( A[L] <= A[R] )
TmpA[Tmp++] = A[L++]; /* 将左边元素复制到TmpA */
else
TmpA[Tmp++] = A[R++]; /* 将右边元素复制到TmpA */
}

while( L <= LeftEnd )
TmpA[Tmp++] = A[L++]; /* 直接复制左边剩下的 */
while( R <= RightEnd )
TmpA[Tmp++] = A[R++]; /* 直接复制右边剩下的 */

for( i = 0; i < NumElements; i++, RightEnd -- )
A[RightEnd] = TmpA[RightEnd]; /* 将有序的TmpA[]复制回A[] */
}

void Msort( ElementType A[], ElementType TmpA[], int L, int RightEnd )
{ /* 核心递归排序函数 */
int Center;

if ( L < RightEnd ) {
Center = (L+RightEnd) / 2;
Msort( A, TmpA, L, Center ); /* 递归解决左边 */
Msort( A, TmpA, Center+1, RightEnd ); /* 递归解决右边 */
Merge( A, TmpA, L, Center+1, RightEnd ); /* 合并两段有序序列 */
}
}

void MergeSort( ElementType A[], int N )
{ /* 归并排序 */
ElementType *TmpA;
TmpA = (ElementType *)malloc(N*sizeof(ElementType));

if ( TmpA != NULL ) {
Msort( A, TmpA, 0, N-1 );
free( TmpA );
}
else printf( "空间不足" );
}
算法分析:
  • 稳定性:稳定
  • 时间复杂度 :最佳:O(nlogn), 最差:O(nlogn), 平均:O(nlogn)
  • 空间复杂度 :O(n)

二、快速排序(Quicksort)

算法步骤:

快速排序使用分治法(Divide and conquer)策略来把一个序列分为较小和较大的 2 个子序列,然后递回地排序两个子序列。具体算法描述如下:

1.从序列中随机挑出一个元素,做为"基准"(pivot);

2.重新排列序列,将所有比基准值小的元素摆放在基准前面,所有比基准值大的摆在基准的后面(相同的数可以到任一边)。在这个操作结束之后,该基准就处于数列的中间位置。这个称为分区(partition)操作;

3.递归地把小于基准值元素的子序列和大于基准值元素的子序列进行快速排序。

实现细节:

1.怎么选取pivot:

  • 调用随件函数,获得随机的pivot
  • 取数组头,中,尾三个数的中位数作为pivot

2.子集划分过程:

  • 每趟排序后,将与分界元素相等的元素聚集在分界元素周围,这样可以避免极端数据(如序列中大部分元素都相等)带来的退化。

3.当序列较短时,使用插入排序的效率更高。

图解算法:

代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public class QuickSort {

// 快速排序,a是数组,n表示数组的大小
public static void quickSort(int[] a, int n) {
quickSortInternally(a, 0, n-1);
}

// 快速排序递归函数,p,r为下标
private static void quickSortInternally(int[] a, int p, int r) {
if (p >= r) return;

int q = partition(a, p, r); // 获取分区点
quickSortInternally(a, p, q-1);
quickSortInternally(a, q+1, r);
}

private static int partition(int[] a, int p, int r) {
int pivot = a[r];
int i = p;
for(int j = p; j < r; ++j) {
if (a[j] < pivot) {
if (i == j) {
++i;
} else {
int tmp = a[i];
a[i] = a[j];
a[j] = tmp;
++i;
}
}
}

int tmp = a[i];
a[i] = a[r];
a[r] = tmp;

System.out.println("i=" + i);
return i;
}
}

三、两者的区别

归并排序的处理过程是由下到上的,先处理子问题,然后再合并。而快排正好相反,它的处理过程是由上到下的,先分区,然后再处理子问题。归并排序虽然是稳定的、时间复杂度为 O(nlogn) 的排序算法,但是它是非原地排序算法。我们前面讲过,归并之所以是非原地排序算法,主要原因是合并函数无法在原地执行。快速排序通过设计巧妙的原地分区函数,可以实现原地排序,解决了归并排序占用太多内存的问题。