Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/flyscrape/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"

"github.com/philippta/flyscrape/cmd"

_ "github.com/philippta/flyscrape/modules/browser"
_ "github.com/philippta/flyscrape/modules/cache"
_ "github.com/philippta/flyscrape/modules/cookies"
Expand All @@ -19,6 +20,7 @@ import (
_ "github.com/philippta/flyscrape/modules/followlinks"
_ "github.com/philippta/flyscrape/modules/headers"
_ "github.com/philippta/flyscrape/modules/output/json"
_ "github.com/philippta/flyscrape/modules/output/mongodb"
_ "github.com/philippta/flyscrape/modules/output/ndjson"
_ "github.com/philippta/flyscrape/modules/proxy"
_ "github.com/philippta/flyscrape/modules/ratelimit"
Expand Down
20 changes: 20 additions & 0 deletions examples/coinmarketcap_mongodb.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export const config = {
url: "https://coinmarketcap.com/",
follow: ["a[href]"],
depth: 1,
output: {
mongodb: {
uri: "mongodb://localhost:27017",
database: "test",
collection: "coinmarketcap",
},
},
};

export default function ({ doc }) {
const title = doc.find("title");

return {
title: title.text(),
};
}
11 changes: 11 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,16 @@ require (
golang.org/x/sync v0.9.0
)

require (
github.com/golang/snappy v0.0.4 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
)

require (
github.com/Velocidex/json v0.0.0-20220224052537-92f3c0326e5a // indirect
github.com/Velocidex/ordereddict v0.0.0-20230909174157-2aa49cc5d11d // indirect
Expand Down Expand Up @@ -48,6 +58,7 @@ require (
github.com/ysmood/gson v0.7.3 // indirect
github.com/ysmood/leakless v0.8.0 // indirect
github.com/zalando/go-keyring v0.2.5 // indirect
go.mongodb.org/mongo-driver v1.17.2
golang.org/x/crypto v0.29.0 // indirect
golang.org/x/net v0.31.0 // indirect
golang.org/x/sys v0.27.0 // indirect
Expand Down
18 changes: 18 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,21 @@ github.com/go-sqlite/sqlite3 v0.0.0-20180313105335-53dd8e640ee7 h1:ow5vK9Q/DSKkx
github.com/go-sqlite/sqlite3 v0.0.0-20180313105335-53dd8e640ee7/go.mod h1:JxSQ+SvsjFb+p8Y+bn+GhTkiMfKVGBD0fq43ms2xw04=
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=
github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gonuts/binary v0.2.0 h1:caITwMWAoQWlL0RNvv2lTU/AHqAJlVuu6nZmNgfbKW4=
github.com/gonuts/binary v0.2.0/go.mod h1:kM+CtBrCGDSKdv8WXTuCUsw+loiy8f/QEI8YCCC0M/E=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904 h1:4/hN5RUoecvl+RmJRE2YxKWtnnQls6rQjjW5oV7qg2U=
github.com/google/pprof v0.0.0-20230207041349-798e818bf904/go.mod h1:uglQLonpP8qtYCYyzA+8c/9qtqgA3qsXGYqCPKARAFg=
github.com/ianlancetaylor/demangle v0.0.0-20220319035150-800ac71e25c2/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w=
github.com/inancgumus/screen v0.0.0-20190314163918-06e984b86ed3 h1:fO9A67/izFYFYky7l1pDP5Dr0BTCRkaQJUG6Jm5ehsk=
github.com/inancgumus/screen v0.0.0-20190314163918-06e984b86ed3/go.mod h1:Ey4uAp+LvIl+s5jRbOHLcZpUDnkjLBROl15fZLwPlTM=
github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6 h1:IsMZxCuZqKuao2vNdfD82fjjgPLfyHLpR41Z88viRWs=
github.com/keybase/go-keychain v0.0.0-20231219164618-57a3676c3af6/go.mod h1:3VeWNIJaW+O5xpRQbPp0Ybqu1vJd/pm7s2F473HRrkw=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
Expand All @@ -73,6 +79,8 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE=
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/nlnwa/whatwg-url v0.4.0 h1:B3kFb5EL7KILeBkhrlQvFi41Ex0p4ropVA9brt5ungI=
github.com/nlnwa/whatwg-url v0.4.0/go.mod h1:pLzpJjFPtA+n7RCLvp0GBxvDHa/2ckNCBK9mfEeNOMQ=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
Expand Down Expand Up @@ -106,6 +114,14 @@ github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4=
github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY=
github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
github.com/ysmood/fetchup v0.2.3 h1:ulX+SonA0Vma5zUFXtv52Kzip/xe7aj4vqT5AJwQ+ZQ=
github.com/ysmood/fetchup v0.2.3/go.mod h1:xhibcRKziSvol0H1/pj33dnKrYyI2ebIvz5cOOkYGns=
github.com/ysmood/goob v0.4.0 h1:HsxXhyLBeGzWXnqVKtmT9qM7EuVs/XOgkX7T6r1o1AQ=
Expand All @@ -125,6 +141,8 @@ github.com/zalando/go-keyring v0.2.5 h1:Bc2HHpjALryKD62ppdEzaFG6VxL6Bc+5v0LYpN8L
github.com/zalando/go-keyring v0.2.5/go.mod h1:HL4k+OXQfJUWaMnqyuSOc0drfGPX2b51Du6K+MRgZMk=
go.etcd.io/bbolt v1.3.11 h1:yGEzV1wPz2yVCLsD8ZAiGHhHVlczyC9d1rP43/VCRJ0=
go.etcd.io/bbolt v1.3.11/go.mod h1:dksAq7YMXoljX0xu6VF5DMZGbhYYoLUalEiSySYAS4I=
go.mongodb.org/mongo-driver v1.17.2 h1:gvZyk8352qSfzyZ2UMWcpDpMSGEr1eqE4T793SqyhzM=
go.mongodb.org/mongo-driver v1.17.2/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
Expand Down
231 changes: 231 additions & 0 deletions modules/output/mongodb/mongodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package mongodb

import (
"context"
"errors"
"log"
"os"
"sync"
"time"

"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"

"github.com/philippta/flyscrape"
)

var (
DefaultMaxPoolSize = 100
DefaultBatchSize = 100
DefaultFlushInterval = 10 * time.Second
DefaultTimeout = 30 * time.Second
DefaultMaxRetries = 3
)

func init() {
flyscrape.RegisterModule(&Module{})
}

type Module struct {
SourceId string `json:"sourceId"`
Output struct {
MongoDB struct {
URI string `json:"uri"`
Database string `json:"database"`
Collection string `json:"collection"`
MaxPoolSize int `json:"maxPoolSize,omitempty"`
} `json:"mongodb"`
} `json:"output"`
Concurrency int `json:"concurrency"`

client *mongo.Client
collection *mongo.Collection
maxPoolSize int

buf []interface{}
mu *sync.Mutex

ticker *time.Ticker
done chan struct{}
concurrency chan struct{}
}

func (m *Module) ModuleInfo() flyscrape.ModuleInfo {
return flyscrape.ModuleInfo{
ID: "output.mongodb",
New: func() flyscrape.Module { return new(Module) },
}
}

func (m *Module) Provision(ctx flyscrape.Context) {
if m.disabled() {
return
}

m.mu = &sync.Mutex{}

m.maxPoolSize = DefaultMaxPoolSize
if m.Output.MongoDB.MaxPoolSize != 0 {
m.maxPoolSize = m.Output.MongoDB.MaxPoolSize
}

if m.concurrencyEnabled() {
m.concurrency = make(chan struct{}, m.Concurrency)
for i := 0; i < m.Concurrency; i++ {
m.concurrency <- struct{}{}
}
}

ctxTimeout, cancel := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancel()
client, err := mongo.Connect(ctxTimeout, options.Client().ApplyURI(m.Output.MongoDB.URI).SetMaxPoolSize(uint64(m.maxPoolSize)))

if err != nil {
log.Printf("failed to connect to MongoDB: %v", err)
os.Exit(1)
}

if err := client.Ping(ctxTimeout, readpref.Primary()); err != nil {
log.Printf("failed to ping MongoDB: %v", err)
os.Exit(1)
}

m.client = client
m.collection = client.Database(m.Output.MongoDB.Database).Collection(m.Output.MongoDB.Collection)
m.buf = make([]interface{}, 0, DefaultBatchSize)
m.done = make(chan struct{})
m.ticker = time.NewTicker(DefaultFlushInterval)

go func() {
for {
select {
case <-m.ticker.C:
m.flushBuffer()
case <-m.done:
return
}
}
}()
}

func (m *Module) ReceiveResponse(resp *flyscrape.Response) {
if m.disabled() {
return
}

if resp.Data == nil && resp.Error == nil {
return
}

o := output{
URL: resp.Request.URL,
Data: resp.Data,
Timestamp: time.Now(),
}
if resp.Error != nil {
o.Error = resp.Error.Error()
}

m.mu.Lock()
defer m.mu.Unlock()

m.buf = append(m.buf, o)
if len(m.buf) >= DefaultBatchSize {
go m.flushBuffer()
}
}

func (m *Module) Finalize() {
if m.disabled() {
return
}

m.ticker.Stop()

ctxTimeout, cancel := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancel()

done := make(chan struct{})
go func() {
m.flushBuffer()
close(done)
}()
select {
case <-done:
case <-ctxTimeout.Done():
}

if err := m.client.Disconnect(ctxTimeout); err != nil {
log.Printf("failed to disconnect from MongoDB: %v", err)
}

close(m.done)
}

func (m *Module) disabled() bool {
return m.Output.MongoDB.URI == "" || m.Output.MongoDB.Database == "" || m.Output.MongoDB.Collection == ""
}

type output struct {
URL string `json:"url,omitempty"`
Data any `json:"data,omitempty"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp,omitempty"`
}

func (m *Module) flushBuffer() {
if m.concurrencyEnabled() {
<-m.concurrency
defer func() { m.concurrency <- struct{}{} }()
}
m.mu.Lock()
defer m.mu.Unlock()

if len(m.buf) == 0 {
return
}

var err error
var res *mongo.InsertManyResult

for i := 0; i < DefaultMaxRetries; i++ {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancel()

log.Printf("attempt %d to insert %d documents to MongoDB", i+1, len(m.buf))
res, err = m.collection.InsertMany(ctx, m.buf)
if err == nil {
log.Printf("successfully inserted %d documents to MongoDB", len(res.InsertedIDs))
m.buf = m.buf[:0]
return
}

if errors.Is(err, context.DeadlineExceeded) {
log.Printf("operation timed out, retrying...")
continue
}

log.Printf("failed to insert documents to MongoDB: %v", err)
break
}

if err != nil {
log.Printf("failed to insert %d documents after %d retries: %v", len(m.buf), DefaultMaxRetries, err)
}
m.buf = m.buf[:0]
}

func (m *Module) concurrencyEnabled() bool {
return m.Concurrency > 0
}

var (
_ flyscrape.Provisioner = (*Module)(nil)
_ flyscrape.ResponseReceiver = (*Module)(nil)
_ flyscrape.Finalizer = (*Module)(nil)
)