import re from datetime import datetime from typing import Optional, Dict, Any, List from elasticsearch import Elasticsearch, AsyncElasticsearch from .schemas import BuscaDiariosResponseSchema, ResultadoSchema, PaginaSchema import unicodedata import asyncio from django.conf import settings async def is_fuzzy_appropriate(term: str) -> bool: """ Determina se a fuzziness é apropriada para o termo de busca. Args: term: Termo de busca a ser avaliado Returns: bool: True se fuzziness é apropriada, False caso contrário """ return not re.match(r"^\d+/\d+$", term.strip()) async def parse_date(date_str: Optional[str]) -> Optional[str]: """ Converte string de data para formato ISO para ElasticSearch. Args: date_str: String de data no formato YYYY-MM-DD Returns: Optional[str]: Data formatada ou None se inválida """ if not date_str: return None try: dt = datetime.strptime(date_str, "%Y-%m-%d") return dt.strftime("%Y-%m-%d") except ValueError: print(f"Alerta: Formato de data inválido recebido: {date_str}") return None async def buscar_diarios( query: Optional[str] = None, data_inicio: Optional[str] = None, data_fim: Optional[str] = None, tipo_diario: Optional[str] = None, page: int = 1, page_size: int = 10, ) -> Dict[str, Any]: """ Realiza busca nos diários oficiais com os parâmetros fornecidos. Args: query: Termo de busca data_inicio: Data inicial no formato YYYY-MM-DD data_fim: Data final no formato YYYY-MM-DD tipo_diario: Tipo de diário a ser filtrado page: Número da página de resultados page_size: Quantidade de resultados por página Returns: Dict[str, Any]: Dicionário com resultados da busca """ try: es = AsyncElasticsearch( "http://elasticsearch:9200", request_timeout=30, basic_auth=( settings.ELASTICSEARCH_USER, settings.ELASTICSEARCH_PASSWORD, ), ) if not await es.ping(): raise ConnectionError("Não foi possível conectar ao Elasticsearch") except Exception as e: print(f"Erro ao conectar com Elasticsearch: {e}") return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size} es_body = { "query": {"bool": {"must": [], "filter": []}}, "size": page_size, "from": (page - 1) * page_size, "_source": [ "numero", "data", "link", "tipo", "paginas.numero", "paginas.conteudo", ], } parsed_dt_inicio = await parse_date(data_inicio) parsed_dt_fim = await parse_date(data_fim) if parsed_dt_inicio or parsed_dt_fim: date_range_filter = {} if parsed_dt_inicio: date_range_filter["gte"] = parsed_dt_inicio if parsed_dt_fim: date_range_filter["lte"] = parsed_dt_fim es_body["query"]["bool"]["filter"].append( {"range": {"data": date_range_filter}} ) if tipo_diario: es_body["query"]["bool"]["filter"].append({"term": {"tipo.nome": tipo_diario}}) if query: aplicar_fuzziness = await is_fuzzy_appropriate(query) text_query_bool = { "bool": { "should": [ { "match_phrase": { "paginas.conteudo.search": { "query": query, "slop": 4, "boost": 5.0, } } }, { "match": { "paginas.conteudo.search": { "query": query, "fuzziness": "AUTO", "operator": "and", "prefix_length": 3, } } }, ], "minimum_should_match": "75%", } } nested_query = { "nested": { "path": "paginas", "query": text_query_bool, "inner_hits": { "highlight": { "fields": {"paginas.conteudo.search": {}}, "fragment_size": 500, "number_of_fragments": 1, "pre_tags": [""], "post_tags": [""], }, "_source": ["paginas.numero"], }, } } es_body["query"]["bool"]["must"].append(nested_query) else: es_body["query"]["bool"]["must"].append({"match_all": {}}) # Adicionar min_score if query: es_body["min_score"] = 2.5 * len(query.split()) try: response = await es.search(index="diario_oficial", body=es_body) except Exception as e: print(f"Erro ao executar busca no Elasticsearch: {e}") return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size} finally: await es.close() hits = response.get("hits", {}) total = hits.get("total", {}).get("value", 0) resultados_formatados = [] for hit in hits.get("hits", []): source = hit.get("_source", {}) resultado_data = { "id": hit.get("_id"), "numero": source.get("numero", ""), "data": source.get("data"), "link": source.get("link", ""), "tipo": source.get("tipo", {}).get("nome", "Sem Tipo"), "score": hit.get("_score"), "paginas": [], } pagina_match = None if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]: paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"] if paginas_inner_hits: inner_hit = paginas_inner_hits[0] inner_source = inner_hit.get("_source", {}) page_num = inner_source.get("numero", "N/A") highlights = inner_hit.get("highlight", {}).get( "paginas.conteudo.search", [] ) highlight_content = " ... ".join(highlights) if highlights else "" if page_num != "N/A" and highlight_content: pagina_match = PaginaSchema( numero=page_num, conteudo=highlight_content ) if pagina_match: resultado_data["paginas"].append(pagina_match) elif not query and "paginas" in source and source["paginas"]: primeira_pagina_source = source["paginas"][0] conteudo_orig = primeira_pagina_source.get("conteudo", "") resultado_data["paginas"].append( PaginaSchema( numero=primeira_pagina_source.get("numero", 0), conteudo=conteudo_orig, ) ) resultados_formatados.append(ResultadoSchema(**resultado_data)) return { "total": total, "resultados": resultados_formatados, "pagina": page, "por_pagina": page_size, } async def remover_acentos(texto: str) -> str: """ Remove acentos de uma string para comparações mais neutras. Args: texto: Texto a ser normalizado Returns: str: Texto sem acentos """ return "".join( c for c in unicodedata.normalize("NFD", texto) if unicodedata.category(c) != "Mn" ) async def processar_query(query: str) -> str: """ Faz pré-processamento da query para separar termos como 'processonº404'. Args: query: Texto da consulta original Returns: str: Consulta processada """ return re.sub(r"([a-zA-Z]+)(nº|n°|no)(\d+)", r"\1 \2 \3", query) async def montar_suggest_body(query_processada: str) -> dict: """ Monta o corpo da sugestão para a requisição ao Elasticsearch. Args: query_processada: Query processada para sugestão Returns: dict: Corpo da requisição para suggest """ return { "suggest": { "text": query_processada, "correcao": { "phrase": { "field": "paginas.conteudo.search", "size": 3, "gram_size": 2, "confidence": 0.8, "max_errors": 4, "highlight": {"pre_tag": "**", "post_tag": "**"}, "collate": { "query": { "source": { "nested": { "path": "paginas", "query": { "bool": { "should": [ { "match_phrase": { "paginas.conteudo.search": { "query": "{{suggestion}}", "slop": 1, } } } ] } }, } } }, "params": {"field_name": "paginas.conteudo.search"}, "prune": True, }, } }, }, "size": 0, } async def sugestao_termo(query: str) -> Optional[str]: """ Oferece sugestões de correção para a query, verificando se a sugestão realmente retorna resultados antes de apresentá-la ao usuário. Args: query: Texto da consulta original Returns: Optional[str]: Sugestão para a consulta ou None """ es = await conectar_elasticsearch() if not es: return None query_processada = await processar_query(query) suggest_body = await montar_suggest_body(query_processada) try: response = await es.search(index="diario_oficial", body=suggest_body) suggestions = response.get("suggest", {}).get("correcao", []) for sug in suggestions: for option in sug.get("options", []): if option.get("collate_match", False): sugestao = option["text"] # Verifica se a sugestão é idêntica à query (ignorando acentos e case) if sugestao.lower() == query.lower(): continue if await remover_acentos(sugestao.lower()) == await remover_acentos( query.lower() ): continue return sugestao return None except Exception as e: print(f"Erro ao buscar sugestão: {e}") return None finally: await es.close() async def conectar_elasticsearch() -> Optional[AsyncElasticsearch]: """ Conecta ao Elasticsearch e retorna o cliente. Returns: Optional[AsyncElasticsearch]: Cliente Elasticsearch ou None se falhar """ try: es = AsyncElasticsearch( "http://elasticsearch:9200", request_timeout=30, basic_auth=( settings.ELASTICSEARCH_USER, settings.ELASTICSEARCH_PASSWORD, ), ) if not await es.ping(): raise ConnectionError("Não foi possível conectar ao Elasticsearch") return es except Exception as e: print(f"Erro ao conectar com Elasticsearch: {e}") return None async def construir_ordenacao(ordenar_por: str) -> List[Dict[str, Any]]: """ Constrói a cláusula de ordenação para a consulta. Args: ordenar_por: Critério de ordenação Returns: List[Dict[str, Any]]: Lista de ordenação para Elasticsearch """ if ordenar_por == "data_asc": return [{"data": {"order": "asc"}}] elif ordenar_por == "data_desc": return [{"data": {"order": "desc"}}] else: # relevancia return ["_score"] async def preencher_numero_do_diario_com_zeros(numero_diario: str) -> str: """ Preenche o número do diário com zeros à esquerda. Args: numero_diario: Número do diário Returns: str: Número formatado com zeros à esquerda """ return numero_diario.zfill(4) async def construir_filtros( data_inicio: Optional[str], data_fim: Optional[str], tipo_diario: Optional[str], numero_diario: Optional[str], ) -> List[Dict[str, Any]]: """ Constrói os filtros de data, tipo e número do diário. Args: data_inicio: Data inicial no formato YYYY-MM-DD data_fim: Data final no formato YYYY-MM-DD tipo_diario: Tipo de diário a ser filtrado numero_diario: Número do diário Returns: List[Dict[str, Any]]: Lista de filtros para Elasticsearch """ filtros = [] parsed_dt_inicio = await parse_date(data_inicio) parsed_dt_fim = await parse_date(data_fim) if parsed_dt_inicio or parsed_dt_fim: date_range = {} if parsed_dt_inicio: date_range["gte"] = parsed_dt_inicio if parsed_dt_fim: date_range["lte"] = parsed_dt_fim filtros.append({"range": {"data": date_range}}) if tipo_diario: filtros.append({"term": {"tipo.nome": tipo_diario}}) if numero_diario: # numero_diario = await preencher_numero_do_diario_com_zeros(numero_diario) filtros.append({"wildcard": {"numero": f"*{numero_diario}*"}}) return filtros async def construir_query_busca( query: Optional[str], modo_busca: str ) -> Dict[str, Any]: """ Constrói a query de busca com base no termo e modo de busca. Args: query: Termo de busca modo_busca: Modo de busca (exata ou qualquer) Returns: Dict[str, Any]: Query de busca para Elasticsearch """ if not query: return { "nested": { "path": "paginas", "query": {"match_all": {}}, "inner_hits": { "_source": ["paginas.numero", "paginas.conteudo"], "size": 100, }, } } should_queries = [] # Busca exata (boost maior) should_queries.append( { "match_phrase": { "paginas.conteudo.search": {"query": query, "slop": 3, "boost": 5.0} } } ) # Busca mais leve (separando termos), se modo_busca permitir if modo_busca == "qualquer": should_queries.append( { "match": { "paginas.conteudo.search": { "query": query, "operator": "or", "fuzziness": "AUTO", "prefix_length": 3, "boost": 1.0, } } } ) return { "nested": { "path": "paginas", "query": {"bool": {"should": should_queries, "minimum_should_match": 1}}, "inner_hits": { "highlight": { "fields": {"paginas.conteudo.search": {}}, "fragment_size": 500, "number_of_fragments": 1, "pre_tags": [""], "post_tags": [""], }, "_source": ["paginas.numero"], "size": 100, # Aumentar para retornar mais páginas correspondentes }, } } async def construir_request_body( query: Optional[str], modo_busca: str, ordenar_por: str, data_inicio: Optional[str], data_fim: Optional[str], tipo_diario: Optional[str], numero_diario: Optional[str], page: int, page_size: int, ) -> Dict[str, Any]: """ Constrói o corpo da requisição para o Elasticsearch. Args: query: Termo de busca modo_busca: Modo de busca (exata ou qualquer) ordenar_por: Critério de ordenação data_inicio: Data inicial data_fim: Data final tipo_diario: Tipo de diário numero_diario: Número do diário page: Número da página page_size: Tamanho da página Returns: Dict[str, Any]: Corpo da requisição para Elasticsearch """ es_body = { "query": {"bool": {"must": [], "filter": []}}, "size": page_size, "from": (page - 1) * page_size, "_source": [ "numero", "data", "link", "tipo", "paginas.numero", "paginas.conteudo", ], } # Adicionar ordenação es_body["sort"] = await construir_ordenacao(ordenar_por) # Adicionar filtros es_body["query"]["bool"]["filter"] = await construir_filtros( data_inicio, data_fim, tipo_diario, numero_diario ) # Adicionar query principal query_principal = await construir_query_busca(query, modo_busca) es_body["query"]["bool"]["must"].append(query_principal) return es_body async def processar_paginas_encontradas( hit: Dict[str, Any], query: Optional[str] ) -> List[PaginaSchema]: """ Processa as páginas encontradas em um hit, retornando todas as correspondências. Args: hit: Item de resultado do Elasticsearch query: Termo de busca original Returns: List[PaginaSchema]: Lista de páginas encontradas """ if not query: return [] paginas = [] source = hit.get("_source", {}) #if not query and "paginas" in source: # for pagina in source["paginas"]: # paginas.append( # PaginaSchema( # numero=pagina.get("numero", 0), conteudo=pagina.get("conteudo", "") # ) # ) # Se temos uma query e há inner_hits, processamos todas as páginas correspondentes if query and "inner_hits" in hit and "paginas" in hit["inner_hits"]: paginas_inner_hits = hit["inner_hits"]["paginas"]["hits"]["hits"] # Processar todas as páginas encontradas, não apenas a primeira for inner_hit in paginas_inner_hits: inner_source = inner_hit.get("_source", {}) page_num = inner_source.get("numero", "N/A") highlights = inner_hit.get("highlight", {}).get( "paginas.conteudo.search", [] ) highlight_content = " ... ".join(highlights) if highlights else "" if page_num != "N/A" and highlight_content: paginas.append( PaginaSchema(numero=page_num, conteudo=highlight_content) ) # Se não há query ou não encontramos páginas nos inner_hits, usamos a primeira página do documento if not paginas and "paginas" in source and source["paginas"]: primeira_pagina_source = source["paginas"][0] conteudo_orig = primeira_pagina_source.get("conteudo", "") paginas.append( PaginaSchema( numero=primeira_pagina_source.get("numero", 0), conteudo=conteudo_orig, ) ) return paginas async def processar_resultados( response: Dict[str, Any], query: Optional[str], page: int, page_size: int ) -> Dict[str, Any]: """ Processa os resultados da busca no Elasticsearch. Args: response: Resposta do Elasticsearch query: Termo de busca original page: Número da página atual page_size: Tamanho da página Returns: Dict[str, Any]: Resultados processados """ hits = response.get("hits", {}) total = hits.get("total", {}).get("value", 0) resultados_formatados = [] for hit in hits.get("hits", []): source = hit.get("_source", {}) resultado_data = { "id": hit.get("_id"), "numero": source.get("numero", ""), "data": source.get("data"), "link": source.get("link", ""), "tipo": source.get("tipo", {}).get("nome", "Sem Tipo"), "score": hit.get("_score"), "paginas": [], } # Processar todas as páginas encontradas resultado_data["paginas"] = await processar_paginas_encontradas(hit, query) resultados_formatados.append(ResultadoSchema(**resultado_data)) return { "total": total, "resultados": resultados_formatados, "pagina": page, "por_pagina": page_size, } async def buscar_diarios_simples( query: Optional[str] = None, numero_diario: Optional[str] = None, modo_busca: str = "exata", # "exata" ou "qualquer" ordenar_por: str = "data_asc", # "relevancia" ou "data" data_inicio: Optional[str] = None, data_fim: Optional[str] = None, tipo_diario: Optional[str] = None, page: int = 1, page_size: int = 10, ) -> Dict[str, Any]: """ Função principal para buscar diários oficiais. Args: query: Termo de busca numero_diario: Número do diário oficial modo_busca: Modo de busca (exata ou qualquer) ordenar_por: Critério de ordenação data_inicio: Data inicial data_fim: Data final tipo_diario: Tipo de diário page: Número da página page_size: Tamanho da página Returns: Dict[str, Any]: Dicionário com os resultados da busca """ # Conectar ao Elasticsearch es = await conectar_elasticsearch() if not es: return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size} # Construir o corpo da requisição es_body = await construir_request_body( query, modo_busca, ordenar_por, data_inicio, data_fim, tipo_diario, numero_diario, page, page_size, ) # Executar a busca try: response = await es.search(index="diario_oficial", body=es_body) except Exception as e: print(f"Erro ao executar busca no Elasticsearch: {e}") return {"total": 0, "resultados": [], "pagina": page, "por_pagina": page_size} finally: await es.close() # Processar e retornar os resultados return await processar_resultados(response, query, page, page_size) async def list_diarios( es_client: AsyncElasticsearch, page: int = 1, page_size: int = 10, data_inicio: Optional[str] = None, data_fim: Optional[str] = None, tipo_diario: Optional[str] = None, numero_diario: Optional[str] = None, ordenar_por: str = "data_desc" ) -> Dict[str, any]: """ Lista diários com paginação e filtros opcionais Args: es_client: Cliente do Elasticsearch page: Número da página page_size: Itens por página data_inicio: Data inicial (YYYY-MM-DD) data_fim: Data final (YYYY-MM-DD) tipo_diario: Filtro por tipo de diário numero_diario: Filtro por número do diário ordenar_por: Campo para ordenação (data_desc, data_asc) Returns: Dict com total de itens e lista de diários """ # Construir query de filtros filters = [] # Filtro por data if data_inicio or data_fim: date_range = {} if data_inicio: try: datetime.strptime(data_inicio, "%Y-%m-%d") date_range["gte"] = data_inicio except ValueError: pass if data_fim: try: datetime.strptime(data_fim, "%Y-%m-%d") date_range["lte"] = data_fim except ValueError: pass if date_range: filters.append({"range": {"data": date_range}}) # Filtro por tipo if tipo_diario: filters.append({"term": {"tipo.nome": tipo_diario}}) # Filtro por número if numero_diario: filters.append({"wildcard": {"numero": f"*{numero_diario}*"}}) # Construir query completa query = { "bool": { "must": [{"match_all": {}}], "filter": filters } } # Definir ordenação sort = [{"data": {"order": "asc" if ordenar_por == "data_asc" else "desc"}}] # Executar consulta try: response = await es_client.search( index="diario_oficial", body={ "query": query, "sort": sort, "from": (page - 1) * page_size, "size": page_size, "_source": ["numero", "data", "tipo", "link"] } ) hits = response["hits"]["hits"] total = response["hits"]["total"]["value"] # Processar resultados diarios = [] for hit in hits: source = hit["_source"] diarios.append(BuscaDiariosResponseSchema( id=hit["_id"], numero=source.get("numero"), data=source.get("data"), tipo=source.get("tipo", {}).get("nome"), link=source.get("link") )) return { "total": total, "page": page, "page_size": page_size, "results": diarios } except Exception as e: print(f"Erro ao listar diários: {e}") return { "total": 0, "page": page, "page_size": page_size, "results": [] }