Skip to content

Commit

Permalink
Merge pull request magda-io#3297 from magda-io/upgrade-es-6.8
Browse files Browse the repository at this point in the history
Upgrade es 6.8
  • Loading branch information
t83714 committed Jan 14, 2022
2 parents 3ed0844 + 910a486 commit 7dc1265
Show file tree
Hide file tree
Showing 32 changed files with 1,853 additions and 378 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
## 1.2.0

- #3291 Remove log4j from scala codebase dependency list
- #3293 Upgrade elasticsearch to 6.8.22
- Upgrade logback to 1.2.10
- #3294 Indexer will auto-fixes non-topologically closed Polygons / MultiPolygons
- Crawler will now wait for 3 seconds (used to be 1 second) before perform trim action to avoid timeout issue
- Refactor Indexer code to remove the redundant index queue
- Allow overiding publisher / format indice version number for search API

## 1.1.0

Expand Down
2 changes: 1 addition & 1 deletion deploy/helm/internal-charts/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ Kubernetes: `>= 1.14.0-0`
| kibanaImage.pullPolicy | string | `"IfNotPresent"` | |
| kibanaImage.pullSecrets | bool | `false` | |
| kibanaImage.repository | string | `"docker.elastic.co/kibana"` | |
| kibanaImage.tag | string | `"6.5.4"` | |
| kibanaImage.tag | string | `"6.8.22"` | |
| master.pluginsInstall | string | `""` | |
| master.replicas | int | `3` | |
| master.resources.limits.cpu | string | `"100m"` | |
Expand Down
2 changes: 1 addition & 1 deletion deploy/helm/internal-charts/elasticsearch/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ initContainerImage:
kibanaImage:
name: kibana-oss
repository: docker.elastic.co/kibana
tag: "6.5.4"
tag: "6.8.22"
pullPolicy: IfNotPresent
pullSecrets: false

Expand Down
2 changes: 2 additions & 0 deletions deploy/helm/internal-charts/search-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ Kubernetes: `>= 1.14.0-0`
| defaultImage.pullPolicy | string | `"IfNotPresent"` | |
| defaultImage.pullSecrets | bool | `false` | |
| defaultImage.repository | string | `"docker.io/data61"` | |
| formatsIndexVersion | string | `nil` | Manually set format index version. If not specify, default version will be used. you want to manually set this setting when upgrade to a Magda version that involves region index version changes. As it takes time to rebuild the index, you could use this setting to make search API query existing old version index before the new version index is built. |
| image.name | string | `"magda-search-api"` | |
| publishersIndexVersion | string | `nil` | Manually set publisher index version. If not specify, default version will be used. you want to manually set this setting when upgrade to a Magda version that involves region index version changes. As it takes time to rebuild the index, you could use this setting to make search API query existing old version index before the new version index is built. |
| regionsIndexVersion | string | `nil` | Manually set region index version. If not specify, default version will be used. you want to manually set this setting when upgrade to a Magda version that involves region index version changes. As it takes time to rebuild the index, you could use this setting to make search API query existing old version index before the new version index is built. |
| resources.limits.cpu | string | `"200m"` | |
| resources.requests.cpu | string | `"50m"` | |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ spec:
{{- if .Values.regionsIndexVersion }}
- "-DelasticSearch.indices.regions.version={{ .Values.regionsIndexVersion }}"
{{- end }}
{{- if .Values.publishersIndexVersion }}
- "-DelasticSearch.indices.publishers.version={{ .Values.publishersIndexVersion }}"
{{- end }}
{{- if .Values.formatsIndexVersion }}
- "-DelasticSearch.indices.formats.version={{ .Values.formatsIndexVersion }}"
{{- end }}
{{- if .Values.global.enableLivenessProbes }}
livenessProbe:
httpGet:
Expand Down
16 changes: 15 additions & 1 deletion deploy/helm/internal-charts/search-api/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,18 @@ datasetsIndexVersion:
# If not specify, default version will be used.
# you want to manually set this setting when upgrade to a Magda version that involves region index version changes.
# As it takes time to rebuild the index, you could use this setting to make search API query existing old version index before the new version index is built.
regionsIndexVersion:
regionsIndexVersion:


# -- Manually set publisher index version.
# If not specify, default version will be used.
# you want to manually set this setting when upgrade to a Magda version that involves region index version changes.
# As it takes time to rebuild the index, you could use this setting to make search API query existing old version index before the new version index is built.
publishersIndexVersion:

# -- Manually set format index version.
# If not specify, default version will be used.
# you want to manually set this setting when upgrade to a Magda version that involves region index version changes.
# As it takes time to rebuild the index, you could use this setting to make search API query existing old version index before the new version index is built.
formatsIndexVersion:

17 changes: 17 additions & 0 deletions docs/docs/migration/1.2.0.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Migrate from Magda v1.0.0 / v1.1.0 to v1.2.0

## Elasticsearch Upgrade

In v1.2.0, we upgraded elasticsearch to 6.8.22 (from 6.5.4). Once you upgrade Magda to v1.2.0, indexer will auto-recreate all elasticsearch indexes for newer version elasticsearch engine. Depends on your database size, this might take some time.

To avoid showing incomplete result till re-index is completed, you can manually set the index version that search API uses to the existing versions via [helm chart options](https://github.com/magda-io/magda/blob/944ae887842b98c51698d567435003be2e9dbefd/deploy/helm/internal-charts/search-api/values.yaml#L29).

e.g. you can set the following in your helm deployment values file / config:

```yaml
search-api:
datasetsIndexVersion: 48
regionsIndexVersion: 25
publishersIndexVersion: 6
formatsIndexVersion: 2
```
6 changes: 2 additions & 4 deletions magda-elastic-search/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
FROM --platform=linux/amd64 docker.elastic.co/elasticsearch/elasticsearch:6.5.4 as stage-amd64
FROM --platform=linux/amd64 docker.elastic.co/elasticsearch/elasticsearch:6.8.22 as stage-amd64
RUN yum -y install sudo zip
# Delete all x-pack modules
RUN find modules -type d -name "x-pack-*" -exec rm -r {} +
COPY --chown=elasticsearch:elasticsearch component/elasticsearch.yml /usr/share/elasticsearch/config/
RUN cd /usr/share/elasticsearch/lib && zip -q -d log4j-core-2.11.1.jar org/apache/logging/log4j/core/lookup/JndiLookup.class

FROM --platform=linux/arm64 data61/elasticsearch:6.5.4 as stage-arm64
FROM --platform=linux/arm64 data61/elasticsearch:6.8.22 as stage-arm64
RUN apt-get update && apt-get install -y --no-install-recommends sudo zip && rm -rf /var/lib/apt/lists/*
COPY --chown=elasticsearch:elasticsearch component/elasticsearch-arm64.yml /usr/share/elasticsearch/config/elasticsearch.yml
RUN cd /usr/share/elasticsearch/lib && zip -q -d log4j-core-2.11.1.jar org/apache/logging/log4j/core/lookup/JndiLookup.class

ARG TARGETARCH

Expand Down
2 changes: 1 addition & 1 deletion magda-elastic-search/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3"
services:
test-es:
image: docker.elastic.co/elasticsearch/elasticsearch:6.5.4
image: docker.elastic.co/elasticsearch/elasticsearch:6.8.22
ports:
- 9200:9200
- 9300:9300
Expand Down
4 changes: 2 additions & 2 deletions magda-indexer/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ scalacOptions := Seq("-unchecked", "-deprecation", "-encoding", "utf8")
resolvers += Resolver.bintrayRepo("monsanto", "maven")

libraryDependencies ++= {
val akkaV = "2.5.23"
val akkaHttpV = "10.1.8"
val akkaV = "2.5.32"
val akkaHttpV = "10.2.7"
val scalaTestV = "3.0.8"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ class RegistryCrawler(
.index(interfaceSource)
.flatMap { result =>
log.info(
"Indexed {} datasets with {} failures",
"Indexed datasets result: {} successes, {} successes with retry and {} failures",
result.successes,
result.failures.length
result.warns,
result.failures
)

// By default ElasticSearch index is refreshed every second. Let the trimming operation
Expand All @@ -66,11 +67,11 @@ class RegistryCrawler(
// visible to search. Defaults to 1s. Can be set to -1 to disable refresh.
// Ref: https://www.elastic.co/guide/en/elasticsearch/reference/6.5/index-modules.html#dynamic-index-settings
Future {
val delay = 1000
val delay = 3000
Thread.sleep(delay)
}.flatMap(_ => {
val futureOpt: Option[Future[Unit]] =
if (result.failures.isEmpty) { // does this need to be tunable?
if (result.failures == 0) { // does this need to be tunable?
log.info("Trimming datasets indexed before {}", startInstant)
Some(indexer.trim(startInstant))
} else {
Expand All @@ -84,25 +85,23 @@ class RegistryCrawler(
.recover {
case e: Throwable =>
log.error(e, "Failed while indexing {}")
SearchIndexer.IndexResult(0, Seq())
SearchIndexer.IndexResult()
}
.map(result => (result.successes, result.failures.length))
.map {
case (successCount, failureCount) =>
if (successCount > 0) {
if (config.getBoolean("indexer.makeSnapshots")) {
log.info("Snapshotting...")
indexer.snapshot()
}
} else {
log.info(
"Did not successfully index anything, no need to snapshot either."
)
.map { result =>
if (result.successes > 0) {
if (config.getBoolean("indexer.makeSnapshots")) {
log.info("Snapshotting...")
indexer.snapshot()
}
} else {
log.info(
"Did not successfully index anything, no need to snapshot either."
)
}

if (failureCount > 0) {
log.warning("Failed to index {} datasets", failureCount)
}
if (result.failures > 0) {
log.warning("Failed to index {} datasets", result.failures)
}
}
.recover {
case e: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,43 @@ package au.csiro.data61.magda.indexer.search

import au.csiro.data61.magda.model.misc._

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
import akka.stream.Materializer
import akka.actor.ActorSystem
import au.csiro.data61.magda.search.elasticsearch.{ClientProvider, Indices}
import com.typesafe.config.Config

import java.time.Instant
import akka.stream.scaladsl.Source
import akka.NotUsed

import java.time.OffsetDateTime
import au.csiro.data61.magda.indexer.search.elasticsearch.ElasticSearchIndexer
import au.csiro.data61.magda.indexer.crawler.RegistryCrawler
import au.csiro.data61.magda.search.elasticsearch.IndexDefinition

trait SearchIndexer {

/**
* Provide simple / alternative interface and call `performIndex`` to perform index action
* @param dataSetStream
* @return
*/
def index(
dataSetStream: Source[DataSet, NotUsed]
): Future[SearchIndexer.IndexResult]

/**
* Teh actual method to perform index action
* @param dataSetStream
* @param retryFailedDatasets
* @return
*/
def performIndex(
dataSetStream: Source[(DataSet, Promise[Unit]), NotUsed],
retryFailedDatasets: Boolean = true
): Future[SearchIndexer.IndexResult]

def delete(identifiers: Seq[String]): Future[Unit]
def snapshot(): Future[Unit]
def ready: Future[Unit]
Expand All @@ -29,7 +48,16 @@ trait SearchIndexer {
}

object SearchIndexer {
case class IndexResult(successes: Long, failures: Seq[String])
case class IndexResult(
// no. of successfully indexed datasets
successes: Long = 0,
// no. of failures
failures: Long = 0,
// no .of datasets that are indexed with warns (e.g. after retry)
warns: Long = 0,
failureReasons: Seq[String] = Seq(),
warnReasons: Seq[String] = Seq()
)

def apply(clientProvider: ClientProvider, indices: Indices)(
implicit config: Config,
Expand Down
Loading

0 comments on commit 7dc1265

Please sign in to comment.