-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcachereader.go
More file actions
189 lines (140 loc) · 4.77 KB
/
cachereader.go
File metadata and controls
189 lines (140 loc) · 4.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package cachereader
import (
"context"
"fmt"
"io"
"net/url"
"sync"
"github.com/whosonfirst/go-cache"
"github.com/whosonfirst/go-ioutil"
"github.com/whosonfirst/go-reader/v2"
)
// type CacheReaderOptions is a struct for use with the `NewCacheReaderWithOptions` method.
type CacheReaderOptions struct {
Reader reader.Reader
Cache cache.Cache
}
// type CacheReader implements the `whosonfirst/go-reader` interface for use with a caching layer for reading documents.
type CacheReader struct {
reader.Reader
// A `whosonfirst/go-reader.Reader` instance used to read source documents.
reader reader.Reader
// A `whosonfirst/go-cache.Cache` instance used to cache source documents.
cache cache.Cache
}
// type CacheStatus is a constant indicating the cache status for a read action
type CacheStatus uint8
const (
// CacheNotFound signals that there are no recorded events for a cache lookup
CacheNotFound CacheStatus = iota
// CacheHit signals that the last read event was a cache HIT
CacheHit
// CacheHit signals that the last read event was a cache MISS
CacheMiss
)
var lastReadMap *sync.Map
func init() {
ctx := context.Background()
err := reader.RegisterReader(ctx, "cachereader", NewCacheReader)
if err != nil {
panic(err)
}
lastReadMap = new(sync.Map)
}
func lastReadKey(cr reader.Reader, key string) string {
return fmt.Sprintf("%p#%s", cr, key)
}
func setLastRead(cr reader.Reader, key string, s CacheStatus) {
fq_key := lastReadKey(cr, key)
lastReadMap.Store(fq_key, s)
}
// GetLastRead returns the `CacheStatus` value for the last event using 'cr' to read 'path'.
func GetLastRead(cr reader.Reader, key string) (CacheStatus, bool) {
fq_key := lastReadKey(cr, key)
v, _ := lastReadMap.Load(fq_key)
if v == nil {
return CacheNotFound, false
}
s := v.(CacheStatus)
return s, true
}
// NewCacheReader will return a new `CacheReader` instance configured by 'uri' which
// is expected to take the form of:
//
// cachereader://?reader={READER_URI}&cache={CACHE_URI}
//
// Where {READER_URI} is expected to be a valid `whosonfirst/go-reader.Reader` URI and
// {CACHE_URI} is expected to be a valid `whosonfirst/go-cache.Cache` URI. Note that multiple
// "?reader=" parameter are supported; internally the `CacheReader` implementation uses a
// `whosonfirst/go-reader.MultiReader` instance for reading documents.
func NewCacheReader(ctx context.Context, uri string) (reader.Reader, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, fmt.Errorf("Failed to parse URI, %w", err)
}
q := u.Query()
cache_uri := q.Get("cache")
if cache_uri == "" {
return nil, fmt.Errorf("Missing ?cache= parameter")
}
reader_uris := q["reader"]
r, err := reader.NewMultiReaderFromURIs(ctx, reader_uris...)
if err != nil {
return nil, fmt.Errorf("Failed to create reader, %w", err)
}
c, err := cache.NewCache(ctx, cache_uri)
if err != nil {
return nil, fmt.Errorf("Failed to create new cache, %w", err)
}
opts := &CacheReaderOptions{
Reader: r,
Cache: c,
}
return NewCacheReaderWithOptions(ctx, opts)
}
// NewCacheReader will return a new `CacheReader` instance configured by 'opts'.
func NewCacheReaderWithOptions(ctx context.Context, opts *CacheReaderOptions) (reader.Reader, error) {
cr := &CacheReader{
reader: opts.Reader,
cache: opts.Cache,
}
return cr, nil
}
// Exists returns a boolean value indicating whether 'key' already exists.
func (cr *CacheReader) Exists(ctx context.Context, key string) (bool, error) {
_, err := cr.cache.Get(ctx, key)
if err == nil {
return true, nil
}
if !cache.IsCacheMiss(err) {
return false, fmt.Errorf("Failed to read from cache for %s with %T, %w", key, cr.cache, err)
}
return cr.reader.Exists(ctx, key)
}
// Read returns an `io.ReadSeekCloser` instance for the document resolved by `uri`. The document
// will also be added to the internal cache maintained by 'cr' if it not already present.
func (cr *CacheReader) Read(ctx context.Context, key string) (io.ReadSeekCloser, error) {
fh, err := cr.cache.Get(ctx, key)
if err == nil {
setLastRead(cr, key, CacheHit)
return ioutil.NewReadSeekCloser(fh)
}
if !cache.IsCacheMiss(err) {
return nil, fmt.Errorf("Failed to read from cache for %s with %T, %w", key, cr.cache, err)
}
setLastRead(cr, key, CacheMiss)
fh, err = cr.reader.Read(ctx, key)
if err != nil {
return nil, fmt.Errorf("Failed to read %s from %T, %w", key, cr.reader, err)
}
fh, err = cr.cache.Set(ctx, key, fh)
if err != nil {
return nil, fmt.Errorf("Failed to set cache for %s with %T, %w", key, cr.cache, err)
}
// https://github.com/whosonfirst/go-cache/issues/1
return ioutil.NewReadSeekCloser(fh)
}
// ReaderURI returns final URI resolved by `uri` for this reader.
func (cr *CacheReader) ReaderURI(ctx context.Context, key string) string {
return cr.reader.ReaderURI(ctx, key)
}