Skip to content

Commit

Permalink
Elasticsearch has Header and HttpsContext Settings
Browse files Browse the repository at this point in the history
The current ElasticsearchAPI only supports basic auth. By including
header options as well as the HttpsContext there would be additional
options to keep previous functionality but still moving forward with
AkkaHttp.
  • Loading branch information
Jim Baugh committed Apr 15, 2021
1 parent 925786a commit 42c1ea0
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,65 @@

package akka.stream.alpakka.elasticsearch

import akka.http.scaladsl.HttpsConnectionContext
import akka.http.scaladsl.model.HttpHeader

final class ElasticsearchConnectionSettings private (
val baseUrl: String,
val port: Option[Int],
val username: Option[String],
val password: Option[String]
val password: Option[String],
val headers: Option[List[HttpHeader]],
val connectionContext: Option[HttpsConnectionContext]
) {

def withBaseUrl(value: String): ElasticsearchConnectionSettings = copy(baseUrl = value)

def withPort(value: Int): ElasticsearchConnectionSettings = copy(port = Option(value))

def hasPortDefined: Boolean = port.isDefined

def withCredentials(username: String, password: String): ElasticsearchConnectionSettings =
copy(username = Option(username), password = Option(password))

def hasCredentialsDefined: Boolean = username.isDefined && password.isDefined

def withHeaders(headers: List[HttpHeader]): ElasticsearchConnectionSettings =
copy(headers = Option(headers))

def hasHeadersDefined: Boolean = headers.isDefined

def withConnectionContext(connectionContext: HttpsConnectionContext): ElasticsearchConnectionSettings =
copy(connectionContext = Option(connectionContext))

def hasConnectionContextDefined: Boolean = connectionContext.isDefined

def copy(baseUrl: String = baseUrl,
port: Option[Int] = port,
username: Option[String] = username,
password: Option[String] = password): ElasticsearchConnectionSettings =
new ElasticsearchConnectionSettings(baseUrl = baseUrl, username = username, password = password)
password: Option[String] = password,
headers: Option[List[HttpHeader]] = headers,
connectionContext: Option[HttpsConnectionContext] = connectionContext): ElasticsearchConnectionSettings =
new ElasticsearchConnectionSettings(baseUrl = baseUrl,
port = port,
username = username,
password = password,
headers,
connectionContext)

override def toString =
s"""ElasticsearchConnectionSettings(baseUrl=$baseUrl,username=$username,password=${password.fold("")(_ => "***")})"""
s"""ElasticsearchConnectionSettings(baseUrl=$baseUrl,port=$port,username=$username,password=${password.fold("")(
_ => "***"
)})"""
}

object ElasticsearchConnectionSettings {

/** Scala API */
def apply(baseUrl: String): ElasticsearchConnectionSettings = new ElasticsearchConnectionSettings(baseUrl, None, None)
def apply(baseUrl: String): ElasticsearchConnectionSettings =
new ElasticsearchConnectionSettings(baseUrl, None, None, None, None, None)

/** Java API */
def create(baseUrl: String): ElasticsearchConnectionSettings =
new ElasticsearchConnectionSettings(baseUrl, None, None)
new ElasticsearchConnectionSettings(baseUrl, None, None, None, None, None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import scala.concurrent.Future
request.addCredentials(BasicHttpCredentials(connectionSettings.username.get, connectionSettings.password.get))
)
} else {
http.singleRequest(request)
http.singleRequest(request,
connectionContext =
connectionSettings.connectionContext.getOrElse(http.defaultClientHttpsContext))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,16 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
private var pullIsWaitingForData = false
private var dataReady: Option[ScrollResponse[T]] = None

def prepareUri(path: Path): Uri = {
Uri(settings.connection.baseUrl)
.withPath(path)
}

def sendScrollScanRequest(): Unit =
try {
waitingForElasticData = true

log.info(s"sendScrollScanRequest")
if (scrollId == null) {
log.debug("Doing initial search")

Expand Down Expand Up @@ -128,12 +134,18 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
case ApiVersion.V5 => s"/${elasticsearchParams.indexName}/${elasticsearchParams.typeName.get}/_search"
case ApiVersion.V7 => s"/${elasticsearchParams.indexName}/_search"
}
val uri = Uri(settings.connection.baseUrl).withPath(Path(endpoint)).withQuery(Uri.Query(queryParams))

val uri = prepareUri(Path(endpoint))
.withQuery(Uri.Query(queryParams))

val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
HttpEntity(ContentTypes.`application/json`, searchBody)
)
.withHeaders(settings.connection.headers.getOrElse(List()))

log.info(s"ElasticsearchSourceStage Request: ${request.toString()}")

ElasticsearchApi
.executeRequest(
Expand All @@ -150,11 +162,13 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
failureHandler
.invoke(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body"))
}
case _ => failureHandler.invoke((new RuntimeException(s"Not sure what is broken")))
}
} else {
log.debug("Fetching next scroll")

val uri = Uri(settings.connection.baseUrl).withPath(Path("/_search/scroll"))
val uri = prepareUri(Path("/_search/scroll"))

val request = HttpRequest(HttpMethods.POST)
.withUri(uri)
.withEntity(
Expand Down Expand Up @@ -276,7 +290,8 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
completeStage()
} else {
// Clear the scroll
val uri = Uri(settings.connection.baseUrl).withPath(Path(s"/_search/scroll/$scrollId"))
val uri = prepareUri(Path(s"/_search/scroll/$scrollId"))

val request = HttpRequest(HttpMethods.DELETE)
.withUri(uri)

Expand Down
2 changes: 2 additions & 0 deletions version.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ThisBuild / version := "3.0.0-jbaugh-M1"
ThisBuild / isSnapshot := false

0 comments on commit 42c1ea0

Please sign in to comment.