diff --git a/build/docker/sharding/README.md b/build/docker/sharding/README.md new file mode 100644 index 000000000..cc2d6aba0 --- /dev/null +++ b/build/docker/sharding/README.md @@ -0,0 +1,90 @@ +## Components +A MongoDB sharded cluster consists of the following components. +1. shard: Each shard contains a subset of the sharded data. +2. mongos: The mongos acts as a query router, providing an interface between client applications and the sharded cluster. +3. config servers: Config servers store metadata and configuration settings for the cluster. + +For a production deployment, consider the following to ensure data redundancy and system availability. +* Config Server (3 member replica set): `config1`,`config2`,`config3` +* 3 Shards (each a 3 member replica set): + * `shard1-1`,`shard1-2`, `shard1-3` + * `shard2-1`,`shard2-2`, `shard2-3` + * `shard3-1`,`shard3-2`, `shard3-3` +* 2 Mongos: `mongos1`, `mongos2` + +## How to deploy +Go to the `prod` or `dev` directory and execute the following command. +It takes 1~2 minutes to completely deploy a cluster. +```bash +docker-compose up -d +``` + +## How to examine +You can examine the configuration and status using the following commands. +### Config server +```bash +docker-compose exec config1 mongosh --port 27017 +rs.status() +``` + +### Shards +```bash +docker-compose exec shard1-1 mongosh --port 27017 +rs.status() +``` + +### Mongos +```bash +docker-compose exec mongos1 mongosh --port 27017 +sh.status() +``` + +## How to connect +You can use the sharded cluster by adding `mongo-connection-uri` option in Yorkie. +Note that the Yorkie should be on the same network as the cluster. +For the production, +``` +--mongo-connection-uri "mongodb://localhost:27020,localhost:27021" +``` +For the development, +``` +--mongo-connection-uri "mongodb://localhost:27020" +``` + + +## Details +As mongos determines which shard a document is located and routes queries, it's necessary to configure mongos to identify shards and sharding rules. +Therefore, the following commands should be applied to the primary mongos. +* `sh.addShard()` method adds each shard to the cluster. +* `sh.shardCollections()` method shards each collection with the specified shard key and strategy. + +Two strategies are available for sharding collections. +1. Hashed sharding: It uses a hashed index of a single field as the shard key to partition data across your sharded cluster. +2. Range-based sharding: It can use multiple fields as the shard key and divides data into contiguous ranges determined by the shard key values. + +```javascript +sh.addShard("shard-rs-1/shard1-1:27017,shard1-2:27017,shard1-3:27017") +sh.addShard("shard-rs-2/shard2-1:27017,shard2-2:27017,shard2-3:27017") +sh.addShard("shard-rs-3/shard3-1:27017,shard3-2:27017,shard3-3:27017") + +sh.enableSharding("yorkie-meta") +sh.shardCollection("yorkie-meta.projects", { _id: "hashed" }) +sh.shardCollection("yorkie-meta.users", { username: "hashed" }) +sh.shardCollection("yorkie-meta.clients", { project_id: "hashed" }) +sh.shardCollection("yorkie-meta.documents", { project_id: "hashed" }) +sh.shardCollection("yorkie-meta.changes", { doc_id: "hashed", server_seq: 1 }) +sh.shardCollection("yorkie-meta.snapshots", { doc_id: "hashed" }) +sh.shardCollection("yorkie-meta.syncedseqs", { doc_id: "hashed" }) +``` + +Considering the common query patterns used in Yorkie, it's possible to employ Range-based sharding using the following command. +```javascript +sh.shardCollection(".", { : 1, ... } ) +``` + +## Considerations + + +## References +* https://www.mongodb.com/docs/v6.0/tutorial/deploy-shard-cluster/ +* https://www.mongodb.com/docs/manual/core/sharded-cluster-components/ diff --git a/build/docker/sharding/dev/docker-compose.yml b/build/docker/sharding/dev/docker-compose.yml new file mode 100644 index 000000000..484f58b6c --- /dev/null +++ b/build/docker/sharding/dev/docker-compose.yml @@ -0,0 +1,126 @@ +version: '3' +services: + + # Config Server + config1: + image: mongo:7.0.1 + container_name: mongo-config1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after mongod has been started. + sh -c "sleep 5 && mongosh < /scripts/init-config1.js" & + mongod --port 27017 --configsvr --replSet config-rs --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27100:27017 + restart: always + networks: + - sharding + + # Shards + # Shards 1 + shard1-1: + image: mongo:7.0.1 + container_name: mongo-shard1-1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after mongod has been started. + sh -c "sleep 5 && mongosh < /scripts/init-shard1-1.js" & + mongod --port 27017 --shardsvr --replSet shard-rs-1 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27110:27017 + restart: always + networks: + - sharding + + shard1-2: + image: mongo:7.0.1 + container_name: mongo-shard1-2 + command: mongod --port 27017 --shardsvr --replSet shard-rs-1 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27111:27017 + restart: always + networks: + - sharding + + shard1-3: + image: mongo:7.0.1 + container_name: mongo-shard1-3 + command: mongod --port 27017 --shardsvr --replSet shard-rs-1 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27112:27017 + restart: always + networks: + - sharding + + # Shards 2 + shard2-1: + image: mongo:7.0.1 + container_name: mongo-shard2-1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after mongod has been started. + sh -c "sleep 5 && mongosh < /scripts/init-shard2-1.js" & + mongod --port 27017 --shardsvr --replSet shard-rs-2 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27113:27017 + restart: always + networks: + - sharding + + shard2-2: + image: mongo:7.0.1 + container_name: mongo-shard2-2 + command: mongod --port 27017 --shardsvr --replSet shard-rs-2 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27114:27017 + restart: always + networks: + - sharding + + shard2-3: + image: mongo:7.0.1 + container_name: mongo-shard2-3 + command: mongod --port 27017 --shardsvr --replSet shard-rs-2 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27115:27017 + restart: always + networks: + - sharding + + # Mongos + mongos1: + image: mongo:7.0.1 + container_name: mongos1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after the config servers and the shards has been configured + sh -c "sleep 40 && mongosh < /scripts/init-mongos1.js" & + mongos --port 27017 --configdb config-rs/config1:27017 --bind_ip_all + ports: + - 27017:27017 + restart: always + volumes: + - ./scripts:/scripts + networks: + - sharding + +networks: + sharding: diff --git a/build/docker/sharding/dev/scripts/init-config1.js b/build/docker/sharding/dev/scripts/init-config1.js new file mode 100644 index 000000000..544cf2072 --- /dev/null +++ b/build/docker/sharding/dev/scripts/init-config1.js @@ -0,0 +1,9 @@ +rs.initiate( + { + _id: "config-rs", + configsvr: true, + members: [ + { _id : 0, host : "config1:27017" }, + ] + } + ) diff --git a/build/docker/sharding/dev/scripts/init-mongos1.js b/build/docker/sharding/dev/scripts/init-mongos1.js new file mode 100644 index 000000000..6ef0edef1 --- /dev/null +++ b/build/docker/sharding/dev/scripts/init-mongos1.js @@ -0,0 +1,11 @@ +sh.addShard("shard-rs-1/shard1-1:27017,shard1-2:27017,shard1-3:27017") +sh.addShard("shard-rs-2/shard2-1:27017,shard2-2:27017,shard2-3:27017") + +sh.enableSharding("yorkie-meta") +sh.shardCollection("yorkie-meta.projects", { _id: "hashed" }) +sh.shardCollection("yorkie-meta.users", { username: "hashed" }) +sh.shardCollection("yorkie-meta.clients", { project_id: "hashed" }) +sh.shardCollection("yorkie-meta.documents", { project_id: "hashed" }) +sh.shardCollection("yorkie-meta.changes", { doc_id: "hashed", server_seq: 1 }) +sh.shardCollection("yorkie-meta.snapshots", { doc_id: "hashed" }) +sh.shardCollection("yorkie-meta.syncedseqs", { doc_id: "hashed" }) diff --git a/build/docker/sharding/dev/scripts/init-shard1-1.js b/build/docker/sharding/dev/scripts/init-shard1-1.js new file mode 100644 index 000000000..e63dc0670 --- /dev/null +++ b/build/docker/sharding/dev/scripts/init-shard1-1.js @@ -0,0 +1,10 @@ +rs.initiate( + { + _id : "shard-rs-1", + members: [ + { _id : 0, host : "shard1-1:27017" }, + { _id : 1, host : "shard1-2:27017" }, + { _id : 2, host : "shard1-3:27017" } + ] + } + ) diff --git a/build/docker/sharding/dev/scripts/init-shard2-1.js b/build/docker/sharding/dev/scripts/init-shard2-1.js new file mode 100644 index 000000000..ebc893ed6 --- /dev/null +++ b/build/docker/sharding/dev/scripts/init-shard2-1.js @@ -0,0 +1,10 @@ +rs.initiate( + { + _id : "shard-rs-2", + members: [ + { _id : 0, host : "shard2-1:27017" }, + { _id : 1, host : "shard2-2:27017" }, + { _id : 2, host : "shard2-3:27017" } + ] + } + ) diff --git a/build/docker/sharding/prod/docker-compose.yml b/build/docker/sharding/prod/docker-compose.yml new file mode 100644 index 000000000..163e06144 --- /dev/null +++ b/build/docker/sharding/prod/docker-compose.yml @@ -0,0 +1,201 @@ +version: '3' +services: + + # Config Server + config1: + image: mongo:7.0.1 + container_name: mongo-config1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after mongod has been started. + sh -c "sleep 5 && mongosh < /scripts/init-config1.js" & + mongod --port 27017 --configsvr --replSet config-rs --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27100:27017 + restart: always + networks: + - sharding + config2: + image: mongo:7.0.1 + container_name: mongo-config2 + command: mongod --port 27017 --configsvr --replSet config-rs --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27101:27017 + restart: always + networks: + - sharding + config3: + image: mongo:7.0.1 + container_name: mongo-config3 + command: mongod --port 27017 --configsvr --replSet config-rs --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27102:27017 + restart: always + networks: + - sharding + + # Shards + # Shards 1 + shard1-1: + image: mongo:7.0.1 + container_name: mongo-shard1-1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after mongod has been started. + sh -c "sleep 5 && mongosh < /scripts/init-shard1-1.js" & + mongod --port 27017 --shardsvr --replSet shard-rs-1 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27110:27017 + restart: always + networks: + - sharding + + shard1-2: + image: mongo:7.0.1 + container_name: mongo-shard1-2 + command: mongod --port 27017 --shardsvr --replSet shard-rs-1 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27111:27017 + restart: always + networks: + - sharding + + shard1-3: + image: mongo:7.0.1 + container_name: mongo-shard1-3 + command: mongod --port 27017 --shardsvr --replSet shard-rs-1 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27112:27017 + restart: always + networks: + - sharding + + # Shards 2 + shard2-1: + image: mongo:7.0.1 + container_name: mongo-shard2-1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after mongod has been started. + sh -c "sleep 5 && mongosh < /scripts/init-shard2-1.js" & + mongod --port 27017 --shardsvr --replSet shard-rs-2 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27113:27017 + restart: always + networks: + - sharding + + shard2-2: + image: mongo:7.0.1 + container_name: mongo-shard2-2 + command: mongod --port 27017 --shardsvr --replSet shard-rs-2 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27114:27017 + restart: always + networks: + - sharding + + shard2-3: + image: mongo:7.0.1 + container_name: mongo-shard2-3 + command: mongod --port 27017 --shardsvr --replSet shard-rs-2 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27115:27017 + restart: always + networks: + - sharding + + # Shards 3 + shard3-1: + image: mongo:7.0.1 + container_name: mongo-shard3-1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after mongod has been started. + sh -c "sleep 5 && mongosh < /scripts/init-shard3-1.js" & + mongod --port 27017 --shardsvr --replSet shard-rs-3 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27116:27017 + restart: always + networks: + - sharding + + shard3-2: + image: mongo:7.0.1 + container_name: mongo-shard3-2 + command: mongod --port 27017 --shardsvr --replSet shard-rs-3 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27117:27017 + restart: always + networks: + - sharding + + shard3-3: + image: mongo:7.0.1 + container_name: mongo-shard3-3 + command: mongod --port 27017 --shardsvr --replSet shard-rs-3 --bind_ip_all + volumes: + - ./scripts:/scripts + ports: + - 27118:27017 + restart: always + networks: + - sharding + + # Mongos + mongos1: + image: mongo:7.0.1 + container_name: mongos1 + command: + - /bin/sh + - -c + - | # run the init script for the primary replica after the config servers and the shards has been configured + sh -c "sleep 40 && mongosh < /scripts/init-mongos1.js" & + mongos --port 27017 --configdb config-rs/config1:27017,config2:27017,config3:27017 --bind_ip_all + ports: + - 27017:27017 + restart: always + volumes: + - ./scripts:/scripts + networks: + - sharding + mongos2: + image: mongo:7.0.1 + container_name: mongos2 + command: mongos --port 27017 --configdb config-rs/config1:27017,config2:27017,config3:27017 --bind_ip_all + ports: + - 27018:27017 + restart: always + volumes: + - ./scripts:/scripts + networks: + - sharding + +networks: + sharding: diff --git a/build/docker/sharding/prod/scripts/init-config1.js b/build/docker/sharding/prod/scripts/init-config1.js new file mode 100644 index 000000000..b87cd4f53 --- /dev/null +++ b/build/docker/sharding/prod/scripts/init-config1.js @@ -0,0 +1,12 @@ +rs.initiate( + { + _id: "config-rs", + configsvr: true, + members: [ + { _id : 0, host : "config1:27017" }, + { _id : 1, host : "config2:27017" }, + { _id : 2, host : "config3:27017" }, + ] + } + ) + \ No newline at end of file diff --git a/build/docker/sharding/prod/scripts/init-mongos1.js b/build/docker/sharding/prod/scripts/init-mongos1.js new file mode 100644 index 000000000..539ee3650 --- /dev/null +++ b/build/docker/sharding/prod/scripts/init-mongos1.js @@ -0,0 +1,12 @@ +sh.addShard("shard-rs-1/shard1-1:27017,shard1-2:27017,shard1-3:27017") +sh.addShard("shard-rs-2/shard2-1:27017,shard2-2:27017,shard2-3:27017") +sh.addShard("shard-rs-3/shard3-1:27017,shard3-2:27017,shard3-3:27017") + +sh.enableSharding("yorkie-meta") +sh.shardCollection("yorkie-meta.projects", { _id: "hashed" }) +sh.shardCollection("yorkie-meta.users", { username: "hashed" }) +sh.shardCollection("yorkie-meta.clients", { project_id: "hashed" }) +sh.shardCollection("yorkie-meta.documents", { project_id: "hashed" }) +sh.shardCollection("yorkie-meta.changes", { doc_id: "hashed", server_seq: 1 }) +sh.shardCollection("yorkie-meta.snapshots", { doc_id: "hashed" }) +sh.shardCollection("yorkie-meta.syncedseqs", { doc_id: "hashed" }) diff --git a/build/docker/sharding/prod/scripts/init-shard1-1.js b/build/docker/sharding/prod/scripts/init-shard1-1.js new file mode 100644 index 000000000..e63dc0670 --- /dev/null +++ b/build/docker/sharding/prod/scripts/init-shard1-1.js @@ -0,0 +1,10 @@ +rs.initiate( + { + _id : "shard-rs-1", + members: [ + { _id : 0, host : "shard1-1:27017" }, + { _id : 1, host : "shard1-2:27017" }, + { _id : 2, host : "shard1-3:27017" } + ] + } + ) diff --git a/build/docker/sharding/prod/scripts/init-shard2-1.js b/build/docker/sharding/prod/scripts/init-shard2-1.js new file mode 100644 index 000000000..ebc893ed6 --- /dev/null +++ b/build/docker/sharding/prod/scripts/init-shard2-1.js @@ -0,0 +1,10 @@ +rs.initiate( + { + _id : "shard-rs-2", + members: [ + { _id : 0, host : "shard2-1:27017" }, + { _id : 1, host : "shard2-2:27017" }, + { _id : 2, host : "shard2-3:27017" } + ] + } + ) diff --git a/build/docker/sharding/prod/scripts/init-shard3-1.js b/build/docker/sharding/prod/scripts/init-shard3-1.js new file mode 100644 index 000000000..7d75c5790 --- /dev/null +++ b/build/docker/sharding/prod/scripts/init-shard3-1.js @@ -0,0 +1,10 @@ +rs.initiate( + { + _id : "shard-rs-3", + members: [ + { _id : 0, host : "shard3-1:27017" }, + { _id : 1, host : "shard3-2:27017" }, + { _id : 2, host : "shard3-3:27017" } + ] + } + ) diff --git a/server/backend/database/mongo/client.go b/server/backend/database/mongo/client.go index 65c319ef8..c3c0e9a64 100644 --- a/server/backend/database/mongo/client.go +++ b/server/backend/database/mongo/client.go @@ -37,13 +37,16 @@ import ( "github.com/yorkie-team/yorkie/pkg/document/key" "github.com/yorkie-team/yorkie/pkg/document/time" "github.com/yorkie-team/yorkie/server/backend/database" + "github.com/yorkie-team/yorkie/server/backend/sync" + "github.com/yorkie-team/yorkie/server/backend/sync/memory" "github.com/yorkie-team/yorkie/server/logging" ) // Client is a client that connects to Mongo DB and reads or saves Yorkie data. type Client struct { - config *Config - client *mongo.Client + config *Config + client *mongo.Client + coordinator sync.Coordinator } // Dial creates an instance of Client and dials the given MongoDB. @@ -75,9 +78,15 @@ func Dial(conf *Config) (*Client, error) { logging.DefaultLogger().Infof("MongoDB connected, URI: %s, DB: %s", conf.ConnectionURI, conf.YorkieDatabase) + // TODO(sejongk): Implement the coordinator for a shard. For now, we + // distribute workloads to all shards per document. In the future, we + // will need to distribute workloads of a document. + coordinator := memory.NewCoordinator(nil) + return &Client{ - config: conf, - client: client, + config: conf, + client: client, + coordinator: coordinator, }, nil } @@ -214,24 +223,49 @@ func (c *Client) CreateProjectInfo( return nil, err } - info := database.NewProjectInfo(name, owner, clientDeactivateThreshold) - result, err := c.collection(colProjects).InsertOne(ctx, bson.M{ - "name": info.Name, - "owner": encodedOwner, - "client_deactivate_threshold": info.ClientDeactivateThreshold, - "public_key": info.PublicKey, - "secret_key": info.SecretKey, - "created_at": info.CreatedAt, - }) + locker, err := c.coordinator.NewLocker(ctx, ProjectKey(owner, name)) if err != nil { - if mongo.IsDuplicateKeyError(err) { - return nil, database.ErrProjectAlreadyExists + return nil, err + } + + if err := locker.Lock(ctx); err != nil { + return nil, err + } + defer func() { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return } + }() + + info := database.NewProjectInfo(name, owner, clientDeactivateThreshold) + result, err := c.collection(colProjects).UpdateOne(ctx, bson.M{ + "$or": bson.A{ + bson.M{"name": info.Name, "owner": encodedOwner}, + bson.M{"public_key": info.PublicKey}, + bson.M{"secret_key": info.SecretKey}, + }, + }, bson.M{ + "$setOnInsert": bson.M{ + "name": info.Name, + "owner": encodedOwner, + "client_deactivate_threshold": info.ClientDeactivateThreshold, + "public_key": info.PublicKey, + "secret_key": info.SecretKey, + "created_at": info.CreatedAt, + }, + }, options.Update().SetUpsert(true)) + if err != nil { return nil, fmt.Errorf("create project info: %w", err) } - info.ID = types.ID(result.InsertedID.(primitive.ObjectID).Hex()) + if result.UpsertedCount == 0 { + return nil, database.ErrProjectAlreadyExists + } + + info.ID = types.ID(result.UpsertedID.(primitive.ObjectID).Hex()) + println("infoID!!", info.ID, result.UpsertedID) return info, nil } @@ -384,6 +418,37 @@ func (c *Client) UpdateProjectInfo( } updatableFields["updated_at"] = gotime.Now() + if name, ok := updatableFields["name"]; ok { + locker, err := c.coordinator.NewLocker(ctx, ProjectKey(owner, name.(string))) + if err != nil { + return nil, err + } + + if err := locker.Lock(ctx); err != nil { + return nil, err + } + defer func() { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return + } + }() + + // check if the combination of owner and name already exists. + result := c.collection(colProjects).FindOne(ctx, bson.M{ + "owner": encodedOwner, + "name": name, + }) + + info := database.ProjectInfo{} + if err = result.Decode(&info); err != mongo.ErrNoDocuments { + if err == nil { + return nil, database.ErrProjectNameAlreadyExists + } + return nil, fmt.Errorf("decode project info: %w", err) + } + } + res := c.collection(colProjects).FindOneAndUpdate(ctx, bson.M{ "_id": encodedID, "owner": encodedOwner, @@ -396,9 +461,6 @@ func (c *Client) UpdateProjectInfo( if err == mongo.ErrNoDocuments { return nil, fmt.Errorf("%s: %w", id, database.ErrProjectNotFound) } - if mongo.IsDuplicateKeyError(err) { - return nil, fmt.Errorf("%s: %w", *fields.Name, database.ErrProjectNameAlreadyExists) - } return nil, fmt.Errorf("decode project info: %w", err) } @@ -412,20 +474,40 @@ func (c *Client) CreateUserInfo( hashedPassword string, ) (*database.UserInfo, error) { info := database.NewUserInfo(username, hashedPassword) - result, err := c.collection(colUsers).InsertOne(ctx, bson.M{ - "username": info.Username, - "hashed_password": info.HashedPassword, - "created_at": info.CreatedAt, - }) + + locker, err := c.coordinator.NewLocker(ctx, UserKey(username)) if err != nil { - if mongo.IsDuplicateKeyError(err) { - return nil, database.ErrUserAlreadyExists + return nil, err + } + + if err := locker.Lock(ctx); err != nil { + return nil, err + } + defer func() { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return } + }() + result, err := c.collection(colUsers).UpdateOne(ctx, bson.M{ + "username": info.Username, + }, bson.M{ + "$setOnInsert": bson.M{ + "hashed_password": info.HashedPassword, + "created_at": info.CreatedAt, + }, + }, options.Update().SetUpsert(true)) + + if err != nil { return nil, fmt.Errorf("create user info: %w", err) } - info.ID = types.ID(result.InsertedID.(primitive.ObjectID).Hex()) + if result.UpsertedCount == 0 { + return nil, database.ErrUserAlreadyExists + } + + info.ID = types.ID(result.UpsertedID.(primitive.ObjectID).Hex()) return info, nil } @@ -470,6 +552,21 @@ func (c *Client) ActivateClient(ctx context.Context, projectID types.ID, key str return nil, err } + locker, err := c.coordinator.NewLocker(ctx, ClientKey(projectID, key)) + if err != nil { + return nil, err + } + + if err := locker.Lock(ctx); err != nil { + return nil, err + } + defer func() { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return + } + }() + now := gotime.Now() res, err := c.collection(colClients).UpdateOne(ctx, bson.M{ "project_id": encodedProjectID, @@ -487,7 +584,8 @@ func (c *Client) ActivateClient(ctx context.Context, projectID types.ID, key str var result *mongo.SingleResult if res.UpsertedCount > 0 { result = c.collection(colClients).FindOneAndUpdate(ctx, bson.M{ - "_id": res.UpsertedID, + "_id": res.UpsertedID, + "project_id": encodedProjectID, }, bson.M{ "$set": bson.M{ "created_at": now, @@ -581,6 +679,11 @@ func (c *Client) UpdateClientInfoAfterPushPull( return err } + encodedProjectID, err := encodeID(docInfo.ProjectID) + if err != nil { + return err + } + clientDocInfoKey := "documents." + docInfo.ID.String() + "." clientDocInfo, ok := clientInfo.Documents[docInfo.ID] if !ok { @@ -615,7 +718,8 @@ func (c *Client) UpdateClientInfoAfterPushPull( } result := c.collection(colClients).FindOneAndUpdate(ctx, bson.M{ - "_id": encodedClientID, + "_id": encodedClientID, + "project_id": encodedProjectID, }, updater) if result.Err() != nil { @@ -714,6 +818,21 @@ func (c *Client) FindDocInfoByKeyAndOwner( return nil, err } + locker, err := c.coordinator.NewLocker(ctx, DocumentKey(projectID, docKey.String())) + if err != nil { + return nil, err + } + + if err := locker.Lock(ctx); err != nil { + return nil, err + } + defer func() { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return + } + }() + filter := bson.M{ "project_id": encodedProjectID, "key": docKey, @@ -734,7 +853,8 @@ func (c *Client) FindDocInfoByKeyAndOwner( var result *mongo.SingleResult if res.UpsertedCount > 0 { result = c.collection(colDocuments).FindOneAndUpdate(ctx, bson.M{ - "_id": res.UpsertedID, + "_id": res.UpsertedID, + "project_id": encodedProjectID, }, bson.M{ "$set": bson.M{ "owner": encodedOwnerID, @@ -878,6 +998,7 @@ func (c *Client) CreateChangeInfos( } var models []mongo.WriteModel + var lockers []sync.Locker for _, cn := range changes { encodedOperations, err := database.EncodeOperations(cn.Operations()) if err != nil { @@ -888,6 +1009,18 @@ func (c *Client) CreateChangeInfos( return err } + locker, err := c.coordinator.NewLocker( + ctx, ChangeKey(docInfo.ID, cn.ServerSeq())) + if err != nil { + return err + } + + if err := locker.Lock(ctx); err != nil { + return err + } + + lockers = append(lockers, locker) + models = append(models, mongo.NewUpdateOneModel().SetFilter(bson.M{ "doc_id": encodedDocID, "server_seq": cn.ServerSeq(), @@ -901,6 +1034,15 @@ func (c *Client) CreateChangeInfos( }}).SetUpsert(true)) } + defer func() { + for _, locker := range lockers { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return + } + } + }() + // TODO(hackerwins): We need to handle the updates for the two collections // below atomically. if len(changes) > 0 { @@ -1055,6 +1197,24 @@ func (c *Client) CreateSnapshotInfo( return err } + locker, err := c.coordinator.NewLocker( + ctx, SnapshotKey(docID, doc.Checkpoint().ServerSeq)) + if err != nil { + return err + } + + // NOTE: If the project is already being created by another, it is + // not necessary to recreate it, so we can skip it. + if err := locker.TryLock(ctx); err != nil { + return err + } + defer func() { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return + } + }() + if _, err := c.collection(colSnapshots).InsertOne(ctx, bson.M{ "doc_id": encodedDocID, "server_seq": doc.Checkpoint().ServerSeq, @@ -1359,6 +1519,22 @@ func (c *Client) UpdateSyncedSeq( return nil } + locker, err := c.coordinator.NewLocker( + ctx, SyncedSeqKey(docID, clientInfo.ID)) + if err != nil { + return err + } + + if err := locker.TryLock(ctx); err != nil { + return err + } + defer func() { + if err := locker.Unlock(ctx); err != nil { + logging.From(ctx).Error(err) + return + } + }() + if _, err = c.collection(colSyncedSeqs).UpdateOne(ctx, bson.M{ "doc_id": encodedDocID, "client_id": encodedClientID, @@ -1367,8 +1543,7 @@ func (c *Client) UpdateSyncedSeq( "lamport": ticket.Lamport(), "actor_id": encodeActorID(ticket.ActorID()), "server_seq": serverSeq, - }, - }, options.Update().SetUpsert(true)); err != nil { + }}, options.Update().SetUpsert(true)); err != nil { return fmt.Errorf("upsert synced seq: %w", err) } @@ -1466,6 +1641,24 @@ func (c *Client) collection( Collection(name, opts...) } +func (c *Client) deleteProjectProxyInfo( + ctx context.Context, + filters map[string]bson.M, + targetCols []string, +) error { + for _, col := range targetCols { + if _, err := c.collection(col).DeleteOne( + ctx, + filters[col], + options.Delete(), + ); err != nil && err != mongo.ErrNoDocuments { + return fmt.Errorf("delete proxy info in %s : %w", col, err) + } + } + + return nil +} + // escapeRegex escapes special characters by putting a backslash in front of it. // NOTE(chacha912): (https://github.com/cxr29/scrud/blob/1039f8edaf5eef522275a5a848a0fca0f53224eb/query/util.go#L31-L47) func escapeRegex(str string) string { @@ -1483,3 +1676,38 @@ func escapeRegex(str string) string { } return buf.String() } + +// ProjectKey creates a new sync.Key of the project for the given owner and name. +func ProjectKey(owner types.ID, name string) sync.Key { + return sync.NewKey(fmt.Sprintf("project-%s-%s", owner, name)) +} + +// UserKey creates a new sync.Key of the user for the given username. +func UserKey(username string) sync.Key { + return sync.NewKey(fmt.Sprintf("user-%s", username)) +} + +// ClientKey creates a new sync.Key of the client for the given project id and key. +func ClientKey(projectID types.ID, key string) sync.Key { + return sync.NewKey(fmt.Sprintf("client-%s-%s", projectID, key)) +} + +// DocumentKey creates a new sync.Key of the document for the given project id and key. +func DocumentKey(projectID types.ID, key string) sync.Key { + return sync.NewKey(fmt.Sprintf("document-%s-%s", projectID, key)) +} + +// ChangeKey creates a new sync.Key of the change for the given doc id and server seq. +func ChangeKey(docID types.ID, serverSeq int64) sync.Key { + return sync.NewKey(fmt.Sprintf("change-%s-%d", docID, serverSeq)) +} + +// SnapshotKey creates a new sync.Key of the snapshot for the given doc id and server seq. +func SnapshotKey(docID types.ID, serverSeq int64) sync.Key { + return sync.NewKey(fmt.Sprintf("snapshot-%s-%d", docID, serverSeq)) +} + +// SyncedSeqKey creates a new sync.Key of the synced seq for the given doc id and client ID. +func SyncedSeqKey(docID types.ID, clientID types.ID) sync.Key { + return sync.NewKey(fmt.Sprintf("syncedseq-%s-%s", docID, clientID)) +} diff --git a/server/backend/database/mongo/indexes.go b/server/backend/database/mongo/indexes.go index 953659b3f..f26295863 100644 --- a/server/backend/database/mongo/indexes.go +++ b/server/backend/database/mongo/indexes.go @@ -42,22 +42,6 @@ type collectionInfo struct { // Below are names and indexes information of collections that stores Yorkie data. var collectionInfos = []collectionInfo{ - { - name: colProjects, - indexes: []mongo.IndexModel{{ - Keys: bsonx.Doc{ - {Key: "owner", Value: bsonx.Int32(1)}, - {Key: "name", Value: bsonx.Int32(1)}, - }, - Options: options.Index().SetUnique(true), - }, { - Keys: bsonx.Doc{{Key: "public_key", Value: bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true), - }, { - Keys: bsonx.Doc{{Key: "secret_key", Value: bsonx.Int32(1)}}, - Options: options.Index().SetUnique(true), - }}, - }, { name: colUsers, indexes: []mongo.IndexModel{{ diff --git a/server/backend/database/testcases/testcases.go b/server/backend/database/testcases/testcases.go index 3cc19121c..f5a37d732 100644 --- a/server/backend/database/testcases/testcases.go +++ b/server/backend/database/testcases/testcases.go @@ -108,6 +108,16 @@ func RunFindProjectInfoByNameTest( assert.NoError(t, err) }) + t.Run("duplicate project name test", func(t *testing.T) { + ctx := context.Background() + _, err := db.CreateProjectInfo(ctx, t.Name(), dummyOwnerID, clientDeactivateThreshold) + assert.NoError(t, err) + + // create a new project with the duplicate name + _, err = db.CreateProjectInfo(ctx, t.Name(), dummyOwnerID, clientDeactivateThreshold) + assert.ErrorIs(t, err, database.ErrProjectAlreadyExists) + }) + t.Run("FindProjectInfoByName test", func(t *testing.T) { ctx := context.Background()