diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 175c725..6efeb18 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.1.3 +current_version = 0.1.4-b1 commit = True tag = False parse = (?P\d+)\.(?P\d+)\.(?P\d+)(\-(?P[a-z]+)(?P\d+))? diff --git a/build.gradle.kts b/build.gradle.kts index f898574..2c154d7 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -5,8 +5,8 @@ plugins { } group = "ai.whylabs" -version = "0.1.3" -//version = "0.1.3-${project.properties.getOrDefault("versionType", "SNAPSHOT")}" +version = "0.1.4-b1" +//version = "0.1.4-b1-${project.properties.getOrDefault("versionType", "SNAPSHOT")}" extra["isReleaseVersion"] = !version.toString().endsWith("SNAPSHOT") allprojects { diff --git a/spark-bundle/build.gradle.kts b/spark-bundle/build.gradle.kts index 4e42cbc..2ea206a 100644 --- a/spark-bundle/build.gradle.kts +++ b/spark-bundle/build.gradle.kts @@ -53,6 +53,7 @@ shadowJar.apply { // okio is consumed by songbird relocate("okio", "com.shaded.whylabs.okio") + relocate("okhttp3", "com.shaded.whylabs.okhttp3") archiveFileName.set("$artifactBaseName-${versionString}.jar") } diff --git a/spark/src/main/scala/com/whylogs/spark/RetryUtil.scala b/spark/src/main/scala/com/whylogs/spark/RetryUtil.scala index 45ad0e7..8452093 100644 --- a/spark/src/main/scala/com/whylogs/spark/RetryUtil.scala +++ b/spark/src/main/scala/com/whylogs/spark/RetryUtil.scala @@ -1,5 +1,6 @@ package com.whylogs.spark +import ai.whylabs.service.invoker.ApiException import org.slf4j.LoggerFactory import scala.concurrent.ExecutionContext.Implicits.global @@ -31,7 +32,12 @@ object RetryUtil { }.recoverWith { case t: Throwable => if (context.retries >= config.maxTries) { - throw new PermanentFailure("Failed too many times.", context.lastCause) + val lastCause = context.lastCause + lastCause match { + case apiException: ApiException => + throw new PermanentFailure("Failed too many times", new ApiException(s"Error code: ${apiException.getCode}. Headers: ${apiException.getResponseHeaders}. Body: ${apiException.getResponseBody}")) + case _ => throw new PermanentFailure("Failed too many times.", lastCause) + } } completeAfter(context.lastWaitMillis) .flatMap { _ => diff --git a/spark/src/main/scala/com/whylogs/spark/WhyLogs.scala b/spark/src/main/scala/com/whylogs/spark/WhyLogs.scala index 72c45e5..902cab4 100644 --- a/spark/src/main/scala/com/whylogs/spark/WhyLogs.scala +++ b/spark/src/main/scala/com/whylogs/spark/WhyLogs.scala @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory import java.net.{HttpURLConnection, URL} import java.nio.file.{Files, StandardOpenOption} import java.time.Instant +import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -202,13 +203,13 @@ case class WhyProfileSession(private val dataFrame: DataFrame, Files.write(tmp, profileData, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING) // Create the upload url + val req = new LogAsyncRequest() + req.setSegmentTags(segmentTags) + req.datasetTimestamp(timestamp) val uploadResultFuture = RetryUtil.withRetries() { - val req = new LogAsyncRequest() - req.setSegmentTags(segmentTags) - req.datasetTimestamp(timestamp) logApi.logAsync(orgId, modelId, req) } - val uploadResult = Await.result(uploadResultFuture, Duration.create(10, "s")) + val uploadResult = Await.result(uploadResultFuture, Duration.create(10, TimeUnit.SECONDS)) // Write the profile to the upload url val profileUploadResult = RetryUtil.withRetries() {