diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..fe63811 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,3 @@ +.vscode/ +.git/ + diff --git a/.gitignore b/.gitignore index 05343b2..2f21926 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ *.o *.so *.DS_Store -fRanz/inst/librdkafka* +franz/src/librdkafa +franz/src/pkgconfig +librdkafka* diff --git a/.gitkeep b/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/.travis.yml b/.travis.yml index 34c4509..2b38878 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,14 +2,12 @@ services: - docker before_install: + - docker-compose up -d - sleep 20 + - cd fRanz language: r r: - release -cache: packages - -os: - - linux - - osx +cache: packages \ No newline at end of file diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 0098e12..fcc807a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -56,29 +56,43 @@ To submit a PR, please follow these steps: 1. Fork `fRanz` to your GitHub account 2. Create a branch on your fork and add your changes 3. If you are changing or adding to the R code in the package, add unit tests and integration tests confirming that your code works as expected -3. When you are ready, click "Compare & Pull Request". Open A PR comparing your branch to the `master` branch in this repo -4. In the description section on your PR, please indicate the following: +4. Any commits added should ideally follow [conventional commits](https://conventionalcommits.org). See the Conventional Commits section below for more detail +5. When you are ready, click "Compare & Pull Request". Open A PR comparing your branch to the `master` branch in this repo +6. In the description section on your PR, please indicate the following: - description of what the PR is trying to do and how it improves `fRanz` - links to any open [issues](https://github.com/UptakeOpenSource/fRanz/issues) that your PR is addressing We will try to review PRs promptly and get back to you within a few days. +### Conventional Commits + +We strive to follow conventional commits to make creating NEWS.md and other files easy to maintain. Additionally it provides ease to PR reviewers to semantically check which commit they want to review and sift through the changes. Though we are not terribly strict around this rule since it will cause lots of friction for first time contributors to have to re-author commits, and that is more important than havea pure git history, we do ask frequent commiters to follow this convention. + +In other words, this is a good useful convention but not at the expense of reducing interest or involvement in the project. This will come at the expense of automation, but that's ok. + +For more details see [conventional commits](https://conventionalcommits.org) + ## Running Tests Locally ### Development -> WIP: Document Local Development Setup +`NOTE TO DEVELOPERS: Please add tips as you find them` +- You need to have g++ to work with this package properly. The clang compiler is not supported because of differences in static linking in the namespace. See this issue for more details + +#### Speeding up development times -### Running Unit Tests -> WIP: Document How to Run Unit Tests +Currently `fRanz` installs the entire pulls and compiles the entire `librdkafka` source each time it is installed. This is good for portability but adds significant install time each time. In order to improve the experience developing a guard is put in ./configure which might cause caching issues. Use `make clean` to purge your install. -### Running Integration Tests -> WIP: Document How to Run Integration Tests in Docker +### Running Unit and Integration Tests +> You must have `docker` and `docker-compose` configured correctly +> run `docker-compose up -d` from the root of the repository. This will start kafka running on your machine +> run `make test` or other R testing tools +> when done, make sure to call `docker-compose down` to shutdown any running docker instances ### Creating Releases -> WIP: Document How to Create Releases and Versioning +> Currently the best practice known is under `make check` +> When we figure this out we will document it! ## Package Versioning -> WIP: Document Package Versioning format ### Version Format We follow semantic versioning for `fRanz` releases, `MAJOR`.`MINOR`.`PATCH`: diff --git a/Makefile b/Makefile index a778cb6..c99b842 100644 --- a/Makefile +++ b/Makefile @@ -1,15 +1,12 @@ PACKAGE = fRanz -INSTALLDIR = $(HOME)/.$(PACKAGE)/local -OS = $(shell uname -s) +.PHONY: install test docs clean distclean cleanRcpp unlock build check -.PHONY: install smoke test docs roxygen pdf version clean distclean cleanRcpp unlock +install: docs + R CMD INSTALL $(PACKAGE) -install: unlock clean cleanRcpp - # Install fRanz R package - Rscript -e 'if (!"Rcpp" %in% rownames(installed.packages())) {install.packages("Rcpp", repos = "https://cran.rstudio.com")}' && \ - Rscript -e "Rcpp::compileAttributes(file.path(getwd(),'$(PACKAGE)'))" && \ - Rscript -e 'if (!"devtools" %in% rownames(installed.packages())) {install.packages("devtools", repos = "https://cran.rstudio.com")}' && \ - Rscript -e "devtools::install(file.path(getwd(),'$(PACKAGE)'), force = TRUE, upgrade = FALSE)" +check: docs clean + R CMD build fRanz + R CMD check --as-cran `ls | grep fRanz*.tar.gz` cleanRcpp: rm -f fRanz/R/RcppExport.R fRanz/src/RcppExport.cpp @@ -18,20 +15,22 @@ clean: # Remove cpp object files find $(PACKAGE)/src -name '*.o' -delete find $(PACKAGE)/src -name '*.so' -delete + find $(PACKAGE)/src -name '*.a' -delete + find $(PACKAGE)/src -name '*.dylib' -delete + rm -r $(PACKAGE)/src/librdkafka distclean: clean cleanRcpp unlock: - # Remove 00LOCK-cpproll directory + # Remove 00LOCK directory for libpath in $$(Rscript -e "noquote(paste(.libPaths(), collapse = ' '))"); do \ echo "Unlocking $$libpath..." && \ rm -rf $$libpath/00LOCK-$(PACKAGE); \ done -docs roxygen: +docs: # Regenerate documentation with roxygen Rscript -e "roxygen2::roxygenize('$(PACKAGE)')" test: # Run unit tests Rscript -e "devtools::test('$(PACKAGE)')" - diff --git a/README.md b/README.md index aa9a3dc..b72bb9c 100644 --- a/README.md +++ b/README.md @@ -48,3 +48,5 @@ result # [[1]]$payload # [1] "My First Message" ``` + + diff --git a/devel/Dockerfile b/devel/Dockerfile new file mode 100644 index 0000000..d424426 --- /dev/null +++ b/devel/Dockerfile @@ -0,0 +1,12 @@ +FROM rocker/r-base +ARG test + +# For roxygen2 +RUN apt-get update && apt-get install -y libxml2-dev +RUN Rscript -e "install.packages(c('Rcpp','R6','testthat','uuid','roxygen2'))" + +RUN echo "$test" +COPY ./fRanz fRanz + +RUN R CMD build fRanz +RUN R CMD check --install-args='--no-clean-on-error' fRanz_0.1.0.tar.gz || cat fRanz.Rcheck/00install.out \ No newline at end of file diff --git a/fRanz/DESCRIPTION b/fRanz/DESCRIPTION index 932cb82..77986c5 100644 --- a/fRanz/DESCRIPTION +++ b/fRanz/DESCRIPTION @@ -1,17 +1,16 @@ Package: fRanz Type: Package Title: An R Kafka Client +Description: This is an R package for interacting with Kafka. It provides a wrapper around librdkafka with idiomatic R classes to consume and produce to Kafka topics. Version: 0.1.0 Date: 2019-05-13 Authors@R: c( - person("Patrick", "Boueri", email = "patrick.boueri@uptake.com", role = c("aut")), - person("Mike", "Jermann", email = "mike.jermann@uptake.com", role = c("cre")) + person("Patrick", "Boueri", email = "patrick.boueri@uptake.com", role = c("cre")), + person("Mike", "Jermann", email = "mike.jermann@uptake.com", role = c("aut")) ) Maintainer: Patrick Boueri -Description: An R Kafka Client License: BSD_3_clause + file LICENSE Imports: - jsonlite, Rcpp, R6, methods diff --git a/fRanz/NAMESPACE b/fRanz/NAMESPACE index b040fef..a92293e 100644 --- a/fRanz/NAMESPACE +++ b/fRanz/NAMESPACE @@ -1,8 +1,5 @@ # Generated by roxygen2: do not edit by hand -export(KafkaBroker) -export(KafkaConsumer) -export(KafkaProducer) importFrom(R6,R6Class) importFrom(Rcpp,sourceCpp) useDynLib(fRanz) diff --git a/fRanz/R/KafkaBroker.R b/fRanz/R/KafkaBroker.R index a920bc5..3d9bb06 100644 --- a/fRanz/R/KafkaBroker.R +++ b/fRanz/R/KafkaBroker.R @@ -13,7 +13,6 @@ #' BROKER_HOST <- 'localhost' #' BROKER_PORT <- 9092 #' TOPIC_NAME <- 'myTestTopic' - #' # KafkaBroker #' broker <- KafkaBroker$new(host=BROKER_HOST, port=BROKER_PORT) #' @@ -27,7 +26,9 @@ #' #' #' # KafkaConsumer -#' consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest")) +#' consumer <- KafkaConsumer$new(brokers = list(broker), +#' groupId = "test", +#' extraOptions=list(`auto.offset.reset`="earliest")) #' consumer$subscribe(topics = c(TOPIC_NAME)) #' result <- consumer$consume(topic=TOPIC_NAME) #' diff --git a/fRanz/R/KafkaConsumer.R b/fRanz/R/KafkaConsumer.R index 93de516..9f5b9d0 100644 --- a/fRanz/R/KafkaConsumer.R +++ b/fRanz/R/KafkaConsumer.R @@ -35,7 +35,9 @@ #' #' #' # KafkaConsumer -#' consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest")) +#' consumer <- KafkaConsumer$new(brokers = list(broker), +#' groupId = "test", +#' extraOptions=list(`auto.offset.reset`="earliest")) #' consumer$subscribe(topics = c(TOPIC_NAME)) #' result <- consumer$consume(topic=TOPIC_NAME) #' @@ -71,7 +73,7 @@ KafkaConsumer <- R6::R6Class( } , consume = function(topic, numResults=100) { - Filter(function(msg) !is.null(msg), KafkaConsume(private$consumerPtr, numResults)) + return(KafkaConsume(private$consumerPtr, numResults)) } , getTopics = function() { diff --git a/fRanz/R/RcppExports.R b/fRanz/R/RcppExports.R index d058f7f..71bc446 100644 --- a/fRanz/R/RcppExports.R +++ b/fRanz/R/RcppExports.R @@ -3,7 +3,7 @@ #' @title GetRdConsumer #' @name GetRdConsumer -#' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +#' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} #' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer #' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys. #' @return a Rcpp::XPtr @@ -28,14 +28,15 @@ RdSubscribe <- function(consumerPtr, Rtopics) { #' the provided consumer is subscribed to. #' @param consumerPtr a reference to a Rcpp::XPtr #' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum +#' @param timeout the timeout in milliseconds. Default is 10000 #' @return a list of length numResults with values list(key=key,value=value) -KafkaConsume <- function(consumerPtr, numResults) { - .Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults) +KafkaConsume <- function(consumerPtr, numResults, timeout = 10000L) { + .Call('_fRanz_KafkaConsume', PACKAGE = 'fRanz', consumerPtr, numResults, timeout) } #' @title GetRdProducer #' @name GetRdProducer -#' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +#' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} #' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer #' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys. #' @return a Rcpp::XPtr diff --git a/fRanz/cleanup b/fRanz/cleanup new file mode 100755 index 0000000..71c25ab --- /dev/null +++ b/fRanz/cleanup @@ -0,0 +1,5 @@ +#!/bin/bash +set -eo + +rm -r librdkafka-1.0.0 librdkafka-1.0.0.tar.gz +rm -r src/librdkafka \ No newline at end of file diff --git a/fRanz/configure b/fRanz/configure new file mode 100755 index 0000000..1fb435d --- /dev/null +++ b/fRanz/configure @@ -0,0 +1,21 @@ +#!/bin/bash + +set -eo + + +### Install librdkafka. Check simply if the library exists if need be +if [ ! -d "src/librdkafka" ]; then + wget https://github.com/edenhill/librdkafka/archive/v1.0.0.tar.gz -O librdkafka-1.0.0.tar.gz && \ + tar xzf librdkafka-1.0.0.tar.gz && \ + INSTALL_PATH="$PWD/src/librdkafka" && \ + mkdir $INSTALL_PATH &&\ + mv librdkafka-1.0.0/* $INSTALL_PATH && \ + cd $INSTALL_PATH && \ + ./configure --prefix=. && make && make install && \ + cd ../.. && + cp -r src/librdkafka/lib inst/ && \ + cp -r inst/lib inst/libs && \ + rm librdkafka-1.0.0.tar.gz; +else + echo "Detected librdkafka in src" +fi \ No newline at end of file diff --git a/fRanz/man/GetRdConsumer.Rd b/fRanz/man/GetRdConsumer.Rd index 3abe841..8cb2e5c 100644 --- a/fRanz/man/GetRdConsumer.Rd +++ b/fRanz/man/GetRdConsumer.Rd @@ -15,5 +15,5 @@ GetRdConsumer(keys, values) a Rcpp::XPtr } \description{ -Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} } diff --git a/fRanz/man/GetRdProducer.Rd b/fRanz/man/GetRdProducer.Rd index cf02b05..a7f11df 100644 --- a/fRanz/man/GetRdProducer.Rd +++ b/fRanz/man/GetRdProducer.Rd @@ -15,5 +15,5 @@ GetRdProducer(keys, values) a Rcpp::XPtr } \description{ -Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} } diff --git a/fRanz/man/Kafka-Broker.Rd b/fRanz/man/KafkaBroker.Rd similarity index 83% rename from fRanz/man/Kafka-Broker.Rd rename to fRanz/man/KafkaBroker.Rd index 9fa2fd1..8fc081e 100644 --- a/fRanz/man/Kafka-Broker.Rd +++ b/fRanz/man/KafkaBroker.Rd @@ -1,10 +1,9 @@ % Generated by roxygen2: do not edit by hand % Please edit documentation in R/KafkaBroker.R \docType{data} -\name{Kafka Broker} -\alias{Kafka Broker} +\name{KafkaBroker} \alias{KafkaBroker} -\title{KafkaBroker} +\title{Kafka Broker} \format{An object of class \code{R6ClassGenerator} of length 24.} \usage{ KafkaBroker @@ -35,7 +34,9 @@ producer$produce(topic = TOPIC_NAME, # KafkaConsumer -consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest")) +consumer <- KafkaConsumer$new(brokers = list(broker), + groupId = "test", + extraOptions=list(`auto.offset.reset`="earliest")) consumer$subscribe(topics = c(TOPIC_NAME)) result <- consumer$consume(topic=TOPIC_NAME) diff --git a/fRanz/man/KafkaConsume.Rd b/fRanz/man/KafkaConsume.Rd index b096c53..a54f838 100644 --- a/fRanz/man/KafkaConsume.Rd +++ b/fRanz/man/KafkaConsume.Rd @@ -4,12 +4,14 @@ \alias{KafkaConsume} \title{KafkaConsume} \usage{ -KafkaConsume(consumerPtr, numResults) +KafkaConsume(consumerPtr, numResults, timeout = 10000L) } \arguments{ \item{consumerPtr}{a reference to a Rcpp::XPtr} \item{numResults}{how many results should be consumed before returning. Will return early if offset is at maximum} + +\item{timeout}{the timeout in milliseconds. Default is 10000} } \value{ a list of length numResults with values list(key=key,value=value) diff --git a/fRanz/man/KafkaConsumer.Rd b/fRanz/man/KafkaConsumer.Rd index 05787d5..a90c030 100644 --- a/fRanz/man/KafkaConsumer.Rd +++ b/fRanz/man/KafkaConsumer.Rd @@ -41,7 +41,9 @@ producer$produce(topic = TOPIC_NAME, # KafkaConsumer -consumer <- KafkaConsumer$new(brokers = list(broker), groupId = "test", extraOptions=list(`auto.offset.reset`="earliest")) +consumer <- KafkaConsumer$new(brokers = list(broker), + groupId = "test", + extraOptions=list(`auto.offset.reset`="earliest")) consumer$subscribe(topics = c(TOPIC_NAME)) result <- consumer$consume(topic=TOPIC_NAME) diff --git a/fRanz/src/Makevars b/fRanz/src/Makevars index e557180..ef0e28f 100644 --- a/fRanz/src/Makevars +++ b/fRanz/src/Makevars @@ -1,23 +1,5 @@ -INSTALLDIR = $(HOME)/.fRanz/librdkafka -LIBRDKAFKADIR = $(PWD)/../inst/librdkafka-0.11.6 - -PKG_LIBS = -L$(INSTALLDIR)/src-cpp -lrdkafka++ -PKG_CXXFLAGS = -std=c++11 -I$(INSTALLDIR)/src-cpp - -.PHONY: all install_librdkadka - -all: install_librdkadka - -install_librdkadka: - if [ ! -s $(INSTALLDIR)/src-cpp/librdkafka++.a ] ; \ - then \ - mkdir -p $(INSTALLDIR) && \ - cd $(shell dirname $(LIBRDKAFKADIR)) && \ - tar xzf $(LIBRDKAFKADIR).tar.gz && \ - cd $(LIBRDKAFKADIR) && \ - ./configure && \ - $(MAKE) && \ - $(MAKE) install && \ - mv * $(INSTALLDIR) ; \ - fi +LIBRDKAFKA_LOC = librdkafka +PKG_LIBS = -L$(LIBRDKAFKA_LOC)/lib -lrdkafka++ +PKG_CXXFLAGS = -I$(LIBRDKAFKA_LOC)/include +CXX_STD=CXX11 \ No newline at end of file diff --git a/fRanz/src/RcppExports.cpp b/fRanz/src/RcppExports.cpp index 912569c..b608c23 100644 --- a/fRanz/src/RcppExports.cpp +++ b/fRanz/src/RcppExports.cpp @@ -30,14 +30,15 @@ BEGIN_RCPP END_RCPP } // KafkaConsume -Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults); -RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP) { +Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout); +RcppExport SEXP _fRanz_KafkaConsume(SEXP consumerPtrSEXP, SEXP numResultsSEXP, SEXP timeoutSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; Rcpp::traits::input_parameter< SEXP >::type consumerPtr(consumerPtrSEXP); Rcpp::traits::input_parameter< int >::type numResults(numResultsSEXP); - rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults)); + Rcpp::traits::input_parameter< int >::type timeout(timeoutSEXP); + rcpp_result_gen = Rcpp::wrap(KafkaConsume(consumerPtr, numResults, timeout)); return rcpp_result_gen; END_RCPP } @@ -72,7 +73,7 @@ END_RCPP static const R_CallMethodDef CallEntries[] = { {"_fRanz_GetRdConsumer", (DL_FUNC) &_fRanz_GetRdConsumer, 2}, {"_fRanz_RdSubscribe", (DL_FUNC) &_fRanz_RdSubscribe, 2}, - {"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 2}, + {"_fRanz_KafkaConsume", (DL_FUNC) &_fRanz_KafkaConsume, 3}, {"_fRanz_GetRdProducer", (DL_FUNC) &_fRanz_GetRdProducer, 2}, {"_fRanz_KafkaProduce", (DL_FUNC) &_fRanz_KafkaProduce, 5}, {NULL, NULL, 0} diff --git a/fRanz/src/consumer.cpp b/fRanz/src/consumer.cpp index 64ccaa7..63576b1 100644 --- a/fRanz/src/consumer.cpp +++ b/fRanz/src/consumer.cpp @@ -1,19 +1,11 @@ -#include +#include #include #include "utils.h" -#include -#include -#include -#include -#include -#include -#include -#include //////////////////////////////////////////////////////////////////////////////////////// //' @title GetRdConsumer //' @name GetRdConsumer -//' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +//' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} //' @param keys a character vector indicating option keys to parameterize the RdKafka::Consumer //' @param values a character vector indicating option values to parameterize the RdKafka::Consumer. Must be of same length as keys. //' @return a Rcpp::XPtr @@ -54,14 +46,15 @@ int RdSubscribe(SEXP consumerPtr, const Rcpp::StringVector Rtopics) { //' the provided consumer is subscribed to. //' @param consumerPtr a reference to a Rcpp::XPtr //' @param numResults how many results should be consumed before returning. Will return early if offset is at maximum +//' @param timeout the timeout in milliseconds. Default is 10000 //' @return a list of length numResults with values list(key=key,value=value) // [[Rcpp::export]] -Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) { +Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults, int timeout = 10000) { Rcpp::XPtr consumer(consumerPtr); Rcpp::List messages(numResults); for(int i = 0; i < numResults; i++) { - RdKafka::Message *msg = consumer->consume(10000); + RdKafka::Message *msg = consumer->consume(timeout); switch(msg->err()){ case RdKafka::ERR_NO_ERROR: { Rcpp::List message = Rcpp::List::create(Rcpp::Named("key") = *msg->key(), @@ -69,11 +62,11 @@ Rcpp::List KafkaConsume(SEXP consumerPtr, int numResults) { messages[i] = message; break; } case RdKafka::ERR__PARTITION_EOF: { - printf("No additional messages available\n"); + //Trim the messages to the appropriate size + messages = messages[Rcpp::Range(0,i-1)]; goto exit_loop; } default: { - /* Errors */ - printf("Consume failed: %s", msg->errstr().c_str()); + Rcpp::stop("Consume failed: %s", msg->errstr().c_str()); goto exit_loop; } } diff --git a/fRanz/src/producer.cpp b/fRanz/src/producer.cpp index 8929a07..70295fc 100644 --- a/fRanz/src/producer.cpp +++ b/fRanz/src/producer.cpp @@ -1,16 +1,10 @@ -#include +#include #include #include "utils.h" -#include -#include -#include -#include -#include -#include //' @title GetRdProducer //' @name GetRdProducer -//' @description Creates an Rcpp::XPtr. For more details on options \link{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md} +//' @description Creates an Rcpp::XPtr. For more details on options \href{https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md}{Configuration} //' @param keys a character vector indicating option keys to parameterize the RdKafka::Producer //' @param values a character vector indicating option values to parameterize the RdKafka::Producer. Must be of same length as keys. //' @return a Rcpp::XPtr @@ -45,8 +39,7 @@ int KafkaProduce(SEXP producer_pointer, std::string s_topic = Rcpp::as(topic); if (keys.size() != values.size()) { - std::cout << "keys and values must be same size" << std::endl; - return -1; + Rcpp::stop("keys and values must be same size"); } int numMsgs = keys.size(); int numSent = 0; diff --git a/fRanz/src/utils.cpp b/fRanz/src/utils.cpp index 8144693..0ba3627 100644 --- a/fRanz/src/utils.cpp +++ b/fRanz/src/utils.cpp @@ -1,5 +1,5 @@ #include -#include +#include diff --git a/fRanz/src/utils.h b/fRanz/src/utils.h index c4b9156..16e9f93 100644 --- a/fRanz/src/utils.h +++ b/fRanz/src/utils.h @@ -1,4 +1,4 @@ #include -#include +#include RdKafka::Conf* MakeKafkaConfig(Rcpp::StringVector keys, Rcpp::StringVector values); \ No newline at end of file diff --git a/fRanz/tests/testthat/test-KafkaConsumer.R b/fRanz/tests/testthat/test-KafkaConsumer.R index 32a7db3..6299eea 100644 --- a/fRanz/tests/testthat/test-KafkaConsumer.R +++ b/fRanz/tests/testthat/test-KafkaConsumer.R @@ -2,6 +2,7 @@ context("KafkaConsumer and KafkaProducer") testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 message",{ + testthat::skip_on_cran() ## Standard Set Up topic <- uuid::UUIDgenerate() group <- uuid::UUIDgenerate() @@ -25,6 +26,7 @@ testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming 1 me testthat::test_that("Testing KafkaConsumer and KafkaProducer work consuming random number of messages",{ + testthat::skip_on_cran() ## Standard Set Up topic <- uuid::UUIDgenerate() group <- uuid::UUIDgenerate()