diff --git a/pom.xml b/pom.xml index 9569ead7..d0e420bd 100644 --- a/pom.xml +++ b/pom.xml @@ -189,7 +189,7 @@ org.yaml snakeyaml - 1.19 + 1.20 diff --git a/src/main/java/org/hyperledger/fabric/sdk/Channel.java b/src/main/java/org/hyperledger/fabric/sdk/Channel.java index d82d40d7..530ff42f 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/Channel.java +++ b/src/main/java/org/hyperledger/fabric/sdk/Channel.java @@ -94,7 +94,6 @@ import org.hyperledger.fabric.sdk.exception.CryptoException; import org.hyperledger.fabric.sdk.exception.EventHubException; import org.hyperledger.fabric.sdk.exception.InvalidArgumentException; -import org.hyperledger.fabric.sdk.exception.PeerException; import org.hyperledger.fabric.sdk.exception.ProposalException; import org.hyperledger.fabric.sdk.exception.TransactionEventException; import org.hyperledger.fabric.sdk.exception.TransactionException; @@ -653,6 +652,29 @@ private Collection getLedgerQueryPeers() { public Channel joinPeer(Peer peer, PeerOptions peerOptions) throws ProposalException { + try { + return joinPeer(getRandomOrderer(), peer, peerOptions); + } catch (ProposalException e) { + throw e; + } catch (Exception e) { + throw new ProposalException(e); + + } + + } + + /** + * Join peer to channel + * + * @param orderer The orderer to get the genesis block. + * @param peer the peer to join the channel. + * @param peerOptions see {@link PeerOptions} + * @return + * @throws ProposalException + */ + + public Channel joinPeer(Orderer orderer, Peer peer, PeerOptions peerOptions) throws ProposalException { + logger.debug(format("Channel %s joining peer %s, url: %s", name, peer.getName(), peer.getUrl())); if (shutdown) { @@ -671,7 +693,7 @@ public Channel joinPeer(Peer peer, PeerOptions peerOptions) throws ProposalExcep } try { - genesisBlock = getGenesisBlock(getRandomOrderer()); + genesisBlock = getGenesisBlock(orderer); logger.debug(format("Channel %s got genesis block", name)); final Channel systemChannel = newSystemChannel(client); //channel is not really created and this is targeted to system channel @@ -715,17 +737,22 @@ public Channel joinPeer(Peer peer, PeerOptions peerOptions) throws ProposalExcep return this; } - private Block getConfigBlock(Peer peer) throws ProposalException { + private Block getConfigBlock(List peers) throws ProposalException { - logger.debug(format("getConfigBlock for channel %s with peer %s, url: %s", name, peer.getName(), peer.getUrl())); + // logger.debug(format("getConfigBlock for channel %s with peer %s, url: %s", name, peer.getName(), peer.getUrl())); if (shutdown) { throw new ProposalException(format("Channel %s has been shutdown.", name)); } - try { + if (peers.isEmpty()) { + throw new ProposalException("No peers go get config block"); + } - TransactionContext transactionContext = getTransactionContext(); + TransactionContext transactionContext = null; + SignedProposal signedProposal = null; + try { + transactionContext = getTransactionContext(); transactionContext.verify(false); // can't verify till we get the config block. FabricProposal.Proposal proposal = GetConfigBlockBuilder.newBuilder() @@ -734,30 +761,43 @@ private Block getConfigBlock(Peer peer) throws ProposalException { .build(); logger.debug("Getting signed proposal."); - SignedProposal signedProposal = getSignedProposal(transactionContext, proposal); + signedProposal = getSignedProposal(transactionContext, proposal); logger.debug("Got signed proposal."); + } catch (Exception e) { + throw new ProposalException(e); + } + ProposalException lastException = new ProposalException(format("getConfigBlock for channel %s failed.", name)); - Collection resp = sendProposalToPeers(new ArrayList<>(Collections.singletonList(peer)), - signedProposal, transactionContext); + for (Peer peer : peers) { + try { - ProposalResponse pro = resp.iterator().next(); + Collection resp = sendProposalToPeers(new ArrayList<>(Collections.singletonList(peer)), + signedProposal, transactionContext); - if (pro.getStatus() == ProposalResponse.Status.SUCCESS) { - logger.trace(format("getConfigBlock from peer %s on channel %s success", peer.getName(), name)); - return Block.parseFrom(pro.getProposalResponse().getResponse().getPayload().toByteArray()); - } else { - throw new ProposalException(format("getConfigBlock for channel %s failed with peer %s. Status %s, details: %s", - name, peer.getName(), pro.getStatus().toString(), pro.getMessage())); + if (!resp.isEmpty()) { + ProposalResponse pro = resp.iterator().next(); + + if (pro.getStatus() == ProposalResponse.Status.SUCCESS) { + logger.trace(format("getConfigBlock from peer %s on channel %s success", peer.getName(), name)); + return Block.parseFrom(pro.getProposalResponse().getResponse().getPayload().toByteArray()); + } else { + lastException = new ProposalException(format("getConfigBlock for channel %s failed with peer %s. Status %s, details: %s", + name, peer.getName(), pro.getStatus().toString(), pro.getMessage())); + logger.warn(lastException.getMessage()); + + } + } else { + logger.warn(format("Got empty proposals from %s", peer)); + } + } catch (Exception e) { + lastException = new ProposalException(format("getConfigBlock for channel %s failed with peer %s.", name, peer.getName()), e); + logger.warn(lastException.getMessage()); } - } catch (ProposalException e) { - logger.error(format("getConfigBlock for channel %s failed with peer %s.", name, peer.getName()), e); - throw e; - } catch (Exception e) { - logger.error(format("getConfigBlock for channel %s failed with peer %s.", name, peer.getName()), e); - throw new ProposalException(e.getMessage(), e); } + throw lastException; + } /** @@ -944,9 +984,12 @@ public Channel initialize() throws InvalidArgumentException, TransactionExceptio userContextCheck(client.getUserContext()); try { - parseConfigBlock(); // Parse config block for this channel to get it's information. + loadCACertificates(); // put all MSP certs into cryptoSuite if this fails here we'll try again later. + } catch (Exception e) { + logger.warn(format("Channel %s could not load peer CA certificates from any peers.", name)); + } - loadCACertificates(); // put all MSP certs into cryptoSuite + try { logger.debug(format("Eventque started %s", "" + eventQueueThread)); @@ -969,9 +1012,9 @@ public Channel initialize() throws InvalidArgumentException, TransactionExceptio logger.debug(format("Channel %s initialized", name)); return this; - } catch (TransactionException e) { - logger.error(e.getMessage(), e); - throw e; +// } catch (TransactionException e) { +// logger.error(e.getMessage(), e); +// throw e; } catch (Exception e) { TransactionException exp = new TransactionException(e); @@ -988,9 +1031,15 @@ public Channel initialize() throws InvalidArgumentException, TransactionExceptio * @throws InvalidArgumentException * @throws CryptoException */ - protected void loadCACertificates() throws InvalidArgumentException, CryptoException { + protected synchronized void loadCACertificates() throws InvalidArgumentException, CryptoException, TransactionException { + + if (msps != null && !msps.isEmpty()) { + return; + } logger.debug(format("Channel %s loadCACertificates", name)); + parseConfigBlock(); + if (msps == null || msps.isEmpty()) { throw new InvalidArgumentException("Unable to load CA certificates. Channel " + name + " does not have any MSPs."); } @@ -1144,9 +1193,16 @@ ExecutorService getExecutorService() { protected void parseConfigBlock() throws TransactionException { + Map lmsps = msps; + + if (lmsps != null && !lmsps.isEmpty()) { + return; + + } + try { - Block parseFrom = getConfigBlock(getRandomPeer()); + Block parseFrom = getConfigBlock(getShuffledPeers()); // final Block configBlock = getConfigurationBlock(); @@ -1262,7 +1318,7 @@ private Block getConfigurationBlock() throws TransactionException { public byte[] getChannelConfigurationBytes() throws TransactionException { try { - final Block configBlock = getConfigBlock(getRandomPeer()); + final Block configBlock = getConfigBlock(getShuffledPeers()); Envelope envelopeRet = Envelope.parseFrom(configBlock.getData().getData(0)); @@ -1277,7 +1333,7 @@ public byte[] getChannelConfigurationBytes() throws TransactionException { } - private long getLastConfigIndex(Orderer orderer) throws CryptoException, TransactionException, InvalidArgumentException, InvalidProtocolBufferException { + private long getLastConfigIndex(Orderer orderer) throws TransactionException, InvalidProtocolBufferException { Block latestBlock = getLatestBlock(orderer); BlockMetadata blockMetadata = latestBlock.getMetadata(); @@ -1641,9 +1697,25 @@ private SignedProposal getSignedProposal(TransactionContext transactionContext, } + private void checkChannelState() throws InvalidArgumentException { + if (shutdown) { + throw new InvalidArgumentException(format("Channel %s has been shutdown.", name)); + } + + if (!initialized) { + throw new InvalidArgumentException(format("Channel %s has not been initialized.", name)); + } + + userContextCheck(client.getUserContext()); + + } + /** * query this channel for a Block by the block hash. - * The request is sent to a random peer in the channel. + * The request is retried on each peer on the channel till successful. + *

+ * This method may not be thread safe if client context is changed! + *

* * @param blockHash the hash of the Block in the chain * @return the {@link BlockInfo} with the given block Hash @@ -1651,87 +1723,94 @@ private SignedProposal getSignedProposal(TransactionContext transactionContext, * @throws ProposalException */ public BlockInfo queryBlockByHash(byte[] blockHash) throws InvalidArgumentException, ProposalException { + return queryBlockByHash(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), blockHash); + } - checkChannelState(); - - if (blockHash == null) { - throw new InvalidArgumentException("blockHash parameter is null."); - } - return queryBlockByHash(getRandomLedgerQueryPeer(), blockHash); + /** + * query this channel for a Block by the block hash. + * The request is tried on multiple peers. + * + * @param blockHash the hash of the Block in the chain + * @param userContext the user context. + * @return the {@link BlockInfo} with the given block Hash + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByHash(byte[] blockHash, User userContext) throws InvalidArgumentException, ProposalException { + return queryBlockByHash(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), blockHash, userContext); } - private void checkChannelState() throws InvalidArgumentException { - if (shutdown) { - throw new InvalidArgumentException(format("Channel %s has been shutdown.", name)); - } + /** + * Query a peer in this channel for a Block by the block hash. + *

+ * This method may not be thread safe if client context is changed! + *

+ * + * @param peer the Peer to query. + * @param blockHash the hash of the Block in the chain. + * @return the {@link BlockInfo} with the given block Hash + * @throws InvalidArgumentException if the channel is shutdown or any of the arguments are not valid. + * @throws ProposalException if an error occurred processing the query. + */ + public BlockInfo queryBlockByHash(Peer peer, byte[] blockHash) throws InvalidArgumentException, ProposalException { + return queryBlockByHash(Collections.singleton(peer), blockHash); + } - if (!initialized) { - throw new InvalidArgumentException(format("Channel %s has not been initialized.", name)); - } + /** + * Query a peer in this channel for a Block by the block hash. + * Each peer is tried until successful response. + *

+ * This method may not be thread safe if client context is changed! + *

+ * + * @param peers the Peers to query. + * @param blockHash the hash of the Block in the chain. + * @return the {@link BlockInfo} with the given block Hash + * @throws InvalidArgumentException if the channel is shutdown or any of the arguments are not valid. + * @throws ProposalException if an error occurred processing the query. + */ + public BlockInfo queryBlockByHash(Collection peers, byte[] blockHash) throws InvalidArgumentException, ProposalException { - userContextCheck(client.getUserContext()); + return queryBlockByHash(peers, blockHash, client.getUserContext()); } /** * Query a peer in this channel for a Block by the block hash. * - * @param peer the Peer to query. - * @param blockHash the hash of the Block in the chain. + * @param peers the Peers to query. + * @param blockHash the hash of the Block in the chain. + * @param userContext the user context * @return the {@link BlockInfo} with the given block Hash * @throws InvalidArgumentException if the channel is shutdown or any of the arguments are not valid. * @throws ProposalException if an error occurred processing the query. */ - public BlockInfo queryBlockByHash(Peer peer, byte[] blockHash) throws InvalidArgumentException, ProposalException { + public BlockInfo queryBlockByHash(Collection peers, byte[] blockHash, User userContext) throws InvalidArgumentException, ProposalException { checkChannelState(); - checkPeer(peer); + checkPeers(peers); + userContextCheck(userContext); if (blockHash == null) { throw new InvalidArgumentException("blockHash parameter is null."); } - ProposalResponse proposalResponse; - BlockInfo responseBlock; try { - logger.debug("queryBlockByHash with hash : " + Hex.encodeHexString(blockHash) + "\n to peer " + peer.getName() + " on channel " + name); - QuerySCCRequest querySCCRequest = new QuerySCCRequest(client.getUserContext()); + + logger.trace("queryBlockByHash with hash : " + Hex.encodeHexString(blockHash) + " on channel " + name); + QuerySCCRequest querySCCRequest = new QuerySCCRequest(userContext); querySCCRequest.setFcn(QuerySCCRequest.GETBLOCKBYHASH); querySCCRequest.setArgs(name); querySCCRequest.setArgBytes(new byte[][] {blockHash}); - Collection proposalResponses = sendProposal(querySCCRequest, Collections.singletonList(peer)); - proposalResponse = proposalResponses.iterator().next(); + ProposalResponse proposalResponse = sendProposalSerially(querySCCRequest, peers); - if (proposalResponse.getStatus().getStatus() != 200) { - throw new PeerException(format("Unable to query block by hash %s %n.... for channel %s from peer %s \n with message %s", - Hex.encodeHexString(blockHash), - name, - peer.getName(), - proposalResponse.getMessage())); - } - responseBlock = new BlockInfo(Block.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); - } catch (Exception e) { - String emsg = format("queryBlockByHash hash: %s peer %s channel %s error: %s", - Hex.encodeHexString(blockHash), peer.getName(), name, e.getMessage()); - logger.error(emsg, e); - throw new ProposalException(emsg, e); + return new BlockInfo(Block.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); + } catch (InvalidProtocolBufferException e) { + ProposalException proposalException = new ProposalException(e); + logger.error(proposalException); + throw proposalException; } - - return responseBlock; - } - - /** - * query this channel for a Block by the blockNumber. - * The request is sent to a random peer in the channel. - * - * @param blockNumber index of the Block in the chain - * @return the {@link BlockInfo} with the given blockNumber - * @throws InvalidArgumentException - * @throws ProposalException - */ - public BlockInfo queryBlockByNumber(long blockNumber) throws InvalidArgumentException, ProposalException { - return queryBlockByNumber(getRandomLedgerQueryPeer(), blockNumber); } private Peer getRandomLedgerQueryPeer() throws InvalidArgumentException { @@ -1755,6 +1834,20 @@ private Peer getRandomPeer() throws InvalidArgumentException { return randPicks.get(RANDOM.nextInt(randPicks.size())); } + private List getShuffledPeers() { + + ArrayList peers = new ArrayList<>(getPeers()); + Collections.shuffle(peers); + return peers; + } + + private List getShuffledPeers(EnumSet roles) { + + ArrayList peers = new ArrayList<>(getPeers(roles)); + Collections.shuffle(peers); + return peers; + } + private Orderer getRandomOrderer() throws InvalidArgumentException { final ArrayList randPicks = new ArrayList<>(new HashSet<>(getOrderers())); //copy to avoid unlikely changes @@ -1817,7 +1910,40 @@ private void checkPeers(Collection peers) throws InvalidArgumentException } /** - * query a peer in this channel for a Block by the blockNumber + * query this channel for a Block by the blockNumber. + * The request is retried on all peers till successful + *

+ * This method may not be thread safe if client context is changed! + *

. + * + * @param blockNumber index of the Block in the chain + * @return the {@link BlockInfo} with the given blockNumber + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByNumber(long blockNumber) throws InvalidArgumentException, ProposalException { + return queryBlockByNumber(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), blockNumber); + } + + /** + * query this channel for a Block by the blockNumber. + * The request is sent to a random peer in the channel. + * + * @param blockNumber index of the Block in the chain + * @param userContext the user context to be used. + * @return the {@link BlockInfo} with the given blockNumber + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByNumber(long blockNumber, User userContext) throws InvalidArgumentException, ProposalException { + return queryBlockByNumber(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), blockNumber, userContext); + } + + /** + * Query a peer in this channel for a Block by the blockNumber + *

+ * This method may not be thread safe if client context is changed! + *

* * @param peer the peer to send the request to * @param blockNumber index of the Block in the chain @@ -1827,44 +1953,80 @@ private void checkPeers(Collection peers) throws InvalidArgumentException */ public BlockInfo queryBlockByNumber(Peer peer, long blockNumber) throws InvalidArgumentException, ProposalException { + return queryBlockByNumber(Collections.singleton(peer), blockNumber); + + } + + /** + * query a peer in this channel for a Block by the blockNumber + * + * @param peer the peer to send the request to + * @param blockNumber index of the Block in the chain + * @param userContext the user context. + * @return the {@link BlockInfo} with the given blockNumber + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByNumber(Peer peer, long blockNumber, User userContext) throws InvalidArgumentException, ProposalException { + + return queryBlockByNumber(Collections.singleton(peer), blockNumber, userContext); + + } + + /** + * query a peer in this channel for a Block by the blockNumber + *

+ * This method may not be thread safe if client context is changed! + *

+ * + * @param peers the peers to try and send the request to + * @param blockNumber index of the Block in the chain + * @return the {@link BlockInfo} with the given blockNumber + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByNumber(Collection peers, long blockNumber) throws InvalidArgumentException, ProposalException { + return queryBlockByNumber(peers, blockNumber, client.getUserContext()); + + } + + /** + * query a peer in this channel for a Block by the blockNumber + * + * @param peers the peers to try and send the request to + * @param blockNumber index of the Block in the chain + * @param userContext the user context to use. + * @return the {@link BlockInfo} with the given blockNumber + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByNumber(Collection peers, long blockNumber, User userContext) throws InvalidArgumentException, ProposalException { + checkChannelState(); - checkPeer(peer); + checkPeers(peers); + userContextCheck(userContext); - ProposalResponse proposalResponse; - BlockInfo responseBlock; try { - logger.debug("queryBlockByNumber with blockNumber " + blockNumber + " to peer " + peer.getName() + " on channel " + name); - QuerySCCRequest querySCCRequest = new QuerySCCRequest(client.getUserContext()); + logger.debug("queryBlockByNumber with blockNumber " + blockNumber + " on channel " + name); + QuerySCCRequest querySCCRequest = new QuerySCCRequest(userContext); querySCCRequest.setFcn(QuerySCCRequest.GETBLOCKBYNUMBER); querySCCRequest.setArgs(name, Long.toUnsignedString(blockNumber)); - Collection proposalResponses = sendProposal(querySCCRequest, Collections.singletonList(peer)); - proposalResponse = proposalResponses.iterator().next(); + ProposalResponse proposalResponse = sendProposalSerially(querySCCRequest, peers); - if (proposalResponse.getStatus().getStatus() != 200) { - throw new PeerException(format("Unable to query block by number %d for channel %s from peer %s with message %s", - blockNumber, - name, - peer.getName(), - proposalResponse.getMessage())); - } - responseBlock = new BlockInfo(Block.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); - } catch (Exception e) { - String emsg = format("queryBlockByNumber blockNumber %d peer %s channel %s error %s", - blockNumber, - peer.getName(), - name, - e.getMessage()); - logger.error(emsg, e); - throw new ProposalException(emsg, e); + return new BlockInfo(Block.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); + } catch (InvalidProtocolBufferException e) { + logger.error(e); + throw new ProposalException(e); } - - return responseBlock; } /** * query this channel for a Block by a TransactionID contained in the block - * The request is sent to a random peer in the channel + * The request is tried on on each peer till successful. + *

+ * This method may not be thread safe if client context is changed! + *

* * @param txID the transactionID to query on * @return the {@link BlockInfo} for the Block containing the transaction @@ -1873,11 +2035,29 @@ public BlockInfo queryBlockByNumber(Peer peer, long blockNumber) throws InvalidA */ public BlockInfo queryBlockByTransactionID(String txID) throws InvalidArgumentException, ProposalException { - return queryBlockByTransactionID(getRandomLedgerQueryPeer(), txID); + return queryBlockByTransactionID(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), txID); + } + + /** + * query this channel for a Block by a TransactionID contained in the block + * The request is sent to a random peer in the channel + * + * @param txID the transactionID to query on + * @param userContext the user context. + * @return the {@link BlockInfo} for the Block containing the transaction + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByTransactionID(String txID, User userContext) throws InvalidArgumentException, ProposalException { + + return queryBlockByTransactionID(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), txID, userContext); } /** * query a peer in this channel for a Block by a TransactionID contained in the block + *

+ * This method may not be thread safe if client context is changed! + *

* * @param peer the peer to send the request to * @param txID the transactionID to query on @@ -1886,49 +2066,82 @@ public BlockInfo queryBlockByTransactionID(String txID) throws InvalidArgumentEx * @throws ProposalException */ public BlockInfo queryBlockByTransactionID(Peer peer, String txID) throws InvalidArgumentException, ProposalException { + return queryBlockByTransactionID(Collections.singleton(peer), txID); + } + + /** + * query a peer in this channel for a Block by a TransactionID contained in the block + * + * @param peer the peer to send the request to + * @param txID the transactionID to query on + * @param userContext the user context. + * @return the {@link BlockInfo} for the Block containing the transaction + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByTransactionID(Peer peer, String txID, User userContext) throws InvalidArgumentException, ProposalException { + return queryBlockByTransactionID(Collections.singleton(peer), txID, userContext); + } + + /** + * query a peer in this channel for a Block by a TransactionID contained in the block + *

+ * This method may not be thread safe if client context is changed! + *

+ * + * @param peers the peers to try to send the request to. + * @param txID the transactionID to query on + * @return the {@link BlockInfo} for the Block containing the transaction + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByTransactionID(Collection peers, String txID) throws InvalidArgumentException, ProposalException { + return queryBlockByTransactionID(peers, txID, client.getUserContext()); + } + + /** + * query a peer in this channel for a Block by a TransactionID contained in the block + * + * @param peers the peer to try to send the request to + * @param txID the transactionID to query on + * @param userContext the user context. + * @return the {@link BlockInfo} for the Block containing the transaction + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockInfo queryBlockByTransactionID(Collection peers, String txID, User userContext) throws InvalidArgumentException, ProposalException { checkChannelState(); - checkPeer(peer); + checkPeers(peers); + User.userContextCheck(userContext); if (txID == null) { throw new InvalidArgumentException("TxID parameter is null."); } - ProposalResponse proposalResponse; - BlockInfo responseBlock; try { - logger.debug("queryBlockByTransactionID with txID " + txID + " \n to peer" + peer.getName() + " on channel " + name); - QuerySCCRequest querySCCRequest = new QuerySCCRequest(client.getUserContext()); + logger.debug("queryBlockByTransactionID with txID " + txID + " \n " + " on channel " + name); + QuerySCCRequest querySCCRequest = new QuerySCCRequest(userContext); querySCCRequest.setFcn(QuerySCCRequest.GETBLOCKBYTXID); querySCCRequest.setArgs(name, txID); - Collection proposalResponses = sendProposal(querySCCRequest, Collections.singletonList(peer)); - proposalResponse = proposalResponses.iterator().next(); + ProposalResponse proposalResponse = sendProposalSerially(querySCCRequest, peers); - if (proposalResponse.getStatus().getStatus() != 200) { - throw new PeerException(format("Unable to query block by TxID %s%n for channel %s from peer %s with message %s", - txID, - name, - peer.getName(), - proposalResponse.getMessage())); - } - responseBlock = new BlockInfo(Block.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); - } catch (Exception e) { - String emsg = format("QueryBlockByTransactionID TxID %s%n peer %s channel %s error %s", - txID, - peer.getName(), - name, - e.getMessage()); - logger.error(emsg, e); - throw new ProposalException(emsg, e); + return new BlockInfo(Block.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); + } catch (InvalidProtocolBufferException e) { + + throw new ProposalException(e); } - return responseBlock; } /** * query this channel for chain information. * The request is sent to a random peer in the channel + *

+ *

+ * This method may not be thread safe if client context is changed! + *

* * @return a {@link BlockchainInfo} object containing the chain info requested * @throws InvalidArgumentException @@ -1936,11 +2149,29 @@ public BlockInfo queryBlockByTransactionID(Peer peer, String txID) throws Invali */ public BlockchainInfo queryBlockchainInfo() throws ProposalException, InvalidArgumentException { - return queryBlockchainInfo(getRandomLedgerQueryPeer()); + return queryBlockchainInfo(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), client.getUserContext()); + } + + /** + * query this channel for chain information. + * The request is sent to a random peer in the channel + * + * @param userContext the user context to use. + * @return a {@link BlockchainInfo} object containing the chain info requested + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockchainInfo queryBlockchainInfo(User userContext) throws ProposalException, InvalidArgumentException { + + return queryBlockchainInfo(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), userContext); } /** * query for chain information + *

+ *

+ * This method may not be thread safe if client context is changed! + *

* * @param peer The peer to send the request to * @return a {@link BlockchainInfo} object containing the chain info requested @@ -1949,41 +2180,62 @@ public BlockchainInfo queryBlockchainInfo() throws ProposalException, InvalidArg */ public BlockchainInfo queryBlockchainInfo(Peer peer) throws ProposalException, InvalidArgumentException { + return queryBlockchainInfo(Collections.singleton(peer), client.getUserContext()); + + } + + /** + * query for chain information + * + * @param peer The peer to send the request to + * @param userContext the user context to use. + * @return a {@link BlockchainInfo} object containing the chain info requested + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockchainInfo queryBlockchainInfo(Peer peer, User userContext) throws ProposalException, InvalidArgumentException { + + return queryBlockchainInfo(Collections.singleton(peer), userContext); + + } + + /** + * query for chain information + * + * @param peers The peers to try send the request. + * @param userContext the user context. + * @return a {@link BlockchainInfo} object containing the chain info requested + * @throws InvalidArgumentException + * @throws ProposalException + */ + public BlockchainInfo queryBlockchainInfo(Collection peers, User userContext) throws ProposalException, InvalidArgumentException { + checkChannelState(); - checkPeer(peer); + checkPeers(peers); + User.userContextCheck(userContext); - BlockchainInfo response; try { - logger.debug("queryBlockchainInfo to peer " + peer.getName() + " on channel " + name); - QuerySCCRequest querySCCRequest = new QuerySCCRequest(client.getUserContext()); + logger.debug("queryBlockchainInfo to peer " + " on channel " + name); + QuerySCCRequest querySCCRequest = new QuerySCCRequest(userContext); querySCCRequest.setFcn(QuerySCCRequest.GETCHAININFO); querySCCRequest.setArgs(name); - Collection proposalResponses = sendProposal(querySCCRequest, Collections.singletonList(peer)); - ProposalResponse proposalResponse = proposalResponses.iterator().next(); + ProposalResponse proposalResponse = sendProposalSerially(querySCCRequest, peers); - if (proposalResponse.getStatus().getStatus() != 200) { - throw new PeerException(format("Unable to query block channel info for channel %s from peer %s with message %s", - name, - peer.getName(), - proposalResponse.getMessage())); - } - response = new BlockchainInfo(Ledger.BlockchainInfo.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); + return new BlockchainInfo(Ledger.BlockchainInfo.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); } catch (Exception e) { - String emsg = format("queryBlockchainInfo peer %s channel %s error %s", - peer.getName(), - name, - e.getMessage()); - logger.error(emsg, e); - throw new ProposalException(emsg, e); + logger.error(e); + throw new ProposalException(e); } - - return response; } /** * Query this channel for a Fabric Transaction given its transactionID. * The request is sent to a random peer in the channel. + *

+ *

+ * This method may not be thread safe if client context is changed! + *

* * @param txID the ID of the transaction * @return a {@link TransactionInfo} @@ -1991,12 +2243,33 @@ public BlockchainInfo queryBlockchainInfo(Peer peer) throws ProposalException, I * @throws InvalidArgumentException */ public TransactionInfo queryTransactionByID(String txID) throws ProposalException, InvalidArgumentException { + return queryTransactionByID(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), txID, client.getUserContext()); + } - return queryTransactionByID(getRandomLedgerQueryPeer(), txID); + /** + * Query this channel for a Fabric Transaction given its transactionID. + * The request is sent to a random peer in the channel. + *

+ *

+ * This method may not be thread safe if client context is changed! + *

+ * + * @param txID the ID of the transaction + * @param userContext the user context used. + * @return a {@link TransactionInfo} + * @throws ProposalException + * @throws InvalidArgumentException + */ + public TransactionInfo queryTransactionByID(String txID, User userContext) throws ProposalException, InvalidArgumentException { + return queryTransactionByID(getShuffledPeers(EnumSet.of(PeerRole.LEDGER_QUERY)), txID, userContext); } /** * Query for a Fabric Transaction given its transactionID + *

+ *

+ * This method may not be thread safe if client context is changed! + *

* * @param txID the ID of the transaction * @param peer the peer to send the request to @@ -2005,9 +2278,38 @@ public TransactionInfo queryTransactionByID(String txID) throws ProposalExceptio * @throws InvalidArgumentException */ public TransactionInfo queryTransactionByID(Peer peer, String txID) throws ProposalException, InvalidArgumentException { + return queryTransactionByID(Collections.singleton(peer), txID, client.getUserContext()); + } + + /** + * Query for a Fabric Transaction given its transactionID + * + * @param peer the peer to send the request to + * @param txID the ID of the transaction + * @param userContext the user context + * @return a {@link TransactionInfo} + * @throws ProposalException + * @throws InvalidArgumentException + */ + public TransactionInfo queryTransactionByID(Peer peer, String txID, User userContext) throws ProposalException, InvalidArgumentException { + return queryTransactionByID(Collections.singleton(peer), txID, userContext); + } + + /** + * Query for a Fabric Transaction given its transactionID + * + * @param txID the ID of the transaction + * @param peers the peers to try to send the request. + * @param userContext the user context + * @return a {@link TransactionInfo} + * @throws ProposalException + * @throws InvalidArgumentException + */ + public TransactionInfo queryTransactionByID(Collection peers, String txID, User userContext) throws ProposalException, InvalidArgumentException { checkChannelState(); - checkPeer(peer); + checkPeers(peers); + User.userContextCheck(userContext); if (txID == null) { throw new InvalidArgumentException("TxID parameter is null."); @@ -2015,33 +2317,20 @@ public TransactionInfo queryTransactionByID(Peer peer, String txID) throws Propo TransactionInfo transactionInfo; try { - logger.debug("queryTransactionByID with txID " + txID + "\n from peer " + peer.getName() + " on channel " + name); - QuerySCCRequest querySCCRequest = new QuerySCCRequest(client.getUserContext()); + logger.debug("queryTransactionByID with txID " + txID + "\n from peer " + " on channel " + name); + QuerySCCRequest querySCCRequest = new QuerySCCRequest(userContext); querySCCRequest.setFcn(QuerySCCRequest.GETTRANSACTIONBYID); querySCCRequest.setArgs(name, txID); - Collection proposalResponses = sendProposal(querySCCRequest, Collections.singletonList(peer)); - ProposalResponse proposalResponse = proposalResponses.iterator().next(); + ProposalResponse proposalResponse = sendProposalSerially(querySCCRequest, peers); - if (proposalResponse.getStatus().getStatus() != 200) { - throw new PeerException(format("Unable to query transaction info for ID %s%n for channel %s from peer %s with message %s", - txID, - name, - peer.getName(), - proposalResponse.getMessage())); - } - transactionInfo = new TransactionInfo(txID, ProcessedTransaction.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); + return new TransactionInfo(txID, ProcessedTransaction.parseFrom(proposalResponse.getProposalResponse().getResponse().getPayload())); } catch (Exception e) { - String emsg = format("queryTransactionByID TxID %s%n peer %s channel %s error %s", - txID, - peer.getName(), - name, - e.getMessage()); - logger.error(emsg, e); - throw new ProposalException(emsg, e); - } - return transactionInfo; + logger.error(e); + + throw new ProposalException(e); + } } ///////////////////////////////////////////////////////// @@ -2177,6 +2466,9 @@ List queryInstalledChaincodes(Peer peer) throws InvalidArgumentEx /** * Query peer for chaincode that has been instantiated + *

+ * This method may not be thread safe if client context is changed! + *

* * @param peer The peer to query. * @return A list of ChaincodeInfo @see {@link ChaincodeInfo} @@ -2185,13 +2477,29 @@ List queryInstalledChaincodes(Peer peer) throws InvalidArgumentEx */ public List queryInstantiatedChaincodes(Peer peer) throws InvalidArgumentException, ProposalException { + return queryInstantiatedChaincodes(peer, client.getUserContext()); + + } + + /** + * Query peer for chaincode that has been instantiated + * + * @param peer The peer to query. + * @param userContext the user context. + * @return A list of ChaincodeInfo @see {@link ChaincodeInfo} + * @throws InvalidArgumentException + * @throws ProposalException + */ + + public List queryInstantiatedChaincodes(Peer peer, User userContext) throws InvalidArgumentException, ProposalException { checkChannelState(); checkPeer(peer); + User.userContextCheck(userContext); try { - TransactionContext context = getTransactionContext(); + TransactionContext context = getTransactionContext(userContext); FabricProposal.Proposal q = QueryInstantiatedChaincodesBuilder.newBuilder().context(context).build(); @@ -2295,7 +2603,61 @@ public Collection queryByChaincode(QueryByChaincodeRequest que } //////////////// Channel Block monitoring ////////////////////////////////// - private Collection sendProposal(TransactionRequest proposalRequest, Collection peers) throws InvalidArgumentException, ProposalException { + private ProposalResponse sendProposalSerially(TransactionRequest proposalRequest, Collection peers) throws + ProposalException { + + ProposalException lastException = new ProposalException("ProposalRequest failed."); + + for (Peer peer : peers) { + + try { + + Collection proposalResponses = sendProposal(proposalRequest, Collections.singletonList(peer)); + + if (proposalResponses.isEmpty()) { + logger.warn(format("Proposal request to peer %s failed", peer)); + } + ProposalResponse proposalResponse = proposalResponses.iterator().next(); + ChaincodeResponse.Status status = proposalResponse.getStatus(); + + if (status.getStatus() < 400) { + return proposalResponse; + + } else if (status.getStatus() > 499) { // server error may work on other peer. + + lastException = new ProposalException(format("Channel %s got exception on peer %s %d. %s ", + name, + peer, + status.getStatus(), + proposalResponse.getMessage())); + + } else { // 400 to 499 + + throw new ProposalException(format("Channel %s got exception on peer %s %d. %s ", + name, + peer, + status.getStatus(), + proposalResponse.getMessage())); + } + + } catch (Exception e) { + + lastException = new ProposalException(format("Channel %s failed proposal on peer %s %s", + name, + peer.getName(), + + e.getMessage()), e); + logger.warn(lastException.getMessage()); + } + + } + + throw lastException; + + } + + private Collection sendProposal(TransactionRequest proposalRequest, Collection peers) throws + InvalidArgumentException, ProposalException { checkChannelState(); checkPeers(peers); @@ -2341,6 +2703,14 @@ private Collection sendProposalToPeers(Collection peers, TransactionContext transactionContext) throws InvalidArgumentException, ProposalException { checkPeers(peers); + if (transactionContext.getVerify()) { + try { + loadCACertificates(); + } catch (Exception e) { + throw new ProposalException(e); + } + } + class Pair { private final Peer peer; private final Future future; @@ -2551,7 +2921,12 @@ public CompletableFuture sendTransaction(Collection sendTransaction(Collection sendTransaction(Collection sendProposalAsy return futureStub.processProposal(proposal); } - public FabricProposalResponse.ProposalResponse sendProposal(FabricProposal.SignedProposal proposal) throws PeerException { - - if (shutdown) { - throw new PeerException("Shutdown"); - } - - try { - return blockingStub.processProposal(proposal); - - } catch (StatusRuntimeException e) { - logger.warn(String.format("RPC failed: %s", e.getStatus())); - throw new PeerException("Sending transaction to peer failed", e); - } - } boolean isChannelActive() { ManagedChannel lchannel = managedChannel; - return lchannel != null && !lchannel.isShutdown() && !lchannel.isTerminated(); + return lchannel != null && !lchannel.isShutdown() && !lchannel.isTerminated() && ConnectivityState.READY.equals(lchannel.getState(true)); } @Override diff --git a/src/main/java/org/hyperledger/fabric/sdk/EventHub.java b/src/main/java/org/hyperledger/fabric/sdk/EventHub.java index d97c851a..15435cb9 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/EventHub.java +++ b/src/main/java/org/hyperledger/fabric/sdk/EventHub.java @@ -74,6 +74,9 @@ public class EventHub implements Serializable { private Channel channel; private transient TransactionContext transactionContext; private transient byte[] clientTLSCertificateDigest; + private transient long reconnectCount; + private transient long lastBlockNumber; + private transient BlockEvent lastBlockEvent; /** * Get disconnected time. @@ -167,19 +170,13 @@ public Properties getProperties() { return properties == null ? null : (Properties) properties.clone(); } - boolean connect() throws EventHubException { - - if (transactionContext == null) { - throw new EventHubException("Eventhub reconnect failed with no user context"); - } - - return connect(transactionContext); - - } - private transient StreamObserver eventStream = null; // Saved here to avoid potential garbage collection synchronized boolean connect(final TransactionContext transactionContext) throws EventHubException { + return connect(transactionContext, false); + } + + synchronized boolean connect(final TransactionContext transactionContext, final boolean reconnection) throws EventHubException { if (connected) { logger.warn(format("%s already connected.", toString())); return true; @@ -209,7 +206,11 @@ public void onNext(PeerEvents.Event event) { if (event.getEventCase() == PeerEvents.Event.EventCase.BLOCK) { try { - eventQue.addBEvent(new BlockEvent(EventHub.this, event)); //add to channel queue + + BlockEvent blockEvent = new BlockEvent(EventHub.this, event); + setLastBlockSeen(blockEvent); + + eventQue.addBEvent(blockEvent); //add to channel queue } catch (InvalidProtocolBufferException e) { EventHubException eventHubException = new EventHubException(format("%s onNext error %s", this, e.getMessage()), e); logger.error(eventHubException.getMessage()); @@ -217,56 +218,61 @@ public void onNext(PeerEvents.Event event) { } } else if (event.getEventCase() == PeerEvents.Event.EventCase.REGISTER) { + if (reconnectCount > 1) { + logger.info(format("Eventhub %s has reconnecting after %d attempts", name, reconnectCount)); + } + connected = true; connectedTime = System.currentTimeMillis(); + reconnectCount = 0L; + finishLatch.countDown(); } } @Override public void onError(Throwable t) { + connected = false; + eventStream = null; + disconnectedTime = System.currentTimeMillis(); if (shutdown) { //IF we're shutdown don't try anything more. logger.trace(format("%s was shutdown.", EventHub.this.toString())); - connected = false; - eventStream = null; + finishLatch.countDown(); return; } - final boolean isTerminated = managedChannel.isTerminated(); - final boolean isChannelShutdown = managedChannel.isShutdown(); + final ManagedChannel lmanagedChannel = managedChannel; + + final boolean isTerminated = lmanagedChannel == null ? true : lmanagedChannel.isTerminated(); + final boolean isChannelShutdown = lmanagedChannel == null ? true : lmanagedChannel.isShutdown(); + + if (reconnectCount % 50 == 1) { + logger.warn(format("%s terminated is %b shutdown is %b, retry count %d has error %s.", EventHub.this.toString(), isTerminated, isChannelShutdown, + reconnectCount, t.getMessage())); + } else { + logger.trace(format("%s terminated is %b shutdown is %b, retry count %d has error %s.", EventHub.this.toString(), isTerminated, isChannelShutdown, + reconnectCount, t.getMessage())); + } - logger.error(format("%s terminated is %b shutdown is %b has error %s ", EventHub.this.toString(), isTerminated, isChannelShutdown, - t.getMessage()), new EventHubException(t)); - threw.add(t); finishLatch.countDown(); // logger.error("Error in stream: " + t.getMessage(), new EventHubException(t)); if (t instanceof StatusRuntimeException) { StatusRuntimeException sre = (StatusRuntimeException) t; Status sreStatus = sre.getStatus(); - logger.error(format("%s :StatusRuntimeException Status %s. Description %s ", EventHub.this, sreStatus + "", sreStatus.getDescription())); - if (sre.getStatus().getCode() == Status.Code.INTERNAL || sre.getStatus().getCode() == Status.Code.UNAVAILABLE) { - - connected = false; - eventStream = null; - disconnectedTime = System.currentTimeMillis(); - try { - if (!isChannelShutdown) { - managedChannel.shutdownNow(); - } - if (null != disconnectedHandler) { - try { - disconnectedHandler.disconnected(EventHub.this); - } catch (Exception e) { - logger.warn(format("Eventhub %s %s", EventHub.this.name, e.getMessage()), e); - eventQue.eventError(e); - } - } - } catch (Exception e) { - logger.warn(format("Eventhub %s Failed shutdown msg: %s", EventHub.this.name, e.getMessage()), e); - } + if (reconnectCount % 50 == 1) { + logger.warn(format("%s :StatusRuntimeException Status %s. Description %s ", EventHub.this, sreStatus + "", sreStatus.getDescription())); + } else { + logger.trace(format("%s :StatusRuntimeException Status %s. Description %s ", EventHub.this, sreStatus + "", sreStatus.getDescription())); } + + try { + reconnect(); + } catch (Exception e) { + logger.warn(format("Eventhub %s Failed shutdown msg: %s", EventHub.this.name, e.getMessage())); + } + } } @@ -274,7 +280,7 @@ public void onError(Throwable t) { @Override public void onCompleted() { - logger.warn(format("Stream completed %s", EventHub.this.toString())); + logger.debug(format("Stream completed %s", EventHub.this.toString())); finishLatch.countDown(); } @@ -288,27 +294,19 @@ public void onCompleted() { } try { - if (!finishLatch.await(EVENTHUB_CONNECTION_WAIT_TIME, TimeUnit.MILLISECONDS)) { - EventHubException evh = new EventHubException(format("EventHub %s failed to connect in %s ms.", name, EVENTHUB_CONNECTION_WAIT_TIME)); - logger.debug(evh.getMessage(), evh); - throw evh; - } - logger.trace(format("Eventhub %s Done waiting for reply!", name)); + //On reconnection don't wait here. - } catch (InterruptedException e) { - logger.error(e); - } + if (!reconnection && !finishLatch.await(EVENTHUB_CONNECTION_WAIT_TIME, TimeUnit.MILLISECONDS)) { - if (!threw.isEmpty()) { - eventStream = null; - connected = false; - Throwable t = threw.iterator().next(); + logger.warn(format("EventHub %s failed to connect in %s ms.", name, EVENTHUB_CONNECTION_WAIT_TIME)); - EventHubException evh = new EventHubException(t.getMessage(), t); - logger.error(format("EventHub %s Error in stream. error: " + t.getMessage(), toString()), evh); - throw evh; + } else { + logger.trace(format("Eventhub %s Done waiting for reply!", name)); + } + } catch (InterruptedException e) { + logger.error(e); } logger.debug(format("Eventhub %s connect is done with connect status: %b ", name, connected)); @@ -321,6 +319,24 @@ public void onCompleted() { } + private void reconnect() throws EventHubException { + + final ManagedChannel lmanagedChannel = managedChannel; + + if (lmanagedChannel != null) { + managedChannel = null; + lmanagedChannel.shutdownNow(); + } + + EventHubDisconnected ldisconnectedHandler = disconnectedHandler; + if (!shutdown && null != ldisconnectedHandler) { + ++reconnectCount; + ldisconnectedHandler.disconnected(this); + + } + + } + private void blockListen(TransactionContext transactionContext) throws CryptoException { this.transactionContext = transactionContext; @@ -371,11 +387,17 @@ public String toString() { public void shutdown() { shutdown = true; + lastBlockEvent = null; + lastBlockNumber = 0; connected = false; disconnectedHandler = null; channel = null; eventStream = null; - managedChannel.shutdownNow(); + final ManagedChannel lmanagedChannel = managedChannel; + managedChannel = null; + if (lmanagedChannel != null) { + lmanagedChannel.shutdownNow(); + } } void setChannel(Channel channel) throws InvalidArgumentException { @@ -391,6 +413,15 @@ void setChannel(Channel channel) throws InvalidArgumentException { this.channel = channel; } + synchronized void setLastBlockSeen(BlockEvent lastBlockSeen) { + long newLastBlockNumber = lastBlockSeen.getBlockNumber(); + // overkill but make sure. + if (lastBlockNumber < newLastBlockNumber) { + lastBlockNumber = newLastBlockNumber; + this.lastBlockEvent = lastBlockSeen; + } + } + /** * Eventhub disconnection notification interface */ @@ -412,16 +443,9 @@ public interface EventHubDisconnected { protected transient EventHubDisconnected disconnectedHandler = new EventHub.EventHubDisconnected() { @Override - public synchronized void disconnected(final EventHub eventHub) throws EventHubException { - logger.info(format("Detected disconnect %s", eventHub.toString())); - - if (eventHub.connectedTime == 0) { //means event hub never connected - logger.error(format("%s failed on first connect no retries", eventHub.toString())); - - eventHub.setEventHubDisconnectedHandler(null); //don't try again - - //event hub never connected. - throw new EventHubException(format("%s never connected.", eventHub.toString())); + public synchronized void disconnected(final EventHub eventHub) { + if (reconnectCount == 1) { + logger.warn(format("Channel %s detected disconnect on event hub %s (%s)", channel.getName(), eventHub.toString(), url)); } executorService.execute(() -> { @@ -429,15 +453,16 @@ public synchronized void disconnected(final EventHub eventHub) throws EventHubEx try { Thread.sleep(500); - if (eventHub.connect()) { - logger.info(format("Successful reconnect %s", eventHub.toString())); - } else { - logger.info(format("Failed reconnect %s", eventHub.toString())); + if (transactionContext == null) { + logger.warn("Eventhub reconnect failed with no user context"); + return; } + eventHub.connect(transactionContext, true); + } catch (Exception e) { - logger.debug(format("Failed %s to reconnect. %s", toString(), e.getMessage())); + logger.warn(format("Failed %s to reconnect. %s", toString(), e.getMessage())); } diff --git a/src/main/java/org/hyperledger/fabric/sdk/Orderer.java b/src/main/java/org/hyperledger/fabric/sdk/Orderer.java index 396058cc..2fa1c92f 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/Orderer.java +++ b/src/main/java/org/hyperledger/fabric/sdk/Orderer.java @@ -205,4 +205,9 @@ protected void finalize() throws Throwable { shutdown(true); super.finalize(); } + + @Override + public String toString() { + return "Orderer: " + name + "(" + url + ")"; + } } // end Orderer diff --git a/src/main/java/org/hyperledger/fabric/sdk/OrdererClient.java b/src/main/java/org/hyperledger/fabric/sdk/OrdererClient.java index 4848711e..84c3f9ed 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/OrdererClient.java +++ b/src/main/java/org/hyperledger/fabric/sdk/OrdererClient.java @@ -20,6 +20,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; @@ -326,6 +327,6 @@ public void onCompleted() { boolean isChannelActive() { ManagedChannel lchannel = managedChannel; - return lchannel != null && !lchannel.isShutdown() && !lchannel.isTerminated(); + return lchannel != null && !lchannel.isShutdown() && !lchannel.isTerminated() && ConnectivityState.READY.equals(lchannel.getState(true)); } } diff --git a/src/main/java/org/hyperledger/fabric/sdk/Peer.java b/src/main/java/org/hyperledger/fabric/sdk/Peer.java index 43624ddb..b82a6e64 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/Peer.java +++ b/src/main/java/org/hyperledger/fabric/sdk/Peer.java @@ -3,7 +3,7 @@ * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * You may obtain a copy of the License at` * http://www.apache.org/licenses/LICENSE-2.0 * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -14,6 +14,8 @@ package org.hyperledger.fabric.sdk; +import java.io.IOException; +import java.io.ObjectInputStream; import java.io.Serializable; import java.util.EnumSet; import java.util.Objects; @@ -30,6 +32,7 @@ import org.hyperledger.fabric.sdk.exception.InvalidArgumentException; import org.hyperledger.fabric.sdk.exception.PeerException; import org.hyperledger.fabric.sdk.exception.TransactionException; +import org.hyperledger.fabric.sdk.helper.Config; import org.hyperledger.fabric.sdk.transaction.TransactionContext; import static java.lang.String.format; @@ -42,6 +45,8 @@ public class Peer implements Serializable { private static final Log logger = LogFactory.getLog(Peer.class); private static final long serialVersionUID = -5273194649991828876L; + private static final Config config = Config.getConfig(); + private static final long PEER_EVENT_RETRY_WAIT_TIME = config.getPeerRetryWaitTime(); private final Properties properties; private final String name; private final String url; @@ -49,8 +54,11 @@ public class Peer implements Serializable { private transient PeerEventServiceClient peerEventingClient; private transient boolean shutdown = false; private Channel channel; - private String channelName; private transient TransactionContext transactionContext; + private transient long lastConnectTime; + private transient long reconnectCount; + private transient BlockEvent lastBlockEvent; + private transient long lastBlockNumber; Peer(String name, String grpcURL, Properties properties) throws InvalidArgumentException { @@ -67,6 +75,7 @@ public class Peer implements Serializable { this.url = grpcURL; this.name = name; this.properties = properties == null ? null : (Properties) properties.clone(); //keep our own copy. + reconnectCount = 0L; } @@ -93,9 +102,14 @@ public Properties getProperties() { void unsetChannel() { channel = null; - channelName = null; } + BlockEvent getLastBlockEvent() { + return lastBlockEvent; + } + + + ExecutorService getExecutorService() { return channel.getExecutorService(); } @@ -143,7 +157,6 @@ void setChannel(Channel channel) throws InvalidArgumentException { } this.channel = channel; - channelName = channel.getName(); } @@ -204,28 +217,8 @@ ListenableFuture sendProposalAsync(Fabr } } - FabricProposalResponse.ProposalResponse sendProposal(FabricProposal.SignedProposal proposal) - throws PeerException, InvalidArgumentException { - checkSendProposal(proposal); - - logger.debug(format("peer.sendProposalAsync name: %s, url: %s", name, url)); - - EndorserClient localEndorserClient = endorserClent; //work off thread local copy. - - if (null == localEndorserClient || !localEndorserClient.isChannelActive()) { - endorserClent = new EndorserClient(new Endpoint(url, properties).getChannelBuilder()); - localEndorserClient = endorserClent; - } - - try { - return localEndorserClient.sendProposal(proposal); - } catch (Throwable t) { - endorserClent = null; - throw t; - } - } - - private void checkSendProposal(FabricProposal.SignedProposal proposal) throws PeerException, InvalidArgumentException { + private void checkSendProposal(FabricProposal.SignedProposal proposal) throws + PeerException, InvalidArgumentException { if (shutdown) { throw new PeerException(format("Peer %s was shutdown.", name)); @@ -246,6 +239,8 @@ synchronized void shutdown(boolean force) { } shutdown = true; channel = null; + lastBlockEvent = null; + lastBlockNumber = 0; EndorserClient lendorserClent = endorserClent; @@ -274,54 +269,194 @@ protected void finalize() throws Throwable { super.finalize(); } - void reconnectPeerEventServiceClient(final PeerEventServiceClient failedPeerEventServiceClient, final Throwable t) { + void reconnectPeerEventServiceClient(final PeerEventServiceClient failedPeerEventServiceClient, + final Throwable throwable) { if (shutdown) { logger.debug("Not reconnecting PeerEventServiceClient shutdown "); return; + } + PeerEventingServiceDisconnected ldisconnectedHandler = disconnectedHandler; + if (null == ldisconnectedHandler) { + + return; // just wont reconnect. + } TransactionContext ltransactionContext = transactionContext; if (ltransactionContext == null) { - logger.debug("Not reconnecting PeerEventServiceClient no transaction available "); + logger.warn("Not reconnecting PeerEventServiceClient no transaction available "); + return; } + final TransactionContext fltransactionContext = ltransactionContext.retryTransactionSameContext(); final ExecutorService executorService = getExecutorService(); + final PeerOptions peerOptions = null != failedPeerEventServiceClient.getPeerOptions() ? failedPeerEventServiceClient.getPeerOptions() : + PeerOptions.createPeerOptions(); if (executorService != null && !executorService.isShutdown() && !executorService.isTerminated()) { - executorService.execute(() -> { - try { - Thread.sleep(6000); //wait for retry. - } catch (InterruptedException e) { - e.printStackTrace(); + executorService.execute(() -> ldisconnectedHandler.disconnected(new PeerEventingServiceDisconnectEvent() { + @Override + public BlockEvent getLatestBLockReceived() { + return lastBlockEvent; } - if (shutdown) { - logger.debug("Not reconnecting PeerEventServiceClient shutdown "); - return; + @Override + public long getLastConnectTime() { + return lastConnectTime; } - logger.debug(t); + @Override + public long getReconnectCount() { + return reconnectCount; + } - PeerEventServiceClient lpeerEventingClient = new PeerEventServiceClient(this, - new Endpoint(url, properties), properties, failedPeerEventServiceClient.getPeerOptions()); + @Override + public Throwable getExceptionThrown() { + return throwable; + } - try { + @Override + public void reconnect(Long startBLockNumber) throws TransactionException { + logger.trace("reconnecting startBLockNumber" + startBLockNumber); + ++reconnectCount; + + if (startBLockNumber == null) { + peerOptions.startEventsNewest(); + } else { + peerOptions.startEvents(startBLockNumber); + } + + + + PeerEventServiceClient lpeerEventingClient = new PeerEventServiceClient(Peer.this, + new Endpoint(url, properties), properties, peerOptions); lpeerEventingClient.connect(fltransactionContext); - if (lpeerEventingClient.isChannelActive()) { - logger.info(format("Channel %s PeerEventing Service %s reconnected to url %s ", channelName, name, url)); - peerEventingClient = lpeerEventingClient; + peerEventingClient = lpeerEventingClient; + + } + })); + + } + + } + + void setLastConnectTime(long lastConnectTime) { + this.lastConnectTime = lastConnectTime; + } + + void resetReconnectCount() { + reconnectCount = 0L; + } + + long getReconnectCount() { + return reconnectCount; + } + + public interface PeerEventingServiceDisconnected { + + /** + * Called when a disconnect is detected in peer eventing service. + * + * @param event + */ + void disconnected(PeerEventingServiceDisconnectEvent event); + + } + + public interface PeerEventingServiceDisconnectEvent { + + /** + * The latest BlockEvent received by peer eventing service. + * + * @return The latest BlockEvent. + */ + + BlockEvent getLatestBLockReceived(); + + /** + * Last connect time + * + * @return Last connect time as reported by System.currentTimeMillis() + */ + long getLastConnectTime(); + + /** + * Number reconnection attempts since last disconnection. + * + * @return reconnect attempts. + */ + + long getReconnectCount(); + + /** + * Last exception throw for failing connection + * + * @return + */ + + Throwable getExceptionThrown(); + + void reconnect(Long startEvent) throws TransactionException; + + } + + private transient PeerEventingServiceDisconnected disconnectedHandler = getDefaultDisconnectHandler(); + + private static PeerEventingServiceDisconnected getDefaultDisconnectHandler() { + return new PeerEventingServiceDisconnected() { //default. + @Override + public synchronized void disconnected(final PeerEventingServiceDisconnectEvent event) { + + BlockEvent lastBlockEvent = event.getLatestBLockReceived(); + + Long startBlockNumber = null; + + if (null != lastBlockEvent) { + + startBlockNumber = lastBlockEvent.getBlockNumber(); + } + + if (0 != event.getReconnectCount()) { + try { + Thread.sleep(PEER_EVENT_RETRY_WAIT_TIME); + } catch (InterruptedException e) { } + } + try { + event.reconnect(startBlockNumber); } catch (TransactionException e) { - logger.debug(e); + e.printStackTrace(); } - }); - } + } + }; + } + + /** + * Set class to handle Event hub disconnects + * + * @param newPeerEventingServiceDisconnectedHandler New handler to replace. If set to null no retry will take place. + * @return the old handler. + */ + + public PeerEventingServiceDisconnected setPeerEventingServiceDisconnected(PeerEventingServiceDisconnected newPeerEventingServiceDisconnectedHandler) { + PeerEventingServiceDisconnected ret = disconnectedHandler; + disconnectedHandler = newPeerEventingServiceDisconnectedHandler; + return ret; + } + + synchronized void setLastBlockSeen(BlockEvent lastBlockSeen) { + long newLastBlockNumber = lastBlockSeen.getBlockNumber(); + // overkill but make sure. + if (lastBlockNumber < newLastBlockNumber) { + lastBlockNumber = newLastBlockNumber; + this.lastBlockEvent = lastBlockSeen; + } } /** @@ -369,4 +504,11 @@ public String toString() { return "Peer " + name + " url: " + url; } + + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + + in.defaultReadObject(); + disconnectedHandler = getDefaultDisconnectHandler(); + + } } // end Peer diff --git a/src/main/java/org/hyperledger/fabric/sdk/PeerEventServiceClient.java b/src/main/java/org/hyperledger/fabric/sdk/PeerEventServiceClient.java index 260b20a4..5d4c1c4b 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/PeerEventServiceClient.java +++ b/src/main/java/org/hyperledger/fabric/sdk/PeerEventServiceClient.java @@ -21,7 +21,6 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; @@ -67,7 +66,7 @@ class PeerEventServiceClient { StreamObserver so = null; private Channel.ChannelEventQue channelEventQue; private boolean shutdown = false; - private ManagedChannel managedChannel = null; + private transient ManagedChannel managedChannel = null; private transient TransactionContext transactionContext; private transient Peer peer; @@ -159,7 +158,13 @@ public void finalize() { shutdown(true); } - DeliverResponse[] connectEnvelope(Envelope envelope) throws TransactionException { + /** + * Get the last block received by this peer. + * + * @return The last block received by this peer. May return null if no block has been received since first reactivated. + */ + + void connectEnvelope(Envelope envelope) throws TransactionException { if (shutdown) { throw new TransactionException("Peer eventing client is shutdown"); @@ -179,13 +184,11 @@ DeliverResponse[] connectEnvelope(Envelope envelope) throws TransactionException DeliverGrpc.DeliverStub broadcast = DeliverGrpc.newStub(lmanagedChannel); // final DeliverResponse[] ret = new DeliverResponse[1]; - final List retList = new ArrayList<>(); + // final List retList = new ArrayList<>(); final List throwableList = new ArrayList<>(); final CountDownLatch finishLatch = new CountDownLatch(1); so = new StreamObserver() { - boolean done = false; - AtomicBoolean inRecovery = new AtomicBoolean(false); @Override public void onNext(DeliverResponse resp) { @@ -194,24 +197,16 @@ public void onNext(DeliverResponse resp) { logger.trace(format("DeliverResponse channel %s peer %s resp status value:%d status %s, typecase %s ", channelName, peer.getName(), resp.getStatusValue(), resp.getStatus(), resp.getTypeCase())); - if (done) { - - // logger.info("Got Broadcast response: " + resp); - logger.trace(format("DeliverResponse channel %s peer %s ignored because done. resp status value:%d status %s, typecase %s ", - channelName, peer.getName(), resp.getStatusValue(), resp.getStatus(), resp.getTypeCase())); - - return; - } - final DeliverResponse.TypeCase typeCase = resp.getTypeCase(); if (typeCase == STATUS) { - done = true; + logger.debug(format("DeliverResponse channel %s peer %s setting done.", channelName, peer.getName())); - if (resp.getStatus() == Common.Status.SUCCESS) { - retList.add(0, resp); + if (resp.getStatus() == Common.Status.SUCCESS) { // unlike you may think this only happens when all blocks are fetched. + peer.setLastConnectTime(System.currentTimeMillis()); + peer.resetReconnectCount(); } else { throwableList.add(new TransactionException(format("Channel %s peer %s Status returned failure code %d (%s) during peer service event registration", @@ -219,11 +214,28 @@ public void onNext(DeliverResponse resp) { } } else if (typeCase == FILTERED_BLOCK || typeCase == BLOCK) { - logger.trace(format("Channel %s peer %s got event block hex hashcode: %016x, block number: %d", - channelName, peer.getName(), resp.getBlock().hashCode(), resp.getBlock().getHeader().getNumber())); - retList.add(resp); + if (typeCase == BLOCK) { + logger.trace(format("Channel %s peer %s got event block hex hashcode: %016x, block number: %d", + channelName, peer.getName(), resp.getBlock().hashCode(), resp.getBlock().getHeader().getNumber())); + } else { + logger.trace(format("Channel %s peer %s got event block hex hashcode: %016x, block number: %d", + channelName, peer.getName(), resp.getFilteredBlock().hashCode(), resp.getFilteredBlock().getNumber())); + } + + peer.setLastConnectTime(System.currentTimeMillis()); + long reconnectCount = peer.getReconnectCount(); + if (reconnectCount > 1) { + + logger.info(format("Peer eventing service reconnected after %d attempts on channel %s, peer %s, url %s", + reconnectCount, channelName, name, url)); + + } + peer.resetReconnectCount(); - channelEventQue.addBEvent(new BlockEvent(peer, resp)); + BlockEvent blockEvent = new BlockEvent(peer, resp); + peer.setLastBlockSeen(blockEvent); + + channelEventQue.addBEvent(blockEvent); } else { logger.error(format("Channel %s peer %s got event block with unknown type: %s, %d", channelName, peer.getName(), typeCase.name(), typeCase.getNumber())); @@ -238,31 +250,34 @@ public void onNext(DeliverResponse resp) { @Override public void onError(Throwable t) { - final boolean recoverymode = inRecovery.getAndSet(true); - if (recoverymode) { - return; // make sure we do this once. + ManagedChannel llmanagedChannel = managedChannel; + if (llmanagedChannel != null) { + llmanagedChannel.shutdownNow(); + managedChannel = null; } if (!shutdown) { - logger.error(format("Received error on channel %s, peer %s, url %s, %s", - channelName, name, url, t.getMessage()), t); - done = true; - throwableList.add(t); - finishLatch.countDown(); - Peer lpeer = peer; - - if (lpeer != null) { + final long reconnectCount = peer.getReconnectCount(); + if (reconnectCount % 50 == 1) { + logger.warn(format("Received error on peer eventing service on channel %s, peer %s, url %s, attempts %d. %s", + channelName, name, url, reconnectCount, t.getMessage())); - lpeer.reconnectPeerEventServiceClient(PeerEventServiceClient.this, t); + } else { + logger.trace(format("Received error on peer eventing service on channel %s, peer %s, url %s, attempts %d. %s", + channelName, name, url, reconnectCount, t.getMessage())); } + + peer.reconnectPeerEventServiceClient(PeerEventServiceClient.this, t); + } + finishLatch.countDown(); } @Override public void onCompleted() { logger.debug(format("DeliverResponse onCompleted channel %s peer %s setting done.", channelName, peer.getName())); - done = true; + // done = true; //There should have been a done before this... finishLatch.countDown(); } @@ -271,33 +286,36 @@ public void onCompleted() { nso = filterBlock ? broadcast.deliverFiltered(so) : broadcast.deliver(so); nso.onNext(envelope); - //nso.onCompleted(); - try { - if (!finishLatch.await(peerEventRegistrationWaitTimeMilliSecs, TimeUnit.MILLISECONDS)) { - TransactionException ex = new TransactionException(format( - "Channel %s connect time exceeded for peer eventing service %s, timed out at %d ms.", channelName, name, peerEventRegistrationWaitTimeMilliSecs)); - logger.error(ex.getMessage(), ex); - throw ex; - } - logger.trace("Done waiting for reply!"); + // try { + if (!finishLatch.await(peerEventRegistrationWaitTimeMilliSecs, TimeUnit.MILLISECONDS)) { + TransactionException ex = new TransactionException(format( + "Channel %s connect time exceeded for peer eventing service %s, timed out at %d ms.", channelName, name, peerEventRegistrationWaitTimeMilliSecs)); + throwableList.add(0, ex); - } catch (InterruptedException e) { - logger.error(e); } + logger.trace("Done waiting for reply!"); if (!throwableList.isEmpty()) { + ManagedChannel llmanagedChannel = managedChannel; + if (llmanagedChannel != null) { + llmanagedChannel.shutdownNow(); + managedChannel = null; + } Throwable throwable = throwableList.get(0); - TransactionException e = new TransactionException(format( - "Channel %s connect failed on peer eventing service %s. Reason: %s", channelName, name, throwable.getMessage()), throwable); - logger.error(e.getMessage(), e); - throw e; + peer.reconnectPeerEventServiceClient(this, throwable); + + } + + } catch (InterruptedException e) { + ManagedChannel llmanagedChannel = managedChannel; + if (llmanagedChannel != null) { + llmanagedChannel.shutdownNow(); + managedChannel = null; } + logger.error(e); // not likely - return retList.toArray(new DeliverResponse[retList.size()]); - } catch (Throwable t) { - managedChannel = null; - throw t; + peer.reconnectPeerEventServiceClient(this, e); } finally { if (null != nso) { diff --git a/src/main/java/org/hyperledger/fabric/sdk/helper/Config.java b/src/main/java/org/hyperledger/fabric/sdk/helper/Config.java index 1fbe7df1..47d83d1e 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/helper/Config.java +++ b/src/main/java/org/hyperledger/fabric/sdk/helper/Config.java @@ -52,6 +52,7 @@ public class Config { public static final String ORDERER_RETRY_WAIT_TIME = "org.hyperledger.fabric.sdk.orderer_retry.wait_time"; public static final String ORDERER_WAIT_TIME = "org.hyperledger.fabric.sdk.orderer.ordererWaitTimeMilliSecs"; public static final String PEER_EVENT_REGISTRATION_WAIT_TIME = "org.hyperledger.fabric.sdk.peer.eventRegistration.wait_time"; + public static final String PEER_EVENT_RETRY_WAIT_TIME = "org.hyperledger.fabric.sdk.peer.retry_wait_time"; public static final String EVENTHUB_CONNECTION_WAIT_TIME = "org.hyperledger.fabric.sdk.eventhub_connection.wait_time"; public static final String GENESISBLOCK_WAIT_TIME = "org.hyperledger.fabric.sdk.channel.genesisblock_wait_time"; /** @@ -107,6 +108,7 @@ private Config() { defaultProperty(ORDERER_RETRY_WAIT_TIME, "200"); defaultProperty(ORDERER_WAIT_TIME, "10000"); defaultProperty(PEER_EVENT_REGISTRATION_WAIT_TIME, "5000"); + defaultProperty(PEER_EVENT_RETRY_WAIT_TIME, "500"); defaultProperty(EVENTHUB_CONNECTION_WAIT_TIME, "1000"); defaultProperty(GENESISBLOCK_WAIT_TIME, "5000"); /** @@ -359,6 +361,15 @@ public long getPeerEventRegistrationWaitTime() { return Long.parseLong(getProperty(PEER_EVENT_REGISTRATION_WAIT_TIME)); } + /** + * getPeerEventRegistrationWaitTime + * + * @return time in milliseconds to wait for peer eventing service to wait for event registration + */ + public long getPeerRetryWaitTime() { + return Long.parseLong(getProperty(PEER_EVENT_RETRY_WAIT_TIME)); + } + public long getEventHubConnectionWaitTime() { return Long.parseLong(getProperty(EVENTHUB_CONNECTION_WAIT_TIME)); } diff --git a/src/test/cirun.sh b/src/test/cirun.sh index df1f12ce..006411f0 100755 --- a/src/test/cirun.sh +++ b/src/test/cirun.sh @@ -25,6 +25,11 @@ export ORG_HYPERLEDGER_FABRIC_SDK_PEER_EVENTREGISTRATION_WAIT_TIME=180000 export ORG_HYPERLEDGER_FABRIC_SDK_EVENTHUB_CONNECTION_WAIT_TIME=180000 export ORG_HYPERLEDGER_FABRIC_SDK_CHANNEL_GENESISBLOCK_WAIT_TIME=180000 +# TEST TIMES +export ORG_HYPERLEDGER_FABRIC_SDKTEST_INVOKEWAITTIME=300000 +export ORG_HYPERLEDGER_FABRIC_SDKTEST_DEPLOYWAITTIME=300000 +export ORG_HYPERLEDGER_FABRIC_SDKTEST_PROPOSALWAITTIME=300000 + ORG_HYPERLEDGER_FABRIC_SDKTEST_VERSION=${ORG_HYPERLEDGER_FABRIC_SDKTEST_VERSION:-} if [ "$ORG_HYPERLEDGER_FABRIC_SDKTEST_VERSION" == "1.0.0" ]; then @@ -39,7 +44,7 @@ export IMAGE_TAG_FABRIC_CA=:x86_64-1.0.0 # set which Fabric generated configuations is used. export FAB_CONFIG_GEN_VERS="v1.0" else -#everythign just defaults for latest (v1.1) +#everything just defaults for latest (v1.1) export ORG_HYPERLEDGER_FABRIC_SDKTEST_ITSUITE="" #unset to use what's in docker's .env file. unset IMAGE_TAG_FABRIC diff --git a/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java b/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java index 22f444ee..d77e9719 100644 --- a/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java +++ b/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.LinkedList; import java.util.concurrent.CompletableFuture; import com.google.common.util.concurrent.ListenableFuture; @@ -55,6 +56,7 @@ import static org.hyperledger.fabric.sdk.testutils.TestUtils.matchesRegex; import static org.hyperledger.fabric.sdk.testutils.TestUtils.setField; import static org.hyperledger.fabric.sdk.testutils.TestUtils.tarBytesToEntryArrayList; +import static org.junit.Assert.assertFalse; //CHECKSTYLE.ON: IllegalImport @@ -77,6 +79,7 @@ public static void setupClient() { hfclient = TestHFClient.newInstance(); shutdownChannel = new Channel("shutdown", hfclient); + shutdownChannel.addOrderer(hfclient.newOrderer("shutdow_orderer", "grpc://localhost:99")); setField(shutdownChannel, "shutdown", true); @@ -261,7 +264,7 @@ protected void loadCACertificates() { final Peer peer = hfclient.newPeer("peer_", "grpc://localhost:7051"); testChannel.addPeer(peer, createPeerOptions().setPeerRoles(Peer.PeerRole.NO_EVENT_SOURCE)); - Assert.assertFalse(testChannel.isInitialized()); + assertFalse(testChannel.isInitialized()); testChannel.initialize(); Assert.assertTrue(testChannel.isInitialized()); @@ -398,7 +401,7 @@ public void testChannelBadPeerNull() throws Exception { thrown.expectMessage("Peer value is null."); final Channel channel = createRunningChannel(null); - channel.queryBlockByHash(null, "rick".getBytes()); + channel.queryBlockByHash((Peer) null, "rick".getBytes()); } @Test @@ -501,6 +504,7 @@ public static Channel createRunningChannel(String channelName, Collection if (peers == null) { Peer peer = hfclient.newPeer("peer1", "grpc://localhost:22"); channel.addPeer(peer); + channel.addOrderer(hfclient.newOrderer("order1", "grpc://localhost:22")); } else { for (Peer peer : peers) { channel.addPeer(peer); @@ -540,6 +544,8 @@ public void testChannelPeerJoinNoOrderer() throws Exception { final Channel channel = createRunningChannel(null); + setField(channel, "orderers", new LinkedList<>()); + //Peer joining channel were no orderer is there .. not likely. channel.joinPeer(hfclient.newPeer("peerJoiningNOT", "grpc://localhost:22")); diff --git a/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java b/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java index 02658ca7..3d33613d 100644 --- a/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java +++ b/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java @@ -14,7 +14,6 @@ package org.hyperledger.fabric.sdk; -import org.hyperledger.fabric.protos.peer.FabricProposal; import org.hyperledger.fabric.sdk.exception.InvalidArgumentException; import org.hyperledger.fabric.sdk.exception.PeerException; import org.junit.Assert; @@ -67,19 +66,6 @@ public void testSetEmptyName() throws InvalidArgumentException { Assert.fail("expected set empty name to throw exception."); } - @Test (expected = PeerException.class) - public void testSendNullProposal() throws PeerException, InvalidArgumentException { - peer.sendProposal(null); - Assert.fail("Expected null proposal to throw exception."); - } - - @Test (expected = PeerException.class) - public void testSendNullChannel() throws InvalidArgumentException, PeerException { - Peer badpeer = hfclient.newPeer("badpeer", "grpc://localhost:7051"); - badpeer.sendProposal(FabricProposal.SignedProposal.newBuilder().build()); - Assert.fail("Expected peer with no channel throw exception"); - } - @Test (expected = PeerException.class) public void testSendAsyncNullProposal() throws PeerException, InvalidArgumentException { peer.sendProposalAsync(null);