Skip to content
This repository was archived by the owner on Feb 22, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ var (
verbose bool
raw bool
maxStreams int
filter string
)

// Error messages
Expand All @@ -67,6 +68,7 @@ func init() {
fetchCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose log output (includes log context in data fields)")
fetchCmd.Flags().BoolVarP(&raw, "raw", "r", false, "Raw JSON output")
fetchCmd.Flags().IntVarP(&maxStreams, "max-streams", "m", 100, "Maximum number of streams to fetch from (for prefix search)")
fetchCmd.Flags().StringVarP(&filter, "query", "q", "", "Filter and Pattern Syntax")
}

func fetch(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -121,7 +123,7 @@ func fetch(cmd *cobra.Command, args []string) error {
ctx, cancel := events.WithSignals(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

eventChan := logReader.StreamEvents(ctx, follow)
eventChan := logReader.StreamEvents(ctx, follow, filter)

ticker := time.After(7 * time.Second)

Expand Down
13 changes: 7 additions & 6 deletions lib/cwreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,20 @@ func (c *CloudwatchLogsReader) ListStreams() ([]*cloudwatchlogs.LogStream, error
// given in the readers constructor. The channel will be closed once
// all events are read or an error occurs. You can check for errors
// after the channel is closed by calling Error()
func (c *CloudwatchLogsReader) StreamEvents(ctx context.Context, follow bool) <-chan Event {
func (c *CloudwatchLogsReader) StreamEvents(ctx context.Context, follow bool, filter string) <-chan Event {
eventChan := make(chan Event)
go c.pumpEvents(ctx, eventChan, follow)
go c.pumpEvents(ctx, eventChan, follow, filter)

return eventChan
}

func (c *CloudwatchLogsReader) pumpEvents(ctx context.Context, eventChan chan<- Event, follow bool) {
func (c *CloudwatchLogsReader) pumpEvents(ctx context.Context, eventChan chan<- Event, follow bool, filter string) {
startTime := c.start.Unix() * 1e3
params := &cloudwatchlogs.FilterLogEventsInput{
Interleaved: aws.Bool(true),
LogGroupName: aws.String(c.logGroupName),
StartTime: aws.Int64(startTime),
Interleaved: aws.Bool(true),
LogGroupName: aws.String(c.logGroupName),
StartTime: aws.Int64(startTime),
FilterPattern: aws.String(filter),
}

if !follow && c.end.IsZero() {
Expand Down
2 changes: 2 additions & 0 deletions lib/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func NewEvent(cwEvent cloudwatchlogs.FilteredLogEvent, group string) Event {
if err := json.Unmarshal([]byte(*cwEvent.Message), &ecsLogsEvent); err != nil {
ecsLogsEvent = ecslogs.MakeEvent(ecslogs.INFO, *cwEvent.Message)
ecsLogsEvent.Time = ParseAWSTimestamp(cwEvent.Timestamp)
} else {
ecsLogsEvent.Message = *cwEvent.Message
}

return Event{
Expand Down