版权声明:本文为博主原创文章,转载注明“龙棠博客”字样和原文链接。
事件管理交互图V0.6.1
节点启动并serve
func serve(args []string) error {
// Parameter overrides must be processed before any paramaters are
// cached. Failures to cache cause the server to terminate immediately.
if chaincodeDevMode {
logger.Info("Running in chaincode development mode")
logger.Info("Set consensus to NOOPS and user starts chaincode")
logger.Info("Disable loading validity system chaincode")
// 节点验证开启
viper.Set("peer.validator.enabled", "true")
// 开发模式默认共识机制采用noops
viper.Set("peer.validator.consensus", "noops")
// 如果定义dev模式,则chaincode在命令行中运行,若为net则在容器中运行
viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
}
if err := peer.CacheConfiguration(); err != nil {
return err
}
peerEndpoint, err := peer.GetPeerEndpoint()
if err != nil {
err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
return err
}
// 配置文件core.yaml中的配置
// peer service端口7051
listenAddr := viper.GetString("peer.listenAddress")
if "" == listenAddr {
logger.Debug("Listen address not specified, using peer endpoint address")
listenAddr = peerEndpoint.Address
}
lis, err := net.Listen("tcp", listenAddr)
if err != nil {
grpclog.Fatalf("Failed to listen: %v", err)
}
// 创建事件交换服务
// ehubLis 7053 注册了BLOCK,CHAINCODE,REJECTION,REGISTER
ehubLis, ehubGrpcServer, err := createEventHubServer()
if err != nil {
grpclog.Fatalf("Failed to create ehub server: %v", err)
}
// 输出安全设置值
logger.Infof("Security enabled status: %t", core.SecurityEnabled())
if viper.GetBool("security.privacy") {
// 隐私启用状态
if core.SecurityEnabled() {
logger.Infof("Privacy enabled status: true")
} else {
panic(errors.New("Privacy cannot be enabled as requested because security is disabled"))
}
} else {
logger.Infof("Privacy enabled status: false")
}
//数据库启动,打开/var/hyperledger/production/db目录
db.Start()
var opts []grpc.ServerOption
// 如果TLS启用,读取证书文件,在docker-compose.yml中CORE_SECURITY_ENABLED=true
if comm.TLSEnabled() {
// core.yaml中定义了了访问membersrvc的接口 localhost:7054, 获取配置
creds, err := credentials.NewServerTLSFromFile(viper.GetString("peer.tls.cert.file"),
viper.GetString("peer.tls.key.file"))
if err != nil {
grpclog.Fatalf("Failed to generate credentials %v", err)
}
opts = []grpc.ServerOption{grpc.Creds(creds)}
}
// 获取gRPC服务器实例
grpcServer := grpc.NewServer(opts...)
// 获取安全帮助函数
secHelper, err := getSecHelper()
if err != nil {
return err
}
// 安全帮助函数
secHelperFunc := func() crypto.Peer {
return secHelper
}
// 注册链码支持
registerChaincodeSupport(chaincode.DefaultChain, grpcServer, secHelper)
var peerServer *peer.Impl
// Create the peerServer
// 如果开启验证则载入共识机制插件,未开启则不加载,默认开启加载noops共识机制
// 当前节点是否是验证节点,是验证节点加载共识机制引擎,非验证节点不加载共识引擎,设置针对单点作用非全局
// 节点启动默认为验证节点, 节点是否加载共识引擎,决定了节点的消息处理方法peer.handlerFactory被不同的值初始化
if peer.ValidatorEnabled() {
logger.Debug("Running as validating peer - making genesis block if needed")
// 生成创世块
makeGenesisError := genesis.MakeGenesis()
if makeGenesisError != nil {
return makeGenesisError
}
logger.Debugf("Running as validating peer - installing consensus %s",
viper.GetString("peer.validator.consensus"))
// helper.GetEngine引擎工厂方法,调用这个获取共识引擎
peerServer, err = peer.NewPeerWithEngine(secHelperFunc, helper.GetEngine)
} else {
// 不采用共识机制,不加载engine,消息处理方法为peer.NewPeerHandler
logger.Debug("Running as non-validating peer")
peerServer, err = peer.NewPeerWithHandler(secHelperFunc, peer.NewPeerHandler)
}
if err != nil {
logger.Fatalf("Failed creating new peer with handler %v", err)
return err
}
// Register the Peer server
// 在gRPC服务器中注册peer服务
pb.RegisterPeerServer(grpcServer, peerServer)
// Register the Admin server
// 注册Admin服务器端
pb.RegisterAdminServer(grpcServer, core.NewAdminServer())
// Register Devops server
// 注册Devops服务器,peer devops client会通过gPRC连接调用这里实现的方法
serverDevops := core.NewDevopsServer(peerServer)
pb.RegisterDevopsServer(grpcServer, serverDevops)
// Register the ServerOpenchain server
serverOpenchain, err := rest.NewOpenchainServerWithPeerInfo(peerServer)
if err != nil {
err = fmt.Errorf("Error creating OpenchainServer: %s", err)
return err
}
pb.RegisterOpenchainServer(grpcServer, serverOpenchain)
// Create and register the REST service if configured
// 如果开启REST则注册该服务器
if viper.GetBool("rest.enabled") {
go rest.StartOpenchainRESTServer(serverOpenchain, serverDevops)
}
logger.Infof("Starting peer with ID=%s, network ID=%s, address=%s, rootnodes=%v, validator=%v",
peerEndpoint.ID, viper.GetString("peer.networkId"), peerEndpoint.Address,
viper.GetString("peer.discovery.rootnode"), peer.ValidatorEnabled())
// Start the grpc server. Done in a goroutine so we can deploy the
// genesis block if needed.
serve := make(chan error)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
fmt.Println()
fmt.Println(sig)
serve <- nil
}()
go func() {
var grpcErr error
if grpcErr = grpcServer.Serve(lis); grpcErr != nil {
grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
} else {
logger.Info("grpc server exited")
}
serve <- grpcErr
}()
if err := writePid(viper.GetString("peer.fileSystemPath")+"/peer.pid", os.Getpid()); err != nil {
return err
}
// Start the event hub server
if ehubGrpcServer != nil && ehubLis != nil {
go ehubGrpcServer.Serve(ehubLis)
}
// peer profile 占用端口6060
if viper.GetBool("peer.profile.enabled") {
go func() {
profileListenAddress := viper.GetString("peer.profile.listenAddress")
logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
logger.Errorf("Error starting profiler: %s", profileErr)
}
}()
}
// Block until grpc server exits
return <-serve
}
初始化引擎
// GetEngine returns initialized peer.Engine
// 共识机制引擎工程,返回一个共识引擎
func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {
var err error
engineOnce.Do(func() {
engine = new(EngineImpl)
// 引擎helper和其它stack交互
// coord 为peer Impl
engine.helper = NewHelper(coord)
// 构建批准者consenter,投票者,有发言权的人
// consenter 和 helper 互相交换句柄, 目的外部调用内部,内部调用外部
engine.consenter = controller.NewConsenter(engine.helper)
engine.helper.setConsenter(engine.consenter)
engine.peerEndpoint, err = coord.GetPeerEndpoint()
engine.consensusFan = util.NewMessageFan()
go func() {
logger.Debug("Starting up message thread for consenter")
// The channel never closes, so this should never break
// 从consensusFan中获取一个制度Channel,并进行消息处理
// for + channel 循环读,当fan.out 关闭时,循环自动结束
// channel永不关闭,所以读取从不停止
for msg := range engine.consensusFan.GetOutChannel() {
engine.consenter.RecvMsg(msg.Msg, msg.Sender)
}
}()
})
return engine, err
}
共识处理器
// NewConsensusHandler constructs a new MessageHandler for the plugin.
// Is instance of peer.HandlerFactory
// 新的共识机制处理器, 验证节点启动时会配置其消息处理方法,如果是验证节点则会根据不同的共识机制加载不同engine的处理方法
// initiatedStream bool,流是否初始化,在节点准备chat时,会先用false来初始化流
// 在handleChat时会获取处理器
func NewConsensusHandler(coord peer.MessageHandlerCoordinator,
stream peer.ChatStream, initiatedStream bool) (peer.MessageHandler, error) {
// 先构建节点处理器,后续赋给共识机制处理器
// peer 的gPRC Chat()实现中initiatedStream 为false
peerHandler, err := peer.NewPeerHandler(coord, stream, initiatedStream)
if err != nil {
return nil, fmt.Errorf("Error creating PeerHandler: %s", err)
}
// 共识处理器,同时把上述构建的节点处理器作为其成员
handler := &ConsensusHandler{
MessageHandler: peerHandler,
coordinator: coord,
}
// 共识插件每个连接的最大通信消息数,默认为1000,超过1000则拒绝分发
consensusQueueSize := viper.GetInt("peer.validator.consensus.buffersize")
if consensusQueueSize <= 0 {
logger.Errorf("peer.validator.consensus.buffersize is set to %d, but this must be a positive integer, defaulting to %d", consensusQueueSize, DefaultConsensusQueueSize)
consensusQueueSize = DefaultConsensusQueueSize
}
// 共识机制的处理chan, 初始化1000个消息的channel
handler.consenterChan = make(chan *util.Message, consensusQueueSize)
// fanin 扇入(端数)
// EngineImpl 是一个consensus.Consenter, PeerEndpoint and MessageFan 的结构
// getEngineImpl() 返回一个 EngineImpl 实例
getEngineImpl().consensusFan.AddFaninChannel(handler.consenterChan)
return handler, nil
}
通用处理器
func NewPeerHandler(coord MessageHandlerCoordinator, stream ChatStream, initiatedStream bool) (MessageHandler, error) {
d := &Handler{
ChatStream: stream,
initiatedStream: initiatedStream,
Coordinator: coord,
}
d.doneChan = make(chan struct{})
if dur := viper.GetDuration("peer.sync.state.snapshot.writeTimeout"); dur == 0 {
d.syncSnapshotTimeout = DefaultSyncSnapshotTimeout
} else {
d.syncSnapshotTimeout = dur
}
d.snapshotRequestHandler = newSyncStateSnapshotRequestHandler()
d.syncStateDeltasRequestHandler = newSyncStateDeltasHandler()
d.syncBlocksRequestHandler = newSyncBlocksRequestHandler()
// 有穷状态机
d.FSM = fsm.NewFSM(
"created",
fsm.Events{
{Name: pb.Message_DISC_HELLO.String(), Src: []string{"created"}, Dst: "established"},
{Name: pb.Message_DISC_GET_PEERS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_DISC_PEERS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_BLOCK_ADDED.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_GET_BLOCKS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_BLOCKS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_GET_SNAPSHOT.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_SNAPSHOT.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_GET_DELTAS.String(), Src: []string{"established"}, Dst: "established"},
{Name: pb.Message_SYNC_STATE_DELTAS.String(), Src: []string{"established"}, Dst: "established"},
},
fsm.Callbacks{
"enter_state": func(e *fsm.Event) { d.enterState(e) },
"before_" + pb.Message_DISC_HELLO.String(): func(e *fsm.Event) { d.beforeHello(e) },
"before_" + pb.Message_DISC_GET_PEERS.String(): func(e *fsm.Event) { d.beforeGetPeers(e) },
"before_" + pb.Message_DISC_PEERS.String(): func(e *fsm.Event) { d.beforePeers(e) },
"before_" + pb.Message_SYNC_BLOCK_ADDED.String(): func(e *fsm.Event) { d.beforeBlockAdded(e) },
"before_" + pb.Message_SYNC_GET_BLOCKS.String(): func(e *fsm.Event) { d.beforeSyncGetBlocks(e) },
"before_" + pb.Message_SYNC_BLOCKS.String(): func(e *fsm.Event) { d.beforeSyncBlocks(e) },
"before_" + pb.Message_SYNC_STATE_GET_SNAPSHOT.String(): func(e *fsm.Event) { d.beforeSyncStateGetSnapshot(e) },
"before_" + pb.Message_SYNC_STATE_SNAPSHOT.String(): func(e *fsm.Event) { d.beforeSyncStateSnapshot(e) },
"before_" + pb.Message_SYNC_STATE_GET_DELTAS.String(): func(e *fsm.Event) { d.beforeSyncStateGetDeltas(e) },
"before_" + pb.Message_SYNC_STATE_DELTAS.String(): func(e *fsm.Event) { d.beforeSyncStateDeltas(e) },
},
)
// If the stream was initiated from this Peer, send an Initial HELLO message
// Chat()时该值为false, 为执行初始化不发送HELLO消息
if d.initiatedStream {
// Send intiial Hello
helloMessage, err := d.Coordinator.NewOpenchainDiscoveryHello()
if err != nil {
return nil, fmt.Errorf("Error getting new HelloMessage: %s", err)
}
if err := d.SendMessage(helloMessage); err != nil {
return nil, fmt.Errorf("Error creating new Peer Handler, error returned sending %s: %s", pb.Message_DISC_HELLO, err)
}
}
return d, nil
}
节点处理通信
func (p *Impl) handleChat(ctx context.Context, stream ChatStream, initiatedStream bool) error {
deadline, ok := ctx.Deadline()
peerLogger.Debugf("Current context deadline = %s, ok = %v", deadline, ok)
// 调用节点的处理器,可能为带共识引擎的,也可能为普通的
// stream作为处理器的处理对象
// initiatedStream 流是否初始化
// handlerFactory 为处理器工厂方法,在peer启动时已经被设置好,此时相当于调用获得一个处理器;
// 如果为验证节点此处相当于调用了,共识引擎的处理器工厂方法NewConsensusHandler
// 如果是Chat()调用此时p为stream.Context(), 这里为Impl, stream 为 background 后续会参数构建peer.Handler
handler, err := p.handlerFactory(p, stream, initiatedStream)
if err != nil {
return fmt.Errorf("Error creating handler during handleChat initiation: %s", err)
}
defer handler.Stop()
for {
// 从流中读取数据
// stream 为 ConsensusHandler内的peer.MessageHandler即peer.Handler的ChatStream
in, err := stream.Recv()
// 异常判断
if err == io.EOF {
peerLogger.Debug("Received EOF, ending Chat")
return nil
}
if err != nil {
e := fmt.Errorf("Error during Chat, stopping handler: %s", err)
peerLogger.Error(e.Error())
return e
}
// 节点消息处理调用,共识机制的验证点击采用ConsensusHandler处理,不同的采用peer.Handler处理
err = handler.HandleMessage(in)
if err != nil {
peerLogger.Errorf("Error handling message: %s", err)
//return err
}
}
}