在之前的文章中,我们模拟了节点网络通讯,很多朋友反馈说,他们想看真正的节点网络通讯而不是单节点的模拟。本章将满足你们。😌
我将本章的内容放在了com.v5ent.real.p2p包中,大家可以在源码中找到我更新的代码。
通过本文,你将可以做到:
- 创建自己的真实peer-to-peer网络
- 多个节点通过p2p网络同步区块内容
- 在自我节点实现RPC通讯,并向区块中写数据
- 在自我节点查看整个区块内容
- 不含虚拟货币在内的其他区块链知识
学习本章至少需要两台可ping通的机器(虚拟机也可以),并且安装了jdk8(如果只有jdk6,可以把代码改为jdk6支持的形式)。
基于之前的文章,本章将重写p2p部分的内容。我们首先要理清思路,提出区块链通讯需要解决的问题。
1.如果我第一次作为全节点启动,我需要干什么
2.如果我已经启动,别的节点和我通讯,我需要交互哪些信息
3.如果我已经启动,我怎么和自己的客户端交互
这些问题有一个很好的参考对象,就是比特币,我们先来看看比特币的通讯过程
一、比特币节点连接建立
1.寻找比特币网络中的有效节点,此步骤通常有两种方法:
(1)使用“DNS种子”(DNS seeds),DNS种子提供比特币节点的IP地址列表,Bitcoin Core客户端提供五种不同的DNS种子,通常使用默认方式。这里不展开谈论。
(2)手动通过-seednode命令指定一个比特币节点的IP地址作为比特币种子节点(为什么叫种子,我的理解就是,根据种子,得到更多)
这里我们使用简单的方式来处理节点,用一个文件来存储地址列表-peers.list,第一次连接的时候更新这个文件,获取更多连接时也更新这个文件。简单演示我们直接把要测试的机器ip和端口加进去。
我有两台机,我把端口8015用作新的p2p网络的默认监听端口,那么我的peers.list的内容是:
10.16.0.205:8015
10.16.3.77:8015
2.与发现的有效比特币节点进行初始“握手”,建立连接
节点发送一条包含基本认证内容的version消息开始“握手”通信过程,该消息包括如下内容:
- nVersion:客户端的比特币P2P协议所采用的版本(例如:70002)。
- nLocalServices:一组该节点支持的本地服务列表,当前仅支持NODE_NETWORK
- nTime:当前时间
- addrYou:当前节点可见的远程节点的IP地址(上例中NodeB IP)
- addrMe:当前节点的IP地址(上例中NodeA IP)
- subver:指示当前节点运行的软件类型的子版本号(例如:”/Satoshi:0.9.2.1/”)
- BestHeight:当前节点区块链的区块高度(初始为0,即只包含创世区块)
我们简化一下,这里我们最关心的就是区块高度bestHeight,我们就传递这个好了。区块高度就是区块链的长度。
if ("VERSION".equalsIgnoreCase(cmd)) {
// 对方发来握手信息,我方发给对方区块高度和最新区块的hash
pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash());
}else if ("VERACK".equalsIgnoreCase(cmd)) {
// 获取区块高度
String[] parts = payload.split(" ");
bestHeight = Integer.parseInt(parts[0]);
//哈希暂时不校验
}
3.新节点建立更多的连接,使节点在网络中被更多节点接收,保证连接更稳定
这里我们就两台机,如果你有更多机器,可以实现一下这个通讯,本章我们简单实现一下。
if ("ADDR".equalsIgnoreCase(cmd)) {
// 对方发来地址,建立连接并保存
if (!peers.contains(payload)) {
String peerAddr = payload.substring(0, payload.indexOf(":"));
int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
peerNetwork.connect(peerAddr, peerPort);
peers.add(payload);
PrintWriter out = new PrintWriter(peerFile);
for (int k = 0; k < peers.size(); k++) {
out.println(peers.get(k));
}
out.close();
}
} else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
//对方请求更多peer地址,随机给一个
Random random = new Random();
pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size())));
}
4.交换“区块清单”(注:该步骤仅在全节点上会执行,且从与节点建立连接就开始进行)本系列内容只使用全节点。
全节点
全节点沿着区块链按时间倒叙一直追溯到创世区块,建立一个完整的UTXO数据库,通过查询UTXO是否未被支付来验证交易的有效性。
SPV节点
SPV节点通过向其他节点请求某笔交易的Merkle路径(Merkle树我可能会在后续章节讲到),如果路径正确无误,并且该交易之上已有6个或以上区块被确认,则证明该交易不是双重支付。
全节点在连接到其他节点后,需要构建完整的区块链,如果是新节点,它仅包含静态植入客户端中的0号区块(创世区块)。注意了,创世区块是静态的(硬编码)。
如前文所言,我们在区块链中取最长的链,区块高度比我高,我就向对方获取区块。
else if ("BLOCK".equalsIgnoreCase(cmd)) {
//把对方给的块存进链中
LOGGER.info("Attempting to add block...");
LOGGER.info("Block: " + payload);
Block newBlock = gson.fromJson(payload, Block.class);
if (!blockChain.contains(newBlock)) {
// 校验区块,如果成功,将其写入本地区块链
if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
if (blockChain.add(newBlock) && !catchupMode) {
LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]");
peerNetwork.broadcast("BLOCK " + payload);
}
}
}
} else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
//把对方请求的块给对方
LOGGER.info("Sending block[" + payload + "] to peer");
Block block = blockChain.get(Integer.parseInt(payload));
if (block != null) {
LOGGER.info("Sending block " + payload + " to peer");
pt.peerWriter.write("BLOCK " + gson.toJson(block));
}
}
到这里,我们基本上完成了p2p网络中关于区块的通讯。
其实比特币等虚拟货币中还有很多通讯,关于交易的,这里我们不需要,不做讨论。
让我们开始编码吧!
整合上文提到的所有通讯,Node.java的代码如下
private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);
/** 本地区块链 */
private static List<Block> blockChain = new LinkedList<Block>();
public static void main(String[] args) throws IOException, InterruptedException {
int port = 8015;
LOGGER.info("Starting peer network... ");
PeerNetwork peerNetwork = new PeerNetwork(port);
peerNetwork.start();
LOGGER.info("[ Node is Started in port:"+port+" ]"); 17 ArrayList<String> peers = new ArrayList<>();
File peerFile = new File("peers.list");
if (!peerFile.exists()) {
String host = InetAddress.getLocalHost().toString();
FileUtils.writeStringToFile(peerFile, host+":"+port);
}
for (Object peer : FileUtils.readLines(peerFile)) {
String[] addr = peer.toString().split(":");
peerNetwork.connect(addr[0], Integer.parseInt(addr[1]));
}
TimeUnit.SECONDS.sleep(2);
peerNetwork.broadcast("VERSION");
// hard code genesisBlock
Block genesisBlock = new Block();
genesisBlock.setIndex(0);
genesisBlock.setTimestamp("2017-07-13 22:32:00");//my son's birthday
genesisBlock.setVac(0);
genesisBlock.setPrevHash("");
genesisBlock.setHash(BlockUtils.calculateHash(genesisBlock));
blockChain.add(genesisBlock);
final Gson gson = new GsonBuilder().create();
LOGGER.info(gson.toJson(blockChain));
int bestHeight = 0;
boolean catchupMode = true;
/**
* p2p 通讯
*/
while (true) {
//对新连接过的peer写入文件,下次启动直接连接
for (String peer : peerNetwork.peers) {
if (!peers.contains(peer)) {
peers.add(peer);
FileUtils.writeStringToFile(peerFile, peer);
}
}
peerNetwork.peers.clear();
// 处理通讯
for (PeerThread pt : peerNetwork.peerThreads) {
if (pt == null || pt.peerReader == null) {
break;
}
List<String> dataList = pt.peerReader.readData();
if (dataList == null) {
LOGGER.info("Null ret retry.");
System.exit(-5);
break;
}
for (String data:dataList) {
LOGGER.info("Got data: " + data);
int flag = data.indexOf(' ');
String cmd = flag >= 0 ? data.substring(0, flag) : data;
String payload = flag >= 0 ? data.substring(flag + 1) : "";
if (StringUtils.isNotBlank(cmd)) {
if ("VERSION".equalsIgnoreCase(cmd)) {
// 对方发来握手信息,我方发给对方区块高度和最新区块的hash
pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash());
}else if ("VERACK".equalsIgnoreCase(cmd)) {
// 获取区块高度
String[] parts = payload.split(" ");
bestHeight = Integer.parseInt(parts[0]);
//哈希暂时不校验
} else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
//把对方请求的块给对方
LOGGER.info("Sending block[" + payload + "] to peer");
Block block = blockChain.get(Integer.parseInt(payload));
if (block != null) {
LOGGER.info("Sending block " + payload + " to peer");
pt.peerWriter.write("BLOCK " + gson.toJson(block));
}
} else if ("BLOCK".equalsIgnoreCase(cmd)) {
//把对方给的块存进链中
LOGGER.info("Attempting to add block...");
LOGGER.info("Block: " + payload);
Block newBlock = gson.fromJson(payload, Block.class);
if (!blockChain.contains(newBlock)) {
// 校验区块,如果成功,将其写入本地区块链
if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
if (blockChain.add(newBlock) && !catchupMode) {
LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]");
peerNetwork.broadcast("BLOCK " + payload);
}
}
}
}else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
//对方请求更多peer地址,随机给一个
Random random = new Random();
pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size())));
} else if ("ADDR".equalsIgnoreCase(cmd)) {
// 对方发来地址,建立连接并保存
if (!peers.contains(payload)) {
String peerAddr = payload.substring(0, payload.indexOf(":"));
int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
peerNetwork.connect(peerAddr, peerPort);
peers.add(payload);
PrintWriter out = new PrintWriter(peerFile);
for (int k = 0; k < peers.size(); k++) {
out.println(peers.get(k));
}
out.close();
}
}
}
}
}
// ********************************
// 比较区块高度,同步区块
// ********************************
int localHeight = blockChain.size();
if (bestHeight > localHeight) {
catchupMode = true;
LOGGER.info("Local chain height: " + localHeight);
LOGGER.info("Best chain Height: " + bestHeight);
TimeUnit.MILLISECONDS.sleep(300);
for (int i = localHeight; i < bestHeight; i++) {
LOGGER.info("请求块 " + i + "...");
peerNetwork.broadcast("GET_BLOCK " + i);
}
} else {
if (catchupMode) {
LOGGER.info("[p2p] - Caught up with network.");
}
catchupMode = false;
}
// ****************
// 循环结束
// ****************
TimeUnit.MILLISECONDS.sleep(200);
}
}
PeerNetwork简单封装了一下p2p通讯的细节,篇幅有限我这里只列出核心交互,具体实现可以去看我的github源码中的类:PeerThread、PeerReader、PeerWriter。
RPC
接下来,我们将讨论关于本地节点客户端的概念。在比特币中,除了bitcoin-core,还有bitcoin-cli,这是做什么的呢。
它其实是用来做本地节点交互的,比如我作为本地节点,我需要发起交易,需要查看我的资产等等,后来发展出gui界面,就是大家俗称的钱包。
在本章中,我们也需要这样一个客户端通讯,用来将我们的vac写入链中(之前的文章,我们是用控制台输入的,实际的做法是提供加密的rpc调用)我们接下来实现RPC服务
首先我们要在Node.java中加入通讯逻辑
LOGGER.info("Starting RPC daemon... ");
RpcServer rpcAgent = new RpcServer(port+1);
rpcAgent.start();
LOGGER.info("[ RPC agent is Started in port:"+(port+1)+" ]");
// for循环体中增加,处理RPC服务
for (RpcThread th:rpcAgent.rpcThreads) {
String request = th.req;
if (request != null) {
String[] parts = request.split(" ");
parts[0] = parts[0].toLowerCase();
if ("getinfo".equals(parts[0])) {
String res = gson.toJson(blockChain);
th.res = res;
} else if ("send".equals(parts[0])) {
try {
int vac = Integer.parseInt(parts[1]);
// 根据vac创建区块
Block newBlock = BlockUtils.generateBlock(blockChain.get(blockChain.size() - 1), vac);
if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
blockChain.add(newBlock);
th.res = "write Success!";
peerNetwork.broadcast("BLOCK " + gson.toJson(newBlock));
} else {
th.res = "RPC 500: Invalid vac Error\n";
}
} catch (Exception e) {
th.res = "Syntax (no '<' or '>'): send <vac> <privateKey>";
LOGGER.error("invalid vac", e);
}
} else {
th.res = "Unknown command: "" + parts[0] + """;
}
}
}
独立线程处理
RpcThread.java
package com.v5ent.real.p2p;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 处理单个rpc连接
* @author Mignet
*/
public class RpcThread extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(RpcThread.class);
private Socket socket;
String res;
String req;
/**
* 默认构造函数
* @param socket
*/
public RpcThread(Socket socket){
this.socket = socket;
}
@Override
public void run(){
try{
req = null;
res = null;
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String input;
out.println("================Welcome RPC Daemon==============");
while((input = in.readLine()) != null){
if ("HELP".equalsIgnoreCase(input)){
out.println("############################################## COMMANDS ###############################################");
out.println("# 1) getinfo - Gets block chain infomations. #");
out.println("# 2) send <vac> - Write <vac> to blockChain #");
out.println("#######################################################################################################");
} else {
req = input;
while (res == null){
TimeUnit.MILLISECONDS.sleep(25);
}
out.println(res);
req = null;
res = null;
}
}
} catch (Exception e){
LOGGER.info("An RPC client has disconnected.",e);
}
}
}
RpcServer.java
package com.v5ent.real.p2p;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* RPC服务
*
* 注意:不要把这个端口开放给外网
* @author Mignet
*/
public class RpcServer extends Thread
{
private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
private int port;
private boolean runFlag = true;
List<RpcThread> rpcThreads;
/**
* 默认配置
*/
public RpcServer()
{
this.port = 8016;
this.rpcThreads = new ArrayList<>();
}
/**
* 指定端口
* @param port Port to listen on
*/
public RpcServer(int port)
{
this.port = port;
this.rpcThreads = new ArrayList<>();
}
@Override
public void run()
{
try
{
ServerSocket socket = new ServerSocket(port);
while (runFlag)
{
RpcThread thread = new RpcThread(socket.accept());
rpcThreads.add(thread);
thread.start();
}
socket.close();
} catch (Exception e){
LOGGER.error("rpc error in port:" + port,e);
}
}
}
跑起来
1.使用mvn的install命令打包
2.新建peers.list,把要组建网络的ip地址填入去,在本机执行jar命令,启动第一个节点。注意,这时候它会尝试连接别的节点,连接不上
3.把jar包和peers.list上传到其他机器,启动第二个节点
4.我们用cmd在本机打开新的窗口,执行nc 127.0.0.1 8016,连接到本机节点的rpc服务,输入help查看支持的命令:
5.节点1(本机)增加一个区块:在rpc命令中,我们实现了1,查看区块链;2,写入vac数据,来验证一下
我们先输入getinfo查看一下,然后send 88,看到写入成功了,再输入getinfo,果然看到了新的块在链中。
我们也可以看到节点控制台的输出中关于新增区块的信息
6.验证是不是同步了:我们看一下另一台机器
我们在这台机器上也使用nc localhost 8016来连接看看区块
7.我们再从这个结点写入一个块(send 666)
看看本机接收到了没
很完美。
到此基本演示了区块链通讯真实的样子。当然,这里有很多可以改进的地方,比如安全性,比如命令的模式,比如不用sleep而是用Future,比如使用netty等更高效更成熟的通讯框架。
如果想利用区块链来发行数字货币,那么在此基础上,还要有公私钥签名交易,交易通讯,校验,使用共识算法来选举及奖励货币等。
还有什么区块链知识是本系列没有提到的吗?有的,使用默克尔树来快速验证区块和整个链.
关于币的问题也可以问,比如UTXO模型可以讲一讲吗?如果有问题请大家留言
本文由 创作,采用 知识共享署名4.0 国际许可协议进行许可。本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。最后编辑时间为: 2022/03/17 01:36