只用120行Java代码写一个自己的区块链-4实现真正的p2p网络

/ Java / 没有评论 / 1434浏览

在之前的文章中,我们模拟了节点网络通讯,很多朋友反馈说,他们想看真正的节点网络通讯而不是单节点的模拟。本章将满足你们。😌

我将本章的内容放在了com.v5ent.real.p2p包中,大家可以在源码中找到我更新的代码。

通过本文,你将可以做到:

学习本章至少需要两台可ping通的机器(虚拟机也可以),并且安装了jdk8(如果只有jdk6,可以把代码改为jdk6支持的形式)。

基于之前的文章,本章将重写p2p部分的内容。我们首先要理清思路,提出区块链通讯需要解决的问题。

1.如果我第一次作为全节点启动,我需要干什么

2.如果我已经启动,别的节点和我通讯,我需要交互哪些信息

3.如果我已经启动,我怎么和自己的客户端交互

这些问题有一个很好的参考对象,就是比特币,我们先来看看比特币的通讯过程

一、比特币节点连接建立

1.寻找比特币网络中的有效节点,此步骤通常有两种方法:

(1)使用“DNS种子”(DNS seeds),DNS种子提供比特币节点的IP地址列表,Bitcoin Core客户端提供五种不同的DNS种子,通常使用默认方式。这里不展开谈论。

(2)手动通过-seednode命令指定一个比特币节点的IP地址作为比特币种子节点(为什么叫种子,我的理解就是,根据种子,得到更多)

这里我们使用简单的方式来处理节点,用一个文件来存储地址列表-peers.list,第一次连接的时候更新这个文件,获取更多连接时也更新这个文件。简单演示我们直接把要测试的机器ip和端口加进去。

我有两台机,我把端口8015用作新的p2p网络的默认监听端口,那么我的peers.list的内容是:

10.16.0.205:8015

10.16.3.77:8015

2.与发现的有效比特币节点进行初始“握手”,建立连接 1

节点发送一条包含基本认证内容的version消息开始“握手”通信过程,该消息包括如下内容:

我们简化一下,这里我们最关心的就是区块高度bestHeight,我们就传递这个好了。区块高度就是区块链的长度。

if ("VERSION".equalsIgnoreCase(cmd)) {
  // 对方发来握手信息,我方发给对方区块高度和最新区块的hash
  pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash());
}else if ("VERACK".equalsIgnoreCase(cmd)) {
  // 获取区块高度
  String[] parts = payload.split(" ");
  bestHeight = Integer.parseInt(parts[0]);
  //哈希暂时不校验
}

3.新节点建立更多的连接,使节点在网络中被更多节点接收,保证连接更稳定 2

这里我们就两台机,如果你有更多机器,可以实现一下这个通讯,本章我们简单实现一下。

if ("ADDR".equalsIgnoreCase(cmd)) {
  // 对方发来地址,建立连接并保存
  if (!peers.contains(payload)) {
    String peerAddr = payload.substring(0, payload.indexOf(":"));
    int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
    peerNetwork.connect(peerAddr, peerPort);
    peers.add(payload);
    PrintWriter out = new PrintWriter(peerFile);
    for (int k = 0; k < peers.size(); k++) {
      out.println(peers.get(k));
    }
    out.close();
  }
} else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
  //对方请求更多peer地址,随机给一个
  Random random = new Random();
  pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size())));
}

4.交换“区块清单”(注:该步骤仅在全节点上会执行,且从与节点建立连接就开始进行)本系列内容只使用全节点。

全节点

全节点沿着区块链按时间倒叙一直追溯到创世区块,建立一个完整的UTXO数据库,通过查询UTXO是否未被支付来验证交易的有效性。

SPV节点

SPV节点通过向其他节点请求某笔交易的Merkle路径(Merkle树我可能会在后续章节讲到),如果路径正确无误,并且该交易之上已有6个或以上区块被确认,则证明该交易不是双重支付。

全节点在连接到其他节点后,需要构建完整的区块链,如果是新节点,它仅包含静态植入客户端中的0号区块(创世区块)。注意了,创世区块是静态的(硬编码)。 3

如前文所言,我们在区块链中取最长的链,区块高度比我高,我就向对方获取区块。

else if ("BLOCK".equalsIgnoreCase(cmd)) {
  //把对方给的块存进链中
  LOGGER.info("Attempting to add block...");
  LOGGER.info("Block: " + payload);
  Block newBlock = gson.fromJson(payload, Block.class);
  if (!blockChain.contains(newBlock)) {
    // 校验区块,如果成功,将其写入本地区块链
    if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
      if (blockChain.add(newBlock) && !catchupMode) {
        LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]");
        peerNetwork.broadcast("BLOCK " + payload);
      }
    }
  }
} else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
  //把对方请求的块给对方
  LOGGER.info("Sending block[" + payload + "] to peer");
  Block block = blockChain.get(Integer.parseInt(payload));
  if (block != null) {
    LOGGER.info("Sending block " + payload + " to peer");
    pt.peerWriter.write("BLOCK " + gson.toJson(block));
  }
}

到这里,我们基本上完成了p2p网络中关于区块的通讯。

其实比特币等虚拟货币中还有很多通讯,关于交易的,这里我们不需要,不做讨论。

让我们开始编码吧!

整合上文提到的所有通讯,Node.java的代码如下

private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);

/** 本地区块链 */
private static List<Block> blockChain = new LinkedList<Block>();

public static void main(String[] args) throws IOException, InterruptedException {
  int port = 8015;
  LOGGER.info("Starting peer network...  ");
  PeerNetwork peerNetwork = new PeerNetwork(port);
  peerNetwork.start();
  LOGGER.info("[  Node is Started in port:"+port+"  ]"); 17         ArrayList<String> peers = new ArrayList<>();
  File peerFile = new File("peers.list");
  if (!peerFile.exists()) {
    String host = InetAddress.getLocalHost().toString();
    FileUtils.writeStringToFile(peerFile, host+":"+port);
  }
  for (Object peer : FileUtils.readLines(peerFile)) {
    String[] addr = peer.toString().split(":");
    peerNetwork.connect(addr[0], Integer.parseInt(addr[1]));
  }
  TimeUnit.SECONDS.sleep(2);

  peerNetwork.broadcast("VERSION");

  // hard code genesisBlock
  Block genesisBlock = new Block();
  genesisBlock.setIndex(0);
  genesisBlock.setTimestamp("2017-07-13 22:32:00");//my son's birthday
  genesisBlock.setVac(0);
  genesisBlock.setPrevHash("");
  genesisBlock.setHash(BlockUtils.calculateHash(genesisBlock));
  blockChain.add(genesisBlock);

  final Gson gson = new GsonBuilder().create();
  LOGGER.info(gson.toJson(blockChain));
  int bestHeight = 0;
  boolean catchupMode = true;

  /**
* p2p 通讯
*/
  while (true) {
    //对新连接过的peer写入文件,下次启动直接连接
    for (String peer : peerNetwork.peers) {
      if (!peers.contains(peer)) {
        peers.add(peer);
        FileUtils.writeStringToFile(peerFile, peer);
      }
    }
    peerNetwork.peers.clear();

    // 处理通讯
    for (PeerThread pt : peerNetwork.peerThreads) {
      if (pt == null || pt.peerReader == null) {
        break;
      }
      List<String> dataList = pt.peerReader.readData();
      if (dataList == null) {
        LOGGER.info("Null ret retry.");
        System.exit(-5);
        break;
      }

      for (String data:dataList) {
        LOGGER.info("Got data: " + data);
        int flag = data.indexOf(' ');
        String cmd = flag >= 0 ? data.substring(0, flag) : data;
        String payload = flag >= 0 ? data.substring(flag + 1) : "";
        if (StringUtils.isNotBlank(cmd)) {
          if ("VERSION".equalsIgnoreCase(cmd)) {
            // 对方发来握手信息,我方发给对方区块高度和最新区块的hash
            pt.peerWriter.write("VERACK " + blockChain.size() + " " + blockChain.get(blockChain.size() - 1).getHash());
          }else if ("VERACK".equalsIgnoreCase(cmd)) {
            // 获取区块高度
            String[] parts = payload.split(" ");
            bestHeight = Integer.parseInt(parts[0]);
            //哈希暂时不校验
          } else if ("GET_BLOCK".equalsIgnoreCase(cmd)) {
            //把对方请求的块给对方
            LOGGER.info("Sending block[" + payload + "] to peer");
            Block block = blockChain.get(Integer.parseInt(payload));
            if (block != null) {
              LOGGER.info("Sending block " + payload + " to peer");
              pt.peerWriter.write("BLOCK " + gson.toJson(block));
            }
          } else if ("BLOCK".equalsIgnoreCase(cmd)) {
            //把对方给的块存进链中
            LOGGER.info("Attempting to add block...");
            LOGGER.info("Block: " + payload);
            Block newBlock = gson.fromJson(payload, Block.class);
            if (!blockChain.contains(newBlock)) {
              // 校验区块,如果成功,将其写入本地区块链
              if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
                if (blockChain.add(newBlock) && !catchupMode) {
                  LOGGER.info("Added block " + newBlock.getIndex() + " with hash: ["+ newBlock.getHash() + "]");
                  peerNetwork.broadcast("BLOCK " + payload);
                }
              }
            }
          }else if ("GET_ADDR".equalsIgnoreCase(cmd)) {
            //对方请求更多peer地址,随机给一个
            Random random = new Random();
            pt.peerWriter.write("ADDR " + peers.get(random.nextInt(peers.size())));
          } else if ("ADDR".equalsIgnoreCase(cmd)) {
            // 对方发来地址,建立连接并保存
            if (!peers.contains(payload)) {
              String peerAddr = payload.substring(0, payload.indexOf(":"));
              int peerPort = Integer.parseInt(payload.substring(payload.indexOf(":") + 1));
              peerNetwork.connect(peerAddr, peerPort);
              peers.add(payload);
              PrintWriter out = new PrintWriter(peerFile);
              for (int k = 0; k < peers.size(); k++) {
                out.println(peers.get(k));
              }
              out.close();
            }
          } 
        }
      }
    }

    // ********************************
    //         比较区块高度,同步区块
    // ********************************

    int localHeight = blockChain.size();

    if (bestHeight > localHeight) {
      catchupMode = true;
      LOGGER.info("Local chain height: " + localHeight);
      LOGGER.info("Best chain Height: " + bestHeight);
      TimeUnit.MILLISECONDS.sleep(300);

      for (int i = localHeight; i < bestHeight; i++) {
        LOGGER.info("请求块 " + i + "...");
        peerNetwork.broadcast("GET_BLOCK " + i);
      }
    } else {
      if (catchupMode) {
        LOGGER.info("[p2p] - Caught up with network.");
      }
      catchupMode = false;
    }



    // ****************
    // 循环结束
    // ****************
    TimeUnit.MILLISECONDS.sleep(200);
  }
}

PeerNetwork简单封装了一下p2p通讯的细节,篇幅有限我这里只列出核心交互,具体实现可以去看我的github源码中的类:PeerThread、PeerReader、PeerWriter。

RPC

接下来,我们将讨论关于本地节点客户端的概念。在比特币中,除了bitcoin-core,还有bitcoin-cli,这是做什么的呢。

它其实是用来做本地节点交互的,比如我作为本地节点,我需要发起交易,需要查看我的资产等等,后来发展出gui界面,就是大家俗称的钱包。

在本章中,我们也需要这样一个客户端通讯,用来将我们的vac写入链中(之前的文章,我们是用控制台输入的,实际的做法是提供加密的rpc调用)我们接下来实现RPC服务

首先我们要在Node.java中加入通讯逻辑

LOGGER.info("Starting RPC daemon... ");	
RpcServer rpcAgent = new RpcServer(port+1);		
rpcAgent.start();		
LOGGER.info("[  RPC agent is Started in port:"+(port+1)+"  ]");
// for循环体中增加,处理RPC服务
for (RpcThread th:rpcAgent.rpcThreads) {
  String request = th.req;
  if (request != null) {
    String[] parts = request.split(" ");
    parts[0] = parts[0].toLowerCase();
    if ("getinfo".equals(parts[0])) {
      String res = gson.toJson(blockChain);
      th.res = res;
    } else if ("send".equals(parts[0])) {
      try {
        int vac = Integer.parseInt(parts[1]);
        // 根据vac创建区块
        Block newBlock = BlockUtils.generateBlock(blockChain.get(blockChain.size() - 1), vac);
        if (BlockUtils.isBlockValid(newBlock, blockChain.get(blockChain.size() - 1))) {
          blockChain.add(newBlock);
          th.res = "write Success!";
          peerNetwork.broadcast("BLOCK " + gson.toJson(newBlock));
        } else {
          th.res = "RPC 500: Invalid vac Error\n";
        }
      } catch (Exception e) {
        th.res = "Syntax (no '<' or '>'): send <vac> <privateKey>";
        LOGGER.error("invalid vac", e);
      }
    } else {
      th.res = "Unknown command: "" + parts[0] + """;
    }
  }
}

独立线程处理

RpcThread.java

package com.v5ent.real.p2p;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * 处理单个rpc连接
 * @author Mignet
 */
public class RpcThread extends Thread {
    
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcThread.class);
    
    private Socket socket;
    String res;
    String req;

    /**
     * 默认构造函数
     * @param socket
     */
    public RpcThread(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run(){
        try{
            req = null;
            res = null;
            PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
            BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String input;
            out.println("================Welcome RPC Daemon==============");
            while((input = in.readLine()) != null){
                if ("HELP".equalsIgnoreCase(input)){
                    out.println("############################################## COMMANDS ###############################################");
                    out.println("#     1) getinfo       - Gets block chain infomations.                                                #");
                    out.println("#     2) send <vac>    - Write <vac> to blockChain                                                    #");
                    out.println("#######################################################################################################");
                } else {
                    req = input;
                    while (res == null){
                        TimeUnit.MILLISECONDS.sleep(25);
                    }
                    out.println(res);
                    req = null;
                    res = null;
                }
            }
        } catch (Exception e){
            LOGGER.info("An RPC client has disconnected.",e);
        }
    }
}

RpcServer.java

package com.v5ent.real.p2p;

import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * RPC服务
 *
 * 注意:不要把这个端口开放给外网
 * @author Mignet
 */
public class RpcServer extends Thread
{
    private static final Logger LOGGER = LoggerFactory.getLogger(RpcServer.class);
    private int port;
    private boolean runFlag = true;

    List<RpcThread> rpcThreads;

    /**
     * 默认配置
     */
    public RpcServer()
    {
        this.port = 8016;
        this.rpcThreads = new ArrayList<>();
    }

    /**
     * 指定端口
     * @param port Port to listen on
     */
    public RpcServer(int port)
    {
        this.port = port;
        this.rpcThreads = new ArrayList<>();
    }

    @Override
    public void run()
    {
        try
        {
            ServerSocket socket = new ServerSocket(port);
            while (runFlag)
            {
                RpcThread thread = new RpcThread(socket.accept());
                rpcThreads.add(thread);
                thread.start();
            }
            socket.close();
        } catch (Exception e){
            LOGGER.error("rpc error in port:" + port,e);
        }
    }
}

跑起来

1.使用mvn的install命令打包

2.新建peers.list,把要组建网络的ip地址填入去,在本机执行jar命令,启动第一个节点。注意,这时候它会尝试连接别的节点,连接不上

4

3.把jar包和peers.list上传到其他机器,启动第二个节点

5

4.我们用cmd在本机打开新的窗口,执行nc 127.0.0.1 8016,连接到本机节点的rpc服务,输入help查看支持的命令:

6

5.节点1(本机)增加一个区块:在rpc命令中,我们实现了1,查看区块链;2,写入vac数据,来验证一下

7

我们先输入getinfo查看一下,然后send 88,看到写入成功了,再输入getinfo,果然看到了新的块在链中。

我们也可以看到节点控制台的输出中关于新增区块的信息

8

6.验证是不是同步了:我们看一下另一台机器

9

我们在这台机器上也使用nc localhost 8016来连接看看区块

10

7.我们再从这个结点写入一个块(send 666)

11

看看本机接收到了没

12

很完美。

到此基本演示了区块链通讯真实的样子。当然,这里有很多可以改进的地方,比如安全性,比如命令的模式,比如不用sleep而是用Future,比如使用netty等更高效更成熟的通讯框架。

如果想利用区块链来发行数字货币,那么在此基础上,还要有公私钥签名交易,交易通讯,校验,使用共识算法来选举及奖励货币等。

还有什么区块链知识是本系列没有提到的吗?有的,使用默克尔树来快速验证区块和整个链.

关于币的问题也可以问,比如UTXO模型可以讲一讲吗?如果有问题请大家留言