By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement . We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account class Nothing(luigi.ExternalTask): def output(self): return luigi.contrib.hdfs.HdfsTarget("column.txt") class HelloWorld(luigi.contrib.hadoop.JobTask): somefile = luigi.Parameter() def output(self): return luigi.contrib.hdfs.HdfsTarget(self.somefile) def requires(self): return Nothing() def mapper(self, line): yield "a",1 def reducer(self, key, values): yield key, sum(values) if __name__ == '__main__': luigi.run(main_task_cls=HelloWorld)

If, to the same code, I add a print statement (say, in the mapper function), it crashes:

    def mapper(self, line):
        print("Hello from Mapper")
        yield "a",1

The same happens when importing modules that use stdout or stderr:

import my_import
luigi.contrib.hadoop.attach(my_import)

Where my_import directory contains the following __init__.py:

print("Hello from my_import")
def my_func(x):
    return 3*x

If I remove the print statement, the import succeeds without problem and I can use the function <my_func> inside the <HelloWorld> class.

The problem thus seems to be related to writing to stderr and stdout. I am particularly interested in this because I would like to import modules such as numpy or scitools-iris and they do print stuff to stderr and stdout when imported, thus causing a crash. For example, if I add the following imports to the top of my simple code,

import numpy
luigi.contrib.hadoop.attach(numpy)

the error I obtain when running
luigi --module hello_world HelloWorld --somefile somefile
is the following:

hadoop_user@hdp1:~/Luigi/HelloWorld$ luigi --module hello_world HelloWorld --somefile somefile
No handlers could be found for logger "luigi-interface"
DEBUG: Checking if HelloWorld(name=name, surname=surname) is complete
/home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
  warnings.warn("Parameter {0} is not of type string.".format(str(x)))
DEBUG: Checking if Nothing() is complete
INFO: Informed scheduler that task   HelloWorld_name_surname_ef635838a3   has status   PENDING
INFO: Informed scheduler that task   Nothing__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 1449] Worker Worker(salt=358646876, workers=1, host=hdp1, username=hadoop_user, pid=1449) running   HelloWorld(name=name, surname=surname)
DEBUG: Tmp dir: /tmp/tmp9JXFW_
DEBUG: adding to tar: /home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/util.py -> luigi/util.py
DEBUG: adding to tar: /home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/cmdline.py -> luigi/cmdline.py
DEBUG: adding to tar: /home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi-2.3.0-py2.7.egg-info/top_level.txt -> luigi-2.3.0-py2.7.egg-info/top_level.txt
DEBUG: adding to tar: /home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi-2.3.0-py2.7.egg-info/PKG-INFO -> luigi-2.3.0-py2.7.egg-info/PKG-INFO
DEBUG: adding to tar: /home/hadoop_user/Luigi/HelloWorld/my_import/another_import.py -> my_import/another_import.py
DEBUG: adding to tar: /home/hadoop_user/Luigi/HelloWorld/my_import/imports.py -> my_import/imports.py
DEBUG: adding to tar: /home/hadoop_user/Luigi/HelloWorld/my_import/__init__.py -> my_import/__init__.py
DEBUG: adding to tar: /home/hadoop_user/anaconda2/lib/python2.7/site-packages/numpy/setup.py -> numpy/setup.py
DEBUG: adding to tar: /home/hadoop_user/anaconda2/lib/python2.7/site-packages/numpy/__config__.py -> numpy/__config__.py
DEBUG: adding to tar: /home/hadoop_user/anaconda2/lib/python2.7/site-packages/numpy/compat/__init__.py -> numpy/compat/__init__.py
DEBUG: Adding package metadata to archive for "numpy" found at "/home/hadoop_user/anaconda2/lib/python2.7/site-packages/numpy-1.10.4-py2.7.egg-info"
DEBUG: adding to tar: /home/hadoop_user/Luigi/HelloWorld/hello_world.py -> hello_world.py
INFO: hadoop jar /home/hadoop_user/hadoop-streaming-2.7.3.jar -D mapred.job.name=HelloWorld_name_surname_ef635838a3 -D mapred.reduce.tasks=25 -D stream.jobconf.truncate.limit=20000 -mapper "python mrrunner.py map" -reducer "python mrrunner.py reduce" -file /home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/mrrunner.py -file /tmp/tmp9JXFW_/packages.tar -file /tmp/tmp9JXFW_/job-instance.pickle -input column.txt -output namesurname-temp-2016-09-30T11-44-28.145871
INFO: 16/09/30 11:44:29 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
INFO: 16/09/30 11:44:32 INFO client.RMProxy: Connecting to ResourceManager at ResourceManager/192.168.200.169:8032
INFO: 16/09/30 11:44:32 INFO client.RMProxy: Connecting to ResourceManager at ResourceManager/192.168.200.169:8032
INFO: 16/09/30 11:44:33 INFO mapred.FileInputFormat: Total input paths to process : 1
INFO: 16/09/30 11:44:33 INFO mapreduce.JobSubmitter: number of splits:2
INFO: 16/09/30 11:44:33 INFO Configuration.deprecation: mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
INFO: 16/09/30 11:44:33 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name
INFO: 16/09/30 11:44:33 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1475111258225_0016
INFO: 16/09/30 11:44:33 INFO impl.YarnClientImpl: Submitted application application_1475111258225_0016
INFO: 16/09/30 11:44:33 INFO mapreduce.Job: The url to track the job: http://ResourceManager:8088/proxy/application_1475111258225_0016/
INFO: 16/09/30 11:44:33 INFO mapreduce.Job: Running job: job_1475111258225_0016
INFO: 16/09/30 11:44:41 INFO mapreduce.Job: Job job_1475111258225_0016 running in uber mode : false
INFO: 16/09/30 11:44:41 INFO mapreduce.Job:  map 0% reduce 0%
INFO: 16/09/30 11:44:47 INFO mapreduce.Job: Task Id : attempt_1475111258225_0016_m_000000_0, Status : FAILED
INFO: Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
INFO: at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)
INFO: at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)
INFO: 16/09/30 11:45:07 ERROR streaming.StreamJob: Job not successful!
INFO: Streaming Command Failed!
DEBUG: Fetching data from http://resourcemanager:8088/proxy/application_1475111258225_0016/&cause=failed
ERROR: [pid 1449] Worker Worker(salt=358646876, workers=1, host=hdp1, username=hadoop_user, pid=1449) failed    HelloWorld(name=name, surname=surname)
Traceback (most recent call last):
  File "/home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/worker.py", line 181, in run
    new_deps = self._run_get_new_deps()
  File "/home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/worker.py", line 119, in _run_get_new_deps
    task_gen = self.task.run()
  File "/home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/contrib/hadoop.py", line 720, in run
    self.job_runner().run_job(self)
  File "/home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/contrib/hadoop.py", line 561, in run_job
    run_and_track_hadoop_job(arglist, tracking_url_callback=job.set_tracking_url)
  File "/home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/contrib/hadoop.py", line 350, in run_and_track_hadoop_job
    return track_process(arglist, tracking_url_callback, env)
  File "/home/hadoop_user/anaconda2/lib/python2.7/site-packages/luigi/contrib/hadoop.py", line 343, in track_process
    raise HadoopJobError(message + 'Also, could not fetch output from tasks.', out, err)
HadoopJobError: Streaming job failed with exit code 1. Also, could not fetch output from tasks.
DEBUG: Removing directory /tmp/tmp9JXFW_
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Skipping error email. Set `error-email` in the `core` section of the Luigi config file or override `owner_email` in the task to receive error emails.
INFO: Informed scheduler that task   HelloWorld_name_surname_ef635838a3   has status   FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=358646876, workers=1, host=hdp1, username=hadoop_user, pid=1449) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====
Scheduled 2 tasks of which:
* 1 present dependencies were encountered:
    - 1 Nothing()
* 1 failed:
    - 1 HelloWorld(name=name, surname=surname)
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====

Do you know what is happening? Am I doing something wrong or this is a deeper issue?
I will really appreciate any help with this, thanks in advance!

I haven't yet solved the problem of writing to stderr or stdout. Maybe writing explicitly to a log file would do.

What I solved is the importing of modules. The solution was to add the following lines at the top of my script, which will attach to luigi tarball all what is needed:

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
import sys 
import numpy
for i in range(len(sys.modules.keys())):
        mod = map(__import__,[sys.modules.keys()[i]])[0]
        luigi.contrib.hadoop.attach(mod)
    except:

Even if this is only a hack to get things done, I wonder if it could and make sense to add a piece of code like that in the create_packages_archive function inside luigi.contrib.hadoop:

for package in packages: # Put a submodule's entire package in the archive. This is the # magic that usually packages everything you need without # having to attach packages/modules explicitly if not getattr(package, "__path__", None) and '.' in package.__name__: package = __import__(package.__name__.rpartition('.')[0], None, None, 'non_empty') n = package.__name__.replace(".", "/") if getattr(package, "__path__", None): # TODO: (BUG) picking only the first path does not # properly deal with namespaced packages in different # directories p = package.__path__[0]

Let me know...

It should also be said that, in any case, if the set of packages that need to be added is BIG, then it is better to find another approach from the very beginning, as the amount of time taken to attach can become a bigger problem.

@mfcabrera As a workaround, I just installed the python packages I needed in every node and my jobs ran fine (I'm currently working with a very small cluster). In any case, shouldn't Luigi deal with attaching automatically all the dependencies of a package (that's what the documentation claims it does)? I will try your solution on the blog at a later time, it looks clever and neat. Thank you!

@charlietianx, to be honest, I followed the workaround above, yes, but I don't remember the outcome. In any case, I didn't adopt Luigi as a solution at that time. My problem was specifically about dealing with netcdf files: https://ieeexplore.ieee.org/abstract/document/8035056

I developed some home-made patchy solution: https://pipistrello.readthedocs.io/en/latest/README.html#readme , not sure if that helps you though...

@charlietianx, to be honest, I followed the workaround above, yes, but I don't remember the outcome. In any case, I didn't adopt Luigi as a solution at that time. My problem was specifically about dealing with netcdf files: https://ieeexplore.ieee.org/abstract/document/8035056

I developed some home-made patchy solution: https://pipistrello.readthedocs.io/en/latest/README.html#readme , not sure if that helps you though...

Thank you for the relply. It turns out my error was also because missing modules. I checked the hadoop yarn log and got the more detailed error log and solved it.