Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions src/main/scala/com/advancedtelematic/director/Settings.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package com.advancedtelematic.director

import com.advancedtelematic.libtuf.data.TufDataType.KeyType
import com.typesafe.config.ConfigFactory

import scala.util.Try

trait Settings {
import Util._

Expand All @@ -14,11 +11,4 @@ trait Settings {
val port = _config.getInt("server.port")

val tufUri = mkUri(_config, "keyserver.uri")
val tufBinaryUri = mkUri(_config, "tuf.binary.uri")

val defaultKeyType: Try[KeyType] = {
Try(_config.getString("daemon.defaultKeyType")).map { defaultKeyTypeName =>
namedType[KeyType](defaultKeyTypeName)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,21 @@ trait DeviceRepositorySupport extends DatabaseSupport {

protected class DeviceRepository()(implicit val db: Database, val ec: ExecutionContext) {
def create(ns: Namespace, deviceId: DeviceId, primaryEcuId: EcuIdentifier, ecus: Seq[Ecu]): Future[Unit] = {
val ecusDeleteIO =
Schema.ecus
.filter(_.namespace === ns)
.filter(_.deviceId === deviceId)
.filter(_.ecuSerial.inSet(ecus.map(_.ecuSerial)))
.delete

val deviceDeleteIo =
Schema.devices
.filter(_.namespace === ns)
.filter(_.id === deviceId)
.delete

val io = for {
_ <- ecusDeleteIO.andThen(deviceDeleteIo) // This is a bad idea and will fail if device has assignments, see DeviceResourceSpec
_ <- Schema.ecus ++= ecus
_ <- Schema.devices += Device(ns, deviceId, primaryEcuId)
} yield ()
Expand Down Expand Up @@ -233,7 +247,15 @@ protected class EcuRepository()(implicit val db: Database, val ec: ExecutionCont
.filter(_.namespace === ns)
.map(_.hardwareId)
.distinct
.paginateAndSortResult(identity, offset = offset, limit = limit)
.paginateResult(offset = offset, limit = limit)
}

def findAllDeviceIds(ns: Namespace, offset: Long, limit: Long): Future[PaginationResult[DeviceId]] = db.run {
Schema.devices
.filter(_.namespace === ns)
.map(d => (d.id, d.createdAt))
.paginateAndSortResult(_._2, offset = offset, limit = limit)
.map(_.map(_._1))
}

def findFor(deviceId: DeviceId): Future[Map[EcuIdentifier, Ecu]] = db.run {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object Schema {
def namespace = column[Namespace]("namespace")
def id = column[DeviceId]("id")
def primaryEcu = column[EcuIdentifier]("primary_ecu_id")
def createdAt = column[Instant]("created_at")

def pk = primaryKey("devices_pk", id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,24 @@ import akka.http.scaladsl.server._
import cats.syntax.option._
import com.advancedtelematic.director.data.AdminDataType.{FindImageCount, RegisterDevice}
import com.advancedtelematic.director.data.Codecs._
import com.advancedtelematic.director.db.{AutoUpdateDefinitionRepositorySupport, DeviceRegistration, DeviceRepositorySupport, EcuRepositorySupport, RepoNamespaceRepositorySupport}
import com.advancedtelematic.director.db.{AutoUpdateDefinitionRepositorySupport, DeviceRegistration, EcuRepositorySupport, RepoNamespaceRepositorySupport}
import com.advancedtelematic.libats.codecs.CirceCodecs._
import com.advancedtelematic.libats.data.DataType.Namespace
import com.advancedtelematic.libats.data.EcuIdentifier
import com.advancedtelematic.libats.http.UUIDKeyAkka._
import com.advancedtelematic.libats.messaging.MessageBusPublisher
import com.advancedtelematic.libats.messaging_datatype.DataType.{DeviceId, UpdateId}
import com.advancedtelematic.libats.messaging_datatype.DataType.DeviceId
import com.advancedtelematic.libtuf.data.ClientCodecs._
import com.advancedtelematic.libtuf.data.TufCodecs._
import com.advancedtelematic.libtuf.data.TufDataType.{Ed25519KeyType, RepoId, TargetName}
import com.advancedtelematic.libtuf.data.TufDataType.{RepoId, TargetName}
import com.advancedtelematic.libtuf_server.keyserver.KeyserverClient
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import com.advancedtelematic.libats.codecs.CirceCodecs._
import slick.jdbc.MySQLProfile.api._
import PaginationParametersDirectives._
import com.advancedtelematic.director.repo.DeviceRoleGeneration

import scala.concurrent.{ExecutionContext, Future}

class RepositoryCreation(keyserverClient: KeyserverClient)(implicit val db: Database, val ec: ExecutionContext)
extends DeviceRepositorySupport with RepoNamespaceRepositorySupport {
import scala.concurrent.ExecutionContext

def create(ns: Namespace): Future[Unit] = {
val repoId = RepoId.generate()

for {
_ <- keyserverClient.createRoot(repoId, Ed25519KeyType, forceSync = true)
_ <- repoNamespaceRepo.persist(repoId, ns)
} yield ()
}
}

class AdminResource(extractNamespace: Directive1[Namespace], val keyserverClient: KeyserverClient)
(implicit val db: Database, val ec: ExecutionContext, messageBusPublisher: MessageBusPublisher)
Expand All @@ -48,13 +38,7 @@ class AdminResource(extractNamespace: Directive1[Namespace], val keyserverClient

val deviceRegistration = new DeviceRegistration(keyserverClient)
val repositoryCreation = new RepositoryCreation(keyserverClient)

val paginationParameters: Directive[(Long, Long)] =
(parameters('limit.as[Long].?) & parameters('offset.as[Long].?)).tmap { case (mLimit, mOffset) =>
val limit = mLimit.getOrElse(50L).min(1000)
val offset = mOffset.getOrElse(0L)
(limit, offset)
}
val deviceRoleGeneration = new DeviceRoleGeneration(keyserverClient)

def repoRoute(ns: Namespace): Route =
pathPrefix("repo") {
Expand All @@ -72,7 +56,7 @@ class AdminResource(extractNamespace: Directive1[Namespace], val keyserverClient
}
}

def devicePath(ns: Namespace): Route =
def devicePath(ns: Namespace, repoId: RepoId): Route =
pathPrefix(DeviceId.Path) { device =>
pathPrefix("ecus") {
pathPrefix(EcuIdPath) { ecuId =>
Expand All @@ -95,10 +79,13 @@ class AdminResource(extractNamespace: Directive1[Namespace], val keyserverClient
}
}
} ~
(pathEnd & get) {
get {
val f = deviceRegistration.findDeviceEcuInfo(ns, device)
complete(f)
}
} ~
(path("targets.json") & put) {
complete(deviceRoleGeneration.forceTargetsRefresh(ns, repoId, device).map(StatusCodes.Created -> _))
}
}

val route: Route = extractNamespace { ns =>
Expand All @@ -125,12 +112,12 @@ class AdminResource(extractNamespace: Directive1[Namespace], val keyserverClient
}
} ~
(get & path("hardware_identifiers")) {
paginationParameters { (limit, offset) =>
PaginationParameters { (limit, offset) =>
val f = ecuRepository.findAllHardwareIdentifiers(ns, offset, limit)
complete(f)
}
} ~
devicePath(ns)
devicePath(ns, repoId)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,17 @@ import java.time.Instant
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server._
import akka.http.scaladsl.unmarshalling.PredefinedFromStringUnmarshallers.CsvSeq
import akka.http.scaladsl.util.FastFuture
import com.advancedtelematic.director.data.AdminDataType.AssignUpdateRequest
import com.advancedtelematic.director.data.AssignmentDataType.CancelAssignments
import com.advancedtelematic.director.data.Codecs._
import com.advancedtelematic.director.data.DbDataType
import com.advancedtelematic.libats.data.DataType.Namespace
import com.advancedtelematic.libats.http.UUIDKeyAkka._
import com.advancedtelematic.libats.messaging.MessageBusPublisher
import com.advancedtelematic.libats.messaging_datatype.DataType.{DeviceId, UpdateId}
import com.advancedtelematic.libats.messaging_datatype.Messages.{DeviceUpdateAssigned, DeviceUpdateCanceled, DeviceUpdateEvent}
import com.advancedtelematic.libats.messaging_datatype.MessageCodecs.deviceUpdateCanceledEncoder
import com.advancedtelematic.libats.messaging_datatype.Messages.{DeviceUpdateAssigned, DeviceUpdateEvent}
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import slick.jdbc.MySQLProfile.api.Database
import com.advancedtelematic.libats.messaging_datatype.MessageCodecs.deviceUpdateCanceledEncoder

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -63,7 +61,7 @@ class AssignmentsResource(extractNamespace: Directive1[Namespace])
}
}
} ~
pathPrefix(DeviceId.Path) { deviceId =>
path(DeviceId.Path) { deviceId =>
get { // This should be replacing /queue in /admin
val f = deviceAssignments.findDeviceAssignments(ns, deviceId)
complete(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.advancedtelematic.director.http

import java.time.Instant

import akka.http.scaladsl.util.FastFuture
import cats.implicits._
import com.advancedtelematic.director.data.AdminDataType.QueueResponse
import com.advancedtelematic.director.data.DbDataType.Assignment
Expand Down Expand Up @@ -65,6 +66,10 @@ class DeviceAssignments(implicit val db: Database, val ec: ExecutionContext) ext
}
}

def createForDevice(ns: Namespace, correlationId: CorrelationId, deviceId: DeviceId, mtuId: UpdateId): Future[Assignment] = {
createForDevices(ns, correlationId, List(deviceId), mtuId).map(_.head)
}

def createForDevices(ns: Namespace, correlationId: CorrelationId, devices: Seq[DeviceId], mtuId: UpdateId): Future[Seq[Assignment]] = async {
val ecus = await(findAffectedEcus(ns, devices, mtuId))

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package com.advancedtelematic.director.http

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.{Directives, _}
import com.advancedtelematic.libats.auth.NamespaceDirectives
import com.advancedtelematic.libats.http.DefaultRejectionHandler.rejectionHandler
import com.advancedtelematic.libats.http.ErrorHandler
import com.advancedtelematic.libats.messaging.MessageBusPublisher
import com.advancedtelematic.libats.messaging_datatype.DataType.{DeviceId, UpdateId}
import com.advancedtelematic.libtuf_server.keyserver.KeyserverClient
import slick.jdbc.MySQLProfile.api._

Expand All @@ -24,7 +26,8 @@ class DirectorRoutes(keyserverClient: KeyserverClient)
new AdminResource(extractNamespace, keyserverClient).route ~
new AssignmentsResource(extractNamespace).route ~
new DeviceResource(extractNamespace, keyserverClient).route ~
new MultiTargetUpdatesResource(extractNamespace).route
new MultiTargetUpdatesResource(extractNamespace).route ~
new LegacyRoutes(extractNamespace).route
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.advancedtelematic.director.http

import java.time.Instant

import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives.{complete, delete, path, put}
import akka.http.scaladsl.server.{Directive1, Route}
import com.advancedtelematic.libats.data.DataType.{MultiTargetUpdateId, Namespace}
import com.advancedtelematic.libats.messaging.MessageBusPublisher
import com.advancedtelematic.libats.messaging_datatype.DataType.{DeviceId, UpdateId}
import com.advancedtelematic.libats.messaging_datatype.Messages.{DeviceUpdateAssigned, DeviceUpdateEvent}
import slick.jdbc.MySQLProfile.api._
import com.advancedtelematic.libats.http.UUIDKeyAkka._
import akka.http.scaladsl.server.Directives._

import scala.concurrent.{ExecutionContext, Future}
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import PaginationParametersDirectives._
import com.advancedtelematic.director.db.EcuRepositorySupport

// Implements routes provided by old director that ota-web-app still uses
class LegacyRoutes(extractNamespace: Directive1[Namespace])(implicit val db: Database, val ec: ExecutionContext, messageBusPublisher: MessageBusPublisher)
extends EcuRepositorySupport {
private val deviceAssignments = new DeviceAssignments()

private def createDeviceAssignment(ns: Namespace, deviceId: DeviceId, mtuId: UpdateId): Future[Unit] = {
val correlationId = MultiTargetUpdateId(mtuId.uuid)
val assignment = deviceAssignments.createForDevice(ns, correlationId, deviceId, mtuId)

assignment.map { a =>
Comment thread
simao marked this conversation as resolved.
val msg: DeviceUpdateEvent = DeviceUpdateAssigned(ns, Instant.now(), correlationId, a.deviceId)
messageBusPublisher.publishSafe(msg)
}
}

val route: Route =
extractNamespace { ns =>
path("admin" / "devices" / DeviceId.Path / "multi_target_update" / UpdateId.Path) { (deviceId, updateId) =>
put {
val f = createDeviceAssignment(ns, deviceId, updateId).map(_ => StatusCodes.Created)
complete(f)
}
} ~
path("assignments" / DeviceId.Path) { deviceId =>
delete {
val a = deviceAssignments.cancel(ns, List(deviceId))
complete(a.map(_.map(_.deviceId)))
}
} ~
(path("admin" / "devices") & PaginationParameters) { (limit, offset) =>
get {
complete(ecuRepository.findAllDeviceIds(ns, offset, limit))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import com.advancedtelematic.director.data.Codecs._
import com.advancedtelematic.director.db.MultiTargetUpdates
import com.advancedtelematic.libats.data.DataType.Namespace

import scala.concurrent.{ExecutionContext, Future}


import scala.concurrent.ExecutionContext
import com.advancedtelematic.libats.codecs.CirceCodecs._

class MultiTargetUpdatesResource(extractNamespace: Directive1[Namespace])(implicit val db: Database, val ec: ExecutionContext) {
import Directives._
Expand All @@ -23,7 +21,10 @@ class MultiTargetUpdatesResource(extractNamespace: Directive1[Namespace])(implic
val route = extractNamespace { ns =>
pathPrefix("multi_target_updates") {
(get & pathPrefix(UpdateId.Path)) { uid =>
complete(multiTargetUpdates.find(ns, uid))
// For some reason director-v1 accepts `{targets: ...}` but returns `{...}`
// To make app compatible with director-v2, for now we do the same, but we should be returning what we accept:
// complete(multiTargetUpdates.find(ns, uid))
complete(multiTargetUpdates.find(ns, uid).map(_.targets))
} ~
(post & pathEnd) {
entity(as[MultiTargetUpdate]) { mtuRequest =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.advancedtelematic.director.http


import akka.http.scaladsl.server.Directive
import akka.http.scaladsl.server.Directives._

object PaginationParametersDirectives {
val PaginationParameters: Directive[(Long, Long)] =
(parameters('limit.as[Long].?) & parameters('offset.as[Long].?)).tmap { case (mLimit, mOffset) =>
val limit = mLimit.getOrElse(50L).min(1000)
val offset = mOffset.getOrElse(0L)
(limit, offset)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.advancedtelematic.director.http

import com.advancedtelematic.director.db.{DeviceRepositorySupport, RepoNamespaceRepositorySupport}
import com.advancedtelematic.libats.data.DataType.Namespace
import com.advancedtelematic.libtuf.data.TufDataType.{Ed25519KeyType, RepoId}
import com.advancedtelematic.libtuf_server.keyserver.KeyserverClient
import slick.jdbc.MySQLProfile.api._
import com.advancedtelematic.libats.http.UUIDKeyAkka._

import scala.concurrent.{ExecutionContext, Future}

class RepositoryCreation(keyserverClient: KeyserverClient)(implicit val db: Database, val ec: ExecutionContext)
extends DeviceRepositorySupport with RepoNamespaceRepositorySupport {

def create(ns: Namespace): Future[Unit] = {
val repoId = RepoId.generate()

for {
_ <- keyserverClient.createRoot(repoId, Ed25519KeyType, forceSync = true)
_ <- repoNamespaceRepo.persist(repoId, ns)
} yield ()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ class DeviceRoleGeneration(keyserverClient: KeyserverClient)(implicit val db: Da
}
}

def forceTargetsRefresh(ns: Namespace, repoId: RepoId, deviceId: DeviceId): Future[JsonSignedPayload] = {
val refresher = roleRefresher(ns, deviceId)
refresher.refreshTargets(repoId).map(_.content)
}

def findFreshDeviceRole[T : TufRole](ns: Namespace, repoId: RepoId, deviceId: DeviceId): Future[JsonSignedPayload] = {
implicit val refresher = roleRefresher(ns, deviceId)
roleGeneration(ns, deviceId).findRole[T](repoId).map(_.content)
Expand Down
Loading