Skip to content

Commit

Permalink
fabric batch operations
Browse files Browse the repository at this point in the history
Signed-off-by: Fedor Partanskiy <[email protected]>
  • Loading branch information
pfi79 committed Oct 5, 2024
1 parent 02e04e2 commit b80c4ba
Show file tree
Hide file tree
Showing 61 changed files with 3,002 additions and 328 deletions.
232 changes: 169 additions & 63 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ func (h *Handler) handleMessageReadyState(msg *pb.ChaincodeMessage) error {
go h.HandleTransaction(msg, h.HandlePutStateMetadata)
case pb.ChaincodeMessage_PURGE_PRIVATE_DATA:
go h.HandleTransaction(msg, h.HandlePurgePrivateData)
case pb.ChaincodeMessage_CHANGE_STATE_BATCH:
go h.HandleTransaction(msg, h.HandleChangeStateBatch)
default:
return fmt.Errorf("[%s] Fabric side handler cannot handle message (%s) while in ready state", msg.Txid, msg.Type)
}
Expand Down Expand Up @@ -441,10 +443,22 @@ func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
// sendReady sends READY to chaincode serially (just like REGISTER)
func (h *Handler) sendReady() error {
chaincodeLogger.Debugf("sending READY for chaincode %s", h.chaincodeID)
ccMsg := &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_READY}

chaincodeAdditionalParams := &pb.ChaincodeAdditionalParams{
UsePutStateBatch: false,
MaxSizePutStateBatch: 1000,
}
payloadBytes, err := proto.Marshal(chaincodeAdditionalParams)
if err != nil {
return errors.WithStack(err)
}
ccMsg := &pb.ChaincodeMessage{
Type: pb.ChaincodeMessage_READY,
Payload: payloadBytes,
}

// if error in sending tear down the h
if err := h.serialSend(ccMsg); err != nil {
if err = h.serialSend(ccMsg); err != nil {
chaincodeLogger.Errorf("error sending READY (%s) for chaincode %s", err, h.chaincodeID)
return err
}
Expand Down Expand Up @@ -504,7 +518,7 @@ func (h *Handler) HandleRegister(msg *pb.ChaincodeMessage) {
}

chaincodeLogger.Debugf("Got %s for chaincodeID = %s, sending back %s", pb.ChaincodeMessage_REGISTER, h.chaincodeID, pb.ChaincodeMessage_REGISTERED)
if err := h.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}); err != nil {
if err = h.serialSend(&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_REGISTERED}); err != nil {
chaincodeLogger.Errorf("error sending %s: %s", pb.ChaincodeMessage_REGISTERED, err)
h.notifyRegistry(err)
return
Expand Down Expand Up @@ -555,10 +569,10 @@ func (h *Handler) registerTxid(msg *pb.ChaincodeMessage) bool {
return false
}

func (h *Handler) checkMetadataCap(msg *pb.ChaincodeMessage) error {
ac, exists := h.AppConfig.GetApplicationConfig(msg.ChannelId)
func (h *Handler) checkMetadataCap(channelId string) error {
ac, exists := h.AppConfig.GetApplicationConfig(channelId)
if !exists {
return errors.Errorf("application config does not exist for %s", msg.ChannelId)
return errors.Errorf("application config does not exist for %s", channelId)
}

if !ac.Capabilities().KeyLevelEndorsement() {
Expand All @@ -567,10 +581,10 @@ func (h *Handler) checkMetadataCap(msg *pb.ChaincodeMessage) error {
return nil
}

func (h *Handler) checkPurgePrivateDataCap(msg *pb.ChaincodeMessage) error {
ac, exists := h.AppConfig.GetApplicationConfig(msg.ChannelId)
func (h *Handler) checkPurgePrivateDataCap(channelId string) error {
ac, exists := h.AppConfig.GetApplicationConfig(channelId)
if !exists {
return errors.Errorf("application config does not exist for %s", msg.ChannelId)
return errors.Errorf("application config does not exist for %s", channelId)
}

if !ac.Capabilities().PurgePvtData() {
Expand Down Expand Up @@ -642,7 +656,7 @@ func (h *Handler) HandleGetState(msg *pb.ChaincodeMessage, txContext *Transactio
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
if err = errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
return nil, err
}
res, err = txContext.TXSimulator.GetPrivateData(namespaceID, collection, getState.Key)
Expand Down Expand Up @@ -687,7 +701,7 @@ func (h *Handler) HandleGetPrivateDataHash(msg *pb.ChaincodeMessage, txContext *

// Handles query to ledger to get state metadata
func (h *Handler) HandleGetStateMetadata(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
err := h.checkMetadataCap(msg)
err := h.checkMetadataCap(msg.ChannelId)
if err != nil {
return nil, err
}
Expand All @@ -707,7 +721,7 @@ func (h *Handler) HandleGetStateMetadata(msg *pb.ChaincodeMessage, txContext *Tr
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
if err = errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
return nil, err
}
metadata, err = txContext.TXSimulator.GetPrivateDataMetadata(namespaceID, collection, getStateMetadata.Key)
Expand Down Expand Up @@ -754,7 +768,7 @@ func (h *Handler) HandleGetStateByRange(msg *pb.ChaincodeMessage, txContext *Tra
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
if err = errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
return nil, err
}
rangeIter, err = txContext.TXSimulator.GetPrivateDataRangeScanIterator(namespaceID, collection,
Expand Down Expand Up @@ -877,7 +891,7 @@ func (h *Handler) HandleGetQueryResult(msg *pb.ChaincodeMessage, txContext *Tran
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
if err = errorIfCreatorHasNoReadPermission(namespaceID, collection, txContext); err != nil {
return nil, err
}
executeIter, err = txContext.TXSimulator.ExecuteQueryOnPrivateData(namespaceID, collection, getQueryResult.Query)
Expand Down Expand Up @@ -1033,123 +1047,215 @@ func (h *Handler) getTxContextForInvoke(channelID string, txid string, payload [

func (h *Handler) HandlePutState(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
putState := &pb.PutState{}
err := proto.Unmarshal(msg.Payload, putState)
if err != nil {
if err := proto.Unmarshal(msg.Payload, putState); err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}

if err := h.putState(putState, txContext); err != nil {
return nil, errors.WithStack(err)
}

return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

func (h *Handler) putState(msg *pb.PutState, txContext *TransactionContext) error {
var err error

namespaceID := txContext.NamespaceID
collection := putState.Collection
collection := msg.Collection
if isCollectionSet(collection) {
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
return errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return nil, err
if err = errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return err
}
err = txContext.TXSimulator.SetPrivateData(namespaceID, collection, putState.Key, putState.Value)
err = txContext.TXSimulator.SetPrivateData(namespaceID, collection, msg.Key, msg.Value)
} else {
err = txContext.TXSimulator.SetState(namespaceID, putState.Key, putState.Value)
err = txContext.TXSimulator.SetState(namespaceID, msg.Key, msg.Value)
}
if err != nil {
return nil, errors.WithStack(err)
return errors.WithStack(err)
}

return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
return nil
}

func (h *Handler) HandlePutStateMetadata(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
err := h.checkMetadataCap(msg)
if err != nil {
return nil, err
putStateMetadata := &pb.PutStateMetadata{}
if err := proto.Unmarshal(msg.Payload, putStateMetadata); err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}

putStateMetadata := &pb.PutStateMetadata{}
err = proto.Unmarshal(msg.Payload, putStateMetadata)
if err := h.putStateMetadata(putStateMetadata, txContext, msg.ChannelId); err != nil {
return nil, errors.WithStack(err)
}

return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

func (h *Handler) putStateMetadata(msg *pb.PutStateMetadata, txContext *TransactionContext, channelId string) error {
err := h.checkMetadataCap(channelId)
if err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
return err
}

metadata := make(map[string][]byte)
metadata[putStateMetadata.Metadata.Metakey] = putStateMetadata.Metadata.Value
metadata[msg.Metadata.Metakey] = msg.Metadata.Value

namespaceID := txContext.NamespaceID
collection := putStateMetadata.Collection
collection := msg.Collection
if isCollectionSet(collection) {
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
return errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return nil, err
if err = errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return err
}
err = txContext.TXSimulator.SetPrivateDataMetadata(namespaceID, collection, putStateMetadata.Key, metadata)
err = txContext.TXSimulator.SetPrivateDataMetadata(namespaceID, collection, msg.Key, metadata)
} else {
err = txContext.TXSimulator.SetStateMetadata(namespaceID, putStateMetadata.Key, metadata)
err = txContext.TXSimulator.SetStateMetadata(namespaceID, msg.Key, metadata)
}
if err != nil {
return nil, errors.WithStack(err)
return errors.WithStack(err)
}

return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
return nil
}

func (h *Handler) HandleDelState(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
delState := &pb.DelState{}
err := proto.Unmarshal(msg.Payload, delState)
if err != nil {
if err := proto.Unmarshal(msg.Payload, delState); err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}

if err := h.delState(delState, txContext); err != nil {
return nil, errors.WithStack(err)
}

// Send response msg back to chaincode.
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

func (h *Handler) delState(msg *pb.DelState, txContext *TransactionContext) error {
var err error

namespaceID := txContext.NamespaceID
collection := delState.Collection
collection := msg.Collection
if isCollectionSet(collection) {
if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
return errors.New("private data APIs are not allowed in chaincode Init()")
}
if err := errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return nil, err
if err = errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return err
}
err = txContext.TXSimulator.DeletePrivateData(namespaceID, collection, delState.Key)
err = txContext.TXSimulator.DeletePrivateData(namespaceID, collection, msg.Key)
} else {
err = txContext.TXSimulator.DeleteState(namespaceID, delState.Key)
err = txContext.TXSimulator.DeleteState(namespaceID, msg.Key)
}
if err != nil {
return nil, errors.WithStack(err)
return errors.WithStack(err)
}

// Send response msg back to chaincode.
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
return nil
}

func (h *Handler) HandlePurgePrivateData(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
err := h.checkPurgePrivateDataCap(msg)
if err != nil {
return nil, err
}
delState := &pb.DelState{}
if err := proto.Unmarshal(msg.Payload, delState); err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}

if err := h.purgePrivateData(delState, txContext, msg.ChannelId); err != nil {
return nil, errors.WithStack(err)
}

// Send response msg back to chaincode.
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

func (h *Handler) purgePrivateData(msg *pb.DelState, txContext *TransactionContext, channelId string) error {
err := h.checkPurgePrivateDataCap(channelId)
if err != nil {
return err
}

namespaceID := txContext.NamespaceID
collection := delState.Collection
collection := msg.Collection
if collection == "" {
return nil, errors.New("only applicable for private data")
return errors.New("only applicable for private data")
}

if txContext.IsInitTransaction {
return nil, errors.New("private data APIs are not allowed in chaincode Init()")
return errors.New("private data APIs are not allowed in chaincode Init()")
}

if err := errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return nil, err
if err = errorIfCreatorHasNoWritePermission(namespaceID, collection, txContext); err != nil {
return err
}

if err := txContext.TXSimulator.PurgePrivateData(namespaceID, collection, delState.Key); err != nil {
return nil, errors.WithStack(err)
if err = txContext.TXSimulator.PurgePrivateData(namespaceID, collection, msg.Key); err != nil {
return errors.WithStack(err)
}

return nil
}

func (h *Handler) HandleChangeStateBatch(msg *pb.ChaincodeMessage, txContext *TransactionContext) (*pb.ChaincodeMessage, error) {
batch := &pb.ChangeStateBatch{}
err := proto.Unmarshal(msg.Payload, batch)
if err != nil {
return nil, errors.Wrap(err, "unmarshal failed")
}

for _, kv := range batch.GetKvs() {
switch kv.GetType() {
case pb.StateKV_PUT_STATE:
putState := &pb.PutState{
Key: kv.GetKey(),
Value: kv.GetValue(),
Collection: kv.GetCollection(),
}
if err = h.putState(putState, txContext); err != nil {
return nil, errors.WithStack(err)
}

case pb.StateKV_DEL_STATE:
delState := &pb.DelState{
Key: kv.GetKey(),
Collection: kv.GetCollection(),
}
if err = h.delState(delState, txContext); err != nil {
return nil, errors.WithStack(err)
}

case pb.StateKV_PURGE_PRIVATE_DATA:
delState := &pb.DelState{
Key: kv.GetKey(),
Collection: kv.GetCollection(),
}
if err = h.purgePrivateData(delState, txContext, msg.ChannelId); err != nil {
return nil, err
}

case pb.StateKV_PUT_STATE_METADATA:
putStateMetadata := &pb.PutStateMetadata{
Key: kv.GetKey(),
Collection: kv.GetCollection(),
Metadata: &pb.StateMetadata{
Metakey: kv.GetMetadata().GetMetakey(),
Value: kv.GetMetadata().GetValue(),
},
}
if err = h.putStateMetadata(putStateMetadata, txContext, msg.ChannelId); err != nil {
return nil, err
}

default:
return nil, errors.New("unknown operation type")
}
}

// Send response msg back to chaincode.
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_RESPONSE, Txid: msg.Txid, ChannelId: msg.ChannelId}, nil
}

Expand Down Expand Up @@ -1252,7 +1358,7 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, namespace stri
}
defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)

if err := h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
if err = h.setChaincodeProposal(txParams.SignedProp, txParams.Proposal, msg); err != nil {
return nil, err
}

Expand Down
Loading

0 comments on commit b80c4ba

Please sign in to comment.