Skip to content

Commit

Permalink
feat: Long-term scheduled transactions State API (#16144)
Browse files Browse the repository at this point in the history
Signed-off-by: Iris Simon <iris.simon@swirldslabs.com>
  • Loading branch information
iwsimon authored Oct 30, 2024
1 parent 593a623 commit 3daf1bf
Show file tree
Hide file tree
Showing 20 changed files with 505 additions and 76 deletions.
20 changes: 20 additions & 0 deletions hapi/hedera-protobufs/block/stream/output/state_changes.proto
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,16 @@ enum StateIdentifier {
*/
STATE_ID_ROSTERS = 28;

/**
* A state identifier for scheduled transaction expiration.
*/
STATE_ID_SCHEDULE_IDS_BY_EXPIRY = 29;

/**
* A state identifier for scheduled transaction deduplication.
*/
STATE_ID_SCHEDULE_ID_BY_EQUALITY = 30;

/**
* A state identifier for the round receipts queue.
*/
Expand Down Expand Up @@ -795,6 +805,16 @@ message MapChangeValue {
* A roster value.
*/
com.hedera.hapi.node.state.roster.Roster roster_value = 16;

/**
* A list of scheduled ids.
*/
proto.ScheduleIdList schedule_id_list_value = 17;

/**
* A scheduled id value.
*/
proto.ScheduleID schedule_id_value = 18;
}
}

Expand Down
2 changes: 1 addition & 1 deletion hapi/hedera-protobufs/services/node_update.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import "basic_types.proto";
* This transaction body SHALL be considered a "privileged transaction".
*
* - This transaction MUST be signed by the governing council and
* - the active `admin_key` for the node.
* the active `admin_key` for the node.
* - If this transaction sets a new value for the `admin_key`, then both the
* current `admin_key`, and the new `admin_key` MUST sign this transaction.
* - This transaction SHALL NOT change any field that is not set (is null) in
Expand Down
14 changes: 14 additions & 0 deletions hapi/hedera-protobufs/services/state/schedule/schedule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,17 @@ message ScheduleList {
*/
repeated Schedule schedules = 1;
}

/**
* A message for storing a list of schedule identifiers in state.<br/>
* This is used to store lists of `ScheduleID` values.
* One example is all schedules that expire at a particular time.
*/
message ScheduleIdList {
/**
* A list of schedule identifiers, in no particular order.
* <p>
* While the order is not _specified_, it MUST be deterministic.
*/
repeated ScheduleID schedule_ids = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_SCHEDULES_BY_EQUALITY;
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_SCHEDULES_BY_EXPIRY;
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_SCHEDULES_BY_ID;
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_SCHEDULE_IDS_BY_EXPIRY;
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_SCHEDULE_ID_BY_EQUALITY;
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_STAKING_INFO;
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_THROTTLE_USAGE;
import static com.hedera.hapi.block.stream.output.StateIdentifier.STATE_ID_TOKENS;
Expand Down Expand Up @@ -169,6 +171,8 @@ public static int stateIdFor(@NonNull final String serviceName, @NonNull final S
case "SCHEDULES_BY_EQUALITY" -> STATE_ID_SCHEDULES_BY_EQUALITY.protoOrdinal();
case "SCHEDULES_BY_EXPIRY_SEC" -> STATE_ID_SCHEDULES_BY_EXPIRY.protoOrdinal();
case "SCHEDULES_BY_ID" -> STATE_ID_SCHEDULES_BY_ID.protoOrdinal();
case "SCHEDULE_IDS_BY_EXPIRY_SEC" -> STATE_ID_SCHEDULE_IDS_BY_EXPIRY.protoOrdinal();
case "SCHEDULE_ID_BY_EQUALITY" -> STATE_ID_SCHEDULE_ID_BY_EQUALITY.protoOrdinal();
default -> UNKNOWN_STATE_ID;
};
case "TokenService" -> switch (stateKey) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.hedera.hapi.node.state.primitives.ProtoString;
import com.hedera.hapi.node.state.roster.Roster;
import com.hedera.hapi.node.state.schedule.Schedule;
import com.hedera.hapi.node.state.schedule.ScheduleIdList;
import com.hedera.hapi.node.state.schedule.ScheduleList;
import com.hedera.hapi.node.state.token.Account;
import com.hedera.hapi.node.state.token.AccountPendingAirdrop;
Expand Down Expand Up @@ -188,9 +189,15 @@ private static <V> MapChangeValue mapChangeValueFor(@NonNull final V value) {
case Schedule schedule -> MapChangeValue.newBuilder()
.scheduleValue(schedule)
.build();
case ScheduleID scheduleID -> MapChangeValue.newBuilder()
.scheduleIdValue(scheduleID)
.build();
case ScheduleList scheduleList -> MapChangeValue.newBuilder()
.scheduleListValue(scheduleList)
.build();
case ScheduleIdList scheduleIdList -> MapChangeValue.newBuilder()
.scheduleIdListValue(scheduleIdList)
.build();
case SlotValue slotValue -> MapChangeValue.newBuilder()
.slotValueValue(slotValue)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ private static String nameOf(@NonNull final StateIdentifier stateId) {
case STATE_ID_SCHEDULES_BY_EQUALITY -> "ScheduleService.SCHEDULES_BY_EQUALITY";
case STATE_ID_SCHEDULES_BY_EXPIRY -> "ScheduleService.SCHEDULES_BY_EXPIRY_SEC";
case STATE_ID_SCHEDULES_BY_ID -> "ScheduleService.SCHEDULES_BY_ID";
case STATE_ID_SCHEDULE_ID_BY_EQUALITY -> "ScheduleService.SCHEDULE_ID_BY_EQUALITY";
case STATE_ID_SCHEDULE_IDS_BY_EXPIRY -> "ScheduleService.SCHEDULE_IDS_BY_EXPIRY_SEC";
case STATE_ID_ACCOUNTS -> "TokenService.ACCOUNTS";
case STATE_ID_ALIASES -> "TokenService.ALIASES";
case STATE_ID_NFTS -> "TokenService.NFTS";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
import com.hedera.hapi.node.state.primitives.ProtoBytes;
import com.hedera.hapi.node.state.primitives.ProtoLong;
import com.hedera.hapi.node.state.schedule.Schedule;
import com.hedera.hapi.node.state.schedule.ScheduleList;
import com.hedera.hapi.node.state.schedule.ScheduleIdList;
import com.hedera.node.app.service.schedule.ReadableScheduleStore;
import com.hedera.node.app.service.schedule.impl.schemas.V0490ScheduleSchema;
import com.hedera.node.app.service.schedule.impl.schemas.V0570ScheduleSchema;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import com.swirlds.state.spi.ReadableKVState;
import com.swirlds.state.spi.ReadableStates;
Expand All @@ -41,8 +42,8 @@ public class ReadableScheduleStoreImpl implements ReadableScheduleStore {
"Null states instance passed to ReadableScheduleStore constructor, possible state corruption.";

private final ReadableKVState<ScheduleID, Schedule> schedulesById;
private final ReadableKVState<ProtoLong, ScheduleList> schedulesByExpirationSecond;
private final ReadableKVState<ProtoBytes, ScheduleList> schedulesByStringHash;
private final ReadableKVState<ProtoLong, ScheduleIdList> scheduleIdsByExpirationSecond;
private final ReadableKVState<ProtoBytes, ScheduleID> scheduleIdByStringHash;

/**
* Create a new {@link ReadableScheduleStore} instance.
Expand All @@ -52,8 +53,8 @@ public class ReadableScheduleStoreImpl implements ReadableScheduleStore {
public ReadableScheduleStoreImpl(@NonNull final ReadableStates states) {
Objects.requireNonNull(states, NULL_STATE_IN_CONSTRUCTOR_MESSAGE);
schedulesById = states.get(V0490ScheduleSchema.SCHEDULES_BY_ID_KEY);
schedulesByExpirationSecond = states.get(V0490ScheduleSchema.SCHEDULES_BY_EXPIRY_SEC_KEY);
schedulesByStringHash = states.get(V0490ScheduleSchema.SCHEDULES_BY_EQUALITY_KEY);
scheduleIdsByExpirationSecond = states.get(V0570ScheduleSchema.SCHEDULE_IDS_BY_EXPIRY_SEC_KEY);
scheduleIdByStringHash = states.get(V0570ScheduleSchema.SCHEDULE_ID_BY_EQUALITY_KEY);
}

/**
Expand All @@ -72,17 +73,16 @@ public Schedule get(@Nullable final ScheduleID id) {

@Override
@Nullable
public List<Schedule> getByEquality(final @NonNull Schedule scheduleToMatch) {
public ScheduleID getByEquality(final @NonNull Schedule scheduleToMatch) {
Bytes bytesHash = ScheduleStoreUtility.calculateBytesHash(scheduleToMatch);
final ScheduleList inStateValue = schedulesByStringHash.get(new ProtoBytes(bytesHash));
return inStateValue != null ? inStateValue.schedules() : null;
return scheduleIdByStringHash.get(new ProtoBytes(bytesHash));
}

@Nullable
@Override
public List<Schedule> getByExpirationSecond(final long expirationTime) {
final ScheduleList inStateValue = schedulesByExpirationSecond.get(new ProtoLong(expirationTime));
return inStateValue != null ? inStateValue.schedules() : null;
public List<ScheduleID> getByExpirationSecond(final long expirationTime) {
final ScheduleIdList inStateValue = scheduleIdsByExpirationSecond.get(new ProtoLong(expirationTime));
return inStateValue != null ? inStateValue.scheduleIds() : null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.hedera.node.app.service.schedule.ScheduleService;
import com.hedera.node.app.service.schedule.impl.schemas.V0490ScheduleSchema;
import com.hedera.node.app.service.schedule.impl.schemas.V0570ScheduleSchema;
import com.hedera.node.app.spi.RpcService;
import com.swirlds.state.spi.SchemaRegistry;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand All @@ -29,5 +30,6 @@ public final class ScheduleServiceImpl implements ScheduleService {
@Override
public void registerSchemas(@NonNull final SchemaRegistry registry) {
registry.register(new V0490ScheduleSchema());
registry.register(new V0570ScheduleSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.hedera.hapi.node.base.ScheduleID;
import com.hedera.hapi.node.scheduled.SchedulableTransactionBody;
import com.hedera.hapi.node.state.schedule.Schedule;
import com.hedera.hapi.node.state.schedule.ScheduleIdList;
import com.hedera.hapi.node.state.schedule.ScheduleList;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
Expand Down Expand Up @@ -86,6 +87,10 @@ private static boolean isScheduleInList(final ScheduleID scheduleId, final Sched
.anyMatch(s -> s.scheduleIdOrThrow().equals(scheduleId));
}

private static boolean isScheduleIdInList(final ScheduleID scheduleId, final ScheduleIdList scheduleIdList) {
return scheduleIdList.scheduleIds().stream().anyMatch(id -> id.equals(scheduleId));
}

/**
* Adds a {@link Schedule} to a {@link ScheduleList}, replacing it if it already exists.
*
Expand Down Expand Up @@ -120,4 +125,30 @@ private static boolean isScheduleInList(final ScheduleID scheduleId, final Sched
}
return newScheduleList.schedules(schedules).build();
}

/**
* Adds a {@link ScheduleID} to a {@link ScheduleIdList}.
*
* <p>This method checks if the provided {@code ScheduleID} is already present in the {@code ScheduleIdList}.
* If it isn't, the {@code ScheduleID} is added to the list. This allows for updating entries within a {@code ScheduleIdList} without needing to
* manually manage duplicates.
*
* @param scheduleId The {@link ScheduleID} to add in the {@code ScheduleList}. Must not be {@code null},
* unless the {@code ScheduleList} is also {@code null}.
* @param scheduleIdList The {@link ScheduleIdList} to which the {@code Schedule} will be added. May be
* {@code null}, in which case a new {@link ScheduleIdList} containing only the provided
* {@code Schedule} is returned.
* @return A new {@link ScheduleIdList} containing the {@link ScheduleID} either added if it's not in the list
*/
static @NonNull ScheduleIdList add(final ScheduleID scheduleId, @Nullable final ScheduleIdList scheduleIdList) {
if (scheduleIdList == null) {
return new ScheduleIdList(Collections.singletonList(scheduleId));
}
final var newScheduleIdList = scheduleIdList.copyBuilder();
final var scheduleIds = new ArrayList<>(scheduleIdList.scheduleIds());
if (!isScheduleIdInList(scheduleId, scheduleIdList)) {
scheduleIds.add(scheduleId);
}
return newScheduleIdList.scheduleIds(scheduleIds).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@

package com.hedera.node.app.service.schedule.impl;

import static com.hedera.node.app.service.schedule.impl.ScheduleStoreUtility.addOrReplace;
import static com.hedera.node.app.service.schedule.impl.ScheduleStoreUtility.add;

import com.hedera.hapi.node.base.ScheduleID;
import com.hedera.hapi.node.base.Timestamp;
import com.hedera.hapi.node.state.primitives.ProtoBytes;
import com.hedera.hapi.node.state.primitives.ProtoLong;
import com.hedera.hapi.node.state.schedule.Schedule;
import com.hedera.hapi.node.state.schedule.ScheduleList;
import com.hedera.hapi.node.state.schedule.ScheduleIdList;
import com.hedera.node.app.service.schedule.WritableScheduleStore;
import com.hedera.node.app.service.schedule.impl.schemas.V0490ScheduleSchema;
import com.hedera.node.app.service.schedule.impl.schemas.V0570ScheduleSchema;
import com.hedera.node.app.spi.metrics.StoreMetricsService;
import com.hedera.node.app.spi.metrics.StoreMetricsService.StoreType;
import com.hedera.node.config.data.SchedulingConfig;
Expand All @@ -51,8 +52,8 @@ public class WritableScheduleStoreImpl extends ReadableScheduleStoreImpl impleme
private static final String SCHEDULE_MISSING_FOR_DELETE_MESSAGE =
"Schedule to be deleted, %1$s, not found in state.";
private final WritableKVState<ScheduleID, Schedule> schedulesByIdMutable;
private final WritableKVState<ProtoBytes, ScheduleList> schedulesByEqualityMutable;
private final WritableKVState<ProtoLong, ScheduleList> schedulesByExpirationMutable;
private final WritableKVState<ProtoBytes, ScheduleID> scheduleIdByEqualityMutable;
private final WritableKVState<ProtoLong, ScheduleIdList> scheduleIdsByExpirationMutable;

/**
* Create a new {@link WritableScheduleStoreImpl} instance.
Expand All @@ -67,8 +68,8 @@ public WritableScheduleStoreImpl(
@NonNull final StoreMetricsService storeMetricsService) {
super(states);
schedulesByIdMutable = states.get(V0490ScheduleSchema.SCHEDULES_BY_ID_KEY);
schedulesByEqualityMutable = states.get(V0490ScheduleSchema.SCHEDULES_BY_EQUALITY_KEY);
schedulesByExpirationMutable = states.get(V0490ScheduleSchema.SCHEDULES_BY_EXPIRY_SEC_KEY);
scheduleIdByEqualityMutable = states.get(V0570ScheduleSchema.SCHEDULE_ID_BY_EQUALITY_KEY);
scheduleIdsByExpirationMutable = states.get(V0570ScheduleSchema.SCHEDULE_IDS_BY_EXPIRY_SEC_KEY);

final long maxCapacity =
configuration.getConfigData(SchedulingConfig.class).maxNumber();
Expand Down Expand Up @@ -121,16 +122,14 @@ public void put(@NonNull final Schedule scheduleToAdd) {
schedulesByIdMutable.put(scheduleToAdd.scheduleIdOrThrow(), scheduleToAdd);

final ProtoBytes newHash = new ProtoBytes(ScheduleStoreUtility.calculateBytesHash(scheduleToAdd));
final ScheduleList inStateEquality = schedulesByEqualityMutable.get(newHash);
final var newEqualityScheduleList = addOrReplace(scheduleToAdd, inStateEquality);
schedulesByEqualityMutable.put(newHash, newEqualityScheduleList);
scheduleIdByEqualityMutable.put(newHash, scheduleToAdd.scheduleIdOrThrow());

// calculated expiration time is never null...
final ProtoLong expirationSecond = new ProtoLong(scheduleToAdd.calculatedExpirationSecond());
final ScheduleList inStateExpiration = schedulesByExpirationMutable.get(expirationSecond);
// we should not be modifying the schedules list directly. This could cause ISS
final var newExpiryScheduleList = addOrReplace(scheduleToAdd, inStateExpiration);
schedulesByExpirationMutable.put(expirationSecond, newExpiryScheduleList);
final ScheduleIdList inStateExpiration = scheduleIdsByExpirationMutable.get(expirationSecond);
// we should not be modifying the scheduleIds list directly. This could cause ISS
final var newExpiryScheduleIdList = add(scheduleToAdd.scheduleId(), inStateExpiration);
scheduleIdsByExpirationMutable.put(expirationSecond, newExpiryScheduleIdList);
}

@NonNull
Expand Down Expand Up @@ -161,16 +160,20 @@ private Schedule markDeleted(final Schedule schedule, final Instant consensusTim
public void purgeExpiredSchedulesBetween(long firstSecondToExpire, long lastSecondToExpire) {
for (long i = firstSecondToExpire; i <= lastSecondToExpire; i++) {
final var second = new ProtoLong(i);
final var scheduleList = schedulesByExpirationMutable.get(second);
if (scheduleList != null) {
for (final var schedule : scheduleList.schedules()) {
schedulesByIdMutable.remove(schedule.scheduleIdOrThrow());

final ProtoBytes hash = new ProtoBytes(ScheduleStoreUtility.calculateBytesHash(schedule));
schedulesByEqualityMutable.remove(hash);
logger.info("Purging expired schedule {} from state.", schedule.scheduleIdOrThrow());
final var scheduleIdList = scheduleIdsByExpirationMutable.get(second);
if (scheduleIdList != null) {
for (final var scheduleId : scheduleIdList.scheduleIds()) {
final var schedule = schedulesByIdMutable.get(scheduleId);
if (schedule != null) {
final ProtoBytes hash = new ProtoBytes(ScheduleStoreUtility.calculateBytesHash(schedule));
scheduleIdByEqualityMutable.remove(hash);
} else {
logger.error("Schedule {} not found in state schedulesByIdMutable.", scheduleId);
}
schedulesByIdMutable.remove(scheduleId);
logger.debug("Purging expired schedule {} from state.", scheduleId);
}
schedulesByExpirationMutable.remove(second);
scheduleIdsByExpirationMutable.remove(second);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import edu.umd.cs.findbugs.annotations.Nullable;
import java.time.InstantSource;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import javax.inject.Inject;
import javax.inject.Singleton;
Expand Down Expand Up @@ -160,8 +159,9 @@ public void handle(@NonNull final HandleContext context) throws HandleException
// we can compare those bytes to any new ScheduleCreate transaction for detecting duplicate
// ScheduleCreate transactions. SchedulesByEquality is the virtual map for that task.
final var scheduleStore = context.storeFactory().writableStore(WritableScheduleStore.class);
final var possibleDuplicates = scheduleStore.getByEquality(provisionalSchedule);
final var duplicate = maybeDuplicate(provisionalSchedule, possibleDuplicates);
final var possibleDuplicateId = scheduleStore.getByEquality(provisionalSchedule);
final var possibleDuplicate = possibleDuplicateId == null ? null : scheduleStore.get(possibleDuplicateId);
final var duplicate = maybeDuplicate(provisionalSchedule, possibleDuplicate);
if (duplicate != null) {
final var scheduledTxnId = duplicate
.originalCreateTransactionOrThrow()
Expand Down Expand Up @@ -250,15 +250,12 @@ public Fees calculateFees(@NonNull final FeeContext feeContext) {
return scheduleOpsUsage.scheduleCreateUsage(txn, sigUsage, lifetimeSecs);
}

private @Nullable Schedule maybeDuplicate(
@NonNull final Schedule schedule, @Nullable final List<Schedule> duplicates) {
if (duplicates == null) {
private @Nullable Schedule maybeDuplicate(@NonNull final Schedule schedule, @Nullable final Schedule duplicate) {
if (duplicate == null) {
return null;
}
for (final var duplicate : duplicates) {
if (areIdentical(duplicate, schedule)) {
return duplicate;
}
if (areIdentical(duplicate, schedule)) {
return duplicate;
}
return null;
}
Expand Down
Loading

0 comments on commit 3daf1bf

Please sign in to comment.