Files
Diarios-Oficiais-ALEMS/diarios/search_service.py

849 lines
26 KiB
Python
Raw Permalink Normal View History

import re
from datetime import datetime
from typing import Optional, Dict, Any, List
from elasticsearch import Elasticsearch, AsyncElasticsearch
2025-06-16 13:11:57 -04:00
from .schemas import BuscaDiariosResponseSchema, ResultadoSchema, PaginaSchema
import unicodedata
import asyncio
from django.conf import settings
async def is_fuzzy_appropriate(term: str) -> bool:
"""
Determina se a fuzziness é apropriada para o termo de busca.
Args:
term: Termo de busca a ser avaliado
Returns:
bool: True se fuzziness é apropriada, False caso contrário
"""
return not re.match(r"^\d+/\d+$", term.strip())
async def parse_date(date_str: Optional[str]) -> Optional[str]:
"""
Converte string de data para formato ISO para ElasticSearch.
Args:
date_str: String de data no formato YYYY-MM-DD
Returns:
Optional[str]: Data formatada ou None se inválida
"""
if not date_str:
return None
try:
dt = datetime.strptime(date_str, "%Y-%m-%d")
return dt.strftime("%Y-%m-%d")
except ValueError:
print(f"Alerta: Formato de data inválido recebido: {date_str}")
return None
async def buscar_diarios(
query: Optional[str] = None,
data_inicio: Optional[str] = None,
data_fim: Optional[str] = None,
tipo_diario: Optional[str] = None,
page: int = 1,
page_size: int = 10,
) -> Dict[str, Any]:
"""
Realiza busca nos diários oficiais com os parâmetros fornecidos.
Args:
query: Termo de busca
data_inicio: Data inicial no formato YYYY-MM-DD
data_fim: Data final no formato YYYY-MM-DD
tipo_diario: Tipo de diário a ser filtrado
page: Número da página de resultados
page_size: Quantidade de resultados por página
Returns:
Dict[str, Any]: Dicionário com resultados da busca
"""
try:
es = AsyncElasticsearch(
"http://elasticsearch:9200",
request_timeout=30,
basic_auth=(
settings.ELASTICSEARCH_USER,
settings.ELASTICSEARCH_PASSWORD,
),
)
if not await es.ping():
raise ConnectionError("Não foi possível conectar ao Elasticsearch")
except Exception as e:
print(f"Erro ao conectar com Elasticsearch: {e}")
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
es_body = {
"query": {"bool": {"must": [], "filter": []}},
"size": page_size,
"from": (page - 1) * page_size,
"_source": [
"numero",
"data",
"link",
"tipo",
"paginas.numero",
"paginas.conteudo",
],
}
parsed_dt_inicio = await parse_date(data_inicio)
parsed_dt_fim = await parse_date(data_fim)
if parsed_dt_inicio or parsed_dt_fim:
date_range_filter = {}
if parsed_dt_inicio:
date_range_filter["gte"] = parsed_dt_inicio
if parsed_dt_fim:
date_range_filter["lte"] = parsed_dt_fim
es_body["query"]["bool"]["filter"].append(
{"range": {"data": date_range_filter}}
)
if tipo_diario:
es_body["query"]["bool"]["filter"].append({"term": {"tipo.nome": tipo_diario}})
if query:
aplicar_fuzziness = await is_fuzzy_appropriate(query)
text_query_bool = {
"bool": {
"should": [
{
"match_phrase": {
"paginas.conteudo.search": {
"query": query,
"slop": 4,
"boost": 5.0,
}
}
},
{
"match": {
"paginas.conteudo.search": {
"query": query,
"fuzziness": "AUTO",
"operator": "and",
"prefix_length": 3,
}
}
},
],
"minimum_should_match": "75%",
}
}
nested_query = {
"nested": {
"path": "paginas",
"query": text_query_bool,
"inner_hits": {
"highlight": {
"fields": {"paginas.conteudo.search": {}},
"fragment_size": 500,
"number_of_fragments": 1,
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"],
},
"_source": ["paginas.numero"],
},
}
}
es_body["query"]["bool"]["must"].append(nested_query)
else:
es_body["query"]["bool"]["must"].append({"match_all": {}})
# Adicionar min_score
if query:
es_body["min_score"] = 2.5 * len(query.split())
try:
response = await es.search(index="diario_oficial", body=es_body)
except Exception as e:
print(f"Erro ao executar busca no Elasticsearch: {e}")
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
finally:
await es.close()
hits = response.get("hits", {})
total = hits.get("total", {}).get("value", 0)
resultados_formatados = []
for hit in hits.get("hits", []):
source = hit.get("_source", {})
resultado_data = {
"id": hit.get("_id"),
"numero": source.get("numero", ""),
"data": source.get("data"),
"link": source.get("link", ""),
"tipo": source.get("tipo", {}).get("nome", "Sem Tipo"),
"score": hit.get("_score"),
"paginas": [],
}
pagina_match = None
if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]:
paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"]
if paginas_inner_hits:
inner_hit = paginas_inner_hits[0]
inner_source = inner_hit.get("_source", {})
page_num = inner_source.get("numero", "N/A")
highlights = inner_hit.get("highlight", {}).get(
"paginas.conteudo.search", []
)
highlight_content = " ... ".join(highlights) if highlights else ""
if page_num != "N/A" and highlight_content:
pagina_match = PaginaSchema(
numero=page_num, conteudo=highlight_content
)
if pagina_match:
resultado_data["paginas"].append(pagina_match)
elif not query and "paginas" in source and source["paginas"]:
primeira_pagina_source = source["paginas"][0]
conteudo_orig = primeira_pagina_source.get("conteudo", "")
resultado_data["paginas"].append(
PaginaSchema(
numero=primeira_pagina_source.get("numero", 0),
conteudo=conteudo_orig,
)
)
resultados_formatados.append(ResultadoSchema(**resultado_data))
return {
"total": total,
"resultados": resultados_formatados,
"pagina": page,
"por_pagina": page_size,
}
async def remover_acentos(texto: str) -> str:
"""
Remove acentos de uma string para comparações mais neutras.
Args:
texto: Texto a ser normalizado
Returns:
str: Texto sem acentos
"""
return "".join(
c
for c in unicodedata.normalize("NFD", texto)
if unicodedata.category(c) != "Mn"
)
async def processar_query(query: str) -> str:
"""
Faz pré-processamento da query para separar termos como 'processonº404'.
Args:
query: Texto da consulta original
Returns:
str: Consulta processada
"""
return re.sub(r"([a-zA-Z]+)(nº|n°|no)(\d+)", r"\1 \2 \3", query)
async def montar_suggest_body(query_processada: str) -> dict:
"""
Monta o corpo da sugestão para a requisição ao Elasticsearch.
Args:
query_processada: Query processada para sugestão
Returns:
dict: Corpo da requisição para suggest
"""
return {
"suggest": {
"text": query_processada,
"correcao": {
"phrase": {
"field": "paginas.conteudo.search",
"size": 3,
"gram_size": 2,
"confidence": 0.8,
"max_errors": 4,
"highlight": {"pre_tag": "**", "post_tag": "**"},
"collate": {
"query": {
"source": {
"nested": {
"path": "paginas",
"query": {
"bool": {
"should": [
{
"match_phrase": {
"paginas.conteudo.search": {
"query": "{{suggestion}}",
"slop": 1,
}
}
}
]
}
},
}
}
},
"params": {"field_name": "paginas.conteudo.search"},
"prune": True,
},
}
},
},
"size": 0,
}
async def sugestao_termo(query: str) -> Optional[str]:
"""
Oferece sugestões de correção para a query, verificando se a sugestão
realmente retorna resultados antes de apresentá-la ao usuário.
Args:
query: Texto da consulta original
Returns:
Optional[str]: Sugestão para a consulta ou None
"""
es = await conectar_elasticsearch()
if not es:
return None
query_processada = await processar_query(query)
suggest_body = await montar_suggest_body(query_processada)
try:
response = await es.search(index="diario_oficial", body=suggest_body)
suggestions = response.get("suggest", {}).get("correcao", [])
for sug in suggestions:
for option in sug.get("options", []):
if option.get("collate_match", False):
sugestao = option["text"]
# Verifica se a sugestão é idêntica à query (ignorando acentos e case)
if sugestao.lower() == query.lower():
continue
if await remover_acentos(sugestao.lower()) == await remover_acentos(
query.lower()
):
continue
return sugestao
return None
except Exception as e:
print(f"Erro ao buscar sugestão: {e}")
return None
finally:
await es.close()
async def conectar_elasticsearch() -> Optional[AsyncElasticsearch]:
"""
Conecta ao Elasticsearch e retorna o cliente.
Returns:
Optional[AsyncElasticsearch]: Cliente Elasticsearch ou None se falhar
"""
try:
es = AsyncElasticsearch(
"http://elasticsearch:9200",
request_timeout=30,
basic_auth=(
settings.ELASTICSEARCH_USER,
settings.ELASTICSEARCH_PASSWORD,
),
)
if not await es.ping():
raise ConnectionError("Não foi possível conectar ao Elasticsearch")
return es
except Exception as e:
print(f"Erro ao conectar com Elasticsearch: {e}")
return None
async def construir_ordenacao(ordenar_por: str) -> List[Dict[str, Any]]:
"""
Constrói a cláusula de ordenação para a consulta.
Args:
ordenar_por: Critério de ordenação
Returns:
List[Dict[str, Any]]: Lista de ordenação para Elasticsearch
"""
if ordenar_por == "data_asc":
return [{"data": {"order": "asc"}}]
elif ordenar_por == "data_desc":
return [{"data": {"order": "desc"}}]
else: # relevancia
return ["_score"]
async def preencher_numero_do_diario_com_zeros(numero_diario: str) -> str:
"""
Preenche o número do diário com zeros à esquerda.
Args:
numero_diario: Número do diário
Returns:
str: Número formatado com zeros à esquerda
"""
return numero_diario.zfill(4)
async def construir_filtros(
data_inicio: Optional[str],
data_fim: Optional[str],
tipo_diario: Optional[str],
numero_diario: Optional[str],
) -> List[Dict[str, Any]]:
"""
Constrói os filtros de data, tipo e número do diário.
Args:
data_inicio: Data inicial no formato YYYY-MM-DD
data_fim: Data final no formato YYYY-MM-DD
tipo_diario: Tipo de diário a ser filtrado
numero_diario: Número do diário
Returns:
List[Dict[str, Any]]: Lista de filtros para Elasticsearch
"""
filtros = []
parsed_dt_inicio = await parse_date(data_inicio)
parsed_dt_fim = await parse_date(data_fim)
if parsed_dt_inicio or parsed_dt_fim:
date_range = {}
if parsed_dt_inicio:
date_range["gte"] = parsed_dt_inicio
if parsed_dt_fim:
date_range["lte"] = parsed_dt_fim
filtros.append({"range": {"data": date_range}})
if tipo_diario:
filtros.append({"term": {"tipo.nome": tipo_diario}})
if numero_diario:
# numero_diario = await preencher_numero_do_diario_com_zeros(numero_diario)
filtros.append({"wildcard": {"numero": f"*{numero_diario}*"}})
return filtros
async def construir_query_busca(
query: Optional[str], modo_busca: str
) -> Dict[str, Any]:
"""
Constrói a query de busca com base no termo e modo de busca.
Args:
query: Termo de busca
modo_busca: Modo de busca (exata ou qualquer)
Returns:
Dict[str, Any]: Query de busca para Elasticsearch
"""
if not query:
return {
"nested": {
"path": "paginas",
"query": {"match_all": {}},
"inner_hits": {
"_source": ["paginas.numero", "paginas.conteudo"],
"size": 100,
},
}
}
should_queries = []
# Busca exata (boost maior)
should_queries.append(
{
"match_phrase": {
"paginas.conteudo.search": {"query": query, "slop": 3, "boost": 5.0}
}
}
)
# Busca mais leve (separando termos), se modo_busca permitir
if modo_busca == "qualquer":
should_queries.append(
{
"match": {
"paginas.conteudo.search": {
"query": query,
"operator": "or",
"fuzziness": "AUTO",
"prefix_length": 3,
"boost": 1.0,
}
}
}
)
return {
"nested": {
"path": "paginas",
"query": {"bool": {"should": should_queries, "minimum_should_match": 1}},
"inner_hits": {
"highlight": {
"fields": {"paginas.conteudo.search": {}},
"fragment_size": 500,
"number_of_fragments": 1,
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"],
},
"_source": ["paginas.numero"],
"size": 100, # Aumentar para retornar mais páginas correspondentes
},
}
}
async def construir_request_body(
query: Optional[str],
modo_busca: str,
ordenar_por: str,
data_inicio: Optional[str],
data_fim: Optional[str],
tipo_diario: Optional[str],
numero_diario: Optional[str],
page: int,
page_size: int,
) -> Dict[str, Any]:
"""
Constrói o corpo da requisição para o Elasticsearch.
Args:
query: Termo de busca
modo_busca: Modo de busca (exata ou qualquer)
ordenar_por: Critério de ordenação
data_inicio: Data inicial
data_fim: Data final
tipo_diario: Tipo de diário
numero_diario: Número do diário
page: Número da página
page_size: Tamanho da página
Returns:
Dict[str, Any]: Corpo da requisição para Elasticsearch
"""
es_body = {
"query": {"bool": {"must": [], "filter": []}},
"size": page_size,
"from": (page - 1) * page_size,
"_source": [
"numero",
"data",
"link",
"tipo",
"paginas.numero",
"paginas.conteudo",
],
}
# Adicionar ordenação
es_body["sort"] = await construir_ordenacao(ordenar_por)
# Adicionar filtros
es_body["query"]["bool"]["filter"] = await construir_filtros(
data_inicio, data_fim, tipo_diario, numero_diario
)
# Adicionar query principal
query_principal = await construir_query_busca(query, modo_busca)
es_body["query"]["bool"]["must"].append(query_principal)
return es_body
async def processar_paginas_encontradas(
hit: Dict[str, Any], query: Optional[str]
) -> List[PaginaSchema]:
"""
Processa as páginas encontradas em um hit, retornando todas as correspondências.
Args:
hit: Item de resultado do Elasticsearch
query: Termo de busca original
Returns:
List[PaginaSchema]: Lista de páginas encontradas
"""
2025-06-16 13:11:57 -04:00
if not query:
return []
paginas = []
source = hit.get("_source", {})
2025-06-16 13:11:57 -04:00
#if not query and "paginas" in source:
# for pagina in source["paginas"]:
# paginas.append(
# PaginaSchema(
# numero=pagina.get("numero", 0), conteudo=pagina.get("conteudo", "")
# )
# )
# Se temos uma query e há inner_hits, processamos todas as páginas correspondentes
if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]:
paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"]
# Processar todas as páginas encontradas, não apenas a primeira
for inner_hit in paginas_inner_hits:
inner_source = inner_hit.get("_source", {})
page_num = inner_source.get("numero", "N/A")
highlights = inner_hit.get("highlight", {}).get(
"paginas.conteudo.search", []
)
highlight_content = " ... ".join(highlights) if highlights else ""
if page_num != "N/A" and highlight_content:
paginas.append(
PaginaSchema(numero=page_num, conteudo=highlight_content)
)
# Se não há query ou não encontramos páginas nos inner_hits, usamos a primeira página do documento
if not paginas and "paginas" in source and source["paginas"]:
primeira_pagina_source = source["paginas"][0]
conteudo_orig = primeira_pagina_source.get("conteudo", "")
paginas.append(
PaginaSchema(
numero=primeira_pagina_source.get("numero", 0),
conteudo=conteudo_orig,
)
)
return paginas
async def processar_resultados(
response: Dict[str, Any], query: Optional[str], page: int, page_size: int
) -> Dict[str, Any]:
"""
Processa os resultados da busca no Elasticsearch.
Args:
response: Resposta do Elasticsearch
query: Termo de busca original
page: Número da página atual
page_size: Tamanho da página
Returns:
Dict[str, Any]: Resultados processados
"""
hits = response.get("hits", {})
total = hits.get("total", {}).get("value", 0)
resultados_formatados = []
for hit in hits.get("hits", []):
source = hit.get("_source", {})
resultado_data = {
"id": hit.get("_id"),
"numero": source.get("numero", ""),
"data": source.get("data"),
"link": source.get("link", ""),
"tipo": source.get("tipo", {}).get("nome", "Sem Tipo"),
"score": hit.get("_score"),
"paginas": [],
}
# Processar todas as páginas encontradas
resultado_data["paginas"] = await processar_paginas_encontradas(hit, query)
resultados_formatados.append(ResultadoSchema(**resultado_data))
return {
"total": total,
"resultados": resultados_formatados,
"pagina": page,
"por_pagina": page_size,
}
async def buscar_diarios_simples(
query: Optional[str] = None,
numero_diario: Optional[str] = None,
modo_busca: str = "exata", # "exata" ou "qualquer"
ordenar_por: str = "data_asc", # "relevancia" ou "data"
data_inicio: Optional[str] = None,
data_fim: Optional[str] = None,
tipo_diario: Optional[str] = None,
page: int = 1,
page_size: int = 10,
) -> Dict[str, Any]:
"""
Função principal para buscar diários oficiais.
Args:
query: Termo de busca
numero_diario: Número do diário oficial
modo_busca: Modo de busca (exata ou qualquer)
ordenar_por: Critério de ordenação
data_inicio: Data inicial
data_fim: Data final
tipo_diario: Tipo de diário
page: Número da página
page_size: Tamanho da página
Returns:
Dict[str, Any]: Dicionário com os resultados da busca
"""
# Conectar ao Elasticsearch
es = await conectar_elasticsearch()
if not es:
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
# Construir o corpo da requisição
es_body = await construir_request_body(
query,
modo_busca,
ordenar_por,
data_inicio,
data_fim,
tipo_diario,
numero_diario,
page,
page_size,
)
# Executar a busca
try:
response = await es.search(index="diario_oficial", body=es_body)
except Exception as e:
print(f"Erro ao executar busca no Elasticsearch: {e}")
return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size}
finally:
await es.close()
# Processar e retornar os resultados
return await processar_resultados(response, query, page, page_size)
2025-06-16 13:11:57 -04:00
async def list_diarios(
es_client: AsyncElasticsearch,
page: int = 1,
page_size: int = 10,
data_inicio: Optional[str] = None,
data_fim: Optional[str] = None,
tipo_diario: Optional[str] = None,
numero_diario: Optional[str] = None,
ordenar_por: str = "data_desc"
) -> Dict[str, any]:
"""
Lista diários com paginação e filtros opcionais
Args:
es_client: Cliente do Elasticsearch
page: Número da página
page_size: Itens por página
data_inicio: Data inicial (YYYY-MM-DD)
data_fim: Data final (YYYY-MM-DD)
tipo_diario: Filtro por tipo de diário
numero_diario: Filtro por número do diário
ordenar_por: Campo para ordenação (data_desc, data_asc)
Returns:
Dict com total de itens e lista de diários
"""
# Construir query de filtros
filters = []
# Filtro por data
if data_inicio or data_fim:
date_range = {}
if data_inicio:
try:
datetime.strptime(data_inicio, "%Y-%m-%d")
date_range["gte"] = data_inicio
except ValueError:
pass
if data_fim:
try:
datetime.strptime(data_fim, "%Y-%m-%d")
date_range["lte"] = data_fim
except ValueError:
pass
if date_range:
filters.append({"range": {"data": date_range}})
# Filtro por tipo
if tipo_diario:
filters.append({"term": {"tipo.nome": tipo_diario}})
# Filtro por número
if numero_diario:
filters.append({"wildcard": {"numero": f"*{numero_diario}*"}})
# Construir query completa
query = {
"bool": {
"must": [{"match_all": {}}],
"filter": filters
}
}
# Definir ordenação
sort = [{"data": {"order": "asc" if ordenar_por == "data_asc" else "desc"}}]
# Executar consulta
try:
response = await es_client.search(
index="diario_oficial",
body={
"query": query,
"sort": sort,
"from": (page - 1) * page_size,
"size": page_size,
"_source": ["numero", "data", "tipo", "link"]
}
)
hits = response["hits"]["hits"]
total = response["hits"]["total"]["value"]
# Processar resultados
diarios = []
for hit in hits:
source = hit["_source"]
diarios.append(BuscaDiariosResponseSchema(
id=hit["_id"],
numero=source.get("numero"),
data=source.get("data"),
tipo=source.get("tipo", {}).get("nome"),
link=source.get("link")
))
return {
"total": total,
"page": page,
"page_size": page_size,
"results": diarios
}
except Exception as e:
print(f"Erro ao listar diários: {e}")
return {
"total": 0,
"page": page,
"page_size": page_size,
"results": []
}