diff --git a/.dockerignore b/.dockerignore
new file mode 100644
index 0000000..fe63811
--- /dev/null
+++ b/.dockerignore
@@ -0,0 +1,3 @@
+.vscode/
+.git/
+
diff --git a/.gitignore b/.gitignore
index 05343b2..2f21926 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,4 +5,6 @@
*.o
*.so
*.DS_Store
-fRanz/inst/librdkafka*
+franz/src/librdkafa
+franz/src/pkgconfig
+librdkafka*
diff --git a/.gitkeep b/.gitkeep
new file mode 100644
index 0000000..e69de29
diff --git a/.travis.yml b/.travis.yml
index 34c4509..2b38878 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -2,14 +2,12 @@ services:
- docker
before_install:
+
- docker-compose up -d
- sleep 20
+ - cd fRanz
language: r
r:
- release
-cache: packages
-
-os:
- - linux
- - osx
+cache: packages
\ No newline at end of file
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 0098e12..fcc807a 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -56,29 +56,43 @@ To submit a PR, please follow these steps:
1. Fork `fRanz` to your GitHub account
2. Create a branch on your fork and add your changes
3. If you are changing or adding to the R code in the package, add unit tests and integration tests confirming that your code works as expected
-3. When you are ready, click "Compare & Pull Request". Open A PR comparing your branch to the `master` branch in this repo
-4. In the description section on your PR, please indicate the following:
+4. Any commits added should ideally follow [conventional commits](https://conventionalcommits.org). See the Conventional Commits section below for more detail
+5. When you are ready, click "Compare & Pull Request". Open A PR comparing your branch to the `master` branch in this repo
+6. In the description section on your PR, please indicate the following:
- description of what the PR is trying to do and how it improves `fRanz`
- links to any open [issues](https://github.com/UptakeOpenSource/fRanz/issues) that your PR is addressing
We will try to review PRs promptly and get back to you within a few days.
+### Conventional Commits
+
+We strive to follow conventional commits to make creating NEWS.md and other files easy to maintain. Additionally it provides ease to PR reviewers to semantically check which commit they want to review and sift through the changes. Though we are not terribly strict around this rule since it will cause lots of friction for first time contributors to have to re-author commits, and that is more important than havea pure git history, we do ask frequent commiters to follow this convention.
+
+In other words, this is a good useful convention but not at the expense of reducing interest or involvement in the project. This will come at the expense of automation, but that's ok.
+
+For more details see [conventional commits](https://conventionalcommits.org)
+
## Running Tests Locally
### Development
-> WIP: Document Local Development Setup
+`NOTE TO DEVELOPERS: Please add tips as you find them`
+- You need to have g++ to work with this package properly. The clang compiler is not supported because of differences in static linking in the namespace. See this issue for more details
+
+#### Speeding up development times
-### Running Unit Tests
-> WIP: Document How to Run Unit Tests
+Currently `fRanz` installs the entire pulls and compiles the entire `librdkafka` source each time it is installed. This is good for portability but adds significant install time each time. In order to improve the experience developing a guard is put in ./configure which might cause caching issues. Use `make clean` to purge your install.
-### Running Integration Tests
-> WIP: Document How to Run Integration Tests in Docker
+### Running Unit and Integration Tests
+> You must have `docker` and `docker-compose` configured correctly
+> run `docker-compose up -d` from the root of the repository. This will start kafka running on your machine
+> run `make test` or other R testing tools
+> when done, make sure to call `docker-compose down` to shutdown any running docker instances
### Creating Releases
-> WIP: Document How to Create Releases and Versioning
+> Currently the best practice known is under `make check`
+> When we figure this out we will document it!
## Package Versioning
-> WIP: Document Package Versioning format
### Version Format
We follow semantic versioning for `fRanz` releases, `MAJOR`.`MINOR`.`PATCH`:
diff --git a/Makefile b/Makefile
index a778cb6..c99b842 100644
--- a/Makefile
+++ b/Makefile
@@ -1,15 +1,12 @@
PACKAGE = fRanz
-INSTALLDIR = $(HOME)/.$(PACKAGE)/local
-OS = $(shell uname -s)
+.PHONY: install test docs clean distclean cleanRcpp unlock build check
-.PHONY: install smoke test docs roxygen pdf version clean distclean cleanRcpp unlock
+install: docs
+ R CMD INSTALL $(PACKAGE)
-install: unlock clean cleanRcpp
- # Install fRanz R package
- Rscript -e 'if (!"Rcpp" %in% rownames(installed.packages())) {install.packages("Rcpp", repos = "https://cran.rstudio.com")}' && \
- Rscript -e "Rcpp::compileAttributes(file.path(getwd(),'$(PACKAGE)'))" && \
- Rscript -e 'if (!"devtools" %in% rownames(installed.packages())) {install.packages("devtools", repos = "https://cran.rstudio.com")}' && \
- Rscript -e "devtools::install(file.path(getwd(),'$(PACKAGE)'), force = TRUE, upgrade = FALSE)"
+check: docs clean
+ R CMD build fRanz
+ R CMD check --as-cran `ls | grep fRanz*.tar.gz`
cleanRcpp:
rm -f fRanz/R/RcppExport.R fRanz/src/RcppExport.cpp
@@ -18,20 +15,22 @@ clean:
# Remove cpp object files
find $(PACKAGE)/src -name '*.o' -delete
find $(PACKAGE)/src -name '*.so' -delete
+ find $(PACKAGE)/src -name '*.a' -delete
+ find $(PACKAGE)/src -name '*.dylib' -delete
+ rm -r $(PACKAGE)/src/librdkafka
distclean: clean cleanRcpp
unlock:
- # Remove 00LOCK-cpproll directory
+ # Remove 00LOCK directory
for libpath in $$(Rscript -e "noquote(paste(.libPaths(), collapse = ' '))"); do \
echo "Unlocking $$libpath..." && \
rm -rf $$libpath/00LOCK-$(PACKAGE); \
done
-docs roxygen:
+docs:
# Regenerate documentation with roxygen
Rscript -e "roxygen2::roxygenize('$(PACKAGE)')"
test:
# Run unit tests
Rscript -e "devtools::test('$(PACKAGE)')"
-
diff --git a/README.md b/README.md
index aa9a3dc..b72bb9c 100644
--- a/README.md
+++ b/README.md
@@ -48,3 +48,5 @@ result
# [[1]]$payload
# [1] "My First Message"
```
+
+
diff --git a/devel/Dockerfile b/devel/Dockerfile
new file mode 100644
index 0000000..d424426
--- /dev/null
+++ b/devel/Dockerfile
@@ -0,0 +1,12 @@
+FROM rocker/r-base
+ARG test
+
+# For roxygen2
+RUN apt-get update && apt-get install -y libxml2-dev
+RUN Rscript -e "install.packages(c('Rcpp','R6','testthat','uuid','roxygen2'))"
+
+RUN echo "$test"
+COPY ./fRanz fRanz
+
+RUN R CMD build fRanz
+RUN R CMD check --install-args='--no-clean-on-error' fRanz_0.1.0.tar.gz || cat fRanz.Rcheck/00install.out
\ No newline at end of file
diff --git a/fRanz/DESCRIPTION b/fRanz/DESCRIPTION
index 932cb82..77986c5 100644
--- a/fRanz/DESCRIPTION
+++ b/fRanz/DESCRIPTION
@@ -1,17 +1,16 @@
Package: fRanz
Type: Package
Title: An R Kafka Client
+Description: This is an R package for interacting with Kafka. It provides a wrapper around librdkafka with idiomatic R classes to consume and produce to Kafka topics.
Version: 0.1.0
Date: 2019-05-13
Authors@R: c(
- person("Patrick", "Boueri", email = "patrick.boueri@uptake.com", role = c("aut")),
- person("Mike", "Jermann", email = "mike.jermann@uptake.com", role = c("cre"))
+ person("Patrick", "Boueri", email = "patrick.boueri@uptake.com", role = c("cre")),
+ person("Mike", "Jermann", email = "mike.jermann@uptake.com", role = c("aut"))
)
Maintainer: Patrick Boueri
-Description: An R Kafka Client
License: BSD_3_clause + file LICENSE
Imports:
- jsonlite,
Rcpp,
R6,
methods
diff --git a/fRanz/NAMESPACE b/fRanz/NAMESPACE
index b040fef..a92293e 100644
--- a/fRanz/NAMESPACE
+++ b/fRanz/NAMESPACE
@@ -1,8 +1,5 @@
# Generated by roxygen2: do not edit by hand
-export(KafkaBroker)
-export(KafkaConsumer)
-export(KafkaProducer)
importFrom(R6,R6Class)
importFrom(Rcpp,sourceCpp)
useDynLib(fRanz)
diff --git a/fRanz/R/KafkaBroker.R b/fRanz/R/KafkaBroker.R
index a920bc5..3d9bb06 100644
--- a/fRanz/R/KafkaBroker.R
+++ b/fRanz/R/KafkaBroker.R
@@ -13,7 +13,6 @@
#' BROKER_HOST <- 'localhost'
#' BROKER_PORT <- 9092
#' TOPIC_NAME <- 'myTestTopic'
-
#' # KafkaBroker
#' broker <- KafkaBroker$new(host=BROKER_HOST, port=BROKER_PORT)
#'
@@ -27,7 +26,9 @@
#'
#'
#' # KafkaConsumer
-#' consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))
+#' consumer <- KafkaConsumer$new(brokers = list(broker),
+#' groupId = "test",
+#' extraOptions=list(`auto.offset.reset`="earliest"))
#' consumer$subscribe(topics = c(TOPIC_NAME))
#' result <- consumer$consume(topic=TOPIC_NAME)
#'
diff --git a/fRanz/R/KafkaConsumer.R b/fRanz/R/KafkaConsumer.R
index 93de516..9f5b9d0 100644
--- a/fRanz/R/KafkaConsumer.R
+++ b/fRanz/R/KafkaConsumer.R
@@ -35,7 +35,9 @@
#'
#'
#' # KafkaConsumer
-#' consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))
+#' consumer <- KafkaConsumer$new(brokers = list(broker),
+#' groupId = "test",
+#' extraOptions=list(`auto.offset.reset`="earliest"))
#' consumer$subscribe(topics = c(TOPIC_NAME))
#' result <- consumer$consume(topic=TOPIC_NAME)
#'
@@ -71,7 +73,7 @@ KafkaConsumer <- R6::R6Class(
}
, consume = function(topic, numResults=100) {
- Filter(function(msg) !is.null(msg), KafkaConsume(private$consumerPtr, numResults))
+ return(KafkaConsume(private$consumerPtr, numResults))
}
, getTopics = function() {
diff --git a/fRanz/R/RcppExports.R b/fRanz/R/RcppExports.R
index d058f7f..71bc446 100644
--- a/fRanz/R/RcppExports.R
+++ b/fRanz/R/RcppExports.R
@@ -3,7 +3,7 @@
#' @title GetRdConsumer
#' @name GetRdConsumer
-#' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
+#' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
#' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer
#' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys.
#' @return a Rcpp::XPtr
@@ -28,14 +28,15 @@ RdSubscribe <- function(consumerPtr, Rtopics) {
#' the provided consumer is subscribed to.
#' @param consumerPtr a reference to a Rcpp::XPtr
#' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum
+#' @param timeout the timeout in milliseconds. Default is 10000
#' @return a list of length numResults with values list(key=key,value=value)
-KafkaConsume <- function(consumerPtr, numResults) {
- .Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults)
+KafkaConsume <- function(consumerPtr, numResults, timeout = 10000L) {
+ .Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults, timeout)
}
#' @title GetRdProducer
#' @name GetRdProducer
-#' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
+#' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
#' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer
#' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys.
#' @return a Rcpp::XPtr
diff --git a/fRanz/cleanup b/fRanz/cleanup
new file mode 100755
index 0000000..71c25ab
--- /dev/null
+++ b/fRanz/cleanup
@@ -0,0 +1,5 @@
+#!/bin/bash
+set -eo
+
+rm -r librdkafka-1.0.0 librdkafka-1.0.0.tar.gz
+rm -r src/librdkafka
\ No newline at end of file
diff --git a/fRanz/configure b/fRanz/configure
new file mode 100755
index 0000000..1fb435d
--- /dev/null
+++ b/fRanz/configure
@@ -0,0 +1,21 @@
+#!/bin/bash
+
+set -eo
+
+
+### Install librdkafka. Check simply if the library exists if need be
+if [ ! -d "src/librdkafka" ]; then
+ wget https://github.com/edenhill/librdkafka/archive/v1.0.0.tar.gz -O librdkafka-1.0.0.tar.gz && \
+ tar xzf librdkafka-1.0.0.tar.gz && \
+ INSTALL_PATH="$PWD/src/librdkafka" && \
+ mkdir $INSTALL_PATH &&\
+ mv librdkafka-1.0.0/* $INSTALL_PATH && \
+ cd $INSTALL_PATH && \
+ ./configure --prefix=. && make && make install && \
+ cd ../.. &&
+ cp -r src/librdkafka/lib inst/ && \
+ cp -r inst/lib inst/libs && \
+ rm librdkafka-1.0.0.tar.gz;
+else
+ echo "Detected librdkafka in src"
+fi
\ No newline at end of file
diff --git a/fRanz/man/GetRdConsumer.Rd b/fRanz/man/GetRdConsumer.Rd
index 3abe841..8cb2e5c 100644
--- a/fRanz/man/GetRdConsumer.Rd
+++ b/fRanz/man/GetRdConsumer.Rd
@@ -15,5 +15,5 @@ GetRdConsumer(keys, values)
a Rcpp::XPtr
}
\description{
-Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
+Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
}
diff --git a/fRanz/man/GetRdProducer.Rd b/fRanz/man/GetRdProducer.Rd
index cf02b05..a7f11df 100644
--- a/fRanz/man/GetRdProducer.Rd
+++ b/fRanz/man/GetRdProducer.Rd
@@ -15,5 +15,5 @@ GetRdProducer(keys, values)
a Rcpp::XPtr
}
\description{
-Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
+Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
}
diff --git a/fRanz/man/Kafka-Broker.Rd b/fRanz/man/KafkaBroker.Rd
similarity index 83%
rename from fRanz/man/Kafka-Broker.Rd
rename to fRanz/man/KafkaBroker.Rd
index 9fa2fd1..8fc081e 100644
--- a/fRanz/man/Kafka-Broker.Rd
+++ b/fRanz/man/KafkaBroker.Rd
@@ -1,10 +1,9 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/KafkaBroker.R
\docType{data}
-\name{Kafka Broker}
-\alias{Kafka Broker}
+\name{KafkaBroker}
\alias{KafkaBroker}
-\title{KafkaBroker}
+\title{Kafka Broker}
\format{An object of class \code{R6ClassGenerator} of length 24.}
\usage{
KafkaBroker
@@ -35,7 +34,9 @@ producer$produce(topic = TOPIC_NAME,
# KafkaConsumer
-consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))
+consumer <- KafkaConsumer$new(brokers = list(broker),
+ groupId = "test",
+ extraOptions=list(`auto.offset.reset`="earliest"))
consumer$subscribe(topics = c(TOPIC_NAME))
result <- consumer$consume(topic=TOPIC_NAME)
diff --git a/fRanz/man/KafkaConsume.Rd b/fRanz/man/KafkaConsume.Rd
index b096c53..a54f838 100644
--- a/fRanz/man/KafkaConsume.Rd
+++ b/fRanz/man/KafkaConsume.Rd
@@ -4,12 +4,14 @@
\alias{KafkaConsume}
\title{KafkaConsume}
\usage{
-KafkaConsume(consumerPtr, numResults)
+KafkaConsume(consumerPtr, numResults, timeout = 10000L)
}
\arguments{
\item{consumerPtr}{a reference to a Rcpp::XPtr}
\item{numResults}{how many results should be consumed before returning. Will return early if offset is at maximum}
+
+\item{timeout}{the timeout in milliseconds. Default is 10000}
}
\value{
a list of length numResults with values list(key=key,value=value)
diff --git a/fRanz/man/KafkaConsumer.Rd b/fRanz/man/KafkaConsumer.Rd
index 05787d5..a90c030 100644
--- a/fRanz/man/KafkaConsumer.Rd
+++ b/fRanz/man/KafkaConsumer.Rd
@@ -41,7 +41,9 @@ producer$produce(topic = TOPIC_NAME,
# KafkaConsumer
-consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest"))
+consumer <- KafkaConsumer$new(brokers = list(broker),
+ groupId = "test",
+ extraOptions=list(`auto.offset.reset`="earliest"))
consumer$subscribe(topics = c(TOPIC_NAME))
result <- consumer$consume(topic=TOPIC_NAME)
diff --git a/fRanz/src/Makevars b/fRanz/src/Makevars
index e557180..ef0e28f 100644
--- a/fRanz/src/Makevars
+++ b/fRanz/src/Makevars
@@ -1,23 +1,5 @@
-INSTALLDIR = $(HOME)/.fRanz/librdkafka
-LIBRDKAFKADIR = $(PWD)/../inst/librdkafka-0.11.6
-
-PKG_LIBS = -L$(INSTALLDIR)/src-cpp -lrdkafka++
-PKG_CXXFLAGS = -std=c++11 -I$(INSTALLDIR)/src-cpp
-
-.PHONY: all install_librdkadka
-
-all: install_librdkadka
-
-install_librdkadka:
- if [ ! -s $(INSTALLDIR)/src-cpp/librdkafka++.a ] ; \
- then \
- mkdir -p $(INSTALLDIR) && \
- cd $(shell dirname $(LIBRDKAFKADIR)) && \
- tar xzf $(LIBRDKAFKADIR).tar.gz && \
- cd $(LIBRDKAFKADIR) && \
- ./configure && \
- $(MAKE) && \
- $(MAKE) install && \
- mv * $(INSTALLDIR) ; \
- fi
+LIBRDKAFKA_LOC = librdkafka
+PKG_LIBS = -L$(LIBRDKAFKA_LOC)/lib -lrdkafka++
+PKG_CXXFLAGS = -I$(LIBRDKAFKA_LOC)/include
+CXX_STD=CXX11
\ No newline at end of file
diff --git a/fRanz/src/RcppExports.cpp b/fRanz/src/RcppExports.cpp
index 912569c..b608c23 100644
--- a/fRanz/src/RcppExports.cpp
+++ b/fRanz/src/RcppExports.cpp
@@ -30,14 +30,15 @@ BEGIN_RCPP
END_RCPP
}
// KafkaConsume
-Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults);
-RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP) {
+Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout);
+RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP, SEXP timeoutSEXP) {
BEGIN_RCPP
Rcpp::RObject rcpp_result_gen;
Rcpp::RNGScope rcpp_rngScope_gen;
Rcpp::traits::input_parameter< SEXP >::type consumerPtr(consumerPtrSEXP);
Rcpp::traits::input_parameter< int >::type numResults(numResultsSEXP);
- rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults));
+ Rcpp::traits::input_parameter< int >::type timeout(timeoutSEXP);
+ rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults, timeout));
return rcpp_result_gen;
END_RCPP
}
@@ -72,7 +73,7 @@ END_RCPP
static const R_CallMethodDef CallEntries[] = {
{"_fRanz_GetRdConsumer", (DL_FUNC) &_fRanz_GetRdConsumer, 2},
{"_fRanz_RdSubscribe", (DL_FUNC) &_fRanz_RdSubscribe, 2},
- {"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 2},
+ {"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 3},
{"_fRanz_GetRdProducer", (DL_FUNC) &_fRanz_GetRdProducer, 2},
{"_fRanz_KafkaProduce", (DL_FUNC) &_fRanz_KafkaProduce, 5},
{NULL, NULL, 0}
diff --git a/fRanz/src/consumer.cpp b/fRanz/src/consumer.cpp
index 64ccaa7..63576b1 100644
--- a/fRanz/src/consumer.cpp
+++ b/fRanz/src/consumer.cpp
@@ -1,19 +1,11 @@
-#include
+#include
#include
#include "utils.h"
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
////////////////////////////////////////////////////////////////////////////////////////
//' @title GetRdConsumer
//' @name GetRdConsumer
-//' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
+//' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
//' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer
//' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys.
//' @return a Rcpp::XPtr
@@ -54,14 +46,15 @@ int RdSubscribe(SEXP consumerPtr, const Rcpp::StringVector Rtopics) {
//' the provided consumer is subscribed to.
//' @param consumerPtr a reference to a Rcpp::XPtr
//' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum
+//' @param timeout the timeout in milliseconds. Default is 10000
//' @return a list of length numResults with values list(key=key,value=value)
// [[Rcpp::export]]
-Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) {
+Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout = 10000) {
Rcpp::XPtr consumer(consumerPtr);
Rcpp::List messages(numResults);
for(int i = 0; i < numResults; i++) {
- RdKafka::Message *msg = consumer->consume(10000);
+ RdKafka::Message *msg = consumer->consume(timeout);
switch(msg->err()){
case RdKafka::ERR_NO_ERROR: {
Rcpp::List message = Rcpp::List::create(Rcpp::Named("key") = *msg->key(),
@@ -69,11 +62,11 @@ Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) {
messages[i] = message;
break;
} case RdKafka::ERR__PARTITION_EOF: {
- printf("No additional messages available\n");
+ //Trim the messages to the appropriate size
+ messages = messages[Rcpp::Range(0,i-1)];
goto exit_loop;
} default: {
- /* Errors */
- printf("Consume failed: %s", msg->errstr().c_str());
+ Rcpp::stop("Consume failed: %s", msg->errstr().c_str());
goto exit_loop;
}
}
diff --git a/fRanz/src/producer.cpp b/fRanz/src/producer.cpp
index 8929a07..70295fc 100644
--- a/fRanz/src/producer.cpp
+++ b/fRanz/src/producer.cpp
@@ -1,16 +1,10 @@
-#include
+#include
#include
#include "utils.h"
-#include
-#include
-#include
-#include
-#include
-#include
//' @title GetRdProducer
//' @name GetRdProducer
-//' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}
+//' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration}
//' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer
//' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys.
//' @return a Rcpp::XPtr
@@ -45,8 +39,7 @@ int KafkaProduce(SEXP producer_pointer,
std::string s_topic = Rcpp::as(topic);
if (keys.size() != values.size()) {
- std::cout << "keys and values must be same size" << std::endl;
- return -1;
+ Rcpp::stop("keys and values must be same size");
}
int numMsgs = keys.size();
int numSent = 0;
diff --git a/fRanz/src/utils.cpp b/fRanz/src/utils.cpp
index 8144693..0ba3627 100644
--- a/fRanz/src/utils.cpp
+++ b/fRanz/src/utils.cpp
@@ -1,5 +1,5 @@
#include
-#include
+#include
diff --git a/fRanz/src/utils.h b/fRanz/src/utils.h
index c4b9156..16e9f93 100644
--- a/fRanz/src/utils.h
+++ b/fRanz/src/utils.h
@@ -1,4 +1,4 @@
#include
-#include
+#include
RdKafka::Conf* MakeKafkaConfig(Rcpp::StringVector keys, Rcpp::StringVector values);
\ No newline at end of file
diff --git a/fRanz/tests/testthat/test-KafkaConsumer.R b/fRanz/tests/testthat/test-KafkaConsumer.R
index 32a7db3..6299eea 100644
--- a/fRanz/tests/testthat/test-KafkaConsumer.R
+++ b/fRanz/tests/testthat/test-KafkaConsumer.R
@@ -2,6 +2,7 @@ context("KafkaConsumer and KafkaProducer")
testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 message",{
+ testthat::skip_on_cran()
## Standard Set Up
topic <- uuid::UUIDgenerate()
group <- uuid::UUIDgenerate()
@@ -25,6 +26,7 @@ testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 me
testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming random number of messages",{
+ testthat::skip_on_cran()
## Standard Set Up
topic <- uuid::UUIDgenerate()
group <- uuid::UUIDgenerate()