-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
database: postgres implementation with tests.
- Loading branch information
1 parent
fb32dcf
commit a5c6400
Showing
30 changed files
with
3,444 additions
and
2,838 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,261 @@ | ||
package pgsql | ||
|
||
import ( | ||
"database/sql" | ||
"errors" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/lib/pq" | ||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/coreos/clair/database" | ||
"github.com/coreos/clair/pkg/commonerr" | ||
) | ||
|
||
func (tx *pgSession) UpsertAncestry(ancestry database.Ancestry, features []database.NamespacedFeature, processedBy database.Processors) error { | ||
if ancestry.Name == "" { | ||
log.Warning("Empty ancestry name is not allowed") | ||
return commonerr.NewBadRequestError("could not insert an ancestry with empty name") | ||
} | ||
|
||
if len(ancestry.Layers) == 0 { | ||
log.Warning("Empty ancestry is not allowed") | ||
return commonerr.NewBadRequestError("could not insert an ancestry with 0 layers") | ||
} | ||
|
||
err := tx.deleteAncestry(ancestry.Name) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var ancestryID int64 | ||
err = tx.QueryRow(insertAncestry, ancestry.Name).Scan(&ancestryID) | ||
if err != nil { | ||
if isErrUniqueViolation(err) { | ||
return handleError("insertAncestry", errors.New("Other Go-routine is processing this ancestry (skip).")) | ||
} | ||
return handleError("insertAncestry", err) | ||
} | ||
|
||
err = tx.insertAncestryLayers(ancestryID, ancestry.Layers) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = tx.insertAncestryFeatures(ancestryID, features) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return tx.persistProcessors(persistAncestryLister, | ||
"persistAncestryLister", | ||
persistAncestryDetector, | ||
"persistAncestryDetector", | ||
ancestryID, processedBy) | ||
} | ||
|
||
func (tx *pgSession) FindAncestry(name string) (database.Ancestry, database.Processors, bool, error) { | ||
ancestry := database.Ancestry{Name: name} | ||
processed := database.Processors{} | ||
|
||
var ancestryID int64 | ||
err := tx.QueryRow(searchAncestry, name).Scan(&ancestryID) | ||
if err != nil { | ||
if err == sql.ErrNoRows { | ||
return ancestry, processed, false, nil | ||
} | ||
return ancestry, processed, false, handleError("searchAncestry", err) | ||
} | ||
|
||
ancestry.Layers, err = tx.findAncestryLayers(ancestryID) | ||
if err != nil { | ||
return ancestry, processed, false, err | ||
} | ||
|
||
processed.Detectors, err = tx.findProcessors(searchAncestryDetectors, "searchAncestryDetectors", "detector", ancestryID) | ||
if err != nil { | ||
return ancestry, processed, false, err | ||
} | ||
|
||
processed.Listers, err = tx.findProcessors(searchAncestryListers, "searchAncestryListers", "lister", ancestryID) | ||
if err != nil { | ||
return ancestry, processed, false, err | ||
} | ||
|
||
return ancestry, processed, true, nil | ||
} | ||
|
||
func (tx *pgSession) FindAncestryFeatures(name string) (database.AncestryWithFeatures, bool, error) { | ||
var ( | ||
awf database.AncestryWithFeatures | ||
ok bool | ||
err error | ||
) | ||
awf.Ancestry, awf.ProcessedBy, ok, err = tx.FindAncestry(name) | ||
if err != nil { | ||
return awf, false, err | ||
} | ||
|
||
if !ok { | ||
return awf, false, nil | ||
} | ||
|
||
rows, err := tx.Query(searchAncestryFeatures, name) | ||
if err != nil { | ||
return awf, false, handleError("searchAncestryFeatures", err) | ||
} | ||
|
||
for rows.Next() { | ||
nf := database.NamespacedFeature{} | ||
err := rows.Scan(&nf.Namespace.Name, &nf.Namespace.VersionFormat, &nf.Feature.Name, &nf.Feature.Version) | ||
if err != nil { | ||
return awf, false, handleError("searchAncestryFeatures", err) | ||
} | ||
nf.Feature.VersionFormat = nf.Namespace.VersionFormat | ||
awf.Features = append(awf.Features, nf) | ||
} | ||
|
||
return awf, true, nil | ||
} | ||
|
||
func (tx *pgSession) deleteAncestry(name string) error { | ||
result, err := tx.Exec(removeAncestry, name) | ||
if err != nil { | ||
return handleError("removeAncestry", err) | ||
} | ||
|
||
_, err = result.RowsAffected() | ||
if err != nil { | ||
return handleError("removeAncestry", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (tx *pgSession) findProcessors(query, queryName, processorType string, id int64) ([]string, error) { | ||
rows, err := tx.Query(query, id) | ||
if err != nil { | ||
if err == sql.ErrNoRows { | ||
log.Warning("No " + processorType + " are used") | ||
return nil, nil | ||
} | ||
return nil, handleError(queryName, err) | ||
} | ||
|
||
var ( | ||
processors []string | ||
processor string | ||
) | ||
|
||
for rows.Next() { | ||
err := rows.Scan(&processor) | ||
if err != nil { | ||
return nil, handleError(queryName, err) | ||
} | ||
processors = append(processors, processor) | ||
} | ||
|
||
return processors, nil | ||
} | ||
|
||
func (tx *pgSession) findAncestryLayers(ancestryID int64) ([]database.Layer, error) { | ||
rows, err := tx.Query(searchAncestryLayer, ancestryID) | ||
if err != nil { | ||
return nil, handleError("searchAncestryLayer", err) | ||
} | ||
layers := []database.Layer{} | ||
for rows.Next() { | ||
var layer database.Layer | ||
err := rows.Scan(&layer.Hash) | ||
if err != nil { | ||
return nil, handleError("searchAncestryLayer", err) | ||
} | ||
layers = append(layers, layer) | ||
} | ||
return layers, nil | ||
} | ||
|
||
func (tx *pgSession) insertAncestryLayers(ancestryID int64, layers []database.Layer) error { | ||
layerIDs := map[string]sql.NullInt64{} | ||
for _, l := range layers { | ||
layerIDs[l.Hash] = sql.NullInt64{} | ||
} | ||
|
||
layerHashes := []string{} | ||
for hash := range layerIDs { | ||
layerHashes = append(layerHashes, hash) | ||
} | ||
|
||
rows, err := tx.Query(searchLayerIDs, pq.Array(layerHashes)) | ||
if err != nil { | ||
return handleError("searchLayerIDs", err) | ||
} | ||
|
||
for rows.Next() { | ||
var ( | ||
layerID sql.NullInt64 | ||
layerName string | ||
) | ||
err := rows.Scan(&layerID, &layerName) | ||
if err != nil { | ||
return handleError("searchLayerIDs", err) | ||
} | ||
layerIDs[layerName] = layerID | ||
} | ||
|
||
notFound := []string{} | ||
for hash, id := range layerIDs { | ||
if !id.Valid { | ||
notFound = append(notFound, hash) | ||
} | ||
} | ||
|
||
if len(notFound) > 0 { | ||
return handleError("searchLayerIDs", fmt.Errorf("Layer %s is not found in database", strings.Join(notFound, ","))) | ||
} | ||
|
||
//TODO(Sida): use bulk insert. | ||
stmt, err := tx.Prepare(insertAncestryLayer) | ||
if err != nil { | ||
return handleError("insertAncestryLayer", err) | ||
} | ||
|
||
defer stmt.Close() | ||
for index, layer := range layers { | ||
_, err := stmt.Exec(ancestryID, index, layerIDs[layer.Hash].Int64) | ||
if err != nil { | ||
return handleError("insertAncestryLayer", commonerr.CombineErrors(err, stmt.Close())) | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (tx *pgSession) insertAncestryFeatures(ancestryID int64, features []database.NamespacedFeature) error { | ||
featureIDs, err := tx.findNamespacedFeatureIDs(features) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
//TODO(Sida): use bulk insert. | ||
stmtFeatures, err := tx.Prepare(insertAncestryFeature) | ||
if err != nil { | ||
return handleError("insertAncestryFeature", err) | ||
} | ||
|
||
defer stmtFeatures.Close() | ||
|
||
for _, id := range featureIDs { | ||
if !id.Valid { | ||
return errors.New("requested namespaced feature is not in database") | ||
} | ||
|
||
_, err := stmtFeatures.Exec(ancestryID, id) | ||
if err != nil { | ||
return handleError("insertAncestryFeature", err) | ||
} | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.