Skip to content

Commit

Permalink
Feature: refactor cache and add SPARQL queries logging (#2)
Browse files Browse the repository at this point in the history
* extract cache logic in a separate module

* add SPARQL query logging module

* rename generate_cache_key to prevent conflicts
  • Loading branch information
syphax-bouazzouni committed Jan 20, 2025
1 parent 59251e5 commit 85f01b8
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 122 deletions.
158 changes: 51 additions & 107 deletions lib/sparql/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ module SPARQL
# @see https://www.w3.org/TR/sparql11-results-csv-tsv/
class Client
autoload :Query, 'sparql/client/query'
autoload :Cache, 'sparql/client/cache'
autoload :Logging, 'sparql/client/logging'
autoload :Repository, 'sparql/client/repository'
autoload :Update, 'sparql/client/update'
autoload :VERSION, 'sparql/client/version'
Expand Down Expand Up @@ -58,6 +60,9 @@ class ServerError < StandardError; end

XMLNS = {'sparql' => 'http://www.w3.org/2005/sparql-results#'}.freeze

attr_reader :cache
attr_reader :logger

##
# The SPARQL endpoint URL, or an RDF::Queryable instance, to use the native SPARQL engine.
#
Expand Down Expand Up @@ -94,12 +99,9 @@ class ServerError < StandardError; end
# Defaults `User-Agent` header, unless one is specified.
# @option options [Hash] :read_timeout
def initialize(url, **options, &block)
@logger = options[:logger] ||= Kernel.const_defined?("LOGGER") ? Kernel.const_get("LOGGER") : Logger.new(STDOUT)
@redis_cache = nil

if options[:redis_cache]
@redis_cache = options[:redis_cache]
end
@cache = SPARQL::Client::Cache.new(redis_cache: options[:redis_cache])
@logger = Logging.new(redis: @cache.redis_cache,
logger: options[:logger])

case url
when RDF::Queryable
Expand Down Expand Up @@ -331,54 +333,32 @@ def nodes
# @raise [IOError] if connection is closed
# @see https://www.w3.org/TR/sparql11-protocol/#query-operation
def query(query, **options)
unless query.respond_to?(:options) && query.options[:bypass_cache]
if @redis_cache && (query.instance_of?(SPARQL::Client::Query) || options[:graphs])


if options[:graphs] || query.options[:graphs]
cache_key = SPARQL::Client::Query.generate_cache_key(query.to_s,
options[:graphs] || query.options[:graphs])
else
cache_key = query.cache_key
end
cached_response = nil
@logger.log(query, user: options[:user]) do
cached_response = @cache.get(query, options)
end

cache_response = @redis_cache.get(cache_key[:query])
return cached_response if cached_response

if options[:reload_cache] and options[:reload_cache] == true
@redis_cache.del(cache_key[:query])
cache_response = nil
end

if cache_response
cache_key[:graphs].each do |g|
unless @redis_cache.sismember(g, cache_key[:query])
@redis_cache.del(cache_key[:query])
cache_response = nil
break
end
end
if cache_response
return Marshal.load(cache_response)
end
end

options[:cache_key] = cache_key
end
end
@op = :query
@alt_endpoint = options[:endpoint]
case @url
when RDF::Queryable
require 'sparql' unless defined?(::SPARQL::Grammar)
begin
SPARQL.execute(query, @url, optimize: true, **options)
rescue SPARQL::MalformedQuery
$stderr.puts "error running #{query}: #{$!}"
raise
output = nil
@logger.log(query, user: options[:user], cached: false) do
case @url
when RDF::Queryable
require 'sparql' unless defined?(::SPARQL::Grammar)
begin
output = SPARQL.execute(query, @url, optimize: true, **options)
rescue SPARQL::MalformedQuery
$stderr.puts "error running #{query}: #{$!}"
raise
end
else
output = parse_response(response(query, **options), **options)
end
else
parse_response(response(query, **options), **options)
end
output
end

##
Expand All @@ -394,17 +374,21 @@ def query(query, **options)
# @see https://www.w3.org/TR/sparql11-protocol/#update-operation
def update(query, **options)
@op = :update
if @redis_cache && !query.options[:bypass_cache]
query_delete_cache(query)

if @cache.redis_cache && !query.options[:bypass_cache]
raise Exception, "Unsupported cacheable query" if query.options[:graph].nil?
@cache.invalidate(query.options[:graph].to_s)
end

@alt_endpoint = options[:endpoint]
case @url
when RDF::Queryable
require 'sparql' unless defined?(::SPARQL::Grammar)
SPARQL.execute(query, @url, update: true, optimize: true, **options)
else
response(query, **options)
@logger.log(query, user: options[:user], cached: false) do
case @url
when RDF::Queryable
require 'sparql' unless defined?(::SPARQL::Grammar)
SPARQL.execute(query, @url, update: true, optimize: true, **options)
else
response(query, **options)
end
end
self
end
Expand All @@ -424,64 +408,20 @@ def response(query, **options)
headers['Accept'] = options[:content_type] if options[:content_type]
request(query, headers) do |response|
case response
when Net::HTTPBadRequest # 400 Bad Request
when Net::HTTPBadRequest # 400 Bad Request
raise MalformedQuery.new(response.body + " Processing query #{query}")
when Net::HTTPClientError # 4xx
raise ClientError.new(response.body + " Processing query #{query}")
when Net::HTTPServerError # 5xx
raise ServerError.new(response.body + " Processing query #{query}")
when Net::HTTPSuccess # 2xx
when Net::HTTPSuccess # 2xx
response
else
# type code here
end
end
end

def query_delete_cache(update)
if update.options[:graph].nil?
raise Exception, "Unsuported cacheable query"
end
cache_invalidate_graph(update.options[:graph].to_s)
end

def cache_invalidate_graph(graphs)
return if @redis_cache.nil?
graphs = [graphs] unless graphs.instance_of?(Array)
graphs.each do |graph|
attempts = 0
begin
graph = graph.to_s
graph = "sparql:graph:#{graph}" unless graph.start_with?("sparql:graph:")
if @redis_cache.exists?(graph)
begin
@redis_cache.del(graph)
rescue => exception
puts "warning: error in cache invalidation `#{exception}`"
puts exception.backtrace
end
end
rescue Exception => e
if attempts < 3
attempts += 1
sleep(5)
retry
end
end
end
end

def query_put_cache(keys, entry)
# expiration = 1800 #1/2 hour
data = Marshal.dump(entry)
if data.length > 50e6 # 50MB of marshal object
# avoid large entries to go in the cache
return
end
keys[:graphs].each do |g|
@redis_cache.sadd(g, keys[:query])
end
@redis_cache.set(keys[:query], data)
#@redis_cache.expire(keys[:query],expiration)
end

##
# @param [Net::HTTPSuccess] response
Expand All @@ -495,9 +435,7 @@ def parse_response(response, **options)
response.body == 'true'
when RESULT_JSON
result_data = self.class.parse_json_bindings(response.body, nodes)
if options[:cache_key]
query_put_cache(options[:cache_key], result_data)
end
@cache.add(options[:cache_key], result_data) if options[:cache_key]
return result_data
when RESULT_XML
self.class.parse_xml_bindings(response.body, nodes)
Expand Down Expand Up @@ -815,7 +753,13 @@ def inspect
end

def redis_cache=(redis_cache)
@redis_cache = redis_cache
@cache.redis_cache = redis_cache
@logger.redis = redis_cache
end

def logger=(logger)
@logger.logger = logger
@logger.redis = @cache.redis_cache
end

protected
Expand Down
120 changes: 120 additions & 0 deletions lib/sparql/client/cache.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
class SPARQL::Client
class Cache
attr_accessor :redis_cache

def initialize(redis_cache: nil)
@redis_cache = redis_cache if redis_cache
end

def add(key, value)
cache_query_response(key, value)
end

def get(query, options)
cached_query_response(query, options)
end

def invalidate(graphs)
cache_invalidate_graph(graphs)
end

def key(query, options)
query_cache_key(query, options)
end

def self.generate_cache_key(string, from)
from = from.map { |x| x.to_s }.uniq.sort
sorted_graphs = from.join ":"
digest = Digest::MD5.hexdigest(string)
from = from.map { |x| "sparql:graph:#{x}" }
return { graphs: from, query: "sparql:#{sorted_graphs}:#{digest}" }
end

private

def cache_invalidate_graph(graphs)
return if @redis_cache.nil?
graphs = [graphs] unless graphs.instance_of?(Array)
graphs.each do |graph|
attempts = 0
begin
graph = graph.to_s
graph = "sparql:graph:#{graph}" unless graph.start_with?("sparql:graph:")
if @redis_cache.exists?(graph)
begin
@redis_cache.del(graph)
rescue => exception
puts "warning: error in cache invalidation `#{exception}`"
end
end
rescue Exception => e
if attempts < 3
attempts += 1
sleep(5)
retry
end
end
end
end

def cache_query_response(keys, entry)
# expiration = 1800 #1/2 hour
data = Marshal.dump(entry)
if data.length > 50e6 # 50MB of marshal object
# avoid large entries to go in the cache
return
end
keys[:graphs].each do |g|
@redis_cache.sadd(g, keys[:query])
end
@redis_cache.set(keys[:query], data)
#@redis_cache.expire(keys[:query],expiration)
end

def cache_key(query)
return nil if query.options[:from].nil? || query.options[:from].empty?
from = query.options[:from]
from = [from] unless from.instance_of?(Array)
SPARQL::Client::Cache.generate_cache_key(query.to_s, from)
end

def query_cache_key(query, options)
if options[:graphs] || query.options[:graphs]
cache_key = SPARQL::Client::Cache.generate_cache_key(query.to_s, options[:graphs] || query.options[:graphs])
else
cache_key = cache_key(query)
end
cache_key
end

def cached_query_response(query, options)
return nil if query.respond_to?(:options) && query.options[:bypass_cache]

if @redis_cache && (query.instance_of?(SPARQL::Client::Query) || options[:graphs])

cache_key = query_cache_key(query, options)
cache_response = @redis_cache.get(cache_key[:query])

if options[:reload_cache] and options[:reload_cache] == true
@redis_cache.del(cache_key[:query])
cache_response = nil
end

if cache_response
cache_key[:graphs].each do |g|
unless @redis_cache.sismember(g, cache_key[:query])
@redis_cache.del(cache_key[:query])
cache_response = nil
break
end
end

return Marshal.load(cache_response) if cache_response
end

options[:cache_key] = cache_key
nil
end
end
end
end
Loading

0 comments on commit 85f01b8

Please sign in to comment.