Skip to content

Commit

Permalink
Define EntityData
Browse files Browse the repository at this point in the history
  • Loading branch information
tigrannajaryan committed Feb 14, 2024
1 parent 552a023 commit 0b099b2
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 92 deletions.
11 changes: 5 additions & 6 deletions exporters/otlp/otlptrace/internal/tracetransform/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@ func Resource(r *resource.Resource) *resourcepb.Resource {
return nil
}

entity := r.Entity()
entityId := Iterator(entity.IdIter())
attrs := Iterator(r.Iter())
entityId := Iterator(r.EntityId().Iter())

return &resourcepb.Resource{
Attributes: ResourceAttributes(r),
DroppedAttributesCount: 0,
EntityType: entity.Type(),
EntityId: entityId,
Attributes: attrs,
EntityType: r.EntityType(),
EntityId: entityId,
}
}
10 changes: 8 additions & 2 deletions sdk/resource/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk"
"go.opentelemetry.io/otel/sdk/resource/internal"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
)

Expand Down Expand Up @@ -98,8 +99,13 @@ func (sd stringDetector) Detect(ctx context.Context) (*Resource, error) {
if !a.Valid() {
return nil, fmt.Errorf("invalid attribute: %q -> %q", a.Key, a.Value.Emit())
}
attrs := []attribute.KeyValue{sd.K.String(value)}
return NewWithEntity(sd.schemaURL, sd.entityType, attrs, attrs), nil
id := attribute.NewSet(sd.K.String(value))
entity := internal.EntityData{
Type: sd.entityType,
Id: id,
Attrs: id,
}
return NewWithEntity(sd.schemaURL, &entity), nil
}

// Detect implements Detector.
Expand Down
62 changes: 62 additions & 0 deletions sdk/resource/internal/entity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package internal

import "go.opentelemetry.io/otel/attribute"

type EntityData struct {
// Defines the producing entity type of this resource, e.g "service", "k8s.pod", etc.
// Empty for legacy Resources that are not entity-aware.
Type string
// Set of attributes that identify the entity.
// Note that a copy of identifying attributes will be also recorded in the Attrs field.
Id attribute.Set

Attrs attribute.Set
}

func MergeEntities(a, b *EntityData) *EntityData {
// Note: 'b' attributes will overwrite 'a' with last-value-wins in attribute.Key()
// Meaning this is equivalent to: append(a.Attributes(), b.Attributes()...)
mergedAttrs := mergeAttrs(&b.Attrs, &a.Attrs)

var mergedType string
var mergedId attribute.Set

if a.Type == b.Type {
mergedType = a.Type
mergedId = mergeAttrs(&b.Id, &a.Id)
} else {
if a.Type == "" {
mergedType = b.Type
mergedId = b.Id
} else if b.Type == "" {
mergedType = a.Type
mergedId = a.Id
} else {
// Different non-empty entities.
mergedId = a.Id
// TODO: merge the id of the updating Entity into the non-identifying
// attributes of the old Resource, attributes from the updating Entity
// take precedence.
panic("not implemented")
}
}

return &EntityData{
Type: mergedType,
Id: mergedId,
Attrs: mergedAttrs,
}
}

func mergeAttrs(a, b *attribute.Set) attribute.Set {
if a.Len()+b.Len() == 0 {
return *attribute.EmptySet()
}

mi := attribute.NewMergeIterator(a, b)
combine := make([]attribute.KeyValue, 0, a.Len()+b.Len())
for mi.Next() {
combine = append(combine, mi.Attribute())
}
return attribute.NewSet(combine...)
}
122 changes: 38 additions & 84 deletions sdk/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,9 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource/internal"
)

type Entity struct {
// Defines the producing entity type of this resource, e.g "service", "k8s.pod", etc.
// Empty for legacy Resources that are not entity-aware.
typ string
// Set of attributes that identify the producing entity.
// Note that the identifying attributes may be also recorded in the "attributes" field.
id attribute.Set
}

func (e *Entity) Type() string {
return e.typ
}

func (e *Entity) IdIter() attribute.Iterator {
return e.id.Iter()
}

// Resource describes an entity about which identifying information
// and metadata is exposed. Resource is an immutable object,
// equivalent to a map from key to unique value.
Expand All @@ -49,10 +33,10 @@ func (e *Entity) IdIter() attribute.Iterator {
// (`*resource.Resource`). The `nil` value is equivalent to an empty
// Resource.
type Resource struct {
attrs attribute.Set
schemaURL string

entity Entity
// Producing entity.
entity internal.EntityData
}

var (
Expand All @@ -77,7 +61,7 @@ func New(ctx context.Context, opts ...Option) (*Resource, error) {

r := &Resource{
schemaURL: cfg.schemaURL,
entity: Entity{typ: cfg.entityType, id: entityId},
entity: internal.EntityData{Type: cfg.entityType, Id: entityId},
}
return r, detect(ctx, r, cfg.detectors)
}
Expand All @@ -97,21 +81,22 @@ func NewWithAttributes(schemaURL string, attrs ...attribute.KeyValue) *Resource
// contains any invalid items those items will be dropped. The attrs and entityId are assumed to be
// in a schema identified by schemaURL.
func NewWithEntity(
schemaURL string, entityType string, entityId []attribute.KeyValue, attrs []attribute.KeyValue,
schemaURL string, entity *internal.EntityData,
) *Resource {
resource := NewSchemaless(attrs...)
resource := NewSchemaless(entity.Attrs.ToSlice()...)
resource.schemaURL = schemaURL
resource.entity.typ = entityType
resource.entity = *entity
//resource.entity.Type = entityType

// Ensure attributes comply with the specification:
// https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/common/README.md#attribute
id, _ := attribute.NewSetWithFiltered(
entityId, func(kv attribute.KeyValue) bool {
return kv.Valid()
},
)

resource.entity.id = id
//id, _ := attribute.NewSetWithFiltered(
// entityId, func(kv attribute.KeyValue) bool {
// return kv.Valid()
// },
//)
//
//resource.entity.Id = id
return resource
}

Expand All @@ -137,7 +122,7 @@ func NewSchemaless(attrs ...attribute.KeyValue) *Resource {
return &Resource{}
}

return &Resource{attrs: s, entity: Entity{id: attribute.NewSet()}} //nolint
return &Resource{entity: internal.EntityData{Id: attribute.NewSet(), Attrs: s}} //nolint
}

// String implements the Stringer interface and provides a
Expand All @@ -149,7 +134,7 @@ func (r *Resource) String() string {
if r == nil {
return ""
}
return r.attrs.Encoded(attribute.DefaultEncoder())
return r.entity.Attrs.Encoded(attribute.DefaultEncoder())
}

// MarshalLog is the marshaling function used by the logging system to represent this Resource.
Expand All @@ -158,7 +143,7 @@ func (r *Resource) MarshalLog() interface{} {
Attributes attribute.Set
SchemaURL string
}{
Attributes: r.attrs,
Attributes: r.entity.Attrs,
SchemaURL: r.schemaURL,
}
}
Expand All @@ -169,7 +154,7 @@ func (r *Resource) Attributes() []attribute.KeyValue {
if r == nil {
r = Empty()
}
return r.attrs.ToSlice()
return r.entity.Attrs.ToSlice()
}

// SchemaURL returns the schema URL associated with Resource r.
Expand All @@ -180,11 +165,18 @@ func (r *Resource) SchemaURL() string {
return r.schemaURL
}

func (r *Resource) Entity() *Entity {
func (r *Resource) EntityId() *attribute.Set {
if r == nil {
return &Entity{}
return attribute.EmptySet()
}
return &r.entity
return &r.entity.Id
}

func (r *Resource) EntityType() string {
if r == nil {
return ""
}
return r.entity.Type
}

// Iter returns an iterator of the Resource attributes.
Expand All @@ -193,7 +185,7 @@ func (r *Resource) Iter() attribute.Iterator {
if r == nil {
r = Empty()
}
return r.attrs.Iter()
return r.entity.Attrs.Iter()
}

// Equal returns true when a Resource is equivalent to this Resource.
Expand Down Expand Up @@ -241,50 +233,12 @@ func Merge(a, b *Resource) (*Resource, error) {
return Empty(), errMergeConflictSchemaURL
}

// Note: 'b' attributes will overwrite 'a' with last-value-wins in attribute.Key()
// Meaning this is equivalent to: append(a.Attributes(), b.Attributes()...)
combineAttrs := mergeAttrs(b.Set(), a.Set())

var combineEntityId []attribute.KeyValue

var entityType string
if a.entity.typ == b.entity.typ {
entityType = a.entity.typ
combineEntityId = mergeAttrs(&b.entity.id, &a.entity.id)
} else {
if a.entity.typ == "" {
entityType = b.entity.typ
combineEntityId = b.entity.id.ToSlice()
} else if b.entity.typ == "" {
entityType = a.entity.typ
combineEntityId = a.entity.id.ToSlice()
} else {
// Different non-empty entities.
combineEntityId = a.entity.id.ToSlice()
// TODO: merge the id of the updating Entity into the non-identifying
// attributes of the old Resource, attributes from the updating Entity
// take precedence.
panic("not implemented")
}
}
mergedEntity := internal.MergeEntities(&a.entity, &b.entity)
merged := NewWithEntity(schemaURL, mergedEntity)

merged := NewWithEntity(schemaURL, entityType, combineEntityId, combineAttrs)
return merged, nil
}

func mergeAttrs(a, b *attribute.Set) []attribute.KeyValue {
if a.Len()+b.Len() == 0 {
return nil
}

mi := attribute.NewMergeIterator(a, b)
combine := make([]attribute.KeyValue, 0, a.Len()+b.Len())
for mi.Next() {
combine = append(combine, mi.Attribute())
}
return combine
}

// Empty returns an instance of Resource with no attributes. It is
// equivalent to a `nil` Resource.
func Empty() *Resource {
Expand Down Expand Up @@ -338,7 +292,7 @@ func (r *Resource) Set() *attribute.Set {
if r == nil {
r = Empty()
}
return &r.attrs
return &r.entity.Attrs
}

// MarshalJSON encodes the resource attributes as a JSON list of { "Key":
Expand All @@ -356,14 +310,14 @@ func (r *Resource) MarshalJSON() ([]byte, error) {
Id any
}
}{
Attributes: r.attrs.MarshalableToJSON(),
Attributes: r.entity.Attrs.MarshalableToJSON(),
SchemaURL: r.schemaURL,
Entity: struct {
Type string
Id any
}{
Type: r.entity.Type(),
Id: r.entity.id.MarshalableToJSON(),
Type: r.entity.Type,
Id: r.entity.Id.MarshalableToJSON(),
},
}

Expand All @@ -375,13 +329,13 @@ func (r *Resource) Len() int {
if r == nil {
return 0
}
return r.attrs.Len()
return r.entity.Attrs.Len()
}

// Encoded returns an encoded representation of the resource.
func (r *Resource) Encoded(enc attribute.Encoder) string {
if r == nil {
return ""
}
return r.attrs.Encoded(enc)
return r.entity.Attrs.Encoded(enc)
}

0 comments on commit 0b099b2

Please sign in to comment.