AI Расширяем функционал Apache NiFi 2.0: руководство к написанию своего процессора

AI

Редактор
Регистрация
23 Август 2023
Сообщения
3 641
Лучшие ответы
0
Реакции
0
Баллы
243
Offline
#1
Привет, Хабр!

Потоки данных между системами стабильно увеличиваются, и в обозримом будущем эта тенденция вряд ли изменится, что создает постоянную потребность в инструментах для работы с данными.

Apache NiFi — программный продукт с открытым исходным кодом, написанный на языке Java, предназначенный для автоматизации потоков данных между системами. Главная его задача: организовывать ETL‑процессы. На GitHub у Apache NiFi имеется 5.9 тысяч звезд.

Для тех, кто не знает, что такое Apache NiFi советую прочитать отличную статью.

Моя статья посвящена написанию кастомного процессора для Apache NiFi на Java и требует базовых знаний в области Apache NiFi, опыт программирования на Java и IDE на борту компьютера.

Ну, начнем!

Для тех, кто немного подзабыл, что такое процессор в Apache NiFi

Процессор — это механизм, с помощью которого NiFi предоставляет доступ к FlowFiles, их атрибутам и содержимому. В Apache NiFi процессоры являются основными компоновочными блоками и используются для построения пайплайнов обработки.


В качестве примера я покажу, как создать процессор, выполняющий базовую задачу Word Count на основе существующего атрибута. Разумеется, в реальной практике такая задача вам вряд ли встретится, но в качестве обучающего примера этого более чем достаточно.

Мы будем разрабатывать процессор под Apache NiFi 2.4.0. Это практически самая свежая и стабильная версия на данный момент.

Первым делом стоит создать проект на основе Maven Archetype. В Intellij Idea для этого нужно выбрать New Project в верхней панели инструментов. Откроется окно, в котором в области Generators следует выбрать Maven Archetype.

В нашем случае в поле Name мы записываем название нашего проекта. Оно влияет лишь на имя директории, созданной в IDE.

В качестве JDK выбран Amazon Corretto 21, так как Apache NiFi рекомендует использовать Java 21 для работы с этой версией. В поле Catalog выбираем Maven Central, чтобы увидеть архетип для NiFi. После этого выбираем: org.apache.nifi:nifi-processor-bundle-archetype.

Версию архетипа выбираем в соответствии с версией Apache NiFi — в нашем случае это 2.4.0.

Также в additionalProperties следует обязательно указать artifactBaseName. Иначе там по умолчанию установится значение true, что сломает структуру пакетов. В artifactId мы устанавливаем идентичное значение. При желании можно также поправить package, если вы понимаете, как работает этот параметр. В поле Version изначально указано значение 1.0-SNAPSHOT. Однако при работе со SNAPSHOT‑версией у вас просто не пройдет сборка, потому что это не релизная версия вашего процессора.


Не собирающийся модуль

Можно игнорировать enforce, но это будет не совсем правильно

После этого жмем Create и получаем готовую структуру проекта. Выглядит она следующим образом:



Таким образом, мы получаем два модуля: один для сборки NAR и JAR. Для добавления модуля в NiFi будет использоваться как раз таки NAR.

Откроем основной файл с главный нашим классом, а именно MyProcessor (при желании его можно переименовать, это название влияет на нейминг в UI)

Код шаблонного класса

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.List;
import java.util.Set;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder()
.name("My Property")
.displayName("My Property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Example success relationship")
.build();

private List<PropertyDescriptor> descriptors;

private Set<Relationship> relationships;

@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = List.of(MY_PROPERTY);

relationships = Set.of(REL_SUCCESS);
}

@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

@OnScheduled
public void onScheduled(final ProcessContext context) {

}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// TODO implement

session.transfer(flowFile, REL_SUCCESS);
}
}

Видно, что у класса присутствует много аннотаций и методов, которые переопределяют родительский AbstractProcessor. AbstractProcessor реализует интерфейс Processor и предоставляет готовые реализации большинства методов, а также позволяет сосредоточиться на бизнес‑логике, а не на написании boilerplate‑кода.

Про все существующие аннотации и методы в рамках одной статьи очень тяжело уложиться, поэтому про них лучше почитать в Developer Guide. Описание тех аннотаций, которые уже существуют в нашем коде, мы поговорим. Их описание в таблице ниже:

Название аннотации

Описание аннотации

@Tags​

Используется для добавление тэгов для поиска и категоризации процессора в UI​

@CapabilityDescription​

Используется для описания процессора и производимого действия им. Помогает понять назначение без чтения документации​

@ReadsAttributes​

Сообщает системе и пользователям, какие атрибуты FlowFile читает процессор​

@WritesAttributes​

Сообщает системе и пользователям какие новые атрибуты FlowFile добавляет процессор​



Для нашего тестового процессора, определим следующие параметры для этих аннотаций:

@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
@ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
@WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})

Мы добавили тэги, описание, указали, что значение целевого атрибута по‑умолчанию для подсчета слов будет habr.wc.target, а результат запишется в атрибут habr.wc.result

Пример отображения значений аннотаций

Пример отображения заполненных аннотаций: тэги, описание

Далее, рассмотрим настройки процессора. Примером в нашем коде в данном случае является параметр MY_PROPERTY с типом PropertyDescriptor. PropertyDescriptor определяет свойство, которое будет использоваться в Processor. Объект такого типа включает его имя, описание свойства, необязательное значение по умолчанию, логику проверки (валидатор) и указание на то, является ли свойство обязательным для корректной работы Processor. PropertyDescriptors создаются путем создания экземпляра класса PropertyDescriptor.Builder, вызова соответствующих методов для заполнения сведений о свойстве и, наконец, вызова метода build, то есть реализуют классический паттерн «Строитель».

Часть кода из шаблона

public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder()
.name("My Property")
.displayName("My Property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

Мы добавим в наш пример три атрибута: исходный атрибут (для перезаписи нашего дефолтного), результирующий, а также разделитель для слов. Примеры описания структуры практически идентичны в базовом шаблоне. Относительно валидаторов - они бывают разные и перечислены в классе StandardValidators.

public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Source Attribute")
.displayName("Source Attribute")
.description("Name of the attribute that contains text for word counting")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.target")
.build();

public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Output Attribute")
.displayName("Output Attribute")
.description("Name of the attribute to store the word count result")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.result")
.build();

public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
.name("Word Delimiter")
.displayName("Word Delimiter")
.description("Regular expression pattern used to split text into words (default: whitespace characters)")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\s+")
.build();

Геттеры getRelationships и getSupportedPropertyDescriptors в нашем примере, как и в большинстве случаев изменять вам не придется.

Однако, в конструкторе стоит поменять список descriptors и relationships, если они отличаются от базовых. В нашем случае это выглядит так:

@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}

ProcessorInitializationContext в нашем случае не нужен, он предназначен для задач, когда в моменте инициализации процессора требуется выполнить какую‑либо задачу (логирование, доступ к ControllerServices). Например, вывести идентификатор процессора при инициализации можно следующим образом:

@Override
protected void init(final ProcessorInitializationContext context) {
String processorId = context.getIdentifier();
getLogger().info("Processor ID: {}", processorId);
// Выведет что-то вроде: "Processor ID: a1b2c3d4-1234-5678-90ab-cdef12345678"
}

Метод помеченный аннотацией@OnScheduled используется для реализации логики, которая будет выполняться при запуске процессора.

Теперь перейдем к самому главному, реализацию метода onTrigger, который отвечает за работу, выполняемую с каждым FlowFile.

Сразу же, нас встречает небольшая часть кода, который позволяет прекратить выполнение, если FlowFile не существует. Выглядит это как простейшая защита от NPE:

FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

Далее, нам уже необходимо реализовать наш код по подсчету слов.

Объяснять алгоритм по подсчету слов выглядит как не самая интересная часть статьи, поэтому подсчет будет вынесен в отдельный метод кода. Разберем код метода onTrigger:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

try {

// Получаем значения свойств
final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

// Получаем значение атрибута, в котором нужно совершить Word Count
final String attributeValue = flowFile.getAttribute(sourceAttribute);

// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
sourceAttribute, flowFile.getId());
// Отправки файла в нужный RelationShip
session.transfer(flowFile, REL_FAILURE);
return;
}

// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);

getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));

// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);

// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().modifyAttributes(flowFile);

getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

} catch (final Exception e) {
getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
}
}

private int countWords(final String text, final String delimiterPattern) {

if (text == null || text.trim().isEmpty()) {
return 0;
}

try {
Pattern pattern;
if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
pattern = Pattern.compile(delimiterPattern);
} else {
pattern = Pattern.compile("\\s+");
}

String[] words = pattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();

} catch (PatternSyntaxException e) {
getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
Pattern defaultPattern = Pattern.compile("\\s+");
String[] words = defaultPattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();
}
}

В самом начале метод принимает два параметра с типами ProcessContext и ProcessSession.

ProcessContext предоставляет доступ к настройкам и конфигурации процессора. Позволяет получать значения свойств, идентификатор процессора или любую другую информацию о нем.

ProcessSession предоставляет методы для манипуляции FlowFiles и управления транзакциями. Позволяет получать FlowFile из входной очереди, создавать новые, читать содержимые, добавлять атрибуты и так далее

Так, в следующем участке кода мы получаем значения параметров, которые определяли ранее:

final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

В UI значения параметров выглядят так:


Атрибуты которые мы задали, отображаются в UI

Затем, получаем текст, который содержится в нашем атрибуте, название которого содержится в параметре Source Attribute

final String attributeValue = flowFile.getAttribute(sourceAttribute);

Далее, проводим проверки в коде и в случае, если какая‑либо из них не проходит (обычное условие if), отправляем файл в Failure Relation. За это отвечает session.transfer()

// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
sourceAttribute, flowFile.getId());
// Отправки файла в нужный RelationShip
session.transfer(flowFile, REL_FAILURE);
return;
}

В случае, если ошибок, то продолжаем выполнять логику и отправляем FlowFile в REL_SUCCESS:

// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);

getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));

// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);

// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
// Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
session.getProvenanceReporter().modifyAttributes(flowFile);

getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

Таким образом, у нас получается полноценный код для нашего процессора. Подведем итог готового кода.

Результат кода для процессора

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.*;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
@ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
@WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})
public class MyProcessor extends AbstractProcessor {

public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Source Attribute")
.displayName("Source Attribute")
.description("Name of the attribute that contains text for word counting")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.target")
.build();

public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Output Attribute")
.displayName("Output Attribute")
.description("Name of the attribute to store the word count result")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.result")
.build();


public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
.name("Word Delimiter")
.displayName("Word Delimiter")
.description("Regular expression pattern used to split text into words (default: whitespace characters)")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\s+")
.build();

public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles with successfully word count")
.build();

public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles with not successfully word count")
.build();

private List<PropertyDescriptor> descriptors;

private Set<Relationship> relationships;

@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}

@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}

@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}

@OnScheduled
public void onScheduled(final ProcessContext context) {

}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}

try {

// Получаем значения свойств
final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

// Получаем значение атрибута, в котором нужно совершить Word Count
final String attributeValue = flowFile.getAttribute(sourceAttribute);

// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
sourceAttribute, flowFile.getId());
// Отправки файла в нужный RelationShip
session.transfer(flowFile, REL_FAILURE);
return;
}

// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);

getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));

// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);

// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
// Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
session.getProvenanceReporter().modifyAttributes(flowFile);

getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

} catch (final Exception e) {
getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
}
}

private int countWords(final String text, final String delimiterPattern) {

if (text == null || text.trim().isEmpty()) {
return 0;
}

try {
Pattern pattern;
if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
pattern = Pattern.compile(delimiterPattern);
} else {
pattern = Pattern.compile("\\s+");
}

String[] words = pattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();

} catch (PatternSyntaxException e) {
getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
Pattern defaultPattern = Pattern.compile("\\s+");
String[] words = defaultPattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();
}
}
}

Помимо основного кода для процессора, Apache NiFi предлагает решение в виде Mock Framework, а именно TestRunner Class. С помощью него, можно не запуская Apache NiFi (добавление нового NAR требует перезагрузки) тестировать различные кейсы и покрывать свой код тестами как в привычном бэкенде или другой области разработки. Пример кода для тестирования представлен ниже:

Пример тестов для процессора

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

class MyProcessorTest {

private TestRunner testRunner;

@BeforeEach
void init() {
testRunner = TestRunners.newTestRunner(MyProcessor.class);
}

@Test
void testProcessorWithDefaultAttribute() {
// Устанавливаем свойства
testRunner.setProperty(MyProcessor.SOURCE_ATTRIBUTE, "text");
testRunner.setProperty(MyProcessor.OUTPUT_ATTRIBUTE, "word.count");

// СОЗДАЕМ АТРИБУТЫ
Map<String, String> attributes = new HashMap<>();
attributes.put("text", "Hello world this is Apache NiFi for Habr");

// Добавляем FlowFile с атрибутами
testRunner.enqueue("dummy content", attributes);

// Запускаем процессор
testRunner.run(1);

// Проверяем результаты
testRunner.assertTransferCount(MyProcessor.REL_SUCCESS, 1);
testRunner.assertTransferCount(MyProcessor.REL_FAILURE, 0);

// Проверяем атрибут 'word.count'
MockFlowFile result = testRunner.getFlowFilesForRelationship(MyProcessor.REL_SUCCESS).getFirst();
result.assertAttributeEquals("word.count", "8");
result.assertAttributeExists("filename"); // Стандартный атрибут
}
}

Далее, выполним mvn clean install в консоли.


Результат процесса сборки

Результат сборки

Нам необходим собранный файл с расширением nar. Именно его мы и добавим в директорию lib.

После того, как вы добавили в директорию lib вновь собранный NAR, запустите Apache NiFi (или перезапустите его).

Далее, среди процессоров должен появиться наш новый процессор.


Самостоятельно реализованный процессор

И в нем будут видны наши заранее реализованные параметры:



Создадим небольшой пайплайн для теста процессора:


Простейший пайплайн для тестирования

Настроим процессор GenerateFlowFile, чтобы он генерировал файлы с уже имеющимся атрибутом:


Добавление атрибута для Apache NiFi

Запустим все процессоры через Run Once и увидим результат в одной из очередей перед Funnel:


Очередь с тестовым FlowFile

Проверим атрибуты:


Наши атрибуты с подсчитанным количеством слов

Таким образом, видно, что процессор штатно отработал и мы прошли все этапы, от написания кода, до сборки и добавления нашего процессора в Apache NiFi.

Результат статьи в виде кодовой базы можно найти тут.

Итоги


Мы рассмотрели, как создать простейший процессор в Apache NiFi, реализовать логику, как использовать основные методы различных классов. Естественно, этой статьей разработка модулей не ограничивается ни в коем случае, и есть много различных направлений в этой области. Для полного погружения рекомендуется изучать документацию и тестировать на практике, желательно на большой нагрузке.

Комьюнити Apache NiFi в РФ: telegram
 
Яндекс.Метрика Рейтинг@Mail.ru
Сверху Снизу