diff --git a/bddtests/steps/orderer_impl.py b/bddtests/steps/orderer_impl.py index 9630d1d36d2..29242a05a2d 100644 --- a/bddtests/steps/orderer_impl.py +++ b/bddtests/steps/orderer_impl.py @@ -30,46 +30,46 @@ @given(u'user "{enrollId}" is an authorized user of the ordering service') def step_impl(context, enrollId): - secretMsg = { - "enrollId": enrollId, - "enrollSecret" : enrollId - } - orderer_util.registerUser(context, secretMsg, "N/A") + secretMsg = { + "enrollId": enrollId, + "enrollSecret" : enrollId + } + orderer_util.registerUser(context, secretMsg, "N/A") @when(u'user "{enrollId}" broadcasts "{numMsgsToBroadcast}" unique messages on "{composeService}"') def step_impl(context, enrollId, numMsgsToBroadcast, composeService): - userRegistration = orderer_util.getUserRegistration(context, enrollId) - userRegistration.broadcastMessages(context, numMsgsToBroadcast, composeService) + userRegistration = orderer_util.getUserRegistration(context, enrollId) + userRegistration.broadcastMessages(context, numMsgsToBroadcast, composeService) @when(u'user "{enrollId}" connects to deliver function on "{composeService}"') def step_impl(context, enrollId, composeService): - # First get the properties - assert 'table' in context, "table (Start | End) not found in context" - userRegistration = orderer_util.getUserRegistration(context, enrollId) - streamHelper = userRegistration.connectToDeliverFunction(context, composeService) + # First get the properties + assert 'table' in context, "table (Start | End) not found in context" + userRegistration = orderer_util.getUserRegistration(context, enrollId) + streamHelper = userRegistration.connectToDeliverFunction(context, composeService) @when(u'user "{enrollId}" waits "{waitTime}" seconds') def step_impl(context, enrollId, waitTime): - time.sleep(float(waitTime)) + time.sleep(float(waitTime)) @then(u'user "{enrollId}" should get a delivery from "{composeService}" of "{expectedBlocks}" blocks with "{numMsgsToBroadcast}" messages within "{batchTimeout}" seconds') def step_impl(context, enrollId, expectedBlocks, numMsgsToBroadcast, batchTimeout, composeService): - userRegistration = orderer_util.getUserRegistration(context, enrollId) - streamHelper = userRegistration.getDelivererStreamHelper(context, composeService) - blocks = streamHelper.getBlocks() - # Verify block count - assert len(blocks) == int(expectedBlocks), "Expected {0} blocks, received {1}".format(expectedBlocks, len(blocks)) + userRegistration = orderer_util.getUserRegistration(context, enrollId) + streamHelper = userRegistration.getDelivererStreamHelper(context, composeService) + blocks = streamHelper.getBlocks() + # Verify block count + assert len(blocks) == int(expectedBlocks), "Expected {0} blocks, received {1}".format(expectedBlocks, len(blocks)) @when(u'user "{enrollId}" sends deliver a seek request on "{composeService}" with properties') def step_impl(context, enrollId, composeService): - row = context.table.rows[0] - start, end, = orderer_util.convertSeek(row['Start']), orderer_util.convertSeek(row['End']) + row = context.table.rows[0] + start, end, = orderer_util.convertSeek(row['Start']), orderer_util.convertSeek(row['End']) - userRegistration = orderer_util.getUserRegistration(context, enrollId) - streamHelper = userRegistration.getDelivererStreamHelper(context, composeService) + userRegistration = orderer_util.getUserRegistration(context, enrollId) + streamHelper = userRegistration.getDelivererStreamHelper(context, composeService) streamHelper.seekToRange(start = start, end = end) diff --git a/bddtests/steps/orderer_util.py b/bddtests/steps/orderer_util.py index a2b99168ec3..b1e58694650 100644 --- a/bddtests/steps/orderer_util.py +++ b/bddtests/steps/orderer_util.py @@ -149,7 +149,6 @@ def __init__(self, userName): def getUserName(self): return self.userName - def connectToDeliverFunction(self, context, composeService, timeout=1): 'Connect to the deliver function and drain messages to associated orderer queue' assert not composeService in self.abDeliversStreamHelperDict, "Already connected to deliver stream on {0}".format(composeService) @@ -157,38 +156,36 @@ def connectToDeliverFunction(self, context, composeService, timeout=1): self.abDeliversStreamHelperDict[composeService] = streamHelper return streamHelper - def getDelivererStreamHelper(self, context, composeService): assert composeService in self.abDeliversStreamHelperDict, "NOT connected to deliver stream on {0}".format(composeService) return self.abDeliversStreamHelperDict[composeService] - - def broadcastMessages(self, context, numMsgsToBroadcast, composeService, chainID=TEST_CHAIN_ID, dataFunc=_defaultDataFunction, chainHeaderType=common_pb2.ENDORSER_TRANSACTION): - abStub = self.getABStubForComposeService(context, composeService) - replyGenerator = abStub.Broadcast(generateBroadcastMessages(chainID=chainID, numToGenerate = int(numMsgsToBroadcast), dataFunc=dataFunc, chainHeaderType=chainHeaderType), 2) - counter = 0 - try: - for reply in replyGenerator: - counter += 1 - print("{0} received reply: {1}, counter = {2}".format(self.getUserName(), reply, counter)) - if counter == int(numMsgsToBroadcast): - break - except Exception as e: - print("Got error: {0}".format(e) ) - print("Got error") - print("Done") - assert counter == int(numMsgsToBroadcast), "counter = {0}, expected {1}".format(counter, numMsgsToBroadcast) + abStub = self.getABStubForComposeService(context, composeService) + replyGenerator = abStub.Broadcast(generateBroadcastMessages(chainID=chainID, numToGenerate = int(numMsgsToBroadcast), dataFunc=dataFunc, chainHeaderType=chainHeaderType), 2) + counter = 0 + try: + for reply in replyGenerator: + counter += 1 + print("{0} received reply: {1}, counter = {2}".format(self.getUserName(), reply, counter)) + if counter == int(numMsgsToBroadcast): + break + except Exception as e: + print("Got error: {0}".format(e) ) + print("Got error") + print("Done") + assert counter == int(numMsgsToBroadcast), "counter = {0}, expected {1}".format(counter, numMsgsToBroadcast) def getABStubForComposeService(self, context, composeService): - 'Return a Stub for the supplied composeService, will cache' - if composeService in self.atomicBroadcastStubsDict: - return self.atomicBroadcastStubsDict[composeService] - # Get the IP address of the server that the user registered on - channel = getGRPCChannel(*bdd_test_util.getPortHostMapping(context.compose_containers, composeService, 7050)) - newABStub = ab_pb2.beta_create_AtomicBroadcast_stub(channel) - self.atomicBroadcastStubsDict[composeService] = newABStub - return newABStub + 'Return a Stub for the supplied composeService, will cache' + if composeService in self.atomicBroadcastStubsDict: + return self.atomicBroadcastStubsDict[composeService] + # Get the IP address of the server that the user registered on + channel = getGRPCChannel(*bdd_test_util.getPortHostMapping(context.compose_containers, composeService, 7050)) + newABStub = ab_pb2.beta_create_AtomicBroadcast_stub(channel) + self.atomicBroadcastStubsDict[composeService] = newABStub + return newABStub + # Registerses a user on a specific composeService def registerUser(context, secretMsg, composeService): @@ -245,7 +242,6 @@ def createSeekInfo(chainID = TEST_CHAIN_ID, start = 'Oldest', end = 'Newest', b ) - def generateBroadcastMessages(chainID = TEST_CHAIN_ID, numToGenerate = 3, timeToHoldOpen = 1, dataFunc =_defaultDataFunction, chainHeaderType=common_pb2.ENDORSER_TRANSACTION ): messages = [] for i in range(0, numToGenerate):